1
0
Fork 0
mirror of https://github.com/fog/fog.git synced 2022-11-09 13:51:43 -05:00

Changes to allow EMR control through fog

* Better method to turn options hash into a k/v list
* run_job_flow and add_instance_groups added as methods on EMR
* Ability to fire Hive job added
This commit is contained in:
Bob Briski 2011-09-13 21:46:41 -07:00
parent 3d7adcf7ca
commit 1976031982
6 changed files with 366 additions and 1 deletions

View file

@ -19,6 +19,7 @@ module Fog
service(:sns, 'aws/sns')
service(:sqs, 'aws/sqs')
service(:storage, 'aws/storage')
service(:emr, 'aws/emr')
def self.indexed_param(key, values)
params = {}
@ -26,10 +27,34 @@ module Fog
key << '.%d'
end
[*values].each_with_index do |value, index|
params[format(key, index + 1)] = value
if value.respond_to?('keys')
k = format(key, index + 1)
value.each do | vkey, vvalue |
params["#{k}.#{vkey}"] = vvalue
end
else
params[format(key, index + 1)] = value
end
end
params
end
def self.serialize_keys(key, value, options = {})
case value
when Hash
value.each do | k, v |
options.merge!(serialize_keys("#{key}.#{k}", v))
end
return options
when Array
value.each_with_index do | it, idx |
options.merge!(serialize_keys("#{key}.member.#{(idx + 1)}", it))
end
return options
else
return {key => value}
end
end
def self.indexed_filters(filters)
params = {}

133
lib/fog/aws/emr.rb Normal file
View file

@ -0,0 +1,133 @@
require File.expand_path(File.join(File.dirname(__FILE__), '..', 'aws'))
module Fog
module AWS
class EMR < Fog::Service
class IdentifierTaken < Fog::Errors::Error; end
requires :aws_access_key_id, :aws_secret_access_key
recognizes :region, :host, :path, :port, :scheme, :persistent
request_path 'fog/aws/requests/emr'
request :add_instance_groups
# request :add_job_flow_steps
# request :describe_job_flows
# request :modify_instance_groups
request :run_job_flow
# request :set_termination_protection
# request :terminate_job_flows
# model_path 'fog/aws/models/rds'
# model :server
# collection :servers
# model :snapshot
# collection :snapshots
# model :parameter_group
# collection :parameter_groups
#
# model :parameter
# collection :parameters
#
# model :security_group
# collection :security_groups
class Mock
def initialize(options={})
Fog::Mock.not_implemented
end
end
class Real
# Initialize connection to EMR
#
# ==== Notes
# options parameter must include values for :aws_access_key_id and
# :aws_secret_access_key in order to create a connection
#
# ==== Examples
# emr = EMR.new(
# :aws_access_key_id => your_aws_access_key_id,
# :aws_secret_access_key => your_aws_secret_access_key
# )
#
# ==== Parameters
# * options<~Hash> - config arguments for connection. Defaults to {}.
# * region<~String> - optional region to use, in ['eu-west-1', 'us-east-1', 'us-west-1'i, 'ap-southeast-1']
#
# ==== Returns
# * EMR object with connection to AWS.
def initialize(options={})
@aws_access_key_id = options[:aws_access_key_id]
@aws_secret_access_key = options[:aws_secret_access_key]
@connection_options = options[:connection_options] || {}
@hmac = Fog::HMAC.new('sha256', @aws_secret_access_key)
options[:region] ||= 'us-east-1'
@host = options[:host] || 'elasticmapreduce.amazonaws.com'
@path = options[:path] || '/'
@persistent = options[:persistent] || false
@port = options[:port] || 443
@scheme = options[:scheme] || 'https'
@connection = Fog::Connection.new("#{@scheme}://#{@host}:#{@port}#{@path}", @persistent, @connection_options)
end
def reload
@connection.reset
end
private
def request(params)
idempotent = params.delete(:idempotent)
parser = params.delete(:parser)
body = Fog::AWS.signed_params(
params,
{
:aws_access_key_id => @aws_access_key_id,
:hmac => @hmac,
:host => @host,
:path => @path,
:port => @port,
:version => '2009-03-31' #'2010-07-28'
}
)
begin
response = @connection.request({
:body => body,
:expects => 200,
:headers => { 'Content-Type' => 'application/x-www-form-urlencoded' },
:idempotent => idempotent,
:host => @host,
:method => 'POST',
:parser => parser
})
rescue Excon::Errors::HTTPStatusError => error
if match = error.message.match(/<Code>(.*)<\/Code>[\s\\\w]+<Message>(.*)<\/Message>/m)
# case match[1].split('.').last
# when 'DBInstanceNotFound', 'DBParameterGroupNotFound', 'DBSnapshotNotFound', 'DBSecurityGroupNotFound'
# raise Fog::AWS::RDS::NotFound.slurp(error, match[2])
# when 'DBParameterGroupAlreadyExists'
# raise Fog::AWS::RDS::IdentifierTaken.slurp(error, match[2])
# else
# raise
# end
raise
else
raise
end
end
response
end
end
end
end
end

