1
0
Fork 0
mirror of https://github.com/fog/fog.git synced 2022-11-09 13:51:43 -05:00
fog--fog/lib/fog/rackspace/queues.rb
Paul Thornthwaite 0e1daf3ddd [GH-2711] Replace Fog::Connection with XML shim
Unlike last attempt this replaces Fog::Connection with
Fog::XML::Connection which should be directly compatible.

Fog::Connection is there for old PRs but should be removed real soon.

Providers using JSON should be able to replace "XML" with "Core" within
their code to cut down on the dependency.

If I get the time I may attempt to clean up some but testing with Mock
will mean that is mostly educated guesswork.
2014-02-27 00:54:17 +00:00

401 lines
12 KiB
Ruby

require 'fog/rackspace/core'
module Fog
module Rackspace
class Queues < Fog::Service
include Fog::Rackspace::Errors
class ServiceError < Fog::Rackspace::Errors::ServiceError; end
class InternalServerError < Fog::Rackspace::Errors::InternalServerError; end
class BadRequest < Fog::Rackspace::Errors::BadRequest; end
class MethodNotAllowed < Fog::Rackspace::Errors::BadRequest; end
requires :rackspace_api_key, :rackspace_username
recognizes :rackspace_auth_url
recognizes :rackspace_auth_token
recognizes :rackspace_region
recognizes :rackspace_queues_url
recognizes :rackspace_queues_client_id
model_path 'fog/rackspace/models/queues'
model :queue
collection :queues
model :message
collection :messages
model :claim
collection :claims
request_path 'fog/rackspace/requests/queues'
request :list_queues
request :get_queue
request :create_queue
request :delete_queue
request :get_queue_stats
request :list_messages
request :get_message
request :create_message
request :delete_message
request :create_claim
request :get_claim
request :update_claim
request :delete_claim
module Common
def apply_options(options)
@rackspace_api_key = options[:rackspace_api_key]
@rackspace_username = options[:rackspace_username]
@rackspace_queues_client_id = options[:rackspace_queues_client_id] || Fog::UUID.uuid
@rackspace_auth_url = options[:rackspace_auth_url]
@rackspace_must_reauthenticate = false
@connection_options = options[:connection_options] || {}
@rackspace_region = options[:rackspace_region] || :ord
unless v2_authentication?
raise Fog::Errors::NotImplemented.new("V2 authentication required for Queues")
end
end
def service_name
:cloudQueues
end
def region
@rackspace_region
end
def endpoint_uri(service_endpoint_url=nil)
@uri = super(@rackspace_endpoint || service_endpoint_url, :rackspace_queues_url)
end
def authenticate(options={})
super({
:rackspace_api_key => @rackspace_api_key,
:rackspace_username => @rackspace_username,
:rackspace_auth_url => @rackspace_auth_url,
:connection_options => @connection_options
})
end
def client_id
@rackspace_queues_client_id
end
def client_id=(client_id)
@rackspace_queues_client_id = client_id
end
end
class Mock < Fog::Rackspace::Service
include Common
PATH_BASE = "/v1/queues"
# An in-memory Queue implementation.
class MockQueue
attr_accessor :name, :metadata, :messages, :claims
def initialize(name)
@name, @metadata = name, {}
@messages, @claims = [], {}
@id_counter = Fog::Mock.random_hex(24).to_i(16)
end
# The total number of messages currently on the queue.
#
# @return [Integer]
def total
@messages.size
end
# The number of messages currently held by a claim.
#
# @return [Integer]
def claimed
@messages.count { |msg| msg.claimed? }
end
# The number of messages not held by any claim.
#
# @return [Integer]
def free
@messages.count { |msg| ! msg.claimed? }
end
# The oldest published message on this queue, or `nil`.
#
# @return [MockMessage|UndefinedObject]
def oldest
@messages.first
end
# The most recently published message on this queue, or `nil`.
#
# @return [MockMessage|UndefinedObject]
def newest
@messages.last
end
# Append a new message to the queue.
#
# @param client_id [String] UUID for the service object.
# @param data [Hash] Message payload.
# @param ttl [Integer] Number of seconds that the message should exist.
# @return [MockMessage] The message object that was created.
def add_message(client_id, data, ttl)
id = @id_counter.to_s(16)
@id_counter += 1
message = MockMessage.new(id, self, client_id, data, ttl)
@messages << message
message
end
# Create a new MockClaim.
#
# @param ttl [Integer] Time-to-live for the claim.
# @param grace [Integer] Grace period that's added to messages included in this claim.
def add_claim(ttl, grace)
claim = MockClaim.new(self, ttl, grace)
claims[claim.id] = claim
claim
end
# Access an existing MockClaim by id.
#
# @param claim_id [String] An ID of an existing claim.
# @raises [Fog::Rackspace::Queues::NotFound] If no MockClaim with this ID exists.
# @return [MockClaim] The requested MockClaim.
def claim!(claim_id)
claims[claim_id] or raise NotFound.new
end
# Remove any messages or claims whose ttls have expired.
def ageoff
messages.reject! { |m| m.expired? }
claims.keys.dup.each do |id|
claim = claims[id]
if claim.expired? || claim.messages.empty?
claim.messages.each { |m| m.claim = nil }
claims.delete(id)
end
end
end
end
# A single message posted to an in-memory MockQueue.
class MockMessage
attr_accessor :id, :queue, :data, :ttl, :producer_id
attr_accessor :claim, :created
# Create a new message. Use {MockQueue#add_message} instead.
def initialize(id, queue, client_id, data, ttl)
@id, @queue, @producer_id = id, queue, client_id
@data, @ttl = data, ttl
@created = Time.now.to_i
@claim = nil
end
# Determine how long ago this message was created, in seconds.
#
# @return [Integer]
def age
Time.now.to_i - @created
end
# Generate a URI segment that identifies this message.
#
# @return [String]
def href
"#{PATH_BASE}/#{@queue.name}/messages/#{@id}"
end
# Return true if this message has been claimed.
#
# @return [Boolean]
def claimed?
! @claim.nil?
end
# Determine if this message has lived longer than its designated ttl.
#
# @return [Boolean]
def expired?
age > ttl
end
# Extend the {#ttl} of this message to include the lifetime of the claim it belongs to,
# plus the claim's grace period.
def extend_life
return unless @claim
extended = claim.message_end_of_life - @created
@ttl = extended if extended > @ttl
end
# Convert this message to a GET payload.
#
# @return [Hash]
def to_h
{
"body" => @data,
"age" => age,
"ttl" => @ttl,
"href" => href
}
end
end
# Reservation indicating that a consumer is in the process of handling a collection of
# messages from a queue.
class MockClaim
attr_accessor :id, :queue, :ttl, :grace
# Create a new MockClaim. Clients should use {MockQueue#add_claim} instead.
#
# @param queue [MockQueue] The queue that owns this claim.
# @param ttl [Integer] Duration, in seconds, that this queue should last.
# @param grace [Integer] Extra life granted to messages within this claim after this
# claim expires, to give another consumer a chance to process it.
def initialize(queue, ttl, grace)
@queue = queue
@id = Fog::Mock.random_hex(24)
@ttl, @grace = ttl, grace
touch!
end
# Set or reset the creation time of the claim to the present.
def touch!
@created = Time.now.to_i
end
# Calculate the time at which messages belonging to this claim should expire.
#
# @return [Integer] Seconds since the epoch.
def message_end_of_life
@created + @ttl + @grace
end
# Determine how long ago this claim was created, in seconds.
#
# @return [Integer]
def age
Time.now.to_i - @created
end
# Determine if this claim has lasted longer than its designated ttl.
#
# @return [Boolean]
def expired?
age > ttl
end
# Access the collection of messages owned by this claim.
#
# @return [Array<Message>]
def messages
@queue.messages.select { |m| m.claim == self }
end
# Convert this claim into a GET payload.
#
# @return [Hash]
def to_h
ms = messages.map { |m| m.to_h }
{
"age" => age,
"href" => "#{PATH_BASE}/#{@queue.name}/claims/#{@id}",
"ttl" => @ttl,
"messages" => ms
}
end
end
def initialize(options = {})
apply_options(options)
authenticate
endpoint_uri
end
def self.data
@data ||= Hash.new do |hash, key|
hash[key] = {}
end
end
def data
self.class.data[@rackspace_username]
end
# Create and remember a MockQueue with a given name. An existing MockQueue with the same
# name will be overridden without warning.
#
# @param [String] Valid queue name.
# @return [MockQueue] The MockQueue that was created.
def add_queue(queue_name)
queue = MockQueue.new(queue_name)
data[queue_name] = queue
queue
end
# Access a MockQueue with the specified name, or return `nil`.
#
# @param queue_name [String] Valid queue name.
# @return [MockQueue|UndefinedObject] The queue with the specified name, or `nil` if
# it doesn't exist.
def mock_queue(queue_name)
data[queue_name]
end
# Access a MockQueue with the specified name, raising an exception if it doesn't exist.
#
# @param queue_name [String] Valid queue name.
# @raises [Fog::Rackspace::Queue::NotFound] If there is no queue with the specified name.
# @return [MockQueue] The queue with the specified name.
def mock_queue!(queue_name)
mock_queue(queue_name) or raise NotFound.new
end
# Remove any messages or expire any claims that have exceeded their ttl values. Invoked
# before every request.
def ageoff
data.values.each { |q| q.ageoff }
end
def request(params)
ageoff
super
end
end
class Real < Fog::Rackspace::Service
include Common
def initialize(options = {})
apply_options(options)
authenticate
@persistent = options[:persistent] || false
@connection = Fog::XML::Connection.new(endpoint_uri.to_s, @persistent, @connection_options)
end
def request(params, parse_json = true, &block)
super(params, parse_json, &block)
rescue Excon::Errors::NotFound => error
raise NotFound.slurp(error, self)
rescue Excon::Errors::BadRequest => error
raise BadRequest.slurp(error, self)
rescue Excon::Errors::InternalServerError => error
raise InternalServerError.slurp(error, self)
rescue Excon::Errors::MethodNotAllowed => error
raise MethodNotAllowed.slurp(error, self)
rescue Excon::Errors::HTTPStatusError => error
raise ServiceError.slurp(error, self)
end
end
end
end
end