mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
clear worker set upon clean shutdown, fixes #840
This commit is contained in:
parent
32a26173a1
commit
511cca1a22
1 changed files with 17 additions and 13 deletions
|
@ -42,6 +42,8 @@ module Sidekiq
|
|||
@ready.each { |x| x.terminate if x.alive? }
|
||||
@ready.clear
|
||||
|
||||
clear_worker_set
|
||||
|
||||
return after(0) { signal(:shutdown) } if @busy.empty?
|
||||
hard_shutdown_in timeout if shutdown
|
||||
end
|
||||
|
@ -107,6 +109,21 @@ module Sidekiq
|
|||
|
||||
private
|
||||
|
||||
def clear_worker_set
|
||||
# Clearing workers in Redis
|
||||
# NOTE: we do this before terminating worker threads because the
|
||||
# process will likely receive a hard shutdown soon anyway, which
|
||||
# means the threads will killed.
|
||||
logger.debug { "Clearing workers in redis" }
|
||||
Sidekiq.redis do |conn|
|
||||
workers = conn.smembers('workers')
|
||||
workers_to_remove = workers.select do |worker_name|
|
||||
worker_name =~ /:#{process_id}-/
|
||||
end
|
||||
conn.srem('workers', workers_to_remove) if !workers_to_remove.empty?
|
||||
end
|
||||
end
|
||||
|
||||
def hard_shutdown_in(delay)
|
||||
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }
|
||||
|
||||
|
@ -124,19 +141,6 @@ module Sidekiq
|
|||
# it is worse to lose a job than to run it twice.
|
||||
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values)
|
||||
|
||||
# Clearing workers in Redis
|
||||
# NOTE: we do this before terminating worker threads because the
|
||||
# process will likely receive a hard shutdown soon anyway, which
|
||||
# means the threads will killed.
|
||||
logger.debug { "Clearing workers in redis" }
|
||||
Sidekiq.redis do |conn|
|
||||
workers = conn.smembers('workers')
|
||||
workers_to_remove = workers.select do |worker_name|
|
||||
worker_name =~ /:#{process_id}-/
|
||||
end
|
||||
conn.srem('workers', workers_to_remove) if !workers_to_remove.empty?
|
||||
end
|
||||
|
||||
logger.debug { "Terminating worker threads" }
|
||||
@busy.each do |processor|
|
||||
t = processor.bare_object.actual_work_thread
|
||||
|
|
Loading…
Reference in a new issue