diff --git a/lib/fog/aws.rb b/lib/fog/aws.rb index 8000c40cb..43c3ec386 100644 --- a/lib/fog/aws.rb +++ b/lib/fog/aws.rb @@ -12,6 +12,7 @@ module Fog service(:cloud_watch, 'aws/cloud_watch') service(:dns, 'aws/dns') service(:elb, 'aws/elb') + service(:emr, 'aws/emr') service(:iam, 'aws/iam') service(:rds, 'aws/rds') service(:ses, 'aws/ses') @@ -19,7 +20,6 @@ module Fog service(:sns, 'aws/sns') service(:sqs, 'aws/sqs') service(:storage, 'aws/storage') - service(:emr, 'aws/emr') def self.indexed_param(key, values) params = {} diff --git a/lib/fog/aws/parsers/emr/add_instance_groups.rb b/lib/fog/aws/parsers/emr/add_instance_groups.rb index 912faa3f7..c6437797d 100644 --- a/lib/fog/aws/parsers/emr/add_instance_groups.rb +++ b/lib/fog/aws/parsers/emr/add_instance_groups.rb @@ -9,8 +9,7 @@ module Fog super case name when 'InstanceGroupIds' - @in_instance_group_ids = true - @instance_groups_ids = [] + @response['InstanceGroupIds'] = [] end end @@ -18,11 +17,8 @@ module Fog case name when 'JobFlowId' @response[name] = value - when 'InstanceGroupIds' - @in_instance_group_ids = false - @response['InstanceGroupIds'] = @instance_group_ids - when 'InstanceGroupId' - @instance_group_ids << value + when 'member' + @response['InstanceGroupIds'] << value end end end diff --git a/lib/fog/aws/parsers/emr/describe_job_flows.rb b/lib/fog/aws/parsers/emr/describe_job_flows.rb index 1839ca085..748df584c 100644 --- a/lib/fog/aws/parsers/emr/describe_job_flows.rb +++ b/lib/fog/aws/parsers/emr/describe_job_flows.rb @@ -69,10 +69,6 @@ module Fog 'MasterInstanceId', 'MasterInstanceType', 'MasterPublicDnsName', 'NormalizedInstanceHours', 'SlaveInstanceType', 'TerminationProtected' @instance[name] = value - when 'BidPrice', 'CreationDateTime', 'EndDateTime', 'InstanceGroupId', - 'InstanceRequestCount', 'InstanceRole', 'InstanceRunningCount', 'InstanceType', - 'LastStateChangeReason', 'Market', 'Name', 'ReadyDateTime', 'StartDateTime', 'State' - @instance_group_detail[name] = value when 'member' @instance['InstanceGroups'] << @instance_group_detail @instance_group_detail = {} @@ -82,6 +78,16 @@ module Fog end end + if @context.last == 'InstanceGroups' + case name + when 'member' + @instance['InstanceGroups'] << @instance_group_detail + @instance_group_detail = {} + else + @instance_group_detail[name] = value + end + end + if @context.last == 'Args' if name == 'member' if @context.include?('Steps') @@ -117,7 +123,7 @@ module Fog when 'AmiVersion', 'JobFlowId', 'LogUri', 'Name' @flow[name] = value when 'member' - @response['JobFlows'] = @flow + @response['JobFlows'] << @flow @flow = {'Instances' => [], 'ExecutionStatusDetail' => {}, 'BootstrapActions' => [], 'Steps' => []} end end diff --git a/lib/fog/aws/requests/emr/add_instance_groups.rb b/lib/fog/aws/requests/emr/add_instance_groups.rb index e8e2a7877..70deac37b 100644 --- a/lib/fog/aws/requests/emr/add_instance_groups.rb +++ b/lib/fog/aws/requests/emr/add_instance_groups.rb @@ -27,8 +27,8 @@ module Fog end request({ - 'Action' => 'CreateDBInstance', - 'JobFlowId' => db_name, + 'Action' => 'AddInstanceGroups', + 'JobFlowId' => job_flow_id, :parser => Fog::Parsers::AWS::EMR::AddInstanceGroups.new, }.merge(options)) end @@ -36,8 +36,7 @@ module Fog end class Mock - - def add_instance_groups(db_name, options={}) + def add_instance_groups(job_flow_id, options={}) Fog::Mock.not_implemented end diff --git a/lib/fog/bin/aws.rb b/lib/fog/bin/aws.rb index 04e2958b5..bb940c1c9 100644 --- a/lib/fog/bin/aws.rb +++ b/lib/fog/bin/aws.rb @@ -17,6 +17,8 @@ class AWS < Fog::Bin Fog::DNS::AWS when :elb Fog::AWS::ELB + when :emr + Fog::AWS::EMR when :iam Fog::AWS::IAM when :sdb, :simpledb @@ -59,6 +61,8 @@ class AWS < Fog::Bin Fog::DNS.new(:provider => 'AWS') when :elb Fog::AWS::ELB.new + when :emr + Fog::AWS::EMR.new when :iam Fog::AWS::IAM.new when :rds diff --git a/tests/aws/requests/emr/helper.rb b/tests/aws/requests/emr/helper.rb new file mode 100644 index 000000000..8dfed64a7 --- /dev/null +++ b/tests/aws/requests/emr/helper.rb @@ -0,0 +1,172 @@ +class AWS + + module EMR + + module Formats + BASIC = { + 'RequestId' => String + } + + RUN_JOB_FLOW = BASIC.merge({ + 'JobFlowId' => String + }) + + ADD_INSTANCE_GROUPS = { + 'JobFlowId' => String, + 'InstanceGroupIds' => Array + } + + SIMPLE_DESCRIBE_JOB_FLOW = { + 'JobFlows' => [{ + 'Name' => String, + 'BootstrapActions' => { + 'ScriptBootstrapActionConfig' => { + 'Args' => Array + } + }, + 'ExecutionStatusDetail' => { + 'CreationDateTime' => String, + 'State' => String, + 'LastStateChangeReason' => String + }, + 'Steps' => [{ + 'ActionOnFailure' => String, + 'Name' => String, + 'StepConfig' => { + 'HadoopJarStepConfig' => { + 'MainClass' => String, + 'Jar' => String, + 'Args' => Array, + 'Properties' => Array + } + }, + 'ExecutionStatusDetail' => { + 'CreationDateTime' => String, + 'State' => String + } + }], + 'JobFlowId' => String, + 'Instances' => { + 'InstanceCount' => String, + 'NormalizedInstanceHours' => String, + 'KeepJobFlowAliveWhenNoSteps' => String, + 'Placement' => { + 'AvailabilityZone' => String + }, + 'MasterInstanceType' => String, + 'SlaveInstanceType' => String, + 'InstanceGroups' => Array, + 'TerminationProtected' => String, + 'HadoopVersion' => String + } + }] + } + + JOB_FLOW_WITHOUT_CHANGE = { + 'JobFlows' => [{ + 'Name' => String, + 'BootstrapActions' => { + 'ScriptBootstrapActionConfig' => { + 'Args' => Array + } + }, + 'ExecutionStatusDetail' => { + 'CreationDateTime' => String, + 'State' => String, + 'LastStateChangeReason' => NilClass + }, + 'Steps' => [{ + 'ActionOnFailure' => String, + 'Name' => String, + 'StepConfig' => { + 'HadoopJarStepConfig' => { + 'MainClass' => String, + 'Jar' => String, + 'Args' => Array, + 'Properties' => Array + } + }, + 'ExecutionStatusDetail' => { + 'CreationDateTime' => String, + 'State' => String + } + }], + 'JobFlowId' => String, + 'Instances' => { + 'InstanceCount' => String, + 'NormalizedInstanceHours' => String, + 'KeepJobFlowAliveWhenNoSteps' => String, + 'Placement' => { + 'AvailabilityZone' => String + }, + 'MasterInstanceType' => String, + 'SlaveInstanceType' => String, + 'InstanceGroups' => Array, + 'TerminationProtected' => String, + 'HadoopVersion' => String + } + }] + } + + DESCRIBE_JOB_FLOW_WITH_INSTANCE_GROUPS = { + 'JobFlows' => [{ + 'Name' => String, + 'BootstrapActions' => { + 'ScriptBootstrapActionConfig' => { + 'Args' => Array + } + }, + 'ExecutionStatusDetail' => { + 'CreationDateTime' => String, + 'State' => String, + 'LastStateChangeReason' => NilClass + }, + 'Steps' => [{ + 'ActionOnFailure' => String, + 'Name' => String, + 'StepConfig' => { + 'HadoopJarStepConfig' => { + 'MainClass' => String, + 'Jar' => String, + 'Args' => Array, + 'Properties' => Array + } + }, + 'ExecutionStatusDetail' => { + 'CreationDateTime' => String, + 'State' => String + } + }], + 'JobFlowId' => String, + 'Instances' => { + 'InstanceCount' => String, + 'NormalizedInstanceHours' => String, + 'KeepJobFlowAliveWhenNoSteps' => String, + 'Placement' => { + 'AvailabilityZone' => String + }, + 'InstanceGroups' => [{ + 'Name' => String, + 'InstanceRole' => String, + 'CreationDateTime' => String, + 'LastStateChangeReason' => nil, + 'InstanceGroupId' => String, + 'Market' => String, + 'InstanceType' => String, + 'State' => String, + 'InstanceRunningCount' => String, + 'InstanceRequestCount' => String + }], + 'MasterInstanceType' => String, + 'SlaveInstanceType' => String, + 'InstanceGroups' => Array, + 'TerminationProtected' => String, + 'HadoopVersion' => String + } + }] + } + + end + end + +end diff --git a/tests/aws/requests/emr/instance_group_tests.rb b/tests/aws/requests/emr/instance_group_tests.rb new file mode 100644 index 000000000..b263fc7cc --- /dev/null +++ b/tests/aws/requests/emr/instance_group_tests.rb @@ -0,0 +1,105 @@ +Shindo.tests('AWS::EMR | instance groups', ['aws', 'emr']) do + + @job_flow_name = "fog_job_flow_#{Time.now.to_f.to_s.gsub('.','')}" + + @job_flow_options = { + 'Instances' => { + 'MasterInstanceType' => 'm1.small', + 'SlaveInstanceType' => 'm1.small', + 'InstanceCount' => 2, + 'Placement' => { + 'AvailabilityZone' => 'us-east-1a' + }, + 'KeepJobFlowAliveWhenNoSteps' => false, + 'TerminationProtected' => false, + 'HadoopVersion' => '0.20' + } + } + + @job_flow_steps = { + 'Steps' => [{ + 'Name' => 'Dummy streaming job', + 'ActionOnFailure' => 'CONTINUE', + 'HadoopJarStep' => { + 'Jar' => '/home/hadoop/contrib/streaming/hadoop-streaming.jar', + 'MainClass' => nil, + 'Args' => %w(-input s3n://elasticmapreduce/samples/wordcount/input -output hdfs:///examples/output/2011-11-03T090856 -mapper s3n://elasticmapreduce/samples/wordcount/wordSplitter.py -reducer aggregate) + } + }] + } + + @instance_group_name = "fog_instance_group_#{Time.now.to_f.to_s.gsub('.','')}" + @instance_groups = { + 'InstanceGroups' => [{ + 'Name' => @instance_group_name, + 'InstanceRole' => 'TASK', + 'InstanceType' => 'm1.small', + 'InstanceCount' => 2 + }] + } + + result = AWS[:emr].run_job_flow(@job_flow_name, @job_flow_options).body + @job_flow_id = result['JobFlowId'] + + tests('success') do + + tests("#add_instance_groups").formats(AWS::EMR::Formats::ADD_INSTANCE_GROUPS) do + pending if Fog.mocking? + + result = AWS[:emr].add_instance_groups(@job_flow_id, @instance_groups).body + + @instance_group_id = result['InstanceGroupIds'].first + + result + end + + tests("#describe_job_flows_with_instance_groups").formats(AWS::EMR::Formats::DESCRIBE_JOB_FLOW_WITH_INSTANCE_GROUPS) do + pending if Fog.mocking? + + result = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body + + result + end + + tests("#modify_instance_groups").formats(AWS::EMR::Formats::BASIC) do + pending if Fog.mocking? + + # Add a step so the state doesn't go directly from STARTING to SHUTTING_DOWN + AWS[:emr].add_job_flow_steps(@job_flow_id, @job_flow_steps) + + # Wait until job has started before modifying the instance group + begin + sleep 10 + + result = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body + job_flow = result['JobFlows'].first + state = job_flow['ExecutionStatusDetail']['State'] + print "." + end while(state == 'STARTING') + + # Check results + result = AWS[:emr].modify_instance_groups('InstanceGroups' => [{'InstanceGroupId' => @instance_group_id, 'InstanceCount' => 4}]).body + + # Check the it actually modified the instance count + tests("modify worked?") do + ig_res = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body + + matched = false + jf = ig_res['JobFlows'].first + jf['Instances']['InstanceGroups'].each do | ig | + if ig['InstanceGroupId'] == @instance_group_id + matched = true if ig['InstanceRequestCount'].to_i == 4 + end + end + + matched + end + + result + end + + end + + + AWS[:emr].terminate_job_flows('JobFlowIds' => [@job_flow_id]) +end \ No newline at end of file diff --git a/tests/aws/requests/emr/job_flow_tests.rb b/tests/aws/requests/emr/job_flow_tests.rb new file mode 100644 index 000000000..e5b8268e9 --- /dev/null +++ b/tests/aws/requests/emr/job_flow_tests.rb @@ -0,0 +1,86 @@ +Shindo.tests('AWS::EMR | job flows', ['aws', 'emr']) do + + @job_flow_name = "fog_job_flow_#{Time.now.to_f.to_s.gsub('.','')}" + + @job_flow_options = { + 'Instances' => { + 'MasterInstanceType' => 'm1.small', + 'SlaveInstanceType' => 'm1.small', + 'InstanceCount' => 2, + 'Placement' => { + 'AvailabilityZone' => 'us-east-1a' + }, + 'KeepJobFlowAliveWhenNoSteps' => false, + 'TerminationProtected' => false, + 'HadoopVersion' => '0.20' + } + } + + @step_name = "fog_job_flow_step_#{Time.now.to_f.to_s.gsub('.','')}" + + @job_flow_steps = { + 'Steps' => [{ + 'Name' => @step_name, + 'ActionOnFailure' => 'CONTINUE', + 'HadoopJarStep' => { + 'Jar' => 'FakeJar', + 'MainClass' => 'FakeMainClass', + 'Args' => ['arg1', 'arg2'] + } + }] + } + + @job_flow_id = nil + + tests('success') do + + tests("#run_job_flow").formats(AWS::EMR::Formats::RUN_JOB_FLOW) do + pending if Fog.mocking? + + result = AWS[:emr].run_job_flow(@job_flow_name, @job_flow_options).body + @job_flow_id = result['JobFlowId'] + + result + end + + tests("#add_job_flow_steps").formats(AWS::EMR::Formats::BASIC) do + pending if Fog.mocking? + + result = AWS[:emr].add_job_flow_steps(@job_flow_id, @job_flow_steps).body + + result + end + + tests("#set_termination_protection").formats(AWS::EMR::Formats::BASIC) do + + result = AWS[:emr].set_termination_protection(true, 'JobFlowIds' => [@job_flow_id]).body + + test("protected?") do + res = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body + jf = res['JobFlows'].first + + jf['Instances']['TerminationProtected'] == 'true' + end + + result + end + + tests("#terminate_job_flow").formats(AWS::EMR::Formats::BASIC) do + pending if Fog.mocking? + AWS[:emr].set_termination_protection(false, 'JobFlowIds' => [@job_flow_id]) + + result = AWS[:emr].terminate_job_flows('JobFlowIds' => [@job_flow_id]).body + + result + end + + tests("#describe_job_flows").formats(AWS::EMR::Formats::SIMPLE_DESCRIBE_JOB_FLOW) do + pending if Fog.mocking? + + result = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body + + result + end + + end +end \ No newline at end of file