From 5bcda3e5f1ad885ce555f70f6b34002bfc99e0a3 Mon Sep 17 00:00:00 2001 From: Jonathan Hyman Date: Thu, 21 Nov 2013 06:09:30 -0500 Subject: [PATCH] Groundwork for transfering reliable fetch working queue back to public queue on graceful shutdown. --- lib/sidekiq/fetch.rb | 7 +++++++ lib/sidekiq/manager.rb | 15 +++++++++++++-- test/test_manager.rb | 2 ++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index caa21629..b0910c83 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -47,6 +47,10 @@ module Sidekiq end end + def clean_up + @strategy.clean_up + end + def handle_fetch_exception(ex) if !@down logger.error("Error fetching message: #{ex}") @@ -90,6 +94,9 @@ module Sidekiq UnitOfWork.new(*work) if work end + def clean_up + end + def self.bulk_requeue(inprogress) Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 227352eb..1369b696 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -19,6 +19,8 @@ module Sidekiq attr_reader :busy attr_accessor :fetcher + SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 3 + def initialize(options={}) logger.debug { options.inspect } @count = options[:concurrency] || 25 @@ -47,12 +49,21 @@ module Sidekiq @ready.clear clear_worker_set + clean_up_for_graceful_shutdown - return after(0) { signal(:shutdown) } if @busy.empty? hard_shutdown_in timeout if shutdown end end + def clean_up_for_graceful_shutdown + if @busy.empty? + @fetcher.clean_up + return after(0) { signal(:shutdown) } + end + + after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown } + end + def start @ready.each { dispatch } end @@ -159,7 +170,7 @@ module Sidekiq # it is worse to lose a job than to run it twice. Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values) - logger.debug { "Terminating #{@busy.size} busy worker threads" } + logger.warn { "Terminating #{@busy.size} busy worker threads" } @busy.each do |processor| if processor.alive? && t = @threads.delete(processor.object_id) t.raise Shutdown diff --git a/test/test_manager.rb b/test/test_manager.rb index 9af7d078..584000fb 100644 --- a/test/test_manager.rb +++ b/test/test_manager.rb @@ -29,6 +29,7 @@ class TestManager < Sidekiq::Test uow.expect(:requeue, nil, []) mgr = Sidekiq::Manager.new(options) + mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []}) mgr.stop mgr.assign(uow) uow.verify @@ -36,6 +37,7 @@ class TestManager < Sidekiq::Test it 'shuts down the system' do mgr = Sidekiq::Manager.new(options) + mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []}) mgr.stop assert mgr.busy.empty?