2013-05-10 23:43:53 -04:00
|
|
|
require 'helper'
|
|
|
|
require 'sidekiq/manager'
|
|
|
|
|
2013-09-22 17:38:33 -04:00
|
|
|
class TestManager < Sidekiq::Test
|
2013-05-10 23:43:53 -04:00
|
|
|
|
|
|
|
describe 'manager' do
|
2014-04-14 06:30:59 -04:00
|
|
|
before do
|
|
|
|
Sidekiq.redis {|c| c.flushdb }
|
|
|
|
end
|
|
|
|
|
2013-05-10 23:43:53 -04:00
|
|
|
it 'creates N processor instances' do
|
|
|
|
mgr = Sidekiq::Manager.new(options)
|
|
|
|
assert_equal options[:concurrency], mgr.ready.size
|
|
|
|
assert_equal [], mgr.busy
|
|
|
|
end
|
|
|
|
|
|
|
|
it 'assigns work to a processor' do
|
2013-12-10 18:33:53 -05:00
|
|
|
uow = Object.new
|
2013-05-12 17:25:30 -04:00
|
|
|
processor = Minitest::Mock.new
|
2013-05-10 23:43:53 -04:00
|
|
|
processor.expect(:async, processor, [])
|
|
|
|
processor.expect(:process, nil, [uow])
|
|
|
|
|
|
|
|
mgr = Sidekiq::Manager.new(options)
|
|
|
|
mgr.ready << processor
|
|
|
|
mgr.assign(uow)
|
|
|
|
assert_equal 1, mgr.busy.size
|
|
|
|
|
|
|
|
processor.verify
|
|
|
|
end
|
|
|
|
|
|
|
|
it 'requeues work if stopping' do
|
2013-05-12 17:25:30 -04:00
|
|
|
uow = Minitest::Mock.new
|
2013-05-10 23:43:53 -04:00
|
|
|
uow.expect(:requeue, nil, [])
|
|
|
|
|
|
|
|
mgr = Sidekiq::Manager.new(options)
|
2013-11-21 06:09:30 -05:00
|
|
|
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
2013-05-10 23:43:53 -04:00
|
|
|
mgr.stop
|
|
|
|
mgr.assign(uow)
|
|
|
|
uow.verify
|
|
|
|
end
|
|
|
|
|
|
|
|
it 'shuts down the system' do
|
|
|
|
mgr = Sidekiq::Manager.new(options)
|
2013-11-21 06:09:30 -05:00
|
|
|
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
|
2013-05-10 23:43:53 -04:00
|
|
|
mgr.stop
|
|
|
|
|
|
|
|
assert mgr.busy.empty?
|
|
|
|
assert mgr.ready.empty?
|
|
|
|
end
|
|
|
|
|
|
|
|
it 'returns finished processors to the ready pool' do
|
2013-09-22 17:05:29 -04:00
|
|
|
fetcher = MiniTest::Mock.new
|
|
|
|
fetcher.expect :async, fetcher, []
|
|
|
|
fetcher.expect :fetch, nil, []
|
2013-05-10 23:43:53 -04:00
|
|
|
mgr = Sidekiq::Manager.new(options)
|
2013-09-22 17:05:29 -04:00
|
|
|
mgr.fetcher = fetcher
|
2013-05-10 23:43:53 -04:00
|
|
|
init_size = mgr.ready.size
|
|
|
|
processor = mgr.ready.pop
|
|
|
|
mgr.busy << processor
|
|
|
|
mgr.processor_done(processor)
|
|
|
|
|
|
|
|
assert_equal 0, mgr.busy.size
|
|
|
|
assert_equal init_size, mgr.ready.size
|
2013-09-22 17:05:29 -04:00
|
|
|
fetcher.verify
|
2013-05-10 23:43:53 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
it 'throws away dead processors' do
|
2013-09-22 17:05:29 -04:00
|
|
|
fetcher = MiniTest::Mock.new
|
|
|
|
fetcher.expect :async, fetcher, []
|
|
|
|
fetcher.expect :fetch, nil, []
|
2013-05-10 23:43:53 -04:00
|
|
|
mgr = Sidekiq::Manager.new(options)
|
2013-09-22 17:05:29 -04:00
|
|
|
mgr.fetcher = fetcher
|
2013-05-10 23:43:53 -04:00
|
|
|
init_size = mgr.ready.size
|
|
|
|
processor = mgr.ready.pop
|
|
|
|
mgr.busy << processor
|
|
|
|
mgr.processor_died(processor, 'ignored')
|
|
|
|
|
|
|
|
assert_equal 0, mgr.busy.size
|
|
|
|
assert_equal init_size, mgr.ready.size
|
|
|
|
refute mgr.ready.include?(processor)
|
2013-09-22 17:05:29 -04:00
|
|
|
fetcher.verify
|
2013-05-10 23:43:53 -04:00
|
|
|
end
|
|
|
|
|
2014-04-13 18:30:25 -04:00
|
|
|
describe 'heartbeat' do
|
2014-04-14 06:30:59 -04:00
|
|
|
before do
|
|
|
|
uow = Object.new
|
|
|
|
|
|
|
|
@processor = Minitest::Mock.new
|
|
|
|
@processor.expect(:async, @processor, [])
|
|
|
|
@processor.expect(:process, nil, [uow])
|
|
|
|
|
|
|
|
@mgr = Sidekiq::Manager.new(options)
|
|
|
|
@mgr.ready << @processor
|
|
|
|
@mgr.assign(uow)
|
|
|
|
|
|
|
|
@processor.verify
|
|
|
|
end
|
|
|
|
|
|
|
|
describe 'when manager is active' do
|
|
|
|
before do
|
|
|
|
@mgr.heartbeat('identity', heartbeat_data)
|
|
|
|
end
|
2014-04-13 18:30:25 -04:00
|
|
|
|
2014-04-14 06:30:59 -04:00
|
|
|
it 'sets useful info to proctitle' do
|
2014-04-13 18:30:25 -04:00
|
|
|
proctitle = $0
|
2014-04-14 06:30:59 -04:00
|
|
|
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0
|
2014-04-13 18:30:25 -04:00
|
|
|
$0 = proctitle
|
|
|
|
end
|
|
|
|
|
2014-04-14 06:30:59 -04:00
|
|
|
it 'stores process info in redis' do
|
|
|
|
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
|
|
|
|
assert_equal ["1"], info
|
|
|
|
expires = Sidekiq.redis { |c| c.pttl('identity') }
|
|
|
|
assert_in_delta 60000, expires, 1
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
describe 'when manager is stopped' do
|
|
|
|
before do
|
|
|
|
@processor.expect(:alive?, [])
|
|
|
|
@processor.expect(:terminate, [])
|
|
|
|
|
|
|
|
@mgr.stop
|
|
|
|
@mgr.processor_done(@processor)
|
|
|
|
@mgr.heartbeat('identity', heartbeat_data)
|
2014-04-13 18:30:25 -04:00
|
|
|
|
2014-04-14 06:30:59 -04:00
|
|
|
@processor.verify
|
|
|
|
end
|
|
|
|
|
|
|
|
it 'indicates status in proctitle' do
|
2014-04-13 18:30:25 -04:00
|
|
|
proctitle = $0
|
2014-04-14 06:30:59 -04:00
|
|
|
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [0 of 3 busy] stopping", $0
|
2014-04-13 18:30:25 -04:00
|
|
|
$0 = proctitle
|
|
|
|
end
|
2014-04-14 06:30:59 -04:00
|
|
|
|
|
|
|
it 'stores process info in redis' do
|
|
|
|
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
|
|
|
|
assert_equal ["0"], info
|
|
|
|
expires = Sidekiq.redis { |c| c.pttl('identity') }
|
|
|
|
assert_in_delta 60000, expires, 1
|
|
|
|
end
|
2014-04-13 18:30:25 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2013-05-10 23:43:53 -04:00
|
|
|
def options
|
|
|
|
{ :concurrency => 3, :queues => ['default'] }
|
|
|
|
end
|
2014-04-13 18:30:25 -04:00
|
|
|
|
|
|
|
def heartbeat_data
|
|
|
|
{ 'concurrency' => 3, 'tag' => 'myapp' }
|
|
|
|
end
|
2013-05-10 23:43:53 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
end
|