1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Refactor fetch strategy, fix queueing to lpush and rpop.

This commit is contained in:
Mike Perham 2013-01-05 21:17:08 -08:00
parent 633983ae86
commit 7408387ee0
8 changed files with 92 additions and 40 deletions

View file

@ -99,7 +99,7 @@ module Sidekiq
else
_, pushed = conn.multi do
conn.sadd('queues', normed['queue'])
conn.rpush("queue:#{normed['queue']}", payload)
conn.lpush("queue:#{normed['queue']}", payload)
end
end
end

View file

@ -12,11 +12,10 @@ module Sidekiq
TIMEOUT = 1
def initialize(mgr, queues, strict)
def initialize(mgr, options)
klass = Sidekiq.options[:fetch] || BasicFetch
@mgr = mgr
@strictly_ordered_queues = strict
@queues = queues.map { |q| "queue:#{q}" }
@unique_queues = @queues.uniq
@strategy = klass.new(options)
end
# Fetching is straightforward: the Manager makes a fetch
@ -32,10 +31,10 @@ module Sidekiq
return if Sidekiq::Fetcher.done?
begin
queue, msg = Sidekiq.redis { |conn| conn.blpop(*queues_cmd) }
work = @strategy.retrieve_work
if msg
@mgr.async.assign(msg, queue.gsub(/.*queue:/, ''))
if work
@mgr.async.assign(work)
else
after(0) { fetch }
end
@ -58,8 +57,34 @@ module Sidekiq
def self.done?
@done
end
end
private
class BasicFetch
def initialize(options)
@strictly_ordered_queues = !!options[:strict]
@queues = options[:queues].map { |q| "queue:#{q}" }
@unique_queues = @queues.uniq
end
def retrieve_work
UnitOfWork.new(*Sidekiq.redis { |conn| conn.brpop(*queues_cmd) })
end
UnitOfWork = Struct.new(:queue, :message) do
def acknowledge
# nothing to do
end
def queue_name
queue.gsub(/.*queue:/, '')
end
def requeue
Sidekiq.redis do |conn|
conn.rpush(queue, message)
end
end
end
# Creating the Redis#blpop command takes into account any
# configured queue weights. By default Redis#blpop returns
@ -67,10 +92,10 @@ module Sidekiq
# recreate the queue command each time we invoke Redis#blpop
# to honor weights and avoid queue starvation.
def queues_cmd
return @unique_queues.dup << TIMEOUT if @strictly_ordered_queues
return @unique_queues.dup << Sidekiq::Fetcher::TIMEOUT if @strictly_ordered_queues
queues = @queues.sample(@unique_queues.size).uniq
queues.concat(@unique_queues - queues)
queues << TIMEOUT
queues << Sidekiq::Fetcher::TIMEOUT
end
end
end

View file

@ -25,7 +25,7 @@ module Sidekiq
@in_progress = {}
@done = false
@busy = []
@fetcher = Fetcher.new(current_actor, options[:queues], !!options[:strict])
@fetcher = Fetcher.new(current_actor, options)
@ready = @count.times.map { Processor.new_link(current_actor) }
procline(options[:tag] ? "#{options[:tag]} " : '')
end
@ -86,21 +86,19 @@ module Sidekiq
end
end
def assign(msg, queue)
def assign(work)
watchdog("Manager#assign died") do
if stopped?
# Race condition between Manager#stop if Fetcher
# is blocked on redis and gets a message after
# all the ready Processors have been stopped.
# Push the message back to redis.
Sidekiq.redis do |conn|
conn.lpush("queue:#{queue}", msg)
end
work.requeue
else
processor = @ready.pop
@in_progress[processor.object_id] = [msg, queue]
@in_progress[processor.object_id] = work
@busy << processor
processor.async.process(msg, queue)
processor.async.process(work)
end
end
end

View file

@ -30,7 +30,9 @@ module Sidekiq
@boss = boss
end
def process(msgstr, queue)
def process(work)
msgstr = work.message
queue = work.queue_name
defer do
begin
msg = Sidekiq.load_json(msgstr)
@ -46,6 +48,8 @@ module Sidekiq
rescue Exception => ex
handle_exception(ex, msg || { :message => msgstr })
raise
ensure
work.acknowledge
end
end
@boss.async.processor_done(current_actor)

View file

