From 7968583fbe9c5fbe1afa484ac093d5d40b74b293 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Wed, 11 Jun 2014 20:31:39 -0700 Subject: [PATCH] Handle multiple jobs with same score, fixes #1771 --- lib/sidekiq/api.rb | 50 ++++++++++++++++++++++++++++++++++------------ test/test_api.rb | 19 +++++++++++++++++- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 88971163..0ba73e14 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -301,32 +301,56 @@ module Sidekiq end def add_to_queue - Sidekiq.redis do |conn| - results = conn.multi do - conn.zrangebyscore('schedule', score, score) - conn.zremrangebyscore('schedule', score, score) - end.first - results.map do |message| - msg = Sidekiq.load_json(message) - Sidekiq::Client.push(msg) - end + remove_job do |message| + msg = Sidekiq.load_json(message) + Sidekiq::Client.push(msg) end end def retry raise "Retry not available on jobs which have not failed" unless item["failed_at"] + remove_job do |message| + msg = Sidekiq.load_json(message) + msg['retry_count'] = msg['retry_count'] - 1 + Sidekiq::Client.push(msg) + end + end + + private + + def remove_job Sidekiq.redis do |conn| results = conn.multi do conn.zrangebyscore(parent.name, score, score) conn.zremrangebyscore(parent.name, score, score) end.first - results.map do |message| - msg = Sidekiq.load_json(message) - msg['retry_count'] = msg['retry_count'] - 1 - Sidekiq::Client.push(msg) + + if results.size == 1 + yield results.first + else + # multiple jobs with the same score + # find the one with the right JID and push it + hash = results.group_by do |message| + if message.index(jid) + msg = Sidekiq.load_json(message) + msg['jid'] == jid + else + false + end + end + message = hash[true].first + yield message + + # push the rest back onto the sorted set + conn.multi do + hash[false].each do |message| + conn.zadd(parent.name, score.to_f.to_s, message) + end + end end end end + end class SortedSet diff --git a/test/test_api.rb b/test/test_api.rb index 3ba7a282..95598ff6 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -219,6 +219,7 @@ class TestApi < Sidekiq::Test end it "can move scheduled job to queue" do + remain_id = ApiWorker.perform_in(100, 1, 'jason') job_id = ApiWorker.perform_in(100, 1, 'jason') job = Sidekiq::ScheduledSet.new.find_job(job_id) q = Sidekiq::Queue.new @@ -226,8 +227,24 @@ class TestApi < Sidekiq::Test queued_job = q.find_job(job_id) refute_nil queued_job assert_equal queued_job.jid, job_id + assert_nil Sidekiq::ScheduledSet.new.find_job(job_id) + refute_nil Sidekiq::ScheduledSet.new.find_job(remain_id) + end + + it "handles multiple scheduled jobs when moving to queue" do + jids = Sidekiq::Client.push_bulk('class' => ApiWorker, + 'args' => [[1, 'jason'], [2, 'jason']], + 'at' => Time.now.to_f) + assert_equal 2, jids.size + (remain_id, job_id) = jids job = Sidekiq::ScheduledSet.new.find_job(job_id) - assert_nil job + q = Sidekiq::Queue.new + job.add_to_queue + queued_job = q.find_job(job_id) + refute_nil queued_job + assert_equal queued_job.jid, job_id + assert_nil Sidekiq::ScheduledSet.new.find_job(job_id) + refute_nil Sidekiq::ScheduledSet.new.find_job(remain_id) end it 'can find job by id in sorted sets' do