1
0
Fork 0
mirror of https://github.com/fog/fog.git synced 2022-11-09 13:51:43 -05:00
fog--fog/tests/aws/requests/emr/instance_group_tests.rb

105 lines
3.3 KiB
Ruby
Raw Normal View History

Shindo.tests('AWS::EMR | instance groups', ['aws', 'emr']) do
@job_flow_name = "fog_job_flow_#{Time.now.to_f.to_s.gsub('.','')}"
@job_flow_options = {
'Instances' => {
'MasterInstanceType' => 'm1.small',
'SlaveInstanceType' => 'm1.small',
'InstanceCount' => 2,
'Placement' => {
'AvailabilityZone' => 'us-east-1a'
},
'KeepJobFlowAliveWhenNoSteps' => false,
'TerminationProtected' => false,
'HadoopVersion' => '0.20'
}
}
@job_flow_steps = {
'Steps' => [{
'Name' => 'Dummy streaming job',
'ActionOnFailure' => 'CONTINUE',
'HadoopJarStep' => {
'Jar' => '/home/hadoop/contrib/streaming/hadoop-streaming.jar',
'MainClass' => nil,
'Args' => %w(-input s3n://elasticmapreduce/samples/wordcount/input -output hdfs:///examples/output/2011-11-03T090856 -mapper s3n://elasticmapreduce/samples/wordcount/wordSplitter.py -reducer aggregate)
}
}]
}
@instance_group_name = "fog_instance_group_#{Time.now.to_f.to_s.gsub('.','')}"
@instance_groups = {
'InstanceGroups' => [{
'Name' => @instance_group_name,
'InstanceRole' => 'TASK',
'InstanceType' => 'm1.small',
'InstanceCount' => 2
}]
}
result = AWS[:emr].run_job_flow(@job_flow_name, @job_flow_options).body
@job_flow_id = result['JobFlowId']
tests('success') do
tests("#add_instance_groups").formats(AWS::EMR::Formats::ADD_INSTANCE_GROUPS) do
pending if Fog.mocking?
result = AWS[:emr].add_instance_groups(@job_flow_id, @instance_groups).body
@instance_group_id = result['InstanceGroupIds'].first
result
end
tests("#describe_job_flows_with_instance_groups").formats(AWS::EMR::Formats::DESCRIBE_JOB_FLOW_WITH_INSTANCE_GROUPS) do
pending if Fog.mocking?
result = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body
result
end
tests("#modify_instance_groups").formats(AWS::EMR::Formats::BASIC) do
pending if Fog.mocking?
# Add a step so the state doesn't go directly from STARTING to SHUTTING_DOWN
AWS[:emr].add_job_flow_steps(@job_flow_id, @job_flow_steps)
# Wait until job has started before modifying the instance group
begin
sleep 10
result = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body
job_flow = result['JobFlows'].first
state = job_flow['ExecutionStatusDetail']['State']
print "."
end while(state == 'STARTING')
# Check results
result = AWS[:emr].modify_instance_groups('InstanceGroups' => [{'InstanceGroupId' => @instance_group_id, 'InstanceCount' => 4}]).body
# Check the it actually modified the instance count
tests("modify worked?") do
ig_res = AWS[:emr].describe_job_flows('JobFlowIds' => [@job_flow_id]).body
matched = false
jf = ig_res['JobFlows'].first
jf['Instances']['InstanceGroups'].each do | ig |
if ig['InstanceGroupId'] == @instance_group_id
matched = true if ig['InstanceRequestCount'].to_i == 4
end
end
matched
end
result
end
end
AWS[:emr].terminate_job_flows('JobFlowIds' => [@job_flow_id])
end