diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index a59eb916..2a00f81b 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -10,6 +10,7 @@ module Sidekiq # immediately. class Launcher include Actor + include Util trap_exit :actor_died @@ -19,6 +20,7 @@ module Sidekiq @manager = Sidekiq::Manager.new_link options @poller = Sidekiq::Scheduled::Poller.new_link @fetcher = Sidekiq::Fetcher.new_link @manager, options + @manager.fetcher = @fetcher @done = false @options = options end @@ -31,18 +33,22 @@ module Sidekiq end def run - manager.async.start(fetcher) - poller.async.poll(true) + watchdog('Launcher#stop') do + manager.async.start + poller.async.poll(true) + end end def stop - @done = true - Sidekiq::Fetcher.done! - fetcher.async.terminate if fetcher.alive? - poller.async.terminate if poller.alive? + watchdog('Launcher#stop') do + @done = true + Sidekiq::Fetcher.done! + fetcher.async.terminate if fetcher.alive? + poller.async.terminate if poller.alive? - manager.async.stop(:shutdown => true, :timeout => @options[:timeout]) - manager.wait(:shutdown) + manager.async.stop(:shutdown => true, :timeout => @options[:timeout]) + manager.wait(:shutdown) + end end def procline(tag) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 994bc81a..227352eb 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -17,6 +17,7 @@ module Sidekiq attr_reader :ready attr_reader :busy + attr_accessor :fetcher def initialize(options={}) logger.debug { options.inspect } @@ -52,8 +53,7 @@ module Sidekiq end end - def start(fetcher) - @fetcher = fetcher + def start @ready.each { dispatch } end diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index 1dc1ce60..a8422b7f 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -15,6 +15,7 @@ module Sidekiq yield rescue Exception => ex handle_exception(ex, { :context => last_words }) + raise ex end def logger diff --git a/test/test_manager.rb b/test/test_manager.rb index 2ff7b162..22c941f5 100644 --- a/test/test_manager.rb +++ b/test/test_manager.rb @@ -43,7 +43,11 @@ class TestManager < Minitest::Test end it 'returns finished processors to the ready pool' do + fetcher = MiniTest::Mock.new + fetcher.expect :async, fetcher, [] + fetcher.expect :fetch, nil, [] mgr = Sidekiq::Manager.new(options) + mgr.fetcher = fetcher init_size = mgr.ready.size processor = mgr.ready.pop mgr.busy << processor @@ -51,10 +55,15 @@ class TestManager < Minitest::Test assert_equal 0, mgr.busy.size assert_equal init_size, mgr.ready.size + fetcher.verify end it 'throws away dead processors' do + fetcher = MiniTest::Mock.new + fetcher.expect :async, fetcher, [] + fetcher.expect :fetch, nil, [] mgr = Sidekiq::Manager.new(options) + mgr.fetcher = fetcher init_size = mgr.ready.size processor = mgr.ready.pop mgr.busy << processor @@ -63,6 +72,7 @@ class TestManager < Minitest::Test assert_equal 0, mgr.busy.size assert_equal init_size, mgr.ready.size refute mgr.ready.include?(processor) + fetcher.verify end def options