mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Rewrite shutdown signalling, fixes #2056
There's a race condition between sending the shutdown message and waiting for the "shutdown finished" signal to come back. Instead use a shared CondVar which is not racy.
This commit is contained in:
parent
279beef14c
commit
b4a6e9460d
4 changed files with 23 additions and 18 deletions
|
@ -17,9 +17,10 @@ module Sidekiq
|
||||||
attr_reader :manager, :poller, :fetcher
|
attr_reader :manager, :poller, :fetcher
|
||||||
|
|
||||||
def initialize(options)
|
def initialize(options)
|
||||||
@manager = Sidekiq::Manager.new_link options
|
@condvar = Celluloid::Condition.new
|
||||||
|
@manager = Sidekiq::Manager.new_link(@condvar, options)
|
||||||
@poller = Sidekiq::Scheduled::Poller.new_link
|
@poller = Sidekiq::Scheduled::Poller.new_link
|
||||||
@fetcher = Sidekiq::Fetcher.new_link @manager, options
|
@fetcher = Sidekiq::Fetcher.new_link(@manager, options)
|
||||||
@manager.fetcher = @fetcher
|
@manager.fetcher = @fetcher
|
||||||
@done = false
|
@done = false
|
||||||
@options = options
|
@options = options
|
||||||
|
@ -49,7 +50,8 @@ module Sidekiq
|
||||||
poller.terminate if poller.alive?
|
poller.terminate if poller.alive?
|
||||||
|
|
||||||
manager.async.stop(:shutdown => true, :timeout => @options[:timeout])
|
manager.async.stop(:shutdown => true, :timeout => @options[:timeout])
|
||||||
manager.wait(:shutdown) if manager.alive?
|
@condvar.wait
|
||||||
|
manager.terminate
|
||||||
|
|
||||||
# Requeue everything in case there was a worker who grabbed work while stopped
|
# Requeue everything in case there was a worker who grabbed work while stopped
|
||||||
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
|
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
|
||||||
|
|
|
@ -22,11 +22,12 @@ module Sidekiq
|
||||||
|
|
||||||
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1
|
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1
|
||||||
|
|
||||||
def initialize(options={})
|
def initialize(condvar, options={})
|
||||||
logger.debug { options.inspect }
|
logger.debug { options.inspect }
|
||||||
@options = options
|
@options = options
|
||||||
@count = options[:concurrency] || 25
|
@count = options[:concurrency] || 25
|
||||||
@done_callback = nil
|
@done_callback = nil
|
||||||
|
@finished = condvar
|
||||||
|
|
||||||
@in_progress = {}
|
@in_progress = {}
|
||||||
@threads = {}
|
@threads = {}
|
||||||
|
@ -183,7 +184,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
signal_shutdown
|
@finished.signal
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -204,11 +205,7 @@ module Sidekiq
|
||||||
|
|
||||||
def shutdown
|
def shutdown
|
||||||
requeue
|
requeue
|
||||||
signal_shutdown
|
@finished.signal
|
||||||
end
|
|
||||||
|
|
||||||
def signal_shutdown
|
|
||||||
after(0) { signal(:shutdown) }
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def requeue
|
def requeue
|
||||||
|
|
|
@ -17,7 +17,7 @@ Gem::Specification.new do |gem|
|
||||||
gem.add_dependency 'redis', '>= 3.0.6'
|
gem.add_dependency 'redis', '>= 3.0.6'
|
||||||
gem.add_dependency 'redis-namespace', '>= 1.3.1'
|
gem.add_dependency 'redis-namespace', '>= 1.3.1'
|
||||||
gem.add_dependency 'connection_pool', '>= 2.0.0'
|
gem.add_dependency 'connection_pool', '>= 2.0.0'
|
||||||
gem.add_dependency 'celluloid', '0.15.2'
|
gem.add_dependency 'celluloid', '>= 0.16.0'
|
||||||
gem.add_dependency 'json'
|
gem.add_dependency 'json'
|
||||||
gem.add_development_dependency 'sinatra'
|
gem.add_development_dependency 'sinatra'
|
||||||
gem.add_development_dependency 'minitest', '~> 5.3.3'
|
gem.add_development_dependency 'minitest', '~> 5.3.3'
|
||||||
|
|
|
@ -9,8 +9,14 @@ class TestManager < Sidekiq::Test
|
||||||
Sidekiq.redis {|c| c.flushdb }
|
Sidekiq.redis {|c| c.flushdb }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def new_manager(opts)
|
||||||
|
condvar = Minitest::Mock.new
|
||||||
|
condvar.expect(:signal, nil, [])
|
||||||
|
Sidekiq::Manager.new(condvar, opts)
|
||||||
|
end
|
||||||
|
|
||||||
it 'creates N processor instances' do
|
it 'creates N processor instances' do
|
||||||
mgr = Sidekiq::Manager.new(options)
|
mgr = new_manager(options)
|
||||||
assert_equal options[:concurrency], mgr.ready.size
|
assert_equal options[:concurrency], mgr.ready.size
|
||||||
assert_equal [], mgr.busy
|
assert_equal [], mgr.busy
|
||||||
end
|
end
|
||||||
|
@ -21,7 +27,7 @@ class TestManager < Sidekiq::Test
|
||||||
processor.expect(:async, processor, [])
|
processor.expect(:async, processor, [])
|
||||||
processor.expect(:process, nil, [uow])
|
processor.expect(:process, nil, [uow])
|
||||||
|
|
||||||
mgr = Sidekiq::Manager.new(options)
|
mgr = new_manager(options)
|
||||||
mgr.ready << processor
|
mgr.ready << processor
|
||||||
mgr.assign(uow)
|
mgr.assign(uow)
|
||||||
assert_equal 1, mgr.busy.size
|
assert_equal 1, mgr.busy.size
|
||||||
|
@ -33,7 +39,7 @@ class TestManager < Sidekiq::Test
|
||||||
uow = Minitest::Mock.new
|
uow = Minitest::Mock.new
|
||||||
uow.expect(:requeue, nil, [])
|
uow.expect(:requeue, nil, [])
|
||||||
|
|
||||||
mgr = Sidekiq::Manager.new(options)
|
mgr = new_manager(options)
|
||||||
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
||||||
mgr.stop
|
mgr.stop
|
||||||
mgr.assign(uow)
|
mgr.assign(uow)
|
||||||
|
@ -41,7 +47,7 @@ class TestManager < Sidekiq::Test
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'shuts down the system' do
|
it 'shuts down the system' do
|
||||||
mgr = Sidekiq::Manager.new(options)
|
mgr = new_manager(options)
|
||||||
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
||||||
mgr.stop
|
mgr.stop
|
||||||
|
|
||||||
|
@ -53,7 +59,7 @@ class TestManager < Sidekiq::Test
|
||||||
fetcher = MiniTest::Mock.new
|
fetcher = MiniTest::Mock.new
|
||||||
fetcher.expect :async, fetcher, []
|
fetcher.expect :async, fetcher, []
|
||||||
fetcher.expect :fetch, nil, []
|
fetcher.expect :fetch, nil, []
|
||||||
mgr = Sidekiq::Manager.new(options)
|
mgr = new_manager(options)
|
||||||
mgr.fetcher = fetcher
|
mgr.fetcher = fetcher
|
||||||
init_size = mgr.ready.size
|
init_size = mgr.ready.size
|
||||||
processor = mgr.ready.pop
|
processor = mgr.ready.pop
|
||||||
|
@ -69,7 +75,7 @@ class TestManager < Sidekiq::Test
|
||||||
fetcher = MiniTest::Mock.new
|
fetcher = MiniTest::Mock.new
|
||||||
fetcher.expect :async, fetcher, []
|
fetcher.expect :async, fetcher, []
|
||||||
fetcher.expect :fetch, nil, []
|
fetcher.expect :fetch, nil, []
|
||||||
mgr = Sidekiq::Manager.new(options)
|
mgr = new_manager(options)
|
||||||
mgr.fetcher = fetcher
|
mgr.fetcher = fetcher
|
||||||
init_size = mgr.ready.size
|
init_size = mgr.ready.size
|
||||||
processor = mgr.ready.pop
|
processor = mgr.ready.pop
|
||||||
|
@ -90,7 +96,7 @@ class TestManager < Sidekiq::Test
|
||||||
@processor.expect(:async, @processor, [])
|
@processor.expect(:async, @processor, [])
|
||||||
@processor.expect(:process, nil, [uow])
|
@processor.expect(:process, nil, [uow])
|
||||||
|
|
||||||
@mgr = Sidekiq::Manager.new(options)
|
@mgr = new_manager(options)
|
||||||
@mgr.ready << @processor
|
@mgr.ready << @processor
|
||||||
@mgr.assign(uow)
|
@mgr.assign(uow)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue