1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Groundwork for transfering reliable fetch working queue back to public queue on graceful shutdown.

This commit is contained in:
Jonathan Hyman 2013-11-21 06:09:30 -05:00
parent 6b0629d54f
commit 5bcda3e5f1
3 changed files with 22 additions and 2 deletions

View file

@ -47,6 +47,10 @@ module Sidekiq
end end
end end
def clean_up
@strategy.clean_up
end
def handle_fetch_exception(ex) def handle_fetch_exception(ex)
if !@down if !@down
logger.error("Error fetching message: #{ex}") logger.error("Error fetching message: #{ex}")
@ -90,6 +94,9 @@ module Sidekiq
UnitOfWork.new(*work) if work UnitOfWork.new(*work) if work
end end
def clean_up
end
def self.bulk_requeue(inprogress) def self.bulk_requeue(inprogress)
Sidekiq.logger.debug { "Re-queueing terminated jobs" } Sidekiq.logger.debug { "Re-queueing terminated jobs" }
jobs_to_requeue = {} jobs_to_requeue = {}

View file

@ -19,6 +19,8 @@ module Sidekiq
attr_reader :busy attr_reader :busy
attr_accessor :fetcher attr_accessor :fetcher
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 3
def initialize(options={}) def initialize(options={})
logger.debug { options.inspect } logger.debug { options.inspect }
@count = options[:concurrency] || 25 @count = options[:concurrency] || 25
@ -47,12 +49,21 @@ module Sidekiq
@ready.clear @ready.clear
clear_worker_set clear_worker_set
clean_up_for_graceful_shutdown
return after(0) { signal(:shutdown) } if @busy.empty?
hard_shutdown_in timeout if shutdown hard_shutdown_in timeout if shutdown
end end
end end
def clean_up_for_graceful_shutdown
if @busy.empty?
@fetcher.clean_up
return after(0) { signal(:shutdown) }
end
after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
end
def start def start
@ready.each { dispatch } @ready.each { dispatch }
end end
@ -159,7 +170,7 @@ module Sidekiq
# it is worse to lose a job than to run it twice. # it is worse to lose a job than to run it twice.
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values) Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values)
logger.debug { "Terminating #{@busy.size} busy worker threads" } logger.warn { "Terminating #{@busy.size} busy worker threads" }
@busy.each do |processor| @busy.each do |processor|
if processor.alive? && t = @threads.delete(processor.object_id) if processor.alive? && t = @threads.delete(processor.object_id)
t.raise Shutdown t.raise Shutdown

View file

@ -29,6 +29,7 @@ class TestManager < Sidekiq::Test
uow.expect(:requeue, nil, []) uow.expect(:requeue, nil, [])
mgr = Sidekiq::Manager.new(options) mgr = Sidekiq::Manager.new(options)
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
mgr.stop mgr.stop
mgr.assign(uow) mgr.assign(uow)
uow.verify uow.verify
@ -36,6 +37,7 @@ class TestManager < Sidekiq::Test
it 'shuts down the system' do it 'shuts down the system' do
mgr = Sidekiq::Manager.new(options) mgr = Sidekiq::Manager.new(options)
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
mgr.stop mgr.stop
assert mgr.busy.empty? assert mgr.busy.empty?