1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Handle multiple jobs with same score, fixes #1771

This commit is contained in:
Mike Perham 2014-06-11 20:31:39 -07:00
parent 819c4ed66b
commit 7968583fbe
2 changed files with 55 additions and 14 deletions

View file

@ -301,32 +301,56 @@ module Sidekiq
end
def add_to_queue
Sidekiq.redis do |conn|
results = conn.multi do
conn.zrangebyscore('schedule', score, score)
conn.zremrangebyscore('schedule', score, score)
end.first
results.map do |message|
msg = Sidekiq.load_json(message)
Sidekiq::Client.push(msg)
end
remove_job do |message|
msg = Sidekiq.load_json(message)
Sidekiq::Client.push(msg)
end
end
def retry
raise "Retry not available on jobs which have not failed" unless item["failed_at"]
remove_job do |message|
msg = Sidekiq.load_json(message)
msg['retry_count'] = msg['retry_count'] - 1
Sidekiq::Client.push(msg)
end
end
private
def remove_job
Sidekiq.redis do |conn|
results = conn.multi do
conn.zrangebyscore(parent.name, score, score)
conn.zremrangebyscore(parent.name, score, score)
end.first
results.map do |message|
msg = Sidekiq.load_json(message)
msg['retry_count'] = msg['retry_count'] - 1
Sidekiq::Client.push(msg)
if results.size == 1
yield results.first
else
# multiple jobs with the same score
# find the one with the right JID and push it
hash = results.group_by do |message|
if message.index(jid)
msg = Sidekiq.load_json(message)
msg['jid'] == jid
else
false
end
end
message = hash[true].first
yield message
# push the rest back onto the sorted set
conn.multi do
hash[false].each do |message|
conn.zadd(parent.name, score.to_f.to_s, message)
end
end
end
end
end
end
class SortedSet

View file

@ -219,6 +219,7 @@ class TestApi < Sidekiq::Test
end
it "can move scheduled job to queue" do
remain_id = ApiWorker.perform_in(100, 1, 'jason')
job_id = ApiWorker.perform_in(100, 1, 'jason')
job = Sidekiq::ScheduledSet.new.find_job(job_id)
q = Sidekiq::Queue.new
@ -226,8 +227,24 @@ class TestApi < Sidekiq::Test
queued_job = q.find_job(job_id)
refute_nil queued_job
assert_equal queued_job.jid, job_id
assert_nil Sidekiq::ScheduledSet.new.find_job(job_id)
refute_nil Sidekiq::ScheduledSet.new.find_job(remain_id)
end
it "handles multiple scheduled jobs when moving to queue" do
jids = Sidekiq::Client.push_bulk('class' => ApiWorker,
'args' => [[1, 'jason'], [2, 'jason']],
'at' => Time.now.to_f)
assert_equal 2, jids.size
(remain_id, job_id) = jids
job = Sidekiq::ScheduledSet.new.find_job(job_id)
assert_nil job
q = Sidekiq::Queue.new
job.add_to_queue
queued_job = q.find_job(job_id)
refute_nil queued_job
assert_equal queued_job.jid, job_id
assert_nil Sidekiq::ScheduledSet.new.find_job(job_id)
refute_nil Sidekiq::ScheduledSet.new.find_job(remain_id)
end
it 'can find job by id in sorted sets' do