Client API update:

- Add API for configuring options per Worker class
- Removed the Client API issues preventing it working on Ruby 1.8
- Cleanups to various APIs for upcoming 1.0 release.
This commit is contained in:
Mike Perham 2012-04-01 19:53:45 -07:00
parent b58d125659
commit 2080412119
10 changed files with 88 additions and 45 deletions

View File

@ -1,12 +1,15 @@
HEAD
-----------
- Add version CLI option
- Client-side API changes, added sidekiq\_options for Sidekiq::Worker.
As a side effect of this change, the client API works on Ruby 1.8.
It's not officially supported but should work [#103]
- NO POLL! Sidekiq no longer polls Redis, leading to lower network
utilization and lower latency for message processing. As a side
effect of this change, queue weights are no longer supported. If you
wish to process multiple queues, list them in the order you want
them processed: `sidekiq -q critical -q high -q default -q low`
- Add --version CLI option
0.10.1
-----------

View File

@ -6,10 +6,6 @@ require 'sidekiq/middleware/client/unique_jobs'
module Sidekiq
class Client
def self.middleware
raise "Sidekiq::Client.middleware is now Sidekiq.client_middleware"
end
def self.default_middleware
Middleware::Chain.new do |m|
m.add Middleware::Client::UniqueJobs
@ -24,22 +20,25 @@ module Sidekiq
Sidekiq.redis { |x| x.smembers('queues') }
end
# DEPRECATED
def self.queue_mappings
@queue_mappings ||= {}
end
# Example usage:
# Sidekiq::Client.push('my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
def self.push(queue=nil, item)
raise(ArgumentError, "Message must be a Hash of the form: { 'class' => SomeClass, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash)
# Sidekiq::Client.push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
def self.push(item)
raise(ArgumentError, "Message must be a Hash of the form: { 'class' => SomeWorker, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash)
raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args']
raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item['class'].ancestors.inspect}") if !item['class'].is_a?(Class) || !item['class'].respond_to?('get_sidekiq_options')
queue = queue || queue_mappings[item['class'].to_s] || 'default'
item['class'] = item['class'].to_s if !item['class'].is_a?(String)
item['retry'] = !!item['class'].get_sidekiq_options['retry']
queue = item['queue'] || item['class'].get_sidekiq_options['queue'] || queue_mappings[item['class'].to_s] || 'default'
worker_class = item['class']
item['class'] = item['class'].to_s
pushed = false
Sidekiq.client_middleware.invoke(item, queue) do
Sidekiq.client_middleware.invoke(worker_class, item, queue) do
payload = MultiJson.encode(item)
Sidekiq.redis do |conn|
conn.multi do
@ -52,16 +51,14 @@ module Sidekiq
pushed
end
# Please use .push if possible instead.
#
# Example usage:
# Redis compatibility helper. Example usage:
#
# Sidekiq::Client.enqueue(MyWorker, 'foo', 1, :bat => 'bar')
#
# Messages are enqueued to the 'default' queue.
#
def self.enqueue(klass, *args)
push(nil, { 'class' => klass.name, 'args' => args })
push('class' => klass, 'args' => args)
end
end
end

View File

@ -1,6 +1,6 @@
module Sidekiq
module Extensions
class Proxy < ::BasicObject
class Proxy < (RUBY_VERSION < '1.9' ? Object : BasicObject)
def initialize(performable, target)
@performable = performable
@target = target

View File

@ -7,23 +7,27 @@ module Sidekiq
class UniqueJobs
HASH_KEY_EXPIRATION = 30 * 60
def call(item, queue)
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
unique = false
def call(worker_class, item, queue)
enabled = worker_class.get_sidekiq_options['unique']
if enabled
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
unique = false
Sidekiq.redis do |conn|
conn.watch(payload_hash)
Sidekiq.redis do |conn|
conn.watch(payload_hash)
if conn.get(payload_hash)
conn.unwatch
else
unique = conn.multi do
conn.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
if conn.get(payload_hash)
conn.unwatch
else
unique = conn.multi do
conn.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
end
end
end
yield if unique
else
yield
end
yield if unique
end
end

View File

@ -27,6 +27,8 @@ module Sidekiq
def call(worker, msg, queue)
yield
rescue => e
raise unless msg['retry']
msg['queue'] = queue
msg['error_message'] = e.message
msg['error_class'] = e.class.name

View File

@ -26,12 +26,35 @@ module Sidekiq
module ClassMethods
def perform_async(*args)
Sidekiq::Client.push('class' => self.name, 'args' => args)
Sidekiq::Client.push('class' => self, 'args' => args)
end
def queue(name)
puts "DEPRECATED: `queue :name` is now `sidekiq_options :queue => :name`"
Sidekiq::Client.queue_mappings[self.name] = name.to_s
end
##
# Allows customization for this type of Worker.
# Legal options:
#
# :unique - enable the UniqueJobs middleware for this Worker, default *true*
# :queue - use a named queue for this Worker, default 'default'
# :retry - enable the RetryJobs middleware for this Worker, default *true*
def sidekiq_options(opts={})
@sidekiq_options = get_sidekiq_options.merge(stringify_keys(opts || {}))
end
def get_sidekiq_options # :nodoc:
@sidekiq_options || { 'unique' => true, 'retry' => true, 'queue' => 'default' }
end
def stringify_keys(hash) # :nodoc:
hash.keys.each do |key|
hash[key.to_s] = hash.delete(key)
end
hash
end
end
end
end

View File

@ -9,19 +9,21 @@ class TestClient < MiniTest::Unit::TestCase
Sidekiq.redis {|c| c.flushdb }
end
class QueueWorker
include Sidekiq::Worker
sidekiq_options :queue => 'customqueue'
end
it 'does not push duplicate messages when configured for unique only' do
Sidekiq.client_middleware.entries.clear
Sidekiq.client_middleware do |chain|
chain.add Sidekiq::Middleware::Client::UniqueJobs
end
10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) }
QueueWorker.sidekiq_options :unique => true
10.times { Sidekiq::Client.push('class' => QueueWorker, 'args' => [1, 2]) }
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end
it 'does push duplicate messages when not configured for unique only' do
Sidekiq.client_middleware.remove(Sidekiq::Middleware::Client::UniqueJobs)
10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) }
assert_equal 10, Sidekiq.redis {|c| c.llen("queue:customqueue2") }
QueueWorker.sidekiq_options :unique => false
10.times { Sidekiq::Client.push('class' => QueueWorker, 'args' => [1, 2]) }
assert_equal 10, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end
end
@ -56,7 +58,7 @@ class TestClient < MiniTest::Unit::TestCase
it 'pushes messages to redis' do
@redis.expect :rpush, 1, ['queue:foo', String]
pushed = Sidekiq::Client.push('foo', 'class' => 'Foo', 'args' => [1, 2])
pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2])
assert pushed
@redis.verify
end
@ -82,7 +84,7 @@ class TestClient < MiniTest::Unit::TestCase
class QueuedWorker
include Sidekiq::Worker
queue :flimflam
sidekiq_options :queue => :flimflam
end
it 'enqueues to the named queue' do