@ -42,7 +42,7 @@ class TestClient < MiniTest::Unit::TestCase
end
it 'pushes messages to redis' do
@redis.expect :rpush, 1, ['queue:foo', String]
@redis.expect :lpush, 1, ['queue:foo', String]
pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2])
assert pushed
assert_equal 24, pushed.size
@ -50,7 +50,7 @@ class TestClient < MiniTest::Unit::TestCase
end
it 'pushes messages to redis using a String class' do
@redis.expect :rpush, 1, ['queue:foo', String]
@redis.expect :lpush, 1, ['queue:foo', String]
pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => 'MyWorker', 'args' => [1, 2])
assert pushed
assert_equal 24, pushed.size
@ -66,28 +66,28 @@ class TestClient < MiniTest::Unit::TestCase
end
it 'handles perform_async' do
@redis.expect :rpush, 1, ['queue:default', String]
@redis.expect :lpush, 1, ['queue:default', String]
pushed = MyWorker.perform_async(1, 2)
assert pushed
@redis.verify
end
it 'handles perform_async on failure' do
@redis.expect :rpush, nil, ['queue:default', String]
@redis.expect :lpush, nil, ['queue:default', String]
pushed = MyWorker.perform_async(1, 2)
refute pushed
@redis.verify
end
it 'enqueues messages to redis' do
@redis.expect :rpush, 1, ['queue:default', String]
@redis.expect :lpush, 1, ['queue:default', String]
pushed = Sidekiq::Client.enqueue(MyWorker, 1, 2)
assert pushed
@redis.verify
end
it 'enqueues messages to redis' do
@redis.expect :rpush, 1, ['queue:custom_queue', String]
@redis.expect :lpush, 1, ['queue:custom_queue', String]
pushed = Sidekiq::Client.enqueue_to(:custom_queue, MyWorker, 1, 2)
assert pushed
@redis.verify
@ -99,7 +99,7 @@ class TestClient < MiniTest::Unit::TestCase
end
it 'enqueues to the named queue' do
@redis.expect :rpush, 1, ['queue:flimflam', String]
@redis.expect :lpush, 1, ['queue:flimflam', String]
pushed = QueuedWorker.perform_async(1, 2)
assert pushed
@redis.verify
@ -117,6 +117,9 @@ class TestClient < MiniTest::Unit::TestCase
end
describe 'bulk' do
after do
Sidekiq::Queue.new.clear
end
it 'can push a large set of jobs at once' do
a = Time.now
count = Sidekiq::Client.push_bulk('class' => QueuedWorker, 'args' => (1..1_000).to_a.map { |x| Array(x) })

View file

@ -2,12 +2,30 @@ require 'helper'
require 'sidekiq/fetch'
class TestFetcher < MiniTest::Unit::TestCase
describe 'Fetcher#queues_cmd' do
describe 'when queues are strictly ordered' do
it 'returns the unique ordered queues properly based on priority and order they were passed in' do
fetcher = Sidekiq::Fetcher.new nil, %w[high medium low default], true
assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd)
def setup
Sidekiq.redis do |conn|
conn.del('queue:basic')
conn.rpush('queue:basic', 'msg')
end
end
def test_basic_fetch_retrieve
fetch = Sidekiq::BasicFetch.new(:queues => ['basic', 'bar'])
uow = fetch.retrieve_work
refute_nil uow
assert_equal 'basic', uow.queue_name
assert_equal 'msg', uow.message
q = Sidekiq::Queue.new('basic')
assert_equal 0, q.size
uow.requeue
assert_equal 1, q.size
assert_nil uow.acknowledge
end
def test_basic_fetch_strict_retrieve
fetch = Sidekiq::BasicFetch.new(:queues => ['basic', 'bar', 'bar'], :strict => true)
cmd = fetch.queues_cmd
assert_equal cmd, ['queue:basic', 'queue:bar', 1]
end
end

View file

@ -83,7 +83,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
actor = MiniTest::Mock.new
actor.expect(:processor_done, nil, [processor])
boss.expect(:async, actor, [])
processor.process(msg, 'default')
processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg))
assert_equal %w(2 before 3 before 0 before work_performed 0 after 3 after 2 after), $recorder.flatten
end

View file

@ -23,12 +23,16 @@ class TestProcessor < MiniTest::Unit::TestCase
end
end
def work(msg, queue='queue:default')
Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
end
it 'processes as expected' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = MiniTest::Mock.new
actor.expect(:processor_done, nil, [@processor])
@boss.expect(:async, actor, [])
@processor.process(msg, 'default')
@processor.process(work(msg))
@boss.verify
assert_equal 1, $invokes
end
@ -36,7 +40,7 @@ class TestProcessor < MiniTest::Unit::TestCase
it 'passes exceptions to ExceptionHandler' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(msg, 'default')
@processor.process(work(msg))
flunk "Expected #process to raise exception"
rescue TestException
end
@ -49,7 +53,7 @@ class TestProcessor < MiniTest::Unit::TestCase
re_raise = false
begin
@processor.process(msg, 'default')
@processor.process(work(msg))
rescue TestException
re_raise = true
end
@ -64,7 +68,7 @@ class TestProcessor < MiniTest::Unit::TestCase
actor = MiniTest::Mock.new
actor.expect(:processor_done, nil, [processor])
@boss.expect(:async, actor, [])
processor.process(msgstr, 'default')
processor.process(work(msgstr))
assert_equal [['myarg']], msg['args']
end
@ -79,7 +83,7 @@ class TestProcessor < MiniTest::Unit::TestCase
actor = MiniTest::Mock.new
actor.expect(:processor_done, nil, [@processor])
@boss.expect(:async, actor, [])
@processor.process(msg, 'default')
@processor.process(work(msg))
end
it 'increments processed stat' do
@ -100,7 +104,7 @@ class TestProcessor < MiniTest::Unit::TestCase
def failed_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(msg, 'default')
@processor.process(work(msg))
rescue TestException
end
end