From c7828f1fb8a7578fe291349ccd0130c16c547e14 Mon Sep 17 00:00:00 2001 From: Dimko Date: Fri, 24 May 2013 16:22:24 +0400 Subject: [PATCH] Scheduled and retry jobs now executes client middleware --- lib/sidekiq/api.rb | 2 +- lib/sidekiq/client.rb | 2 +- lib/sidekiq/scheduled.rb | 5 +-- test/test_scheduled.rb | 82 ++++++++++++++++++++++++++++------------ 4 files changed, 61 insertions(+), 30 deletions(-) diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 100b542f..1b700082 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -246,7 +246,7 @@ module Sidekiq results.map do |message| msg = Sidekiq.load_json(message) msg['retry_count'] = msg['retry_count'] - 1 - conn.lpush("queue:#{msg['queue']}", Sidekiq.dump_json(msg)) + Sidekiq::Client.push(msg) end end end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index b1796341..9870110c 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -129,7 +129,7 @@ module Sidekiq normalized_item = Sidekiq::Worker::ClassMethods::DEFAULT_OPTIONS.merge(item) end - normalized_item['jid'] = SecureRandom.hex(12) + normalized_item['jid'] ||= SecureRandom.hex(12) normalized_item['enqueued_at'] = Time.now.to_f normalized_item end diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 7e1ac808..53574f3b 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -39,10 +39,7 @@ module Sidekiq # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, message) - conn.multi do - conn.sadd('queues', msg['queue']) - conn.lpush("queue:#{msg['queue']}", Sidekiq.dump_json(msg)) - end + Sidekiq::Client.push(msg) logger.debug { "enqueued #{sorted_set}: #{message}" } end end diff --git a/test/test_scheduled.rb b/test/test_scheduled.rb index efd2a09c..37ea5040 100644 --- a/test/test_scheduled.rb +++ b/test/test_scheduled.rb @@ -14,38 +14,72 @@ class TestScheduled < Minitest::Test Sidekiq.redis do |conn| conn.flushdb end + + @error_1 = { 'class' => ScheduledWorker.name, 'args' => [0], 'queue' => 'queue_1' } + @error_2 = { 'class' => ScheduledWorker.name, 'args' => [1], 'queue' => 'queue_2' } + @error_3 = { 'class' => ScheduledWorker.name, 'args' => [2], 'queue' => 'queue_3' } + @future_1 = { 'class' => ScheduledWorker.name, 'args' => [3], 'queue' => 'queue_4' } + @future_2 = { 'class' => ScheduledWorker.name, 'args' => [4], 'queue' => 'queue_5' } + @future_3 = { 'class' => ScheduledWorker.name, 'args' => [5], 'queue' => 'queue_6' } + + @retry = Sidekiq::RetrySet.new + @scheduled = Sidekiq::ScheduledSet.new + @poller = Sidekiq::Scheduled::Poller.new + end + + class Stopper + def call(worker_class, message, queue) + yield if message['args'].first.odd? + end + end + + it 'executes client middleware' do + Sidekiq.client_middleware.add Stopper + begin + @retry.schedule (Time.now - 60).to_f, @error_1 + @retry.schedule (Time.now - 60).to_f, @error_2 + @scheduled.schedule (Time.now - 60).to_f, @future_2 + @scheduled.schedule (Time.now - 60).to_f, @future_3 + + @poller.poll + + Sidekiq.redis do |conn| + assert_equal 0, conn.llen("queue:queue_1") + assert_equal 1, conn.llen("queue:queue_2") + assert_equal 0, conn.llen("queue:queue_5") + assert_equal 1, conn.llen("queue:queue_6") + end + ensure + Sidekiq.client_middleware.remove Stopper + end end it 'should empty the retry and scheduled queues up to the current time' do enqueued_time = Time.new(2013, 2, 4) Time.stub(:now, enqueued_time) do + @retry.schedule (Time.now - 60).to_f, @error_1 + @retry.schedule (Time.now - 50).to_f, @error_2 + @retry.schedule (Time.now + 60).to_f, @error_3 + @scheduled.schedule (Time.now - 60).to_f, @future_1 + @scheduled.schedule (Time.now - 50).to_f, @future_2 + @scheduled.schedule (Time.now + 60).to_f, @future_3 + + @poller.poll + Sidekiq.redis do |conn| - error_1 = { 'class' => ScheduledWorker.name, 'args' => ["error_1"], 'queue' => 'queue_1' } - error_2 = { 'class' => ScheduledWorker.name, 'args' => ["error_2"], 'queue' => 'queue_2' } - error_3 = { 'class' => ScheduledWorker.name, 'args' => ["error_3"], 'queue' => 'queue_3' } - future_1 = { 'class' => ScheduledWorker.name, 'args' => ["future_1"], 'queue' => 'queue_4' } - future_2 = { 'class' => ScheduledWorker.name, 'args' => ["future_2"], 'queue' => 'queue_5' } - future_3 = { 'class' => ScheduledWorker.name, 'args' => ["future_3"], 'queue' => 'queue_6' } - - conn.zadd("retry", (Time.now - 60).to_f.to_s, Sidekiq.dump_json(error_1)) - conn.zadd("retry", (Time.now - 50).to_f.to_s, Sidekiq.dump_json(error_2)) - conn.zadd("retry", (Time.now + 60).to_f.to_s, Sidekiq.dump_json(error_3)) - conn.zadd("schedule", (Time.now - 60).to_f.to_s, Sidekiq.dump_json(future_1)) - conn.zadd("schedule", (Time.now - 50).to_f.to_s, Sidekiq.dump_json(future_2)) - conn.zadd("schedule", (Time.now + 60).to_f.to_s, Sidekiq.dump_json(future_3)) - - poller = Sidekiq::Scheduled::Poller.new - poller.poll - - assert_equal [Sidekiq.dump_json(error_1.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_1", 0, -1) - assert_equal [Sidekiq.dump_json(error_2.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_2", 0, -1) - assert_equal [Sidekiq.dump_json(future_1.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_4", 0, -1) - assert_equal [Sidekiq.dump_json(future_2.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_5", 0, -1) - - assert_equal [Sidekiq.dump_json(error_3)], conn.zrange("retry", 0, -1) - assert_equal [Sidekiq.dump_json(future_3)], conn.zrange("schedule", 0, -1) + assert_equal 1, conn.llen("queue:queue_1") + assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_1", 0, -1)[0])['enqueued_at'] + assert_equal 1, conn.llen("queue:queue_2") + assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_2", 0, -1)[0])['enqueued_at'] + assert_equal 1, conn.llen("queue:queue_4") + assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_4", 0, -1)[0])['enqueued_at'] + assert_equal 1, conn.llen("queue:queue_5") + assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_5", 0, -1)[0])['enqueued_at'] end + + assert_equal 1, @retry.size + assert_equal 1, @scheduled.size end end end