1
0
Fork 0
mirror of https://github.com/fog/fog.git synced 2022-11-09 13:51:43 -05:00

Merge pull request #1572 from kbarrette/data_pipeline

[AWS] Rudimentary AWS Data Pipeline functionality
This commit is contained in:
Wesley Beary 2013-02-18 09:54:10 -08:00
commit b6eb57196d
15 changed files with 591 additions and 0 deletions

View file

@ -13,6 +13,7 @@ module Fog
service(:compute, 'aws/compute', 'Compute')
service(:cloud_formation, 'aws/cloud_formation', 'CloudFormation')
service(:cloud_watch, 'aws/cloud_watch', 'CloudWatch')
service(:data_pipeline, 'aws/data_pipeline', 'DataPipeline')
service(:dynamodb, 'aws/dynamodb', 'DynamoDB')
service(:dns, 'aws/dns', 'DNS')
service(:elasticache, 'aws/elasticache', 'Elasticache')

View file

@ -0,0 +1,114 @@
require 'fog/aws'
module Fog
module AWS
class DataPipeline < Fog::Service
extend Fog::AWS::CredentialFetcher::ServiceMethods
requires :aws_access_key_id, :aws_secret_access_key
recognizes :region, :host, :path, :port, :scheme, :persistent, :use_iam_profile, :aws_session_token, :aws_credentials_expire_at
request_path 'fog/aws/requests/data_pipeline'
request :activate_pipeline
request :create_pipeline
request :delete_pipeline
request :describe_pipelines
request :list_pipelines
request :put_pipeline_definition
model_path 'fog/aws/models/data_pipeline'
model :pipeline
collection :pipelines
class Mock
def initialize(options={})
Fog::Mock.not_implemented
end
end
class Real
attr_reader :region
include Fog::AWS::CredentialFetcher::ConnectionMethods
# Initialize connection to DataPipeline
#
# ==== Notes
# options parameter must include values for :aws_access_key_id and
# :aws_secret_access_key in order to create a connection
#
# ==== Examples
# datapipeline = DataPipeline.new(
# :aws_access_key_id => your_aws_access_key_id,
# :aws_secret_access_key => your_aws_secret_access_key
# )
#
# ==== Parameters
# * options<~Hash> - config arguments for connection. Defaults to {}.
# * region<~String> - optional region to use. For instance, 'eu-west-1', 'us-east-1' and etc.
#
# ==== Returns
# * DataPipeline object with connection to AWS.
def initialize(options={})
@use_iam_profile = options[:use_iam_profile]
@connection_options = options[:connection_options] || {}
@version = '2012-10-29'
@region = options[:region] || 'us-east-1'
@host = options[:host] || "datapipeline.#{@region}.amazonaws.com"
@path = options[:path] || '/'
@persistent = options[:persistent] || false
@port = options[:port] || 443
@scheme = options[:scheme] || 'https'
@connection = Fog::Connection.new("#{@scheme}://#{@host}:#{@port}#{@path}", @persistent, @connection_options)
setup_credentials(options)
end
def owner_id
@owner_id ||= security_groups.get('default').owner_id
end
def reload
@connection.reset
end
private
def setup_credentials(options)
@aws_access_key_id = options[:aws_access_key_id]
@aws_secret_access_key = options[:aws_secret_access_key]
@aws_session_token = options[:aws_session_token]
@aws_credentials_expire_at = options[:aws_credentials_expire_at]
@signer = Fog::AWS::SignatureV4.new(@aws_access_key_id, @aws_secret_access_key, @region, 'datapipeline')
end
def request(params)
refresh_credentials_if_expired
# Params for all DataPipeline requests
params.merge!({
:expects => 200,
:host => @host,
:method => :post,
:path => '/',
})
date = Fog::Time.now
params[:headers] = {
'Date' => date.to_date_header,
'Host' => @host,
'X-Amz-Date' => date.to_iso8601_basic,
'Content-Type' => 'application/x-amz-json-1.1',
'Content-Length' => params[:body].bytesize.to_s,
}.merge!(params[:headers] || {})
params[:headers]['x-amz-security-token'] = @aws_session_token if @aws_session_token
params[:headers]['Authorization'] = @signer.sign(params, date)
response = @connection.request(params)
response
end
end
end
end
end

View file

@ -0,0 +1,67 @@
require 'fog/core/model'
module Fog
module AWS
class DataPipeline
class Pipeline < Fog::Model
identity :id, :aliases => 'pipelineId'
attribute :name
attribute :description
attribute :user_id, :aliases => 'userId'
attribute :account_id, :aliases => 'accountId'
attribute :state, :aliases => 'pipelineState'
attribute :unique_id, :aliases => 'uniqueId'
def initialize(attributes={})
# Extract the 'fields' portion of a response to attributes
if attributes.include?('fields')
string_fields = attributes['fields'].select { |f| f.include?('stringValue') }
field_attributes = Hash[string_fields.map { |f| [f['key'][/^@(.+)$/, 1], f['stringValue']] }]
merge_attributes(field_attributes)
end
super
end
def save
requires :name
requires :unique_id
data = service.create_pipeline(unique_id, name)
merge_attributes(data)
true
end
def activate
requires :id
service.activate_pipeline(id)
true
end
def put(objects)
requires :id
service.put_pipeline_definition(id, objects)
true
end
def destroy
requires :id
service.delete_pipeline(id)
true
end
end
end
end
end

