From 0002ea89efb59d36cfda2c4e38d0eed680932ff7 Mon Sep 17 00:00:00 2001 From: Bob Briski Date: Fri, 16 Sep 2011 16:59:28 -0700 Subject: [PATCH] Added EMR functions for AWS * describe_job_flows does not return Properties for the HadoopJarStepConfig * All other methods work as advertised --- lib/fog/aws/emr.rb | 10 +- lib/fog/aws/parsers/emr/add_job_flow_steps.rb | 17 +++ lib/fog/aws/parsers/emr/describe_job_flows.rb | 134 ++++++++++++++++++ .../aws/parsers/emr/modify_instance_groups.rb | 17 +++ lib/fog/aws/parsers/emr/run_job_flow.rb | 7 +- .../parsers/emr/set_termination_protection.rb | 17 +++ .../aws/parsers/emr/terminate_job_flows.rb | 17 +++ .../aws/requests/emr/add_job_flow_steps.rb | 49 +++++++ .../aws/requests/emr/describe_job_flows.rb | 108 ++++++++++++++ .../requests/emr/modify_instance_groups.rb | 40 ++++++ .../emr/set_termination_protection.rb | 39 +++++ .../aws/requests/emr/terminate_job_flows.rb | 37 +++++ lib/fog/core/connection.rb | 2 +- 13 files changed, 483 insertions(+), 11 deletions(-) create mode 100644 lib/fog/aws/parsers/emr/add_job_flow_steps.rb create mode 100644 lib/fog/aws/parsers/emr/describe_job_flows.rb create mode 100644 lib/fog/aws/parsers/emr/modify_instance_groups.rb create mode 100644 lib/fog/aws/parsers/emr/set_termination_protection.rb create mode 100644 lib/fog/aws/parsers/emr/terminate_job_flows.rb create mode 100644 lib/fog/aws/requests/emr/add_job_flow_steps.rb create mode 100644 lib/fog/aws/requests/emr/describe_job_flows.rb create mode 100644 lib/fog/aws/requests/emr/modify_instance_groups.rb create mode 100644 lib/fog/aws/requests/emr/set_termination_protection.rb create mode 100644 lib/fog/aws/requests/emr/terminate_job_flows.rb diff --git a/lib/fog/aws/emr.rb b/lib/fog/aws/emr.rb index dce6d8a42..b39ff8eda 100644 --- a/lib/fog/aws/emr.rb +++ b/lib/fog/aws/emr.rb @@ -12,12 +12,12 @@ module Fog request_path 'fog/aws/requests/emr' request :add_instance_groups -# request :add_job_flow_steps -# request :describe_job_flows -# request :modify_instance_groups + request :add_job_flow_steps + request :describe_job_flows + request :modify_instance_groups request :run_job_flow -# request :set_termination_protection -# request :terminate_job_flows + request :set_termination_protection + request :terminate_job_flows # model_path 'fog/aws/models/rds' # model :server diff --git a/lib/fog/aws/parsers/emr/add_job_flow_steps.rb b/lib/fog/aws/parsers/emr/add_job_flow_steps.rb new file mode 100644 index 000000000..2a93b70f4 --- /dev/null +++ b/lib/fog/aws/parsers/emr/add_job_flow_steps.rb @@ -0,0 +1,17 @@ +module Fog + module Parsers + module AWS + module EMR + + class AddJobFlowSteps < Fog::Parsers::Base + def end_element(name) + case name + when 'RequestId' + @response[name] = value + end + end + end + end + end + end +end diff --git a/lib/fog/aws/parsers/emr/describe_job_flows.rb b/lib/fog/aws/parsers/emr/describe_job_flows.rb new file mode 100644 index 000000000..1839ca085 --- /dev/null +++ b/lib/fog/aws/parsers/emr/describe_job_flows.rb @@ -0,0 +1,134 @@ +module Fog + module Parsers + module AWS + module EMR + + class DescribeJobFlows < Fog::Parsers::Base + + def reset + @context = [] + @contexts = ['BootstrapActions', 'ExecutionStatusDetail', 'Instances', 'Steps', 'InstanceGroups', 'Args'] + + @response = { 'JobFlows' => [] } + @bootstrap_actions = {'ScriptBootstrapActionConfig' => {'Args' => []}} + @instance = { 'InstanceGroups' => [], 'Placement' => {}} + @step = { + 'ExecutionStatusDetail' => {}, + 'StepConfig' => { + 'HadoopJarStepConfig' => { + 'Args' => [], + 'Properties' => [] + } + } + } + @flow = {'Instances' => [], 'ExecutionStatusDetail' => {}, 'BootstrapActions' => [], 'Steps' => []} + @instance_group_detail = {} + @execution_status_detail = {} + end + + def start_element(name, attrs = []) + super + if @contexts.include?(name) + @context.push(name) + end + end + + def end_element(name) + if @context.last == 'BootstrapActions' + case name + when 'Name' + @bootstrap_actions[name] = value + when 'Path' + @bootstrap_actions['ScriptBootstrapActionConfig'][name] = value + when 'BootstrapActions' + @flow['BootstrapActions'] = @bootstrap_actions + @bootstrap_actions = {'ScriptBootstrapActionConfig' => {'Args' => []}} + end + end + + if @context.last == 'ExecutionStatusDetail' + case name + when 'CreationDateTime', 'EndDateTime', 'LastStateChangeReason', + 'ReadyDateTime', 'StartDateTime', 'State' + @execution_status_detail[name] = value + when 'ExecutionStatusDetail' + if @context.include?('Steps') + @step['ExecutionStatusDetail'] = @execution_status_detail + else + @flow['ExecutionStatusDetail'] = @execution_status_detail + end + @execution_status_detail = {} + end + end + + if @context.last == 'Instances' + case name + when 'AvailabilityZone' + @instance['Placement'][name] = value + when 'Ec2KeyName', 'HadoopVersion', 'InstanceCount', 'KeepJobFlowAliveWhenNoSteps', + 'MasterInstanceId', 'MasterInstanceType', 'MasterPublicDnsName', 'NormalizedInstanceHours', + 'SlaveInstanceType', 'TerminationProtected' + @instance[name] = value + when 'BidPrice', 'CreationDateTime', 'EndDateTime', 'InstanceGroupId', + 'InstanceRequestCount', 'InstanceRole', 'InstanceRunningCount', 'InstanceType', + 'LastStateChangeReason', 'Market', 'Name', 'ReadyDateTime', 'StartDateTime', 'State' + @instance_group_detail[name] = value + when 'member' + @instance['InstanceGroups'] << @instance_group_detail + @instance_group_detail = {} + when 'Instances' + @flow['Instances'] = @instance + @instance = { 'InstanceGroups' => [], 'Placement' => {}} + end + end + + if @context.last == 'Args' + if name == 'member' + if @context.include?('Steps') + @step['StepConfig']['HadoopJarStepConfig']['Args'] << value.strip + else + @bootstrap_actions['ScriptBootstrapActionConfig']['Args'] << value + end + end + end + + if @context.last == 'Steps' + case name + when 'ActionOnFailure', 'Name' + @step[name] = value + when 'Jar', 'MainClass' + @step['StepConfig']['HadoopJarStepConfig'][name] = value + when 'member' + @flow['Steps'] << @step + @step = { + 'ExecutionStatusDetail' => {}, + 'StepConfig' => { + 'HadoopJarStepConfig' => { + 'Args' => [], + 'Properties' => [] + } + } + } + end + end + + if @context.empty? + case name + when 'AmiVersion', 'JobFlowId', 'LogUri', 'Name' + @flow[name] = value + when 'member' + @response['JobFlows'] = @flow + @flow = {'Instances' => [], 'ExecutionStatusDetail' => {}, 'BootstrapActions' => [], 'Steps' => []} + end + end + + if @context.last == name + @context.pop + end + end + end + + end + end + end +end diff --git a/lib/fog/aws/parsers/emr/modify_instance_groups.rb b/lib/fog/aws/parsers/emr/modify_instance_groups.rb new file mode 100644 index 000000000..f4daf4376 --- /dev/null +++ b/lib/fog/aws/parsers/emr/modify_instance_groups.rb @@ -0,0 +1,17 @@ +module Fog + module Parsers + module AWS + module EMR + + class ModifyInstanceGroups < Fog::Parsers::Base + def end_element(name) + case name + when 'RequestId' + @response[name] = value + end + end + end + end + end + end +end diff --git a/lib/fog/aws/parsers/emr/run_job_flow.rb b/lib/fog/aws/parsers/emr/run_job_flow.rb index b0c94e062..4ef3a3790 100644 --- a/lib/fog/aws/parsers/emr/run_job_flow.rb +++ b/lib/fog/aws/parsers/emr/run_job_flow.rb @@ -4,15 +4,12 @@ module Fog module EMR class RunJobFlow < Fog::Parsers::Base - - def start_element(name, attrs = []) - super - end - def end_element(name) case name when 'JobFlowId' @response[name] = value + when 'RequestId' + @response[name] = value end end end diff --git a/lib/fog/aws/parsers/emr/set_termination_protection.rb b/lib/fog/aws/parsers/emr/set_termination_protection.rb new file mode 100644 index 000000000..fc303bca8 --- /dev/null +++ b/lib/fog/aws/parsers/emr/set_termination_protection.rb @@ -0,0 +1,17 @@ +module Fog + module Parsers + module AWS + module EMR + + class SetTerminationProtection < Fog::Parsers::Base + def end_element(name) + case name + when 'RequestId' + @response[name] = value + end + end + end + end + end + end +end diff --git a/lib/fog/aws/parsers/emr/terminate_job_flows.rb b/lib/fog/aws/parsers/emr/terminate_job_flows.rb new file mode 100644 index 000000000..e555ff891 --- /dev/null +++ b/lib/fog/aws/parsers/emr/terminate_job_flows.rb @@ -0,0 +1,17 @@ +module Fog + module Parsers + module AWS + module EMR + + class TerminateJobFlows < Fog::Parsers::Base + def end_element(name) + case name + when 'RequestId' + @response[name] = value + end + end + end + end + end + end +end diff --git a/lib/fog/aws/requests/emr/add_job_flow_steps.rb b/lib/fog/aws/requests/emr/add_job_flow_steps.rb new file mode 100644 index 000000000..00d70c56b --- /dev/null +++ b/lib/fog/aws/requests/emr/add_job_flow_steps.rb @@ -0,0 +1,49 @@ +module Fog + module AWS + class EMR + class Real + + require 'fog/aws/parsers/emr/add_job_flow_steps' + + # adds new steps to a running job flow. + # http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_AddJobFlowSteps.html + # ==== Parameters + # * JobFlowId <~String> - A string that uniquely identifies the job flow + # * Steps <~Array> - A list of steps to be executed by the job flow + # * 'ActionOnFailure'<~String> - TERMINATE_JOB_FLOW | CANCEL_AND_WAIT | CONTINUE Specifies the action to take if the job flow step fails + # * 'HadoopJarStep'<~Array> - Specifies the JAR file used for the job flow step + # * 'Args'<~String list> - A list of command line arguments passed to the JAR file's main function when executed. + # * 'Jar'<~String> - A path to a JAR file run during the step. + # * 'MainClass'<~String> - The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file + # * 'Properties'<~Array> - A list of Java properties that are set when the step runs. You can use these properties to pass key value pairs to your main function + # * 'Key'<~String> - The unique identifier of a key value pair + # * 'Value'<~String> - The value part of the identified key + # * 'Name'<~String> - The name of the job flow step + # + # ==== Returns + # * response<~Excon::Response>: + # * body<~Hash>: + def add_job_flow_steps(job_flow_id, options={}) + + if steps = options.delete('Steps') + options.merge!(Fog::AWS.serialize_keys('Steps', steps)) + end + + request({ + 'Action' => 'AddJobFlowSteps', + 'JobFlowId' => job_flow_id, + :parser => Fog::Parsers::AWS::EMR::AddJobFlowSteps.new, + }.merge(options)) + end + end + + class Mock + + def add_job_flow_steps(db_name, options={}) + Fog::Mock.not_implemented + end + + end + end + end +end diff --git a/lib/fog/aws/requests/emr/describe_job_flows.rb b/lib/fog/aws/requests/emr/describe_job_flows.rb new file mode 100644 index 000000000..e57f1756b --- /dev/null +++ b/lib/fog/aws/requests/emr/describe_job_flows.rb @@ -0,0 +1,108 @@ +module Fog + module AWS + class EMR + class Real + + require 'fog/aws/parsers/emr/describe_job_flows' + + # returns a list of job flows that match all of the supplied parameters. + # http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_DescribeJobFlows.html + # ==== Parameters + # * CreatedAfter <~DateTime> - Return only job flows created after this date and time + # * CreatedBefore <~DateTime> - Return only job flows created before this date and time + # * JobFlowIds <~String list> - Return only job flows whose job flow ID is contained in this list + # * JobFlowStates <~String list> - RUNNING | WAITING | SHUTTING_DOWN | STARTING Return only job flows whose state is contained in this list + # + # ==== Returns + # * response<~Excon::Response>: + # * body<~Hash>: + # * JobFlows <~Array> - A list of job flows matching the parameters supplied. + # * AmiVersion <~String> - A list of bootstrap actions that will be run before Hadoop is started on the cluster nodes. + # * 'BootstrapActions'<~Array> - A list of the bootstrap actions run by the job flow + # * 'BootstrapConfig <~Array> - A description of the bootstrap action + # * 'Name' <~String> - The name of the bootstrap action + # * 'ScriptBootstrapAction' <~Array> - The script run by the bootstrap action. + # * 'Args' <~String list> - A list of command line arguments to pass to the bootstrap action script. + # * 'Path' <~String> - Location of the script to run during a bootstrap action. + # * 'ExecutionStatusDetail'<~Array> - Describes the execution status of the job flow + # * 'CreationDateTime <~DateTime> - The creation date and time of the job flow. + # * 'EndDateTime <~DateTime> - The completion date and time of the job flow. + # * 'LastStateChangeReason <~String> - Description of the job flow last changed state. + # * 'ReadyDateTime <~DateTime> - The date and time when the job flow was ready to start running bootstrap actions. + # * 'StartDateTime <~DateTime> - The start date and time of the job flow. + # * 'State <~DateTime> - COMPLETED | FAILED | TERMINATED | RUNNING | SHUTTING_DOWN | STARTING | WAITING | BOOTSTRAPPING The state of the job flow. + # * Instances <~Array> - A specification of the number and type of Amazon EC2 instances on which to run the job flow. + # * 'Ec2KeyName'<~String> - Specifies the name of the Amazon EC2 key pair that can be used to ssh to the master node as the user called "hadoop. + # * 'HadoopVersion'<~String> - "0.18" | "0.20" Specifies the Hadoop version for the job flow + # * 'InstanceCount'<~Integer> - The number of Amazon EC2 instances used to execute the job flow + # * 'InstanceGroups'<~Array> - Configuration for the job flow's instance groups + # * 'BidPrice' <~String> - Bid price for each Amazon EC2 instance in the instance group when launching nodes as Spot Instances, expressed in USD. + # * 'CreationDateTime' <~DateTime> - The date/time the instance group was created. + # * 'EndDateTime' <~DateTime> - The date/time the instance group was terminated. + # * 'InstanceGroupId' <~String> - Unique identifier for the instance group. + # * 'InstanceRequestCount'<~Integer> - Target number of instances for the instance group + # * 'InstanceRole'<~String> - MASTER | CORE | TASK The role of the instance group in the cluster + # * 'InstanceRunningCount'<~Integer> - Actual count of running instances + # * 'InstanceType'<~String> - The Amazon EC2 instance type for all instances in the instance group + # * 'LastStateChangeReason'<~String> - Details regarding the state of the instance group + # * 'Market'<~String> - ON_DEMAND | SPOT Market type of the Amazon EC2 instances used to create a cluster + # * 'Name'<~String> - Friendly name for the instance group + # * 'ReadyDateTime'<~DateTime> - The date/time the instance group was available to the cluster + # * 'StartDateTime'<~DateTime> - The date/time the instance group was started + # * 'State'<~String> - PROVISIONING | STARTING | BOOTSTRAPPING | RUNNING | RESIZING | ARRESTED | SHUTTING_DOWN | TERMINATED | FAILED | ENDED State of instance group + # * 'KeepJobFlowAliveWhenNoSteps' <~Boolean> - Specifies whether the job flow should terminate after completing all steps + # * 'MasterInstanceId'<~String> - The Amazon EC2 instance identifier of the master node + # * 'MasterInstanceType'<~String> - The EC2 instance type of the master node + # * 'MasterPublicDnsName'<~String> - The DNS name of the master node + # * 'NormalizedInstanceHours'<~Integer> - An approximation of the cost of the job flow, represented in m1.small/hours. + # * 'Placement'<~Array> - Specifies the Availability Zone the job flow will run in + # * 'AvailabilityZone' <~String> - The Amazon EC2 Availability Zone for the job flow. + # * 'SlaveInstanceType'<~String> - The EC2 instance type of the slave nodes + # * 'TerminationProtected'<~Boolean> - Specifies whether to lock the job flow to prevent the Amazon EC2 instances from being terminated by API call, user intervention, or in the event of a job flow error + # * LogUri <~String> - Specifies the location in Amazon S3 to write the log files of the job flow. If a value is not provided, logs are not created + # * Name <~String> - The name of the job flow + # * Steps <~Array> - A list of steps to be executed by the job flow + # * 'ExecutionStatusDetail'<~Array> - Describes the execution status of the job flow + # * 'CreationDateTime <~DateTime> - The creation date and time of the job flow. + # * 'EndDateTime <~DateTime> - The completion date and time of the job flow. + # * 'LastStateChangeReason <~String> - Description of the job flow last changed state. + # * 'ReadyDateTime <~DateTime> - The date and time when the job flow was ready to start running bootstrap actions. + # * 'StartDateTime <~DateTime> - The start date and time of the job flow. + # * 'State <~DateTime> - COMPLETED | FAILED | TERMINATED | RUNNING | SHUTTING_DOWN | STARTING | WAITING | BOOTSTRAPPING The state of the job flow. + # * StepConfig <~Array> - The step configuration + # * 'ActionOnFailure'<~String> - TERMINATE_JOB_FLOW | CANCEL_AND_WAIT | CONTINUE Specifies the action to take if the job flow step fails + # * 'HadoopJarStep'<~Array> - Specifies the JAR file used for the job flow step + # * 'Args'<~String list> - A list of command line arguments passed to the JAR file's main function when executed. + # * 'Jar'<~String> - A path to a JAR file run during the step. + # * 'MainClass'<~String> - The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file + # * 'Properties'<~Array> - A list of Java properties that are set when the step runs. You can use these properties to pass key value pairs to your main function + # * 'Key'<~String> - The unique identifier of a key value pair + # * 'Value'<~String> - The value part of the identified key + # * 'Name'<~String> - The name of the job flow step + def describe_job_flows(options={}) + + if job_ids = options.delete('JobFlowIds') + options.merge!(Fog::AWS.serialize_keys('JobFlowIds', job_ids)) + end + + if job_states = options.delete('JobFlowStates') + options.merge!(Fog::AWS.serialize_keys('JobFlowStates', job_states)) + end + + request({ + 'Action' => 'DescribeJobFlows', + :parser => Fog::Parsers::AWS::EMR::DescribeJobFlows.new, + }.merge(options)) + end + end + + class Mock + + def describe_job_flows(db_name, options={}) + Fog::Mock.not_implemented + end + + end + end + end +end diff --git a/lib/fog/aws/requests/emr/modify_instance_groups.rb b/lib/fog/aws/requests/emr/modify_instance_groups.rb new file mode 100644 index 000000000..147a432b6 --- /dev/null +++ b/lib/fog/aws/requests/emr/modify_instance_groups.rb @@ -0,0 +1,40 @@ +module Fog + module AWS + class EMR + class Real + + require 'fog/aws/parsers/emr/modify_instance_groups' + + # modifies the number of nodes and configuration settings of an instance group.. + # http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_ModifyInstanceGroups.html + # ==== Parameters + # * InstanceGroups <~InstanceGroupModifyConfig list> - Instance groups to change + # * InstanceCount <~Integer> - Target size for instance group + # * InstanceGroupId <~String> - Unique ID of the instance group to expand or shrink + # + # ==== Returns + # * response<~Excon::Response>: + # * body<~Hash> + def modify_instance_groups(options={}) + + if job_ids = options.delete('InstanceGroups') + options.merge!(Fog::AWS.serialize_keys('InstanceGroups', job_ids)) + end + + request({ + 'Action' => 'ModifyInstanceGroups', + :parser => Fog::Parsers::AWS::EMR::ModifyInstanceGroups.new, + }.merge(options)) + end + end + + class Mock + + def modify_instance_groups(options={}) + Fog::Mock.not_implemented + end + + end + end + end +end diff --git a/lib/fog/aws/requests/emr/set_termination_protection.rb b/lib/fog/aws/requests/emr/set_termination_protection.rb new file mode 100644 index 000000000..b25eedff0 --- /dev/null +++ b/lib/fog/aws/requests/emr/set_termination_protection.rb @@ -0,0 +1,39 @@ +module Fog + module AWS + class EMR + class Real + + require 'fog/aws/parsers/emr/set_termination_protection' + + # locks a job flow so the Amazon EC2 instances in the cluster cannot be terminated by user intervention. + # http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_SetTerminationProtection.html + # ==== Parameters + # * JobFlowIds <~String list> - list of strings that uniquely identify the job flows to protect + # * TerminationProtected <~Boolean> - indicates whether to protect the job flow + # + # ==== Returns + # * response<~Excon::Response>: + # * body<~Hash> + def set_termination_protection(is_protected, options={}) + + if job_ids = options.delete('JobFlowIds') + options.merge!(Fog::AWS.serialize_keys('JobFlowIds', job_ids)) + end + request({ + 'Action' => 'SetTerminationProtection', + 'TerminationProtected' => is_protected, + :parser => Fog::Parsers::AWS::EMR::SetTerminationProtection.new, + }.merge(options)) + end + end + + class Mock + + def set_termination_protection(db_name, options={}) + Fog::Mock.not_implemented + end + + end + end + end +end diff --git a/lib/fog/aws/requests/emr/terminate_job_flows.rb b/lib/fog/aws/requests/emr/terminate_job_flows.rb new file mode 100644 index 000000000..66ec4b881 --- /dev/null +++ b/lib/fog/aws/requests/emr/terminate_job_flows.rb @@ -0,0 +1,37 @@ +module Fog + module AWS + class EMR + class Real + + require 'fog/aws/parsers/emr/terminate_job_flows' + + # shuts a list of job flows down. + # http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_TerminateJobFlows.html + # ==== Parameters + # * JobFlowIds <~String list> - list of strings that uniquely identify the job flows to protect + # + # ==== Returns + # * response<~Excon::Response>: + # * body<~Hash> + def terminate_job_flows(options={}) + + if job_ids = options.delete('JobFlowIds') + options.merge!(Fog::AWS.serialize_keys('JobFlowIds', job_ids)) + end + request({ + 'Action' => 'TerminateJobFlows', + :parser => Fog::Parsers::AWS::EMR::TerminateJobFlows.new, + }.merge(options)) + end + end + + class Mock + + def terminate_job_flows(db_name, options={}) + Fog::Mock.not_implemented + end + + end + end + end +end diff --git a/lib/fog/core/connection.rb b/lib/fog/core/connection.rb index 95f51c785..0b515cecc 100644 --- a/lib/fog/core/connection.rb +++ b/lib/fog/core/connection.rb @@ -18,7 +18,7 @@ module Fog end response = @excon.request(params, &block) - + if parser body.finish response.body = parser.response