mirror of
https://github.com/fog/fog.git
synced 2022-11-09 13:51:43 -05:00
first pass at replacing curb with eventmachine connection for flexibility/speed
This commit is contained in:
parent
80d064be89
commit
3381648dc4
6 changed files with 126 additions and 69 deletions
|
@ -1,2 +1,61 @@
|
|||
require File.dirname(__FILE__) + '/aws/simpledb'
|
||||
require File.dirname(__FILE__) + '/aws/s3'
|
||||
|
||||
require 'rubygems'
|
||||
require 'eventmachine'
|
||||
require 'uri'
|
||||
|
||||
module Fog
|
||||
module AWS
|
||||
class Connection < EventMachine::Connection
|
||||
include EventMachine::Deferrable
|
||||
|
||||
attr_accessor :headers, :method, :url, :parser, :status
|
||||
|
||||
def response
|
||||
@parser.response
|
||||
end
|
||||
|
||||
def post_init
|
||||
@data = nil
|
||||
@headers = {}
|
||||
end
|
||||
|
||||
def connection_completed
|
||||
uri = URI.parse(@url)
|
||||
if uri.scheme == 'https'
|
||||
start_tls
|
||||
else
|
||||
request
|
||||
end
|
||||
end
|
||||
|
||||
def ssl_handshake_completed
|
||||
request
|
||||
end
|
||||
|
||||
def request
|
||||
uri = URI.parse(@url)
|
||||
path = "#{uri.path}#{uri.query.nil? ? "" : "?#{uri.query}"}"
|
||||
host = "#{uri.host}#{uri.port == 80 ? "" : ":#{uri.port}"}"
|
||||
headers.merge!({'Host' => host})
|
||||
send_data("#{method} #{path} HTTP/1.1\r\n#{headers.collect {|k,v| "#{k}: #{v}\r\n"}.join('')}\r\n")
|
||||
end
|
||||
|
||||
def receive_data(data)
|
||||
unless @data
|
||||
if data =~ /\AHTTP\/1\.[01] ([\d]{3})/
|
||||
@status = $1.to_i
|
||||
else
|
||||
@status = 0
|
||||
end
|
||||
@data = data.split(/<\?xml.*\?>/)[1]
|
||||
Nokogiri::XML::SAX::Parser.new(@parser).parse(@data)
|
||||
set_deferred_status(:succeeded, self)
|
||||
EventMachine.stop_event_loop
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,7 +1,6 @@
|
|||
require 'rubygems'
|
||||
require 'base64'
|
||||
require 'cgi'
|
||||
require 'curb'
|
||||
require 'hmac-sha1'
|
||||
|
||||
require File.dirname(__FILE__) + '/s3/parsers'
|
||||
|
@ -34,46 +33,44 @@ module Fog
|
|||
@host = options[:host] || 's3.amazonaws.com'
|
||||
@port = options[:port] || 443
|
||||
@scheme = options[:scheme] || 'https'
|
||||
@connection = Curl::Easy.new("#{@scheme}://#{@host}:#{@port}")
|
||||
end
|
||||
|
||||
def get_service
|
||||
request(:get, "#{@scheme}://#{@host}:#{@port}", Fog::Parsers::AWS::S3::GetServiceParser.new)
|
||||
request('GET', "#{@scheme}://#{@host}:#{@port}/", Fog::Parsers::AWS::S3::GetServiceParser.new)
|
||||
end
|
||||
|
||||
def put_bucket(name)
|
||||
request(:put, "#{@scheme}://#{name}.#{@host}:#{@port}", Fog::Parsers::AWS::S3::BasicParser.new)
|
||||
request('PUT', "#{@scheme}://#{name}.#{@host}:#{@port}/", Fog::Parsers::AWS::S3::BasicParser.new)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def request(method, url, parser, data=nil)
|
||||
@connection.headers['Date'] = Time.now.utc.strftime("%a, %d %b %Y %H:%M:%S +0000")
|
||||
headers = { 'Date' => Time.now.utc.strftime("%a, %d %b %Y %H:%M:%S +0000") }
|
||||
params = [
|
||||
method.to_s.upcase,
|
||||
method,
|
||||
content_md5 = '',
|
||||
content_type = '',
|
||||
@connection.headers['Date'],
|
||||
headers['Date'],
|
||||
canonicalized_amz_headers = nil,
|
||||
canonicalized_resource = '/'
|
||||
]
|
||||
string_to_sign = params.delete_if {|value| value.nil?}.join("\n")
|
||||
hmac = @hmac.update(string_to_sign)
|
||||
signature = Base64.encode64(hmac.digest).strip
|
||||
headers['Authorization'] = "AWS #{@aws_access_key_id}:#{signature}"
|
||||
|
||||
@connection.url = url
|
||||
@connection.headers['Authorization'] = "AWS #{@aws_access_key_id}:#{signature}"
|
||||
case method
|
||||
when :get
|
||||
p @connection.url
|
||||
@connection.http_get
|
||||
when :put
|
||||
@connection.http_put(data)
|
||||
end
|
||||
p @connection.headers
|
||||
p @connection.body_str
|
||||
Nokogiri::XML::SAX::Parser.new(parser).parse(@connection.body_str)
|
||||
parser.result
|
||||
response = nil
|
||||
EventMachine::run {
|
||||
http = EventMachine.connect(@host, @port, Fog::AWS::Connection) {|connection|
|
||||
connection.headers = headers
|
||||
connection.method = method
|
||||
connection.parser = parser
|
||||
connection.url = url
|
||||
}
|
||||
http.callback {|http| response = http.response}
|
||||
}
|
||||
response
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -7,14 +7,14 @@ module Fog
|
|||
|
||||
class BasicParser < Fog::Parsers::Base
|
||||
|
||||
attr_reader :result
|
||||
attr_reader :response
|
||||
|
||||
def initialize
|
||||
reset
|
||||
end
|
||||
|
||||
def reset
|
||||
@result = {}
|
||||
@response = {}
|
||||
end
|
||||
|
||||
def characters(string)
|
||||
|
@ -31,20 +31,20 @@ module Fog
|
|||
|
||||
def reset
|
||||
@bucket = {}
|
||||
@result = { :owner => {}, :buckets => [] }
|
||||
@response = { :owner => {}, :buckets => [] }
|
||||
end
|
||||
|
||||
def end_element(name)
|
||||
case name
|
||||
when 'Bucket'
|
||||
@result[:buckets] << @bucket
|
||||
@response[:buckets] << @bucket
|
||||
@bucket = {}
|
||||
when 'CreationDate'
|
||||
@bucket[:creation_date] = @value
|
||||
when 'DisplayName'
|
||||
@result[:owner][:display_name] = @value
|
||||
@response[:owner][:display_name] = @value
|
||||
when 'ID'
|
||||
@result[:owner][:id] = @value
|
||||
@response[:owner][:id] = @value
|
||||
when 'Name'
|
||||
@bucket[:name] = @value
|
||||
end
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
require 'rubygems'
|
||||
require 'base64'
|
||||
require 'cgi'
|
||||
require 'curb'
|
||||
require 'hmac-sha2'
|
||||
|
||||
require File.dirname(__FILE__) + '/simpledb/parsers'
|
||||
|
@ -36,7 +35,6 @@ module Fog
|
|||
@nil_string = options[:nil_string]|| 'nil'
|
||||
@port = options[:port] || 443
|
||||
@scheme = options[:scheme] || 'https'
|
||||
@connection = Curl::Easy.new("#{@scheme}://#{@host}:#{@port}")
|
||||
end
|
||||
|
||||
# Create a SimpleDB domain
|
||||
|
@ -285,20 +283,23 @@ module Fog
|
|||
query << "#{key}=#{CGI.escape(params[key]).gsub(/\+/, '%20')}&"
|
||||
end
|
||||
|
||||
method = query.length > 2000 ? 'POST' : 'GET'
|
||||
string_to_sign = "#{method}\n#{@host}\n/\n" << query.chop
|
||||
# FIXME: use 'POST' for larger requests
|
||||
# method = query.length > 2000 ? 'POST' : 'GET'
|
||||
method = 'GET'
|
||||
string_to_sign = "#{method}\n#{@host + (@port == 80 ? "" : ":#{@port}")}\n/\n" << query.chop
|
||||
hmac = @hmac.update(string_to_sign)
|
||||
query << "Signature=#{CGI.escape(Base64.encode64(hmac.digest).strip).gsub(/\+/, '%20')}"
|
||||
|
||||
if method == 'GET'
|
||||
@connection.url = "#{@scheme}://#{@host}:#{@port}/?#{query}"
|
||||
@connection.http_get
|
||||
else
|
||||
@connection.url = "#{@scheme}://#{@host}:#{@port}"
|
||||
@connection.http_post(query)
|
||||
end
|
||||
Nokogiri::XML::SAX::Parser.new(parser).parse(@connection.body_str)
|
||||
parser.result
|
||||
response = nil
|
||||
EventMachine::run {
|
||||
http = EventMachine.connect(@host, @port, Fog::AWS::Connection) {|connection|
|
||||
connection.method = method
|
||||
connection.parser = parser
|
||||
connection.url = "#{@scheme}://#{@host}:#{@port}/#{method == 'GET' ? "?#{query}" : ""}"
|
||||
}
|
||||
http.callback {|http| response = http.response}
|
||||
}
|
||||
response
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -13,8 +13,8 @@ module Fog
|
|||
|
||||
def end_element(name)
|
||||
case(name)
|
||||
when 'BoxUsage' then result[:box_usage] = @value.to_f
|
||||
when 'RequestId' then result[:request_id] = @value
|
||||
when 'BoxUsage' then response[:box_usage] = @value.to_f
|
||||
when 'RequestId' then response[:request_id] = @value
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -27,15 +27,15 @@ module Fog
|
|||
class ListDomainsParser < Fog::Parsers::AWS::SimpleDB::BasicParser
|
||||
|
||||
def reset
|
||||
@result = { :domains => [] }
|
||||
@response = { :domains => [] }
|
||||
end
|
||||
|
||||
def end_element(name)
|
||||
case(name)
|
||||
when 'BoxUsage' then result[:box_usage] = @value.to_f
|
||||
when 'DomainName' then result[:domains] << @value
|
||||
when 'NextToken' then result[:next_token] = @value
|
||||
when 'RequestId' then result[:request_id] = @value
|
||||
when 'BoxUsage' then response[:box_usage] = @value.to_f
|
||||
when 'DomainName' then response[:domains] << @value
|
||||
when 'NextToken' then response[:next_token] = @value
|
||||
when 'RequestId' then response[:request_id] = @value
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -44,20 +44,20 @@ module Fog
|
|||
class DomainMetadataParser < Fog::Parsers::AWS::SimpleDB::BasicParser
|
||||
|
||||
def reset
|
||||
@result = {}
|
||||
@response = {}
|
||||
end
|
||||
|
||||
def end_element(name)
|
||||
case name
|
||||
when 'AttributeNameCount' then result[:attribute_name_count] = @value.to_i
|
||||
when 'AttributeNamesSizeBytes' then result[:attribute_names_size_bytes] = @value.to_i
|
||||
when 'AttributeValueCount' then result[:attribute_value_count] = @value.to_i
|
||||
when 'AttributeValuesSizeBytes' then result[:attribute_values_size_bytes] = @value.to_i
|
||||
when 'BoxUsage' then result[:box_usage] = @value.to_f
|
||||
when 'ItemCount' then result[:item_count] = @value.to_i
|
||||
when 'ItemNamesSizeBytes' then result[:item_names_size_bytes] = @value.to_i
|
||||
when 'RequestId' then result[:request_id] = @value
|
||||
when 'Timestamp' then result[:timestamp] = @value
|
||||
when 'AttributeNameCount' then response[:attribute_name_count] = @value.to_i
|
||||
when 'AttributeNamesSizeBytes' then response[:attribute_names_size_bytes] = @value.to_i
|
||||
when 'AttributeValueCount' then response[:attribute_value_count] = @value.to_i
|
||||
when 'AttributeValuesSizeBytes' then response[:attribute_values_size_bytes] = @value.to_i
|
||||
when 'BoxUsage' then response[:box_usage] = @value.to_f
|
||||
when 'ItemCount' then response[:item_count] = @value.to_i
|
||||
when 'ItemNamesSizeBytes' then response[:item_names_size_bytes] = @value.to_i
|
||||
when 'RequestId' then response[:request_id] = @value
|
||||
when 'Timestamp' then response[:timestamp] = @value
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -67,15 +67,15 @@ module Fog
|
|||
|
||||
def reset
|
||||
@attribute = nil
|
||||
@result = { :attributes => {} }
|
||||
@response = { :attributes => {} }
|
||||
end
|
||||
|
||||
def end_element(name)
|
||||
case name
|
||||
when 'BoxUsage' then result[:box_usage] = @value.to_f
|
||||
when 'BoxUsage' then response[:box_usage] = @value.to_f
|
||||
when 'Name' then @attribute = @value
|
||||
when 'RequestId' then result[:request_id] = @value
|
||||
when 'Value' then (result[:attributes][@attribute] ||= []) << sdb_decode(@value)
|
||||
when 'RequestId' then response[:request_id] = @value
|
||||
when 'Value' then (response[:attributes][@attribute] ||= []) << sdb_decode(@value)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -85,17 +85,17 @@ module Fog
|
|||
|
||||
def reset
|
||||
@item_name = @attribute_name = nil
|
||||
@result = { :items => {} }
|
||||
@response = { :items => {} }
|
||||
end
|
||||
|
||||
def end_element(name)
|
||||
case name
|
||||
when 'BoxUsage' then result[:box_usage] = @value.to_f
|
||||
when 'BoxUsage' then response[:box_usage] = @value.to_f
|
||||
when 'Item' then @item_name = @attribute_name = nil
|
||||
when 'Name' then @item_name.nil? ? @item_name = @value : @attribute_name = @value
|
||||
when 'NextToken' then result[:next_token] = @value
|
||||
when 'RequestId' then result[:request_id] = @value
|
||||
when 'Value' then ((result[:items][@item_name] ||= {})[@attribute_name] ||= []) << sdb_decode(@value)
|
||||
when 'NextToken' then response[:next_token] = @value
|
||||
when 'RequestId' then response[:request_id] = @value
|
||||
when 'Value' then ((response[:items][@item_name] ||= {})[@attribute_name] ||= []) << sdb_decode(@value)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -5,14 +5,14 @@ module Fog
|
|||
module Parsers
|
||||
class Base < Nokogiri::XML::SAX::Document
|
||||
|
||||
attr_reader :result
|
||||
attr_reader :response
|
||||
|
||||
def initialize
|
||||
reset
|
||||
end
|
||||
|
||||
def reset
|
||||
@result = {}
|
||||
@response = {}
|
||||
end
|
||||
|
||||
def characters(string)
|
||||
|
|
Loading…
Add table
Reference in a new issue