View File

@ -26,8 +26,8 @@ class TestManager < MiniTest::Unit::TestCase
end
it 'processes messages' do
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 2])
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 3])
Sidekiq::Client.push('queue' => :foo, 'class' => IntegrationWorker, 'args' => [1, 2])
Sidekiq::Client.push('queue' => :foo, 'class' => IntegrationWorker, 'args' => [1, 3])
q = TimedQueue.new
mgr = Sidekiq::Manager.new(:queues => [:foo], :concurrency => 2)

View File

@ -12,9 +12,21 @@ class TestRetry < MiniTest::Unit::TestCase
def @redis.with; yield self; end
end
it 'allows disabling retry' do
msg = { 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => false }
msg2 = msg.dup
handler = Sidekiq::Middleware::Server::RetryJobs.new
assert_raises RuntimeError do
handler.call('', msg2, 'default') do
raise "kerblammo!"
end
end
assert_equal msg, msg2
end
it 'handles a new failed message' do
@redis.expect :zadd, 1, ['retry', String, String]
msg = { 'class' => 'Bob', 'args' => [1,2,'foo'] }
msg = { 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => true }
handler = Sidekiq::Middleware::Server::RetryJobs.new
assert_raises RuntimeError do
handler.call('', msg, 'default') do
@ -32,7 +44,7 @@ class TestRetry < MiniTest::Unit::TestCase
it 'handles a recurring failed message' do
@redis.expect :zadd, 1, ['retry', String, String]
now = Time.now.utc
msg = {"class"=>"Bob", "args"=>[1, 2, "foo"], "queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>10}
msg = {"class"=>"Bob", "args"=>[1, 2, "foo"], 'retry' => true, "queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>10}
handler = Sidekiq::Middleware::Server::RetryJobs.new
assert_raises RuntimeError do
handler.call('', msg, 'default') do

View File

@ -38,7 +38,7 @@ class TestWeb < MiniTest::Unit::TestCase
assert_match last_response.body, /default/
refute_match last_response.body, /foo/
assert Sidekiq::Client.push(:foo, 'class' => WebWorker, 'args' => [1, 3])
assert Sidekiq::Client.push('queue' => :foo, 'class' => WebWorker, 'args' => [1, 3])
get '/'
assert_equal 200, last_response.status