diff --git a/lib/fog/aws.rb b/lib/fog/aws.rb index 2bbde77d3..8000c40cb 100644 --- a/lib/fog/aws.rb +++ b/lib/fog/aws.rb @@ -19,6 +19,7 @@ module Fog service(:sns, 'aws/sns') service(:sqs, 'aws/sqs') service(:storage, 'aws/storage') + service(:emr, 'aws/emr') def self.indexed_param(key, values) params = {} @@ -26,10 +27,34 @@ module Fog key << '.%d' end [*values].each_with_index do |value, index| - params[format(key, index + 1)] = value + if value.respond_to?('keys') + k = format(key, index + 1) + value.each do | vkey, vvalue | + params["#{k}.#{vkey}"] = vvalue + end + else + params[format(key, index + 1)] = value + end end params end + + def self.serialize_keys(key, value, options = {}) + case value + when Hash + value.each do | k, v | + options.merge!(serialize_keys("#{key}.#{k}", v)) + end + return options + when Array + value.each_with_index do | it, idx | + options.merge!(serialize_keys("#{key}.member.#{(idx + 1)}", it)) + end + return options + else + return {key => value} + end + end def self.indexed_filters(filters) params = {} diff --git a/lib/fog/aws/emr.rb b/lib/fog/aws/emr.rb new file mode 100644 index 000000000..dce6d8a42 --- /dev/null +++ b/lib/fog/aws/emr.rb @@ -0,0 +1,133 @@ +require File.expand_path(File.join(File.dirname(__FILE__), '..', 'aws')) + +module Fog + module AWS + class EMR < Fog::Service + + class IdentifierTaken < Fog::Errors::Error; end + + requires :aws_access_key_id, :aws_secret_access_key + recognizes :region, :host, :path, :port, :scheme, :persistent + + request_path 'fog/aws/requests/emr' + + request :add_instance_groups +# request :add_job_flow_steps +# request :describe_job_flows +# request :modify_instance_groups + request :run_job_flow +# request :set_termination_protection +# request :terminate_job_flows + + # model_path 'fog/aws/models/rds' + # model :server + # collection :servers + # model :snapshot + # collection :snapshots + # model :parameter_group + # collection :parameter_groups + # + # model :parameter + # collection :parameters + # + # model :security_group + # collection :security_groups + + class Mock + + def initialize(options={}) + Fog::Mock.not_implemented + end + + end + + class Real + + # Initialize connection to EMR + # + # ==== Notes + # options parameter must include values for :aws_access_key_id and + # :aws_secret_access_key in order to create a connection + # + # ==== Examples + # emr = EMR.new( + # :aws_access_key_id => your_aws_access_key_id, + # :aws_secret_access_key => your_aws_secret_access_key + # ) + # + # ==== Parameters + # * options<~Hash> - config arguments for connection. Defaults to {}. + # * region<~String> - optional region to use, in ['eu-west-1', 'us-east-1', 'us-west-1'i, 'ap-southeast-1'] + # + # ==== Returns + # * EMR object with connection to AWS. + def initialize(options={}) + @aws_access_key_id = options[:aws_access_key_id] + @aws_secret_access_key = options[:aws_secret_access_key] + @connection_options = options[:connection_options] || {} + @hmac = Fog::HMAC.new('sha256', @aws_secret_access_key) + + options[:region] ||= 'us-east-1' + @host = options[:host] || 'elasticmapreduce.amazonaws.com' + @path = options[:path] || '/' + @persistent = options[:persistent] || false + @port = options[:port] || 443 + @scheme = options[:scheme] || 'https' + @connection = Fog::Connection.new("#{@scheme}://#{@host}:#{@port}#{@path}", @persistent, @connection_options) + end + + def reload + @connection.reset + end + + private + + def request(params) + idempotent = params.delete(:idempotent) + parser = params.delete(:parser) + + body = Fog::AWS.signed_params( + params, + { + :aws_access_key_id => @aws_access_key_id, + :hmac => @hmac, + :host => @host, + :path => @path, + :port => @port, + :version => '2009-03-31' #'2010-07-28' + } + ) + + begin + response = @connection.request({ + :body => body, + :expects => 200, + :headers => { 'Content-Type' => 'application/x-www-form-urlencoded' }, + :idempotent => idempotent, + :host => @host, + :method => 'POST', + :parser => parser + }) + rescue Excon::Errors::HTTPStatusError => error + if match = error.message.match(/(.*)<\/Code>[\s\\\w]+(.*)<\/Message>/m) + # case match[1].split('.').last + # when 'DBInstanceNotFound', 'DBParameterGroupNotFound', 'DBSnapshotNotFound', 'DBSecurityGroupNotFound' + # raise Fog::AWS::RDS::NotFound.slurp(error, match[2]) + # when 'DBParameterGroupAlreadyExists' + # raise Fog::AWS::RDS::IdentifierTaken.slurp(error, match[2]) + # else + # raise + # end + raise + else + raise + end + end + + response + end + + end + end + end +end diff --git a/lib/fog/aws/parsers/emr/add_instance_groups.rb b/lib/fog/aws/parsers/emr/add_instance_groups.rb new file mode 100644 index 000000000..912faa3f7 --- /dev/null +++ b/lib/fog/aws/parsers/emr/add_instance_groups.rb @@ -0,0 +1,32 @@ +module Fog + module Parsers + module AWS + module EMR + + class AddInstanceGroups < Fog::Parsers::Base + + def start_element(name, attrs = []) + super + case name + when 'InstanceGroupIds' + @in_instance_group_ids = true + @instance_groups_ids = [] + end + end + + def end_element(name) + case name + when 'JobFlowId' + @response[name] = value + when 'InstanceGroupIds' + @in_instance_group_ids = false + @response['InstanceGroupIds'] = @instance_group_ids + when 'InstanceGroupId' + @instance_group_ids << value + end + end + end + end + end + end +end diff --git a/lib/fog/aws/parsers/emr/run_job_flow.rb b/lib/fog/aws/parsers/emr/run_job_flow.rb new file mode 100644 index 000000000..b0c94e062 --- /dev/null +++ b/lib/fog/aws/parsers/emr/run_job_flow.rb @@ -0,0 +1,22 @@ +module Fog + module Parsers + module AWS + module EMR + + class RunJobFlow < Fog::Parsers::Base + + def start_element(name, attrs = []) + super + end + + def end_element(name) + case name + when 'JobFlowId' + @response[name] = value + end + end + end + end + end + end +end diff --git a/lib/fog/aws/requests/emr/add_instance_groups.rb b/lib/fog/aws/requests/emr/add_instance_groups.rb new file mode 100644 index 000000000..e8e2a7877 --- /dev/null +++ b/lib/fog/aws/requests/emr/add_instance_groups.rb @@ -0,0 +1,47 @@ +module Fog + module AWS + class EMR + class Real + + require 'fog/aws/parsers/emr/add_instance_groups' + + # adds an instance group to a running cluster + # http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_AddInstanceGroups.html + # ==== Parameters + # * JobFlowId <~String> - Job flow in which to add the instance groups + # * InstanceGroups<~Array> - Instance Groups to add + # * 'BidPrice'<~String> - Bid price for each Amazon EC2 instance in the instance group when launching nodes as Spot Instances, expressed in USD. + # * 'InstanceCount'<~Integer> - Target number of instances for the instance group + # * 'InstanceRole'<~String> - MASTER | CORE | TASK The role of the instance group in the cluster + # * 'InstanceType'<~String> - The Amazon EC2 instance type for all instances in the instance group + # * 'MarketType'<~String> - ON_DEMAND | SPOT Market type of the Amazon EC2 instances used to create a cluster node + # * 'Name'<~String> - Friendly name given to the instance group. + # + # ==== Returns + # * response<~Excon::Response>: + # * body<~Hash>: + def add_instance_groups(job_flow_id, options={}) + + if instance_groups = options.delete('InstanceGroups') + options.merge!(Fog::AWS.indexed_param('InstanceGroups.member.%d', [*instance_groups])) + end + + request({ + 'Action' => 'CreateDBInstance', + 'JobFlowId' => db_name, + :parser => Fog::Parsers::AWS::EMR::AddInstanceGroups.new, + }.merge(options)) + end + + end + + class Mock + + def add_instance_groups(db_name, options={}) + Fog::Mock.not_implemented + end + + end + end + end +end diff --git a/lib/fog/aws/requests/emr/run_job_flow.rb b/lib/fog/aws/requests/emr/run_job_flow.rb new file mode 100644 index 000000000..c64c055f9 --- /dev/null +++ b/lib/fog/aws/requests/emr/run_job_flow.rb @@ -0,0 +1,106 @@ +module Fog + module AWS + class EMR + class Real + + require 'fog/aws/parsers/emr/run_job_flow' + + # creates and starts running a new job flow + # http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_RunJobFlow.html + # ==== Parameters + # * AdditionalInfo <~String> - A JSON string for selecting additional features. + # * BootstrapActions <~Array> - A list of bootstrap actions that will be run before Hadoop is started on the cluster nodes. + # * 'Name'<~String> - The name of the bootstrap action + # * 'ScriptBootstrapAction'<~Array> - The script run by the bootstrap action + # * 'Args' <~Array> - A list of command line arguments to pass to the bootstrap action script + # * 'Path' <~String> - Location of the script to run during a bootstrap action. Can be either a location in Amazon S3 or on a local file system. + # * Instances <~Array> - A specification of the number and type of Amazon EC2 instances on which to run the job flow. + # * 'Ec2KeyName'<~String> - Specifies the name of the Amazon EC2 key pair that can be used to ssh to the master node as the user called "hadoop. + # * 'HadoopVersion'<~String> - "0.18" | "0.20" Specifies the Hadoop version for the job flow + # * 'InstanceCount'<~Integer> - The number of Amazon EC2 instances used to execute the job flow + # * 'InstanceGroups'<~Array> - Configuration for the job flow's instance groups + # * 'BidPrice' <~String> - Bid price for each Amazon EC2 instance in the instance group when launching nodes as Spot Instances, expressed in USD. + # * 'InstanceCount'<~Integer> - Target number of instances for the instance group + # * 'InstanceRole'<~String> - MASTER | CORE | TASK The role of the instance group in the cluster + # * 'InstanceType'<~String> - The Amazon EC2 instance type for all instances in the instance group + # * 'MarketType'<~String> - ON_DEMAND | SPOT Market type of the Amazon EC2 instances used to create a cluster node + # * 'Name'<~String> - Friendly name given to the instance group. + # * 'KeepJobFlowAliveWhenNoSteps' <~Boolean> - Specifies whether the job flow should terminate after completing all steps + # * 'MasterInstanceType'<~String> - The EC2 instance type of the master node + # * 'Placement'<~Array> - Specifies the Availability Zone the job flow will run in + # * 'AvailabilityZone' <~String> - The Amazon EC2 Availability Zone for the job flow. + # * 'SlaveInstanceType'<~String> - The EC2 instance type of the slave nodes + # * 'TerminationProtected'<~Boolean> - Specifies whether to lock the job flow to prevent the Amazon EC2 instances from being terminated by API call, user intervention, or in the event of a job flow error + # * LogUri <~String> - Specifies the location in Amazon S3 to write the log files of the job flow. If a value is not provided, logs are not created + # * Name <~String> - The name of the job flow + # * Steps <~Array> - A list of steps to be executed by the job flow + # * 'ActionOnFailure'<~String> - TERMINATE_JOB_FLOW | CANCEL_AND_WAIT | CONTINUE Specifies the action to take if the job flow step fails + # * 'HadoopJarStep'<~Array> - Specifies the JAR file used for the job flow step + # * 'Args'<~String list> - A list of command line arguments passed to the JAR file's main function when executed. + # * 'Jar'<~String> - A path to a JAR file run during the step. + # * 'MainClass'<~String> - The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file + # * 'Properties'<~Array> - A list of Java properties that are set when the step runs. You can use these properties to pass key value pairs to your main function + # * 'Key'<~String> - The unique identifier of a key value pair + # * 'Value'<~String> - The value part of the identified key + # * 'Name'<~String> - The name of the job flow step + # + # ==== Returns + # * response<~Excon::Response>: + # * body<~Hash>: + def run_job_flow(name, options={}) + + if bootstrap_actions = options.delete('BootstrapActions') + options.merge!(Fog::AWS.serialize_keys('BootstrapActions', bootstrap_actions)) + end + + if instances = options.delete('Instances') + options.merge!(Fog::AWS.serialize_keys('Instances', instances)) + end + + if steps = options.delete('Steps') + options.merge!(Fog::AWS.serialize_keys('Steps', steps)) + end + + request({ + 'Action' => 'RunJobFlow', + 'Name' => name, + :parser => Fog::Parsers::AWS::EMR::RunJobFlow.new, + }.merge(options)) + end + + def run_hive(name, options={}) + steps = [] + steps << { + 'Name' => 'Setup Hive', + 'HadoopJarStep' => { + 'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar', + 'Args' => ['s3://us-east-1.elasticmapreduce/libs/hive/hive-script', '--base-path', 's3://us-east-1.elasticmapreduce/libs/hive/', '--install-hive']}, + 'ActionOnFailure' => 'TERMINATE_JOB_FLOW' + } + steps << { + 'Name' => 'Install Hive Site Configuration', + 'HadoopJarStep' => { + 'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar', + 'Args' => ['s3://us-east-1.elasticmapreduce/libs/hive/hive-script', '--base-path', 's3://us-east-1.elasticmapreduce/libs/hive/', '--install-hive-site', '--hive-site=s3://raybeam.okl/prod/hive/hive-site.xml']}, + 'ActionOnFailure' => 'TERMINATE_JOB_FLOW' + } + options['Steps'] = steps + + if not options['Instances'].nil? + options['Instances']['KeepJobFlowAliveWhenNoSteps'] = true + end + + run_job_flow name, options + end + end + + class Mock + + def run_job_flow(db_name, options={}) + Fog::Mock.not_implemented + end + + end + end + end +end