View file

@ -0,0 +1,32 @@
module Fog
module Parsers
module AWS
module EMR
class AddInstanceGroups < Fog::Parsers::Base
def start_element(name, attrs = [])
super
case name
when 'InstanceGroupIds'
@in_instance_group_ids = true
@instance_groups_ids = []
end
end
def end_element(name)
case name
when 'JobFlowId'
@response[name] = value
when 'InstanceGroupIds'
@in_instance_group_ids = false
@response['InstanceGroupIds'] = @instance_group_ids
when 'InstanceGroupId'
@instance_group_ids << value
end
end
end
end
end
end
end

View file

@ -0,0 +1,22 @@
module Fog
module Parsers
module AWS
module EMR
class RunJobFlow < Fog::Parsers::Base
def start_element(name, attrs = [])
super
end
def end_element(name)
case name
when 'JobFlowId'
@response[name] = value
end
end
end
end
end
end
end

View file

@ -0,0 +1,47 @@
module Fog
module AWS
class EMR
class Real
require 'fog/aws/parsers/emr/add_instance_groups'
# adds an instance group to a running cluster
# http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_AddInstanceGroups.html
# ==== Parameters
# * JobFlowId <~String> - Job flow in which to add the instance groups
# * InstanceGroups<~Array> - Instance Groups to add
# * 'BidPrice'<~String> - Bid price for each Amazon EC2 instance in the instance group when launching nodes as Spot Instances, expressed in USD.
# * 'InstanceCount'<~Integer> - Target number of instances for the instance group
# * 'InstanceRole'<~String> - MASTER | CORE | TASK The role of the instance group in the cluster
# * 'InstanceType'<~String> - The Amazon EC2 instance type for all instances in the instance group
# * 'MarketType'<~String> - ON_DEMAND | SPOT Market type of the Amazon EC2 instances used to create a cluster node
# * 'Name'<~String> - Friendly name given to the instance group.
#
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def add_instance_groups(job_flow_id, options={})
if instance_groups = options.delete('InstanceGroups')
options.merge!(Fog::AWS.indexed_param('InstanceGroups.member.%d', [*instance_groups]))
end
request({
'Action' => 'CreateDBInstance',
'JobFlowId' => db_name,
:parser => Fog::Parsers::AWS::EMR::AddInstanceGroups.new,
}.merge(options))
end
end
class Mock
def add_instance_groups(db_name, options={})
Fog::Mock.not_implemented
end
end
end
end
end

View file

