1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Clear workers in redis before terminating worker threads. #623

This commit is contained in:
Jason Ardell 2013-01-16 16:56:15 -05:00
parent e2c0614804
commit e695e0554f

View file

@ -113,19 +113,13 @@ module Sidekiq
logger.info("Still waiting for #{@busy.size} busy workers") logger.info("Still waiting for #{@busy.size} busy workers")
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
logger.debug { "Clearing workers in redis" }
workers = conn.smembers('workers')
workers_to_remove = workers.select do |worker_name|
worker_name =~ /:#{process_id}-/
end
conn.srem('workers', workers_to_remove)
# Re-enqueue terminated jobs # Re-enqueue terminated jobs
# NOTE: You may notice that we may push a job back to redis before # NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's # the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination # contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because # is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice. # it is worse to lose a job than to run it twice.
logger.debug { "Re-queueing terminated jobs" }
jobs_to_requeue = {} jobs_to_requeue = {}
@busy.each do |processor| @busy.each do |processor|
# processor is an actor proxy and we can't call any methods # processor is an actor proxy and we can't call any methods
@ -139,7 +133,18 @@ module Sidekiq
conn.rpush(queue, jobs) conn.rpush(queue, jobs)
end end
# Lastly, terminate worker threads # Clearing workers in Redis
# NOTE: we do this before terminating worker threads because the
# process will likely receive a hard shutdown soon anyway, which
# means the threads will killed.
logger.debug { "Clearing workers in redis" }
workers = conn.smembers('workers')
workers_to_remove = workers.select do |worker_name|
worker_name =~ /:#{process_id}-/
end
conn.srem('workers', workers_to_remove)
logger.debug { "Terminate worker threads" }
@busy.each do |processor| @busy.each do |processor|
processor.terminate if processor.alive? processor.terminate if processor.alive?
end end