diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 9c526bac..da355deb 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -28,17 +28,21 @@ module Sidekiq now = Time.now.to_f.to_s Sidekiq.redis do |conn| SETS.each do |sorted_set| - (messages, _) = conn.multi do - conn.zrangebyscore(sorted_set, '-inf', now) - conn.zremrangebyscore(sorted_set, '-inf', now) - end - - messages.each do |message| - logger.debug { "enqueued #{sorted_set}: #{message}" } + # Get the next item in the queue if it's score (time to execute) is <= now. + # We need to go through the list one at a time to reduce the risk of something + # going wrong between the time jobs are popped from the scheduled queue and when + # they are pushed onto a work queue and losing the jobs. + while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do msg = Sidekiq.load_json(message) - conn.multi do - conn.sadd('queues', msg['queue']) - conn.rpush("queue:#{msg['queue']}", message) + # Pop item off the queue and add it to the work queue. If the job can't be popped from + # the queue, it's because another process already popped it so we can move on to the + # next one. + if conn.zrem(sorted_set, message) + conn.multi do + conn.sadd('queues', msg['queue']) + conn.rpush("queue:#{msg['queue']}", message) + end + logger.debug("enqueued #{sorted_set}: #{message}") if logger.debug? end end end diff --git a/test/test_retry.rb b/test/test_retry.rb index 7b411594..a035c03a 100644 --- a/test/test_retry.rb +++ b/test/test_retry.rb @@ -199,26 +199,4 @@ class TestRetry < MiniTest::Unit::TestCase end end - describe 'poller' do - before do - @redis = MiniTest::Mock.new - Sidekiq.instance_variable_set(:@redis, @redis) - - def @redis.with; yield self; end - end - - it 'should poll like a bad mother...SHUT YO MOUTH' do - fake_msg = Sidekiq.dump_json({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' }) - @redis.expect :multi, [[fake_msg], 1], [] - @redis.expect :multi, [[], nil], [] - @redis.expect :multi, [[], nil], [] - @redis.expect :multi, [[], nil], [] - - inst = Sidekiq::Scheduled::Poller.new - inst.poll - - @redis.verify - end - end - end diff --git a/test/test_scheduled.rb b/test/test_scheduled.rb new file mode 100644 index 00000000..84c7586b --- /dev/null +++ b/test/test_scheduled.rb @@ -0,0 +1,48 @@ +require 'helper' +require 'sidekiq/scheduled' + +class TestScheduled < MiniTest::Unit::TestCase + class ScheduledWorker + include Sidekiq::Worker + def perform(x) + end + end + + describe 'poller' do + before do + Sidekiq.redis = REDIS + Sidekiq.redis do |conn| + conn.flushdb + end + end + + it 'should empty the retry and scheduled queues up to the current time' do + Sidekiq.redis do |conn| + error_1 = Sidekiq.dump_json('class' => ScheduledWorker.name, 'args' => ["error_1"], 'queue' => 'queue_1') + error_2 = Sidekiq.dump_json('class' => ScheduledWorker.name, 'args' => ["error_2"], 'queue' => 'queue_2') + error_3 = Sidekiq.dump_json('class' => ScheduledWorker.name, 'args' => ["error_3"], 'queue' => 'queue_3') + future_1 = Sidekiq.dump_json('class' => ScheduledWorker.name, 'args' => ["future_1"], 'queue' => 'queue_4') + future_2 = Sidekiq.dump_json('class' => ScheduledWorker.name, 'args' => ["future_2"], 'queue' => 'queue_5') + future_3 = Sidekiq.dump_json('class' => ScheduledWorker.name, 'args' => ["future_3"], 'queue' => 'queue_6') + + conn.zadd("retry", (Time.now - 60).to_f.to_s, error_1) + conn.zadd("retry", (Time.now - 50).to_f.to_s, error_2) + conn.zadd("retry", (Time.now + 60).to_f.to_s, error_3) + conn.zadd("schedule", (Time.now - 60).to_f.to_s, future_1) + conn.zadd("schedule", (Time.now - 50).to_f.to_s, future_2) + conn.zadd("schedule", (Time.now + 60).to_f.to_s, future_3) + + poller = Sidekiq::Scheduled::Poller.new + poller.poll + poller.terminate + + assert_equal [error_1], conn.lrange("queue:queue_1", 0, -1) + assert_equal [error_2], conn.lrange("queue:queue_2", 0, -1) + assert_equal [error_3], conn.zrange("retry", 0, -1) + assert_equal [future_1], conn.lrange("queue:queue_4", 0, -1) + assert_equal [future_2], conn.lrange("queue:queue_5", 0, -1) + assert_equal [future_3], conn.zrange("schedule", 0, -1) + end + end + end +end