diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 0f5df4a6..f5b94d58 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -99,7 +99,7 @@ module Sidekiq else _, pushed = conn.multi do conn.sadd('queues', normed['queue']) - conn.rpush("queue:#{normed['queue']}", payload) + conn.lpush("queue:#{normed['queue']}", payload) end end end diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index a2b292d0..f1cc32be 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -12,11 +12,10 @@ module Sidekiq TIMEOUT = 1 - def initialize(mgr, queues, strict) + def initialize(mgr, options) + klass = Sidekiq.options[:fetch] || BasicFetch @mgr = mgr - @strictly_ordered_queues = strict - @queues = queues.map { |q| "queue:#{q}" } - @unique_queues = @queues.uniq + @strategy = klass.new(options) end # Fetching is straightforward: the Manager makes a fetch @@ -32,10 +31,10 @@ module Sidekiq return if Sidekiq::Fetcher.done? begin - queue, msg = Sidekiq.redis { |conn| conn.blpop(*queues_cmd) } + work = @strategy.retrieve_work - if msg - @mgr.async.assign(msg, queue.gsub(/.*queue:/, '')) + if work + @mgr.async.assign(work) else after(0) { fetch } end @@ -58,8 +57,34 @@ module Sidekiq def self.done? @done end + end - private + class BasicFetch + def initialize(options) + @strictly_ordered_queues = !!options[:strict] + @queues = options[:queues].map { |q| "queue:#{q}" } + @unique_queues = @queues.uniq + end + + def retrieve_work + UnitOfWork.new(*Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }) + end + + UnitOfWork = Struct.new(:queue, :message) do + def acknowledge + # nothing to do + end + + def queue_name + queue.gsub(/.*queue:/, '') + end + + def requeue + Sidekiq.redis do |conn| + conn.rpush(queue, message) + end + end + end # Creating the Redis#blpop command takes into account any # configured queue weights. By default Redis#blpop returns @@ -67,10 +92,10 @@ module Sidekiq # recreate the queue command each time we invoke Redis#blpop # to honor weights and avoid queue starvation. def queues_cmd - return @unique_queues.dup << TIMEOUT if @strictly_ordered_queues + return @unique_queues.dup << Sidekiq::Fetcher::TIMEOUT if @strictly_ordered_queues queues = @queues.sample(@unique_queues.size).uniq queues.concat(@unique_queues - queues) - queues << TIMEOUT + queues << Sidekiq::Fetcher::TIMEOUT end end end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index c2d1363a..3cc99428 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -25,7 +25,7 @@ module Sidekiq @in_progress = {} @done = false @busy = [] - @fetcher = Fetcher.new(current_actor, options[:queues], !!options[:strict]) + @fetcher = Fetcher.new(current_actor, options) @ready = @count.times.map { Processor.new_link(current_actor) } procline(options[:tag] ? "#{options[:tag]} " : '') end @@ -86,21 +86,19 @@ module Sidekiq end end - def assign(msg, queue) + def assign(work) watchdog("Manager#assign died") do if stopped? # Race condition between Manager#stop if Fetcher # is blocked on redis and gets a message after # all the ready Processors have been stopped. # Push the message back to redis. - Sidekiq.redis do |conn| - conn.lpush("queue:#{queue}", msg) - end + work.requeue else processor = @ready.pop - @in_progress[processor.object_id] = [msg, queue] + @in_progress[processor.object_id] = work @busy << processor - processor.async.process(msg, queue) + processor.async.process(work) end end end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index be26b474..e0433f3c 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -30,7 +30,9 @@ module Sidekiq @boss = boss end - def process(msgstr, queue) + def process(work) + msgstr = work.message + queue = work.queue_name defer do begin msg = Sidekiq.load_json(msgstr) @@ -46,6 +48,8 @@ module Sidekiq rescue Exception => ex handle_exception(ex, msg || { :message => msgstr }) raise + ensure + work.acknowledge end end @boss.async.processor_done(current_actor) diff --git a/test/test_client.rb b/test/test_client.rb index 9ba8d232..b6247982 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -42,7 +42,7 @@ class TestClient < MiniTest::Unit::TestCase end it 'pushes messages to redis' do - @redis.expect :rpush, 1, ['queue:foo', String] + @redis.expect :lpush, 1, ['queue:foo', String] pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2]) assert pushed assert_equal 24, pushed.size @@ -50,7 +50,7 @@ class TestClient < MiniTest::Unit::TestCase end it 'pushes messages to redis using a String class' do - @redis.expect :rpush, 1, ['queue:foo', String] + @redis.expect :lpush, 1, ['queue:foo', String] pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => 'MyWorker', 'args' => [1, 2]) assert pushed assert_equal 24, pushed.size @@ -66,28 +66,28 @@ class TestClient < MiniTest::Unit::TestCase end it 'handles perform_async' do - @redis.expect :rpush, 1, ['queue:default', String] + @redis.expect :lpush, 1, ['queue:default', String] pushed = MyWorker.perform_async(1, 2) assert pushed @redis.verify end it 'handles perform_async on failure' do - @redis.expect :rpush, nil, ['queue:default', String] + @redis.expect :lpush, nil, ['queue:default', String] pushed = MyWorker.perform_async(1, 2) refute pushed @redis.verify end it 'enqueues messages to redis' do - @redis.expect :rpush, 1, ['queue:default', String] + @redis.expect :lpush, 1, ['queue:default', String] pushed = Sidekiq::Client.enqueue(MyWorker, 1, 2) assert pushed @redis.verify end it 'enqueues messages to redis' do - @redis.expect :rpush, 1, ['queue:custom_queue', String] + @redis.expect :lpush, 1, ['queue:custom_queue', String] pushed = Sidekiq::Client.enqueue_to(:custom_queue, MyWorker, 1, 2) assert pushed @redis.verify @@ -99,7 +99,7 @@ class TestClient < MiniTest::Unit::TestCase end it 'enqueues to the named queue' do - @redis.expect :rpush, 1, ['queue:flimflam', String] + @redis.expect :lpush, 1, ['queue:flimflam', String] pushed = QueuedWorker.perform_async(1, 2) assert pushed @redis.verify @@ -117,6 +117,9 @@ class TestClient < MiniTest::Unit::TestCase end describe 'bulk' do + after do + Sidekiq::Queue.new.clear + end it 'can push a large set of jobs at once' do a = Time.now count = Sidekiq::Client.push_bulk('class' => QueuedWorker, 'args' => (1..1_000).to_a.map { |x| Array(x) }) diff --git a/test/test_fetch.rb b/test/test_fetch.rb index dfbea6f4..cca14dca 100644 --- a/test/test_fetch.rb +++ b/test/test_fetch.rb @@ -2,12 +2,30 @@ require 'helper' require 'sidekiq/fetch' class TestFetcher < MiniTest::Unit::TestCase - describe 'Fetcher#queues_cmd' do - describe 'when queues are strictly ordered' do - it 'returns the unique ordered queues properly based on priority and order they were passed in' do - fetcher = Sidekiq::Fetcher.new nil, %w[high medium low default], true - assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd) - end + + def setup + Sidekiq.redis do |conn| + conn.del('queue:basic') + conn.rpush('queue:basic', 'msg') end end + + def test_basic_fetch_retrieve + fetch = Sidekiq::BasicFetch.new(:queues => ['basic', 'bar']) + uow = fetch.retrieve_work + refute_nil uow + assert_equal 'basic', uow.queue_name + assert_equal 'msg', uow.message + q = Sidekiq::Queue.new('basic') + assert_equal 0, q.size + uow.requeue + assert_equal 1, q.size + assert_nil uow.acknowledge + end + + def test_basic_fetch_strict_retrieve + fetch = Sidekiq::BasicFetch.new(:queues => ['basic', 'bar', 'bar'], :strict => true) + cmd = fetch.queues_cmd + assert_equal cmd, ['queue:basic', 'queue:bar', 1] + end end diff --git a/test/test_middleware.rb b/test/test_middleware.rb index 0ddbf403..33632dcf 100644 --- a/test/test_middleware.rb +++ b/test/test_middleware.rb @@ -41,7 +41,7 @@ class TestMiddleware < MiniTest::Unit::TestCase def call(*args) end end - + class AnotherCustomMiddleware def initialize(name, recorder) @name = name @@ -83,7 +83,7 @@ class TestMiddleware < MiniTest::Unit::TestCase actor = MiniTest::Mock.new actor.expect(:processor_done, nil, [processor]) boss.expect(:async, actor, []) - processor.process(msg, 'default') + processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)) assert_equal %w(2 before 3 before 0 before work_performed 0 after 3 after 2 after), $recorder.flatten end diff --git a/test/test_processor.rb b/test/test_processor.rb index 918de2ca..1fc24c4f 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -23,12 +23,16 @@ class TestProcessor < MiniTest::Unit::TestCase end end + def work(msg, queue='queue:default') + Sidekiq::BasicFetch::UnitOfWork.new(queue, msg) + end + it 'processes as expected' do msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) actor = MiniTest::Mock.new actor.expect(:processor_done, nil, [@processor]) @boss.expect(:async, actor, []) - @processor.process(msg, 'default') + @processor.process(work(msg)) @boss.verify assert_equal 1, $invokes end @@ -36,7 +40,7 @@ class TestProcessor < MiniTest::Unit::TestCase it 'passes exceptions to ExceptionHandler' do msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) begin - @processor.process(msg, 'default') + @processor.process(work(msg)) flunk "Expected #process to raise exception" rescue TestException end @@ -49,7 +53,7 @@ class TestProcessor < MiniTest::Unit::TestCase re_raise = false begin - @processor.process(msg, 'default') + @processor.process(work(msg)) rescue TestException re_raise = true end @@ -64,7 +68,7 @@ class TestProcessor < MiniTest::Unit::TestCase actor = MiniTest::Mock.new actor.expect(:processor_done, nil, [processor]) @boss.expect(:async, actor, []) - processor.process(msgstr, 'default') + processor.process(work(msgstr)) assert_equal [['myarg']], msg['args'] end @@ -79,7 +83,7 @@ class TestProcessor < MiniTest::Unit::TestCase actor = MiniTest::Mock.new actor.expect(:processor_done, nil, [@processor]) @boss.expect(:async, actor, []) - @processor.process(msg, 'default') + @processor.process(work(msg)) end it 'increments processed stat' do @@ -100,7 +104,7 @@ class TestProcessor < MiniTest::Unit::TestCase def failed_job msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) begin - @processor.process(msg, 'default') + @processor.process(work(msg)) rescue TestException end end