View file

@ -0,0 +1,36 @@
require 'fog/core/collection'
require 'fog/aws/models/data_pipeline/pipeline'
module Fog
module AWS
class DataPipeline
class Pipelines < Fog::Collection
model Fog::AWS::DataPipeline::Pipeline
def all
ids = []
begin
result = service.list_pipelines
ids << result['pipelineIdList'].map { |id| id['id'] }
end while (result['hasMoreResults'] && result['marker'])
load(service.describe_pipelines(ids.flatten)['pipelineDescriptionList'])
end
def get(id)
data = service.describe_pipelines([id])['pipelineDescriptionList'].first
new(data)
rescue Excon::Errors::BadRequest => error
data = Fog::JSON.decode(error.response.body)
raise unless data['__type'] == 'PipelineDeletedException' || data['__type'] == 'PipelineNotFoundException'
nil
end
end
end
end
end

View file

@ -0,0 +1,35 @@
module Fog
module AWS
class DataPipeline
class Real
# Activate a pipeline
# http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_ActivatePipeline.html
# ==== Parameters
# * PipelineId <~String> - The ID of the pipeline to activate
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def activate_pipeline(id)
params = { 'pipelineId' => id }
response = request({
:body => Fog::JSON.encode(params),
:headers => { 'X-Amz-Target' => 'DataPipeline.ActivatePipeline' },
})
Fog::JSON.decode(response.body)
end
end
class Mock
def activate_pipeline(id)
Fog::Mock.not_implemented
end
end
end
end
end

View file

@ -0,0 +1,41 @@
module Fog
module AWS
class DataPipeline
class Real
# Create a pipeline
# http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_CreatePipeline.html
# ==== Parameters
# * UniqueId <~String> - A unique ID for of the pipeline
# * Name <~String> - The name of the pipeline
# * Description <~String> - Description of the pipeline
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def create_pipeline(unique_id, name, description=nil)
params = {
'uniqueId' => unique_id,
'name' => name,
}
params['Description'] = description if description
response = request({
:body => Fog::JSON.encode(params),
:headers => { 'X-Amz-Target' => 'DataPipeline.CreatePipeline' },
})
Fog::JSON.decode(response.body)
end
end
class Mock
def create_pipeline(unique_id, name, description=nil)
Fog::Mock.not_implemented
end
end
end
end
end

View file

@ -0,0 +1,35 @@
module Fog
module AWS
class DataPipeline
class Real
# Delete a pipeline
# http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_DeletePipeline.html
# ==== Parameters
# * PipelineId <~String> - The id of the pipeline to delete
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def delete_pipeline(id)
params = { 'pipelineId' => id }
response = request({
:body => Fog::JSON.encode(params),
:headers => { 'X-Amz-Target' => 'DataPipeline.DeletePipeline' },
})
Fog::JSON.decode(response.body)
end
end
class Mock
def delete_pipeline(id)
Fog::Mock.not_implemented
end
end
end
end
end

View file

@ -0,0 +1,36 @@
module Fog
module AWS
class DataPipeline
class Real
# Describe pipelines
# http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_DescribePipelines.html
# ==== Parameters
# * PipelineIds <~String> - ID of pipeline to retrieve information for
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def describe_pipelines(ids)
params = {}
params['pipelineIds'] = ids
response = request({
:body => Fog::JSON.encode(params),
:headers => { 'X-Amz-Target' => 'DataPipeline.DescribePipelines' },
})
Fog::JSON.decode(response.body)
end
end
class Mock
def describe_pipelines(ids)
Fog::Mock.not_implemented
end
end
end
end
end

View file

@ -0,0 +1,36 @@
module Fog
module AWS
class DataPipeline
class Real
# List all pipelines
# http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_ListPipelines.html
# ==== Parameters
# * Marker <~String> - The starting point for the results to be returned.
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def list_pipelines(options={})
params = {}
params['Marker'] = options[:marker] if options[:marker]
response = request({
:body => Fog::JSON.encode(params),
:headers => { 'X-Amz-Target' => 'DataPipeline.ListPipelines' },
})
Fog::JSON.decode(response.body)
end
end
class Mock
def list_pipelines(options={})
Fog::Mock.not_implemented
end
end
end
end
end

View file

