diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index a9de8f32..ef72bdba 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -4,133 +4,135 @@ require 'sidekiq/middleware/chain' module Sidekiq class Client + class << self - def self.default_middleware - Middleware::Chain.new do - end - end - - def self.registered_workers - Sidekiq.redis { |x| x.smembers('workers') } - end - - def self.registered_queues - Sidekiq.redis { |x| x.smembers('queues') } - end - - ## - # The main method used to push a job to Redis. Accepts a number of options: - # - # queue - the named queue to use, default 'default' - # class - the worker class to call, required - # args - an array of simple arguments to the perform method, must be JSON-serializable - # retry - whether to retry this job if it fails, true or false, default true - # backtrace - whether to save any error backtrace, default false - # - # All options must be strings, not symbols. NB: because we are serializing to JSON, all - # symbols in 'args' will be converted to strings. - # - # Returns nil if not pushed to Redis or a unique Job ID if pushed. - # - # Example: - # Sidekiq::Client.push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) - # - def self.push(item) - normed = normalize_item(item) - normed, payload = process_single(item['class'], normed) - - pushed = false - pushed = raw_push(normed, payload) if normed - pushed ? normed['jid'] : nil - end - - ## - # Push a large number of jobs to Redis. In practice this method is only - # useful if you are pushing tens of thousands of jobs or more. This method - # basically cuts down on the redis round trip latency. - # - # Takes the same arguments as Client.push except that args is expected to be - # an Array of Arrays. All other keys are duplicated for each job. Each job - # is run through the client middleware pipeline and each job gets its own Job ID - # as normal. - # - # Returns the number of jobs pushed or nil if the pushed failed. The number of jobs - # pushed can be less than the number given if the middleware stopped processing for one - # or more jobs. - def self.push_bulk(items) - normed = normalize_item(items) - payloads = items['args'].map do |args| - _, payload = process_single(items['class'], normed.merge('args' => args, 'jid' => SecureRandom.hex(12))) - payload - end.compact - - pushed = raw_push(normed, payloads) - pushed ? payloads.size : nil - end - - # Resque compatibility helpers. - # - # Example usage: - # Sidekiq::Client.enqueue(MyWorker, 'foo', 1, :bat => 'bar') - # - # Messages are enqueued to the 'default' queue. - # - def self.enqueue(klass, *args) - klass.client_push('class' => klass, 'args' => args) - end - - # Example usage: - # Sidekiq::Client.enqueue_to(:queue_name, MyWorker, 'foo', 1, :bat => 'bar') - # - def self.enqueue_to(queue, klass, *args) - klass.client_push('queue' => queue, 'class' => klass, 'args' => args) - end - - private - - def self.raw_push(normed, payload) # :nodoc: - pushed = false - Sidekiq.redis do |conn| - if normed['at'] && payload.is_a?(Array) - pushed = conn.zadd('schedule', payload.map {|hash| [normed['at'].to_s, hash]}) - elsif normed['at'] - pushed = conn.zadd('schedule', normed['at'].to_s, payload) - else - _, pushed = conn.multi do - conn.sadd('queues', normed['queue']) - conn.lpush("queue:#{normed['queue']}", payload) - end + def default_middleware + Middleware::Chain.new do end end - pushed - end - def self.process_single(worker_class, item) - queue = item['queue'] - - Sidekiq.client_middleware.invoke(worker_class, item, queue) do - payload = Sidekiq.dump_json(item) - return item, payload - end - end - - def self.normalize_item(item) - raise(ArgumentError, "Message must be a Hash of the form: { 'class' => SomeWorker, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash) - raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args'] - raise(ArgumentError, "Message args must be an Array") unless item['args'].is_a?(Array) - raise(ArgumentError, "Message class must be either a Class or String representation of the class name") unless item['class'].is_a?(Class) || item['class'].is_a?(String) - - if item['class'].is_a?(Class) - raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item['class'].ancestors.inspect}") if !item['class'].respond_to?('get_sidekiq_options') - normalized_item = item['class'].get_sidekiq_options.merge(item) - normalized_item['class'] = normalized_item['class'].to_s - else - normalized_item = Sidekiq::Worker::ClassMethods::DEFAULT_OPTIONS.merge(item) + def registered_workers + Sidekiq.redis { |x| x.smembers('workers') } end - normalized_item['jid'] = SecureRandom.hex(12) - normalized_item - end + def registered_queues + Sidekiq.redis { |x| x.smembers('queues') } + end + ## + # The main method used to push a job to Redis. Accepts a number of options: + # + # queue - the named queue to use, default 'default' + # class - the worker class to call, required + # args - an array of simple arguments to the perform method, must be JSON-serializable + # retry - whether to retry this job if it fails, true or false, default true + # backtrace - whether to save any error backtrace, default false + # + # All options must be strings, not symbols. NB: because we are serializing to JSON, all + # symbols in 'args' will be converted to strings. + # + # Returns nil if not pushed to Redis or a unique Job ID if pushed. + # + # Example: + # Sidekiq::Client.push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) + # + def push(item) + normed = normalize_item(item) + payload = process_single(item['class'], normed) + + pushed = false + pushed = raw_push([payload]) if payload + pushed ? payload['jid'] : nil + end + + ## + # Push a large number of jobs to Redis. In practice this method is only + # useful if you are pushing tens of thousands of jobs or more. This method + # basically cuts down on the redis round trip latency. + # + # Takes the same arguments as Client.push except that args is expected to be + # an Array of Arrays. All other keys are duplicated for each job. Each job + # is run through the client middleware pipeline and each job gets its own Job ID + # as normal. + # + # Returns the number of jobs pushed or nil if the pushed failed. The number of jobs + # pushed can be less than the number given if the middleware stopped processing for one + # or more jobs. + def push_bulk(items) + normed = normalize_item(items) + payloads = items['args'].map do |args| + raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array) + process_single(items['class'], normed.merge('args' => args, 'jid' => SecureRandom.hex(12))) + end.compact + + pushed = false + pushed = raw_push(payloads) if !payloads.empty? + pushed ? payloads.size : nil + end + + # Resque compatibility helpers. + # + # Example usage: + # Sidekiq::Client.enqueue(MyWorker, 'foo', 1, :bat => 'bar') + # + # Messages are enqueued to the 'default' queue. + # + def enqueue(klass, *args) + klass.client_push('class' => klass, 'args' => args) + end + + # Example usage: + # Sidekiq::Client.enqueue_to(:queue_name, MyWorker, 'foo', 1, :bat => 'bar') + # + def enqueue_to(queue, klass, *args) + klass.client_push('queue' => queue, 'class' => klass, 'args' => args) + end + + private + + def raw_push(payloads) + pushed = false + Sidekiq.redis do |conn| + if payloads.first['at'] + pushed = conn.zadd('schedule', payloads.map {|hash| [hash['at'].to_s, Sidekiq.dump_json(hash)]}) + else + q = payloads.first['queue'] + to_push = payloads.map { |entry| Sidekiq.dump_json(entry) } + _, pushed = conn.multi do + conn.sadd('queues', q) + conn.lpush("queue:#{q}", to_push) + end + end + end + pushed + end + + def process_single(worker_class, item) + queue = item['queue'] + + Sidekiq.client_middleware.invoke(worker_class, item, queue) do + item + end + end + + def normalize_item(item) + raise(ArgumentError, "Message must be a Hash of the form: { 'class' => SomeWorker, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash) + raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args'] + raise(ArgumentError, "Message args must be an Array") unless item['args'].is_a?(Array) + raise(ArgumentError, "Message class must be either a Class or String representation of the class name") unless item['class'].is_a?(Class) || item['class'].is_a?(String) + + if item['class'].is_a?(Class) + raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item['class'].ancestors.inspect}") if !item['class'].respond_to?('get_sidekiq_options') + normalized_item = item['class'].get_sidekiq_options.merge(item) + normalized_item['class'] = normalized_item['class'].to_s + else + normalized_item = Sidekiq::Worker::ClassMethods::DEFAULT_OPTIONS.merge(item) + end + + normalized_item['jid'] = SecureRandom.hex(12) + normalized_item + end + + end end end diff --git a/lib/sidekiq/testing/inline.rb b/lib/sidekiq/testing/inline.rb index 62643859..32123c4e 100644 --- a/lib/sidekiq/testing/inline.rb +++ b/lib/sidekiq/testing/inline.rb @@ -28,9 +28,10 @@ module Sidekiq # singleton_class.class_eval do alias_method :raw_push_old, :raw_push - def raw_push(normed, payload) - Array(payload).each do |hash| - normed['class'].constantize.new.perform(*Sidekiq.load_json(hash)['args']) + def raw_push(payload) + [payload].flatten.each do |item| + marshalled = Sidekiq.load_json(Sidekiq.dump_json(item)) + marshalled['class'].constantize.new.perform(*marshalled['args']) end true diff --git a/myapp/app/controllers/work_controller.rb b/myapp/app/controllers/work_controller.rb index cc087bac..f2dd9feb 100644 --- a/myapp/app/controllers/work_controller.rb +++ b/myapp/app/controllers/work_controller.rb @@ -12,6 +12,12 @@ class WorkController < ApplicationController render :text => 'enqueued' end + def bulk + Sidekiq::Client.push_bulk('class' => HardWorker, + 'args' => [['bob', 1, 1], ['mike', 1, 2]]) + render :text => 'enbulked' + end + def long 50.times do |x| HardWorker.perform_async('bob', 10, x) diff --git a/myapp/config/routes.rb b/myapp/config/routes.rb index c7edef5c..90309453 100644 --- a/myapp/config/routes.rb +++ b/myapp/config/routes.rb @@ -7,4 +7,5 @@ Myapp::Application.routes.draw do get "work/post" => "work#delayed_post" get "work/long" => "work#long" get "work/crash" => "work#crash" + get "work/bulk" => "work#bulk" end diff --git a/test/test_client.rb b/test/test_client.rb index 369774fc..2b3d3e15 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -46,7 +46,7 @@ class TestClient < MiniTest::Unit::TestCase end it 'pushes messages to redis' do - @redis.expect :lpush, 1, ['queue:foo', String] + @redis.expect :lpush, 1, ['queue:foo', Array] pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2]) assert pushed assert_equal 24, pushed.size @@ -54,7 +54,7 @@ class TestClient < MiniTest::Unit::TestCase end it 'pushes messages to redis using a String class' do - @redis.expect :lpush, 1, ['queue:foo', String] + @redis.expect :lpush, 1, ['queue:foo', Array] pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => 'MyWorker', 'args' => [1, 2]) assert pushed assert_equal 24, pushed.size @@ -70,28 +70,28 @@ class TestClient < MiniTest::Unit::TestCase end it 'handles perform_async' do - @redis.expect :lpush, 1, ['queue:default', String] + @redis.expect :lpush, 1, ['queue:default', Array] pushed = MyWorker.perform_async(1, 2) assert pushed @redis.verify end it 'handles perform_async on failure' do - @redis.expect :lpush, nil, ['queue:default', String] + @redis.expect :lpush, nil, ['queue:default', Array] pushed = MyWorker.perform_async(1, 2) refute pushed @redis.verify end it 'enqueues messages to redis' do - @redis.expect :lpush, 1, ['queue:default', String] + @redis.expect :lpush, 1, ['queue:default', Array] pushed = Sidekiq::Client.enqueue(MyWorker, 1, 2) assert pushed @redis.verify end it 'enqueues messages to redis' do - @redis.expect :lpush, 1, ['queue:custom_queue', String] + @redis.expect :lpush, 1, ['queue:custom_queue', Array] pushed = Sidekiq::Client.enqueue_to(:custom_queue, MyWorker, 1, 2) assert pushed @redis.verify @@ -103,7 +103,7 @@ class TestClient < MiniTest::Unit::TestCase end it 'enqueues to the named queue' do - @redis.expect :lpush, 1, ['queue:flimflam', String] + @redis.expect :lpush, 1, ['queue:flimflam', Array] pushed = QueuedWorker.perform_async(1, 2) assert pushed @redis.verify @@ -176,11 +176,11 @@ class TestClient < MiniTest::Unit::TestCase describe 'item normalization' do it 'defaults retry to true' do - assert_equal true, Sidekiq::Client.normalize_item('class' => QueuedWorker, 'args' => [])['retry'] + assert_equal true, Sidekiq::Client.send(:normalize_item, 'class' => QueuedWorker, 'args' => [])['retry'] end it "does not normalize numeric retry's" do - assert_equal 2, Sidekiq::Client.normalize_item('class' => CWorker, 'args' => [])['retry'] + assert_equal 2, Sidekiq::Client.send(:normalize_item, 'class' => CWorker, 'args' => [])['retry'] end end end diff --git a/test/test_scheduling.rb b/test/test_scheduling.rb index 44c88a03..e01a3f4c 100644 --- a/test/test_scheduling.rb +++ b/test/test_scheduling.rb @@ -18,20 +18,20 @@ class TestScheduling < MiniTest::Unit::TestCase end it 'schedules a job via interval' do - @redis.expect :zadd, true, ['schedule', String, String] + @redis.expect :zadd, true, ['schedule', Array] assert ScheduledWorker.perform_in(600, 'mike') @redis.verify end it 'schedules a job via timestamp' do - @redis.expect :zadd, true, ['schedule', String, String] + @redis.expect :zadd, true, ['schedule', Array] assert ScheduledWorker.perform_in(5.days.from_now, 'mike') @redis.verify end it 'schedules multiple jobs at once' do @redis.expect :zadd, true, ['schedule', Array] - assert Sidekiq::Client.push_bulk('class' => ScheduledWorker, 'args' => ['mike', 'mike'], 'at' => 600) + assert Sidekiq::Client.push_bulk('class' => ScheduledWorker, 'args' => [['mike'], ['mike']], 'at' => 600) @redis.verify end end diff --git a/test/test_testing_inline.rb b/test/test_testing_inline.rb index 87aedc0a..ea068204 100644 --- a/test/test_testing_inline.rb +++ b/test/test_testing_inline.rb @@ -81,10 +81,10 @@ class TestInline < MiniTest::Unit::TestCase end it 'stubs the push_bulk call when in testing mode' do - assert Sidekiq::Client.push_bulk({'class' => InlineWorker, 'args' => [true, true]}) + assert Sidekiq::Client.push_bulk({'class' => InlineWorker, 'args' => [[true], [true]]}) assert_raises InlineError do - Sidekiq::Client.push_bulk({'class' => InlineWorker, 'args' => [true, false]}) + Sidekiq::Client.push_bulk({'class' => InlineWorker, 'args' => [[true], [false]]}) end end