From e2c06148049b4cff7586265a540e16be73c4f476 Mon Sep 17 00:00:00 2001 From: Jason Ardell Date: Wed, 16 Jan 2013 16:48:51 -0500 Subject: [PATCH] Improve efficiency and reliability of re-queueing when workers are terminated. #623 --- lib/sidekiq/manager.rb | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 7115ef50..83f26c7d 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -120,13 +120,28 @@ module Sidekiq end conn.srem('workers', workers_to_remove) + # Re-enqueue terminated jobs + # NOTE: You may notice that we may push a job back to redis before + # the worker thread is terminated. This is ok because Sidekiq's + # contract says that jobs are run AT LEAST once. Process termination + # is delayed until we're certain the jobs are back in Redis because + # it is worse to lose a job than to run it twice. + jobs_to_requeue = {} @busy.each do |processor| # processor is an actor proxy and we can't call any methods # that would go to the actor (since it's busy). Instead # we'll use the object_id to track the worker's data here. - processor.terminate if processor.alive? unit_of_work = @in_progress[processor.object_id] - conn.rpush(unit_of_work.queue, unit_of_work.message) + jobs_to_requeue[unit_of_work.queue] ||= [] + jobs_to_requeue[unit_of_work.queue] << unit_of_work.message + end + jobs_to_requeue.each do |queue, jobs| + conn.rpush(queue, jobs) + end + + # Lastly, terminate worker threads + @busy.each do |processor| + processor.terminate if processor.alive? end end logger.info("Pushed #{@busy.size} messages back to Redis")