@ -0,0 +1,106 @@
module Fog
module AWS
class EMR
class Real
require 'fog/aws/parsers/emr/run_job_flow'
# creates and starts running a new job flow
# http://docs.amazonwebservices.com/ElasticMapReduce/latest/API/API_RunJobFlow.html
# ==== Parameters
# * AdditionalInfo <~String> - A JSON string for selecting additional features.
# * BootstrapActions <~Array> - A list of bootstrap actions that will be run before Hadoop is started on the cluster nodes.
# * 'Name'<~String> - The name of the bootstrap action
# * 'ScriptBootstrapAction'<~Array> - The script run by the bootstrap action
# * 'Args' <~Array> - A list of command line arguments to pass to the bootstrap action script
# * 'Path' <~String> - Location of the script to run during a bootstrap action. Can be either a location in Amazon S3 or on a local file system.
# * Instances <~Array> - A specification of the number and type of Amazon EC2 instances on which to run the job flow.
# * 'Ec2KeyName'<~String> - Specifies the name of the Amazon EC2 key pair that can be used to ssh to the master node as the user called "hadoop.
# * 'HadoopVersion'<~String> - "0.18" | "0.20" Specifies the Hadoop version for the job flow
# * 'InstanceCount'<~Integer> - The number of Amazon EC2 instances used to execute the job flow
# * 'InstanceGroups'<~Array> - Configuration for the job flow's instance groups
# * 'BidPrice' <~String> - Bid price for each Amazon EC2 instance in the instance group when launching nodes as Spot Instances, expressed in USD.
# * 'InstanceCount'<~Integer> - Target number of instances for the instance group
# * 'InstanceRole'<~String> - MASTER | CORE | TASK The role of the instance group in the cluster
# * 'InstanceType'<~String> - The Amazon EC2 instance type for all instances in the instance group
# * 'MarketType'<~String> - ON_DEMAND | SPOT Market type of the Amazon EC2 instances used to create a cluster node
# * 'Name'<~String> - Friendly name given to the instance group.
# * 'KeepJobFlowAliveWhenNoSteps' <~Boolean> - Specifies whether the job flow should terminate after completing all steps
# * 'MasterInstanceType'<~String> - The EC2 instance type of the master node
# * 'Placement'<~Array> - Specifies the Availability Zone the job flow will run in
# * 'AvailabilityZone' <~String> - The Amazon EC2 Availability Zone for the job flow.
# * 'SlaveInstanceType'<~String> - The EC2 instance type of the slave nodes
# * 'TerminationProtected'<~Boolean> - Specifies whether to lock the job flow to prevent the Amazon EC2 instances from being terminated by API call, user intervention, or in the event of a job flow error
# * LogUri <~String> - Specifies the location in Amazon S3 to write the log files of the job flow. If a value is not provided, logs are not created
# * Name <~String> - The name of the job flow
# * Steps <~Array> - A list of steps to be executed by the job flow
# * 'ActionOnFailure'<~String> - TERMINATE_JOB_FLOW | CANCEL_AND_WAIT | CONTINUE Specifies the action to take if the job flow step fails
# * 'HadoopJarStep'<~Array> - Specifies the JAR file used for the job flow step
# * 'Args'<~String list> - A list of command line arguments passed to the JAR file's main function when executed.
# * 'Jar'<~String> - A path to a JAR file run during the step.
# * 'MainClass'<~String> - The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file
# * 'Properties'<~Array> - A list of Java properties that are set when the step runs. You can use these properties to pass key value pairs to your main function
# * 'Key'<~String> - The unique identifier of a key value pair
# * 'Value'<~String> - The value part of the identified key
# * 'Name'<~String> - The name of the job flow step
#
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def run_job_flow(name, options={})
if bootstrap_actions = options.delete('BootstrapActions')
options.merge!(Fog::AWS.serialize_keys('BootstrapActions', bootstrap_actions))
end
if instances = options.delete('Instances')
options.merge!(Fog::AWS.serialize_keys('Instances', instances))
end
if steps = options.delete('Steps')
options.merge!(Fog::AWS.serialize_keys('Steps', steps))
end
request({
'Action' => 'RunJobFlow',
'Name' => name,
:parser => Fog::Parsers::AWS::EMR::RunJobFlow.new,
}.merge(options))
end
def run_hive(name, options={})
steps = []
steps << {
'Name' => 'Setup Hive',
'HadoopJarStep' => {
'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar',
'Args' => ['s3://us-east-1.elasticmapreduce/libs/hive/hive-script', '--base-path', 's3://us-east-1.elasticmapreduce/libs/hive/', '--install-hive']},
'ActionOnFailure' => 'TERMINATE_JOB_FLOW'
}
steps << {
'Name' => 'Install Hive Site Configuration',
'HadoopJarStep' => {
'Jar' => 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar',
'Args' => ['s3://us-east-1.elasticmapreduce/libs/hive/hive-script', '--base-path', 's3://us-east-1.elasticmapreduce/libs/hive/', '--install-hive-site', '--hive-site=s3://raybeam.okl/prod/hive/hive-site.xml']},
'ActionOnFailure' => 'TERMINATE_JOB_FLOW'
}
options['Steps'] = steps
if not options['Instances'].nil?
options['Instances']['KeepJobFlowAliveWhenNoSteps'] = true
end
run_job_flow name, options
end
end
class Mock
def run_job_flow(db_name, options={})
Fog::Mock.not_implemented
end
end
end
end
end