1
0
Fork 0
mirror of https://github.com/fog/fog.git synced 2022-11-09 13:51:43 -05:00
fog--fog/lib/fog/rackspace/queues.rb

407 lines
13 KiB
Ruby
Raw Normal View History

require 'fog/rackspace/core'
module Fog
module Rackspace
class Queues < Fog::Service
include Fog::Rackspace::Errors
class ServiceError < Fog::Rackspace::Errors::ServiceError; end
class InternalServerError < Fog::Rackspace::Errors::InternalServerError; end
class BadRequest < Fog::Rackspace::Errors::BadRequest; end
class MethodNotAllowed < Fog::Rackspace::Errors::BadRequest; end
requires :rackspace_api_key, :rackspace_username
recognizes :rackspace_auth_url
recognizes :rackspace_auth_token
recognizes :rackspace_region
recognizes :rackspace_queues_url
recognizes :rackspace_queues_client_id
model_path 'fog/rackspace/models/queues'
model :queue
collection :queues
model :message
collection :messages
model :claim
collection :claims
request_path 'fog/rackspace/requests/queues'
request :list_queues
request :get_queue
request :create_queue
request :delete_queue
request :get_queue_stats
request :list_messages
request :get_message
request :create_message
request :delete_message
request :create_claim
request :get_claim
request :update_claim
request :delete_claim
module Common
def apply_options(options)
@rackspace_api_key = options[:rackspace_api_key]
@rackspace_username = options[:rackspace_username]
@rackspace_queues_client_id = options[:rackspace_queues_client_id] || Fog::UUID.uuid
@rackspace_auth_url = options[:rackspace_auth_url]
@rackspace_queues_url = options[:rackspace_queues_url]
@rackspace_must_reauthenticate = false
@connection_options = options[:connection_options] || {}
@rackspace_region = options[:rackspace_region]
unless v2_authentication?
raise Fog::Errors::NotImplemented.new("V2 authentication required for Queues")
end
unless @rackspace_region || @rackspace_queues_url
Fog::Logger.deprecation("Default region support will be removed in an upcoming release. Please switch to manually setting your endpoint. This requires settng the :rackspace_region option.")
end
@rackspace_region ||= :ord
end
def service_name
:cloudQueues
end
def region
@rackspace_region
end
def endpoint_uri(service_endpoint_url=nil)
@uri = super(@rackspace_queues_url || service_endpoint_url, :rackspace_queues_url)
end
def authenticate(options={})
super({
:rackspace_api_key => @rackspace_api_key,
:rackspace_username => @rackspace_username,
:rackspace_auth_url => @rackspace_auth_url,
:connection_options => @connection_options
})
end
def client_id
@rackspace_queues_client_id
end
def client_id=(client_id)
@rackspace_queues_client_id = client_id
end
end
class Mock < Fog::Rackspace::Service
include Common
PATH_BASE = "/v1/queues"
# An in-memory Queue implementation.
class MockQueue
attr_accessor :name, :metadata, :messages, :claims
def initialize(name)
@name, @metadata = name, {}
@messages, @claims = [], {}
@id_counter = Fog::Mock.random_hex(24).to_i(16)
end
# The total number of messages currently on the queue.
#
# @return [Integer]
def total
@messages.size
end
# The number of messages currently held by a claim.
#
# @return [Integer]
def claimed
@messages.count { |msg| msg.claimed? }
end
# The number of messages not held by any claim.
#
# @return [Integer]
def free
@messages.count { |msg| ! msg.claimed? }
end
# The oldest published message on this queue, or `nil`.
#
# @return [MockMessage|UndefinedObject]
def oldest
@messages.first
end
# The most recently published message on this queue, or `nil`.
#
# @return [MockMessage|UndefinedObject]
def newest
@messages.last
end
# Append a new message to the queue.
#
# @param client_id [String] UUID for the service object.
# @param data [Hash] Message payload.
# @param ttl [Integer] Number of seconds that the message should exist.
# @return [MockMessage] The message object that was created.
def add_message(client_id, data, ttl)
id = @id_counter.to_s(16)
@id_counter += 1
message = MockMessage.new(id, self, client_id, data, ttl)
@messages << message
message
end
# Create a new MockClaim.
#
# @param ttl [Integer] Time-to-live for the claim.
# @param grace [Integer] Grace period that's added to messages included in this claim.
def add_claim(ttl, grace)
claim = MockClaim.new(self, ttl, grace)
claims[claim.id] = claim
claim
end
# Access an existing MockClaim by id.
#
# @param claim_id [String] An ID of an existing claim.
# @raises [Fog::Rackspace::Queues::NotFound] If no MockClaim with this ID exists.
# @return [MockClaim] The requested MockClaim.
def claim!(claim_id)
claims[claim_id] or raise NotFound.new
end
# Remove any messages or claims whose ttls have expired.
def ageoff
messages.reject! { |m| m.expired? }
claims.keys.dup.each do |id|
claim = claims[id]
if claim.expired? || claim.messages.empty?
claim.messages.each { |m| m.claim = nil }
claims.delete(id)
end
end
end
end
# A single message posted to an in-memory MockQueue.
class MockMessage
attr_accessor :id, :queue, :data, :ttl, :producer_id
attr_accessor :claim, :created
# Create a new message. Use {MockQueue#add_message} instead.
def initialize(id, queue, client_id, data, ttl)
@id, @queue, @producer_id = id, queue, client_id
@data, @ttl = data, ttl
@created = Time.now.to_i
@claim = nil
end
# Determine how long ago this message was created, in seconds.
#
# @return [Integer]
def age
Time.now.to_i - @created
end
# Generate a URI segment that identifies this message.
#
# @return [String]
def href
"#{PATH_BASE}/#{@queue.name}/messages/#{@id}"
end
# Return true if this message has been claimed.
#
# @return [Boolean]
def claimed?
! @claim.nil?
end
# Determine if this message has lived longer than its designated ttl.
#
# @return [Boolean]
def expired?
age > ttl
end
# Extend the {#ttl} of this message to include the lifetime of the claim it belongs to,
# plus the claim's grace period.
def extend_life
return unless @claim
extended = claim.message_end_of_life - @created
@ttl = extended if extended > @ttl
end
# Convert this message to a GET payload.
#
# @return [Hash]
def to_h
{
"body" => @data,
"age" => age,
"ttl" => @ttl,
"href" => href
}
end
end
# Reservation indicating that a consumer is in the process of handling a collection of
# messages from a queue.
class MockClaim
attr_accessor :id, :queue, :ttl, :grace
# Create a new MockClaim. Clients should use {MockQueue#add_claim} instead.
#
# @param queue [MockQueue] The queue that owns this claim.
# @param ttl [Integer] Duration, in seconds, that this queue should last.
# @param grace [Integer] Extra life granted to messages within this claim after this
# claim expires, to give another consumer a chance to process it.
def initialize(queue, ttl, grace)
@queue = queue
@id = Fog::Mock.random_hex(24)
@ttl, @grace = ttl, grace
touch!
end
# Set or reset the creation time of the claim to the present.
def touch!
@created = Time.now.to_i
end
# Calculate the time at which messages belonging to this claim should expire.
#
# @return [Integer] Seconds since the epoch.
def message_end_of_life
@created + @ttl + @grace
end
# Determine how long ago this claim was created, in seconds.
#
# @return [Integer]
def age
Time.now.to_i - @created
end
# Determine if this claim has lasted longer than its designated ttl.
#
# @return [Boolean]
def expired?
age > ttl
end
# Access the collection of messages owned by this claim.
#
# @return [Array<Message>]
def messages
@queue.messages.select { |m| m.claim == self }
end
# Convert this claim into a GET payload.
#
# @return [Hash]
def to_h
ms = messages.map { |m| m.to_h }
{
"age" => age,
"href" => "#{PATH_BASE}/#{@queue.name}/claims/#{@id}",
"ttl" => @ttl,
"messages" => ms
}
end
end
def initialize(options = {})
apply_options(options)
authenticate
endpoint_uri
end
def self.data
@data ||= Hash.new do |hash, key|
hash[key] = {}
end
end
def data
self.class.data[@rackspace_username]
end
# Create and remember a MockQueue with a given name. An existing MockQueue with the same
# name will be overridden without warning.
#
# @param [String] Valid queue name.
# @return [MockQueue] The MockQueue that was created.
def add_queue(queue_name)
queue = MockQueue.new(queue_name)
data[queue_name] = queue
queue
end
# Access a MockQueue with the specified name, or return `nil`.
#
# @param queue_name [String] Valid queue name.
# @return [MockQueue|UndefinedObject] The queue with the specified name, or `nil` if
# it doesn't exist.
def mock_queue(queue_name)
data[queue_name]
end
# Access a MockQueue with the specified name, raising an exception if it doesn't exist.
#
# @param queue_name [String] Valid queue name.
# @raises [Fog::Rackspace::Queue::NotFound] If there is no queue with the specified name.
# @return [MockQueue] The queue with the specified name.
def mock_queue!(queue_name)
mock_queue(queue_name) or raise NotFound.new
end
# Remove any messages or expire any claims that have exceeded their ttl values. Invoked
# before every request.
def ageoff
data.values.each { |q| q.ageoff }
end
def request(params)
ageoff
super
end
end
class Real < Fog::Rackspace::Service
include Common
def initialize(options = {})
apply_options(options)
authenticate
@persistent = options[:persistent] || false
@connection = Fog::Core::Connection.new(endpoint_uri.to_s, @persistent, @connection_options)
end
def request(params, parse_json = true, &block)
super(params, parse_json, &block)
rescue Excon::Errors::NotFound => error
raise NotFound.slurp(error, self)
rescue Excon::Errors::BadRequest => error
raise BadRequest.slurp(error, self)
rescue Excon::Errors::InternalServerError => error
raise InternalServerError.slurp(error, self)
rescue Excon::Errors::MethodNotAllowed => error
raise MethodNotAllowed.slurp(error, self)
rescue Excon::Errors::HTTPStatusError => error
raise ServiceError.slurp(error, self)
end
end
end
end
end