mirror of
https://github.com/fog/fog.git
synced 2022-11-09 13:51:43 -05:00
persistent connections
This commit is contained in:
parent
ccdebb0163
commit
22995aff83
5 changed files with 79 additions and 108 deletions
125
lib/fog/aws.rb
125
lib/fog/aws.rb
|
@ -2,99 +2,84 @@ require File.dirname(__FILE__) + '/aws/simpledb'
|
||||||
require File.dirname(__FILE__) + '/aws/s3'
|
require File.dirname(__FILE__) + '/aws/s3'
|
||||||
|
|
||||||
require 'rubygems'
|
require 'rubygems'
|
||||||
require 'eventmachine'
|
require 'openssl'
|
||||||
|
require 'socket'
|
||||||
require 'uri'
|
require 'uri'
|
||||||
|
|
||||||
module Fog
|
module Fog
|
||||||
module AWS
|
module AWS
|
||||||
class Connection < EventMachine::Connection
|
class Connection
|
||||||
attr_accessor :scheme
|
|
||||||
attr_reader :request
|
|
||||||
|
|
||||||
def post_init
|
def initialize(url)
|
||||||
@connected = EM::DefaultDeferrable.new
|
@uri = URI.parse(url)
|
||||||
|
@connection = TCPSocket.open(@uri.host, @uri.port)
|
||||||
|
if @uri.scheme == 'https'
|
||||||
|
@ssl_context = OpenSSL::SSL::SSLContext.new
|
||||||
|
@ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
|
||||||
|
@connection = OpenSSL::SSL::SSLSocket.new(@connection, @ssl_context)
|
||||||
|
@connection.sync_close = true
|
||||||
|
@connection.connect
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def connection_completed
|
def request(params)
|
||||||
start_tls if @scheme == 'https'
|
params = {
|
||||||
@connected.succeed
|
:headers => {}
|
||||||
end
|
}.merge(params)
|
||||||
|
uri = URI.parse(params[:url])
|
||||||
def send(request)
|
|
||||||
@request = request
|
|
||||||
@connected.callback { @request.execute }
|
|
||||||
@request
|
|
||||||
end
|
|
||||||
|
|
||||||
def receive_data(data)
|
|
||||||
p data
|
|
||||||
@request.receive_data(data)
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
class Request
|
|
||||||
include EventMachine::Deferrable
|
|
||||||
|
|
||||||
attr_accessor :body, :headers, :method, :parser, :url
|
|
||||||
attr_reader :response
|
|
||||||
|
|
||||||
def initialize(connection)
|
|
||||||
@connection = connection
|
|
||||||
@headers ||= {}
|
|
||||||
@response ||= Fog::AWS::Response.new
|
|
||||||
end
|
|
||||||
|
|
||||||
def execute
|
|
||||||
uri = URI.parse(@url)
|
|
||||||
path = "#{uri.path}"
|
path = "#{uri.path}"
|
||||||
path << "?#{uri.query}" if uri.query
|
path << "?#{uri.query}" if uri.query
|
||||||
host = "#{uri.host}"
|
host = "#{uri.host}"
|
||||||
host << ":#{uri.port}" unless uri.port == 80
|
host << ":#{uri.port}" if uri.scheme == "http" && uri.port != 80
|
||||||
@headers.merge!({'Host' => host})
|
host << ":#{uri.port}" if uri.scheme == "https" && uri.port != 443
|
||||||
request = "#{method} #{path} HTTP/1.1\r\n"
|
|
||||||
for key, value in headers
|
request = "#{params[:method]} #{path} HTTP/1.1\r\n"
|
||||||
|
params[:headers]['Host'] = uri.host
|
||||||
|
params[:headers]['Content-Length'] = (params[:body].length) if params[:body]
|
||||||
|
for key, value in params[:headers]
|
||||||
request << "#{key}: #{value}\r\n"
|
request << "#{key}: #{value}\r\n"
|
||||||
end
|
end
|
||||||
request << "\r\n#{@body}" if @body
|
|
||||||
request << "\r\n"
|
request << "\r\n"
|
||||||
p request
|
request << params[:body] if params[:body]
|
||||||
@connection.send_data(request)
|
@connection.write(request)
|
||||||
|
|
||||||
|
response = AWS::Response.new
|
||||||
|
@connection.readline =~ /\AHTTP\/1.1 ([\d]{3})/
|
||||||
|
response.status = $1.to_i
|
||||||
|
while true
|
||||||
|
data = @connection.readline
|
||||||
|
break if data == "\r\n"
|
||||||
|
if header = data.match(/(.*):\s(.*)\r\n/)
|
||||||
|
response.headers[header[1]] = header[2]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
if response.headers['Content-Length']
|
||||||
|
content_length = response.headers['Content-Length'].to_i
|
||||||
|
response.body << @connection.read(content_length)
|
||||||
|
elsif response.headers['Transfer-Encoding'] == 'chunked'
|
||||||
|
while true
|
||||||
|
@connection.readline =~ /([a-f0-9]*)\r\n/i
|
||||||
|
chunk_size = $1.to_i(16) + 2 # 2 = "/r/n".length
|
||||||
|
response.body << @connection.read(chunk_size)
|
||||||
|
break if $1.to_i(16) == 0
|
||||||
|
end
|
||||||
|
end
|
||||||
|
response
|
||||||
end
|
end
|
||||||
|
|
||||||
def receive_data(data)
|
|
||||||
p data
|
|
||||||
unless @data
|
|
||||||
if data =~ /\AHTTP\/1\.[01] ([\d]{3})/
|
|
||||||
@response.status = $1.to_i
|
|
||||||
else
|
|
||||||
@response.status = 0
|
|
||||||
end
|
|
||||||
@headers, @data = data.split("\r\n\r\n")
|
|
||||||
for header in @headers.split("\r\n")
|
|
||||||
if data = header.match(/(.*):\s(.*)/)
|
|
||||||
@response.headers[data[1]] = data[2]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
if @parser && @data
|
|
||||||
Nokogiri::XML::SAX::Parser.new(@parser).parse(@data.split(/<\?xml.*\?>/)[1])
|
|
||||||
@response.body = @parser.response
|
|
||||||
elsif @data
|
|
||||||
@response.body = @data
|
|
||||||
end
|
|
||||||
set_deferred_status(:succeeded, self)
|
|
||||||
EventMachine.stop_event_loop
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
class Response
|
class Response
|
||||||
|
|
||||||
attr_accessor :status, :headers, :body
|
attr_accessor :status, :headers, :body
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
|
@body = ''
|
||||||
@headers = {}
|
@headers = {}
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -36,13 +36,7 @@ module Fog
|
||||||
@host = options[:host] || 's3.amazonaws.com'
|
@host = options[:host] || 's3.amazonaws.com'
|
||||||
@port = options[:port] || 443
|
@port = options[:port] || 443
|
||||||
@scheme = options[:scheme] || 'https'
|
@scheme = options[:scheme] || 'https'
|
||||||
|
@connection = AWS::Connection.new("#{@scheme}://#{@host}:#{@port}")
|
||||||
EventMachine::run {
|
|
||||||
@connection = EventMachine.connect(@host, @port, Fog::AWS::Connection) {|connection|
|
|
||||||
connection.scheme = @scheme
|
|
||||||
}
|
|
||||||
EventMachine.stop_event_loop
|
|
||||||
}
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# List information about S3 buckets for authorized user
|
# List information about S3 buckets for authorized user
|
||||||
|
@ -246,7 +240,7 @@ module Fog
|
||||||
metadata
|
metadata
|
||||||
end
|
end
|
||||||
|
|
||||||
def request(method, url, parser, headers = {}, data = nil)
|
def request(method, url, parser, headers = {}, body = nil)
|
||||||
uri = URI.parse(url)
|
uri = URI.parse(url)
|
||||||
headers['Date'] = Time.now.utc.strftime("%a, %d %b %Y %H:%M:%S +0000")
|
headers['Date'] = Time.now.utc.strftime("%a, %d %b %Y %H:%M:%S +0000")
|
||||||
params = [
|
params = [
|
||||||
|
@ -262,22 +256,18 @@ module Fog
|
||||||
signature = Base64.encode64(hmac.digest).strip
|
signature = Base64.encode64(hmac.digest).strip
|
||||||
headers['Authorization'] = "AWS #{@aws_access_key_id}:#{signature}"
|
headers['Authorization'] = "AWS #{@aws_access_key_id}:#{signature}"
|
||||||
|
|
||||||
response = nil
|
response = @connection.request({
|
||||||
EventMachine::run {
|
:body => body,
|
||||||
http = EventMachine.connect(@host, @port, Fog::AWS::Connection) {|connection|
|
:headers => headers,
|
||||||
connection.scheme = @scheme
|
:method => method,
|
||||||
}
|
:url => url
|
||||||
|
})
|
||||||
|
|
||||||
request = Fog::AWS::Request.new(http)
|
if parser && !response.body.empty?
|
||||||
request.body = data
|
Nokogiri::XML::SAX::Parser.new(parser).parse(response.body.split(/<\?xml.*\?>/)[1])
|
||||||
request.headers = headers
|
response.body = parser.response
|
||||||
request.method = method
|
end
|
||||||
request.parser = parser
|
|
||||||
request.url = url
|
|
||||||
http.send(request)
|
|
||||||
|
|
||||||
request.callback {|request| response = request.response}
|
|
||||||
}
|
|
||||||
response
|
response
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ module Fog
|
||||||
@nil_string = options[:nil_string]|| 'nil'
|
@nil_string = options[:nil_string]|| 'nil'
|
||||||
@port = options[:port] || 443
|
@port = options[:port] || 443
|
||||||
@scheme = options[:scheme] || 'https'
|
@scheme = options[:scheme] || 'https'
|
||||||
|
@connection = AWS::Connection.new("#{@scheme}://#{@host}:#{@port}")
|
||||||
end
|
end
|
||||||
|
|
||||||
# Create a SimpleDB domain
|
# Create a SimpleDB domain
|
||||||
|
@ -295,24 +296,20 @@ module Fog
|
||||||
# FIXME: use 'POST' for larger requests
|
# FIXME: use 'POST' for larger requests
|
||||||
# method = query.length > 2000 ? 'POST' : 'GET'
|
# method = query.length > 2000 ? 'POST' : 'GET'
|
||||||
method = 'GET'
|
method = 'GET'
|
||||||
string_to_sign = "#{method}\n#{@host + (@port == 80 ? "" : ":#{@port}")}\n/\n" << query.chop
|
string_to_sign = "#{method}\n#{@host}\n/\n" << query.chop
|
||||||
hmac = @hmac.update(string_to_sign)
|
hmac = @hmac.update(string_to_sign)
|
||||||
query << "Signature=#{CGI.escape(Base64.encode64(hmac.digest).strip).gsub(/\+/, '%20')}"
|
query << "Signature=#{CGI.escape(Base64.encode64(hmac.digest).strip).gsub(/\+/, '%20')}"
|
||||||
|
|
||||||
response = nil
|
response = @connection.request({
|
||||||
EventMachine::run {
|
:method => method,
|
||||||
http = EventMachine.connect(@host, @port, Fog::AWS::Connection) {|connection|
|
:url => "#{@scheme}://#{@host}:#{@port}/#{method == 'GET' ? "?#{query}" : ""}"
|
||||||
connection.scheme = @scheme
|
})
|
||||||
}
|
|
||||||
|
|
||||||
request = Fog::AWS::Request.new(http)
|
if parser && !response.body.empty?
|
||||||
request.method = method
|
Nokogiri::XML::SAX::Parser.new(parser).parse(response.body.split(/<\?xml.*\?>/)[1])
|
||||||
request.parser = parser
|
response.body = parser.response
|
||||||
request.url = "#{@scheme}://#{@host}:#{@port}/#{method == 'GET' ? "?#{query}" : ""}"
|
end
|
||||||
http.send(request)
|
|
||||||
|
|
||||||
request.callback {|http| response = request.response}
|
|
||||||
}
|
|
||||||
response
|
response
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ describe 'S3.get_service' do
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'should include foggetservice in get_service' do
|
it 'should include foggetservice in get_service' do
|
||||||
lambda { s3.get_service }.should eventually { |expected| expected.body[:buckets].collect { |bucket| bucket[:name] }.should include('list_domains') }
|
lambda { s3.get_service }.should eventually { |expected| expected.body[:buckets].collect { |bucket| bucket[:name] }.should include('foggetservice') }
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -5,7 +5,6 @@ $LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||||
require 'fog'
|
require 'fog'
|
||||||
|
|
||||||
Spec::Runner.configure do |config|
|
Spec::Runner.configure do |config|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
require 'fog/aws'
|
require 'fog/aws'
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue