mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Change watchdog to propagate exceptions so Sidekiq quickly dies, #1194.
This commit is contained in:
parent
1e24e783a0
commit
a7b422af9b
4 changed files with 27 additions and 10 deletions
|
@ -10,6 +10,7 @@ module Sidekiq
|
|||
# immediately.
|
||||
class Launcher
|
||||
include Actor
|
||||
include Util
|
||||
|
||||
trap_exit :actor_died
|
||||
|
||||
|
@ -19,6 +20,7 @@ module Sidekiq
|
|||
@manager = Sidekiq::Manager.new_link options
|
||||
@poller = Sidekiq::Scheduled::Poller.new_link
|
||||
@fetcher = Sidekiq::Fetcher.new_link @manager, options
|
||||
@manager.fetcher = @fetcher
|
||||
@done = false
|
||||
@options = options
|
||||
end
|
||||
|
@ -31,18 +33,22 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def run
|
||||
manager.async.start(fetcher)
|
||||
poller.async.poll(true)
|
||||
watchdog('Launcher#stop') do
|
||||
manager.async.start
|
||||
poller.async.poll(true)
|
||||
end
|
||||
end
|
||||
|
||||
def stop
|
||||
@done = true
|
||||
Sidekiq::Fetcher.done!
|
||||
fetcher.async.terminate if fetcher.alive?
|
||||
poller.async.terminate if poller.alive?
|
||||
watchdog('Launcher#stop') do
|
||||
@done = true
|
||||
Sidekiq::Fetcher.done!
|
||||
fetcher.async.terminate if fetcher.alive?
|
||||
poller.async.terminate if poller.alive?
|
||||
|
||||
manager.async.stop(:shutdown => true, :timeout => @options[:timeout])
|
||||
manager.wait(:shutdown)
|
||||
manager.async.stop(:shutdown => true, :timeout => @options[:timeout])
|
||||
manager.wait(:shutdown)
|
||||
end
|
||||
end
|
||||
|
||||
def procline(tag)
|
||||
|
|
|
@ -17,6 +17,7 @@ module Sidekiq
|
|||
|
||||
attr_reader :ready
|
||||
attr_reader :busy
|
||||
attr_accessor :fetcher
|
||||
|
||||
def initialize(options={})
|
||||
logger.debug { options.inspect }
|
||||
|
@ -52,8 +53,7 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def start(fetcher)
|
||||
@fetcher = fetcher
|
||||
def start
|
||||
@ready.each { dispatch }
|
||||
end
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ module Sidekiq
|
|||
yield
|
||||
rescue Exception => ex
|
||||
handle_exception(ex, { :context => last_words })
|
||||
raise ex
|
||||
end
|
||||
|
||||
def logger
|
||||
|
|
|
@ -43,7 +43,11 @@ class TestManager < Minitest::Test
|
|||
end
|
||||
|
||||
it 'returns finished processors to the ready pool' do
|
||||
fetcher = MiniTest::Mock.new
|
||||
fetcher.expect :async, fetcher, []
|
||||
fetcher.expect :fetch, nil, []
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr.fetcher = fetcher
|
||||
init_size = mgr.ready.size
|
||||
processor = mgr.ready.pop
|
||||
mgr.busy << processor
|
||||
|
@ -51,10 +55,15 @@ class TestManager < Minitest::Test
|
|||
|
||||
assert_equal 0, mgr.busy.size
|
||||
assert_equal init_size, mgr.ready.size
|
||||
fetcher.verify
|
||||
end
|
||||
|
||||
it 'throws away dead processors' do
|
||||
fetcher = MiniTest::Mock.new
|
||||
fetcher.expect :async, fetcher, []
|
||||
fetcher.expect :fetch, nil, []
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr.fetcher = fetcher
|
||||
init_size = mgr.ready.size
|
||||
processor = mgr.ready.pop
|
||||
mgr.busy << processor
|
||||
|
@ -63,6 +72,7 @@ class TestManager < Minitest::Test
|
|||
assert_equal 0, mgr.busy.size
|
||||
assert_equal init_size, mgr.ready.size
|
||||
refute mgr.ready.include?(processor)
|
||||
fetcher.verify
|
||||
end
|
||||
|
||||
def options
|
||||
|
|
Loading…
Reference in a new issue