1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Merge branch 'patch-4' of git://github.com/dimko/sidekiq into dimko-patch-4

This commit is contained in:
Mike Perham 2013-05-24 20:12:34 -07:00
commit 3dda03d000
4 changed files with 61 additions and 30 deletions

View file

@ -250,7 +250,7 @@ module Sidekiq
results.map do |message| results.map do |message|
msg = Sidekiq.load_json(message) msg = Sidekiq.load_json(message)
msg['retry_count'] = msg['retry_count'] - 1 msg['retry_count'] = msg['retry_count'] - 1
conn.lpush("queue:#{msg['queue']}", Sidekiq.dump_json(msg)) Sidekiq::Client.push(msg)
end end
end end
end end

View file

@ -129,7 +129,7 @@ module Sidekiq
normalized_item = Sidekiq::Worker::ClassMethods::DEFAULT_OPTIONS.merge(item) normalized_item = Sidekiq::Worker::ClassMethods::DEFAULT_OPTIONS.merge(item)
end end
normalized_item['jid'] = SecureRandom.hex(12) normalized_item['jid'] ||= SecureRandom.hex(12)
normalized_item['enqueued_at'] = Time.now.to_f normalized_item['enqueued_at'] = Time.now.to_f
normalized_item normalized_item
end end

View file

@ -39,10 +39,7 @@ module Sidekiq
# the queue, it's because another process already popped it so we can move on to the # the queue, it's because another process already popped it so we can move on to the
# next one. # next one.
if conn.zrem(sorted_set, message) if conn.zrem(sorted_set, message)
conn.multi do Sidekiq::Client.push(msg)
conn.sadd('queues', msg['queue'])
conn.lpush("queue:#{msg['queue']}", Sidekiq.dump_json(msg))
end
logger.debug { "enqueued #{sorted_set}: #{message}" } logger.debug { "enqueued #{sorted_set}: #{message}" }
end end
end end

View file

@ -14,38 +14,72 @@ class TestScheduled < Minitest::Test
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.flushdb conn.flushdb
end end
@error_1 = { 'class' => ScheduledWorker.name, 'args' => [0], 'queue' => 'queue_1' }
@error_2 = { 'class' => ScheduledWorker.name, 'args' => [1], 'queue' => 'queue_2' }
@error_3 = { 'class' => ScheduledWorker.name, 'args' => [2], 'queue' => 'queue_3' }
@future_1 = { 'class' => ScheduledWorker.name, 'args' => [3], 'queue' => 'queue_4' }
@future_2 = { 'class' => ScheduledWorker.name, 'args' => [4], 'queue' => 'queue_5' }
@future_3 = { 'class' => ScheduledWorker.name, 'args' => [5], 'queue' => 'queue_6' }
@retry = Sidekiq::RetrySet.new
@scheduled = Sidekiq::ScheduledSet.new
@poller = Sidekiq::Scheduled::Poller.new
end
class Stopper
def call(worker_class, message, queue)
yield if message['args'].first.odd?
end
end
it 'executes client middleware' do
Sidekiq.client_middleware.add Stopper
begin
@retry.schedule (Time.now - 60).to_f, @error_1
@retry.schedule (Time.now - 60).to_f, @error_2
@scheduled.schedule (Time.now - 60).to_f, @future_2
@scheduled.schedule (Time.now - 60).to_f, @future_3
@poller.poll
Sidekiq.redis do |conn|
assert_equal 0, conn.llen("queue:queue_1")
assert_equal 1, conn.llen("queue:queue_2")
assert_equal 0, conn.llen("queue:queue_5")
assert_equal 1, conn.llen("queue:queue_6")
end
ensure
Sidekiq.client_middleware.remove Stopper
end
end end
it 'should empty the retry and scheduled queues up to the current time' do it 'should empty the retry and scheduled queues up to the current time' do
enqueued_time = Time.new(2013, 2, 4) enqueued_time = Time.new(2013, 2, 4)
Time.stub(:now, enqueued_time) do Time.stub(:now, enqueued_time) do
@retry.schedule (Time.now - 60).to_f, @error_1
@retry.schedule (Time.now - 50).to_f, @error_2
@retry.schedule (Time.now + 60).to_f, @error_3
@scheduled.schedule (Time.now - 60).to_f, @future_1
@scheduled.schedule (Time.now - 50).to_f, @future_2
@scheduled.schedule (Time.now + 60).to_f, @future_3
@poller.poll
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
error_1 = { 'class' => ScheduledWorker.name, 'args' => ["error_1"], 'queue' => 'queue_1' } assert_equal 1, conn.llen("queue:queue_1")
error_2 = { 'class' => ScheduledWorker.name, 'args' => ["error_2"], 'queue' => 'queue_2' } assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_1", 0, -1)[0])['enqueued_at']
error_3 = { 'class' => ScheduledWorker.name, 'args' => ["error_3"], 'queue' => 'queue_3' } assert_equal 1, conn.llen("queue:queue_2")
future_1 = { 'class' => ScheduledWorker.name, 'args' => ["future_1"], 'queue' => 'queue_4' } assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_2", 0, -1)[0])['enqueued_at']
future_2 = { 'class' => ScheduledWorker.name, 'args' => ["future_2"], 'queue' => 'queue_5' } assert_equal 1, conn.llen("queue:queue_4")
future_3 = { 'class' => ScheduledWorker.name, 'args' => ["future_3"], 'queue' => 'queue_6' } assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_4", 0, -1)[0])['enqueued_at']
assert_equal 1, conn.llen("queue:queue_5")
conn.zadd("retry", (Time.now - 60).to_f.to_s, Sidekiq.dump_json(error_1)) assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_5", 0, -1)[0])['enqueued_at']
conn.zadd("retry", (Time.now - 50).to_f.to_s, Sidekiq.dump_json(error_2))
conn.zadd("retry", (Time.now + 60).to_f.to_s, Sidekiq.dump_json(error_3))
conn.zadd("schedule", (Time.now - 60).to_f.to_s, Sidekiq.dump_json(future_1))
conn.zadd("schedule", (Time.now - 50).to_f.to_s, Sidekiq.dump_json(future_2))
conn.zadd("schedule", (Time.now + 60).to_f.to_s, Sidekiq.dump_json(future_3))
poller = Sidekiq::Scheduled::Poller.new
poller.poll
assert_equal [Sidekiq.dump_json(error_1.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_1", 0, -1)
assert_equal [Sidekiq.dump_json(error_2.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_2", 0, -1)
assert_equal [Sidekiq.dump_json(future_1.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_4", 0, -1)
assert_equal [Sidekiq.dump_json(future_2.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_5", 0, -1)
assert_equal [Sidekiq.dump_json(error_3)], conn.zrange("retry", 0, -1)
assert_equal [Sidekiq.dump_json(future_3)], conn.zrange("schedule", 0, -1)
end end
assert_equal 1, @retry.size
assert_equal 1, @scheduled.size
end end
end end
end end