mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
commit
9c81f99772
4 changed files with 23 additions and 18 deletions
|
@ -17,9 +17,10 @@ module Sidekiq
|
|||
attr_reader :manager, :poller, :fetcher
|
||||
|
||||
def initialize(options)
|
||||
@manager = Sidekiq::Manager.new_link options
|
||||
@condvar = Celluloid::Condition.new
|
||||
@manager = Sidekiq::Manager.new_link(@condvar, options)
|
||||
@poller = Sidekiq::Scheduled::Poller.new_link
|
||||
@fetcher = Sidekiq::Fetcher.new_link @manager, options
|
||||
@fetcher = Sidekiq::Fetcher.new_link(@manager, options)
|
||||
@manager.fetcher = @fetcher
|
||||
@done = false
|
||||
@options = options
|
||||
|
@ -49,7 +50,8 @@ module Sidekiq
|
|||
poller.terminate if poller.alive?
|
||||
|
||||
manager.async.stop(:shutdown => true, :timeout => @options[:timeout])
|
||||
manager.wait(:shutdown) if manager.alive?
|
||||
@condvar.wait
|
||||
manager.terminate
|
||||
|
||||
# Requeue everything in case there was a worker who grabbed work while stopped
|
||||
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
|
||||
|
|
|
@ -22,11 +22,12 @@ module Sidekiq
|
|||
|
||||
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1
|
||||
|
||||
def initialize(options={})
|
||||
def initialize(condvar, options={})
|
||||
logger.debug { options.inspect }
|
||||
@options = options
|
||||
@count = options[:concurrency] || 25
|
||||
@done_callback = nil
|
||||
@finished = condvar
|
||||
|
||||
@in_progress = {}
|
||||
@threads = {}
|
||||
|
@ -183,7 +184,7 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
signal_shutdown
|
||||
@finished.signal
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -204,11 +205,7 @@ module Sidekiq
|
|||
|
||||
def shutdown
|
||||
requeue
|
||||
signal_shutdown
|
||||
end
|
||||
|
||||
def signal_shutdown
|
||||
after(0) { signal(:shutdown) }
|
||||
@finished.signal
|
||||
end
|
||||
|
||||
def requeue
|
||||
|
|
|
@ -17,7 +17,7 @@ Gem::Specification.new do |gem|
|
|||
gem.add_dependency 'redis', '>= 3.0.6'
|
||||
gem.add_dependency 'redis-namespace', '>= 1.3.1'
|
||||
gem.add_dependency 'connection_pool', '>= 2.0.0'
|
||||
gem.add_dependency 'celluloid', '0.15.2'
|
||||
gem.add_dependency 'celluloid', '>= 0.16.0'
|
||||
gem.add_dependency 'json'
|
||||
gem.add_development_dependency 'sinatra'
|
||||
gem.add_development_dependency 'minitest', '~> 5.3.3'
|
||||
|
|
|
@ -9,8 +9,14 @@ class TestManager < Sidekiq::Test
|
|||
Sidekiq.redis {|c| c.flushdb }
|
||||
end
|
||||
|
||||
def new_manager(opts)
|
||||
condvar = Minitest::Mock.new
|
||||
condvar.expect(:signal, nil, [])
|
||||
Sidekiq::Manager.new(condvar, opts)
|
||||
end
|
||||
|
||||
it 'creates N processor instances' do
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr = new_manager(options)
|
||||
assert_equal options[:concurrency], mgr.ready.size
|
||||
assert_equal [], mgr.busy
|
||||
end
|
||||
|
@ -21,7 +27,7 @@ class TestManager < Sidekiq::Test
|
|||
processor.expect(:async, processor, [])
|
||||
processor.expect(:process, nil, [uow])
|
||||
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr = new_manager(options)
|
||||
mgr.ready << processor
|
||||
mgr.assign(uow)
|
||||
assert_equal 1, mgr.busy.size
|
||||
|
@ -33,7 +39,7 @@ class TestManager < Sidekiq::Test
|
|||
uow = Minitest::Mock.new
|
||||
uow.expect(:requeue, nil, [])
|
||||
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr = new_manager(options)
|
||||
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
||||
mgr.stop
|
||||
mgr.assign(uow)
|
||||
|
@ -41,7 +47,7 @@ class TestManager < Sidekiq::Test
|
|||
end
|
||||
|
||||
it 'shuts down the system' do
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr = new_manager(options)
|
||||
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
||||
mgr.stop
|
||||
|
||||
|
@ -53,7 +59,7 @@ class TestManager < Sidekiq::Test
|
|||
fetcher = MiniTest::Mock.new
|
||||
fetcher.expect :async, fetcher, []
|
||||
fetcher.expect :fetch, nil, []
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr = new_manager(options)
|
||||
mgr.fetcher = fetcher
|
||||
init_size = mgr.ready.size
|
||||
processor = mgr.ready.pop
|
||||
|
@ -69,7 +75,7 @@ class TestManager < Sidekiq::Test
|
|||
fetcher = MiniTest::Mock.new
|
||||
fetcher.expect :async, fetcher, []
|
||||
fetcher.expect :fetch, nil, []
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr = new_manager(options)
|
||||
mgr.fetcher = fetcher
|
||||
init_size = mgr.ready.size
|
||||
processor = mgr.ready.pop
|
||||
|
@ -90,7 +96,7 @@ class TestManager < Sidekiq::Test
|
|||
@processor.expect(:async, @processor, [])
|
||||
@processor.expect(:process, nil, [uow])
|
||||
|
||||
@mgr = Sidekiq::Manager.new(options)
|
||||
@mgr = new_manager(options)
|
||||
@mgr.ready << @processor
|
||||
@mgr.assign(uow)
|
||||
|
||||
|
|
Loading…
Reference in a new issue