@ -0,0 +1,72 @@
module Fog
module AWS
class DataPipeline
class Real
# Put raw pipeline definition JSON
# http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_PutPipelineDefinition.html
# ==== Parameters
# * PipelineId <~String> - The ID of the pipeline
# * PipelineObjects <~String> - Objects in the pipeline
# ==== Returns
# * response<~Excon::Response>:
# * body<~Hash>:
def put_pipeline_definition(id, objects)
params = {
'pipelineId' => id,
'pipelineObjects' => transform_objects(objects),
}
response = request({
:body => Fog::JSON.encode(params),
:headers => { 'X-Amz-Target' => 'DataPipeline.PutPipelineDefinition' },
})
Fog::JSON.decode(response.body)
end
# Take a list of pipeline object hashes as specified in the Data Pipeline JSON format
# and transform it into the format expected by the API
private
def transform_objects(objects)
output = []
objects.each do |object|
new_object = {}
new_object['id'] = object.delete('id')
new_object['name'] = object.delete('name') || new_object['id']
new_object['fields'] = []
object.each do |key, value|
if value.is_a?(Hash)
new_object['fields'] << { 'key' => key, 'refValue' => value['ref'] }
elsif value.is_a?(Array)
value.each do |v|
new_object['fields'] << { 'key' => key, 'stringValue' => v }
end
else
new_object['fields'] << { 'key' => key, 'stringValue' => value }
end
end
output << new_object
end
output
end
end
class Mock
def put_pipeline_definition(id, objects)
Fog::Mock.not_implemented
end
end
end
end
end

View file

@ -15,6 +15,8 @@ class AWS < Fog::Bin
Fog::AWS::CloudWatch
when :compute
Fog::Compute::AWS
when :data_pipeline
Fog::AWS::DataPipeline
when :ddb, :dynamodb
Fog::AWS::DynamoDB
when :dns
@ -68,6 +70,8 @@ class AWS < Fog::Bin
when :compute
Fog::Logger.warning("AWS[:compute] is not recommended, use Compute[:aws] for portability")
Fog::Compute.new(:provider => 'AWS')
when :data_pipeline
Fog::AWS::DataPipeline
when :ddb, :dynamodb
Fog::AWS::DynamoDB.new
when :dns

View file

@ -0,0 +1,8 @@
Shindo.tests("AWS::DataPipeline | pipelines", ['aws', 'data_pipeline']) do
pending if Fog.mocking?
unique_id = uniq_id
model_tests(Fog::AWS[:data_pipeline].pipelines, { :id => unique_id, :name => "#{unique_id}-name", :unique_id => unique_id }) do
@instance.wait_for { state }
end
end

View file

@ -0,0 +1,8 @@
Shindo.tests("AWS::DataPipeline | pipelines", ['aws', 'data_pipeline']) do
pending if Fog.mocking?
unique_id = uniq_id
collection_tests(Fog::AWS[:data_pipeline].pipelines, { :id => unique_id, :name => "#{unique_id}-name", :unique_id => unique_id }) do
@instance.wait_for { state }
end
end

View file

@ -0,0 +1,44 @@
class AWS
module DataPipeline
module Formats
BASIC = {
'pipelineId' => String,
}
LIST_PIPELINES = {
"hasMoreResults" => Fog::Nullable::Boolean,
"marker" => Fog::Nullable::String,
"pipelineIdList" => [
{
"id" => String,
"name" => String,
}
]
}
DESCRIBE_PIPELINES = {
"pipelineDescriptionList" => [
{
"description" => Fog::Nullable::String,
"name" => String,
"pipelineId" => String,
"fields" => [
{
"key" => String,
"refValue" => Fog::Nullable::String,
"stringValue" => Fog::Nullable::String,
}
]
}
]
}
PUT_PIPELINE_DEFINITION = {
"errored" => Fog::Boolean,
"validationErrors" => Fog::Nullable::Array,
}
end
end
end

View file

@ -0,0 +1,54 @@
Shindo.tests('AWS::DataPipeline | pipeline_tests', ['aws', 'data_pipeline']) do
pending if Fog.mocking?
@pipeline_id = nil
tests('success') do
tests("#create_pipeline").formats(AWS::DataPipeline::Formats::BASIC) do
unique_id = 'fog-test-pipeline-unique-id'
name = 'fog-test-pipeline-name'
description = 'Fog test pipeline'
result = Fog::AWS[:data_pipeline].create_pipeline(unique_id, name, description)
@pipeline_id = result['pipelineId']
result
end
tests("#list_pipelines").formats(AWS::DataPipeline::Formats::LIST_PIPELINES) do
Fog::AWS[:data_pipeline].list_pipelines()
end
tests("#describe_pipelines").formats(AWS::DataPipeline::Formats::DESCRIBE_PIPELINES) do
ids = [@pipeline_id]
Fog::AWS[:data_pipeline].describe_pipelines(ids)
end
tests("#put_pipeline_definition").formats(AWS::DataPipeline::Formats::PUT_PIPELINE_DEFINITION) do
objects = [
{
"id" => "Nightly",
"type" => "Schedule",
"startDateTime" => Time.now.strftime("%Y-%m-%dT%H:%M:%S"),
"period" => "24 hours",
},
{
"id" => "Default",
"role" => "role-dumps",
"resourceRole" => "role-dumps-inst",
"schedule" => { "ref" => "Nightly" },
},
]
Fog::AWS[:data_pipeline].put_pipeline_definition(@pipeline_id, objects)
end
tests("#activate_pipeline") do
Fog::AWS[:data_pipeline].activate_pipeline(@pipeline_id)
end
tests("#delete_pipeline") do
Fog::AWS[:data_pipeline].delete_pipeline(@pipeline_id)
end
end
end