mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Update Celluloid API usage, use .async, fixes #500
This commit is contained in:
parent
083681ea18
commit
110cc1dc42
7 changed files with 24 additions and 16 deletions
|
@ -12,7 +12,7 @@ end
|
|||
trap 'USR1' do
|
||||
Sidekiq.logger.info "Received USR1, no longer accepting new work"
|
||||
mgr = Sidekiq::CLI.instance.manager
|
||||
mgr.stop! if mgr
|
||||
mgr.async.stop if mgr
|
||||
end
|
||||
|
||||
trap 'TTIN' do
|
||||
|
@ -79,13 +79,13 @@ module Sidekiq
|
|||
poller = Sidekiq::Scheduled::Poller.new
|
||||
begin
|
||||
logger.info 'Starting processing, hit Ctrl-C to stop'
|
||||
@manager.start!
|
||||
poller.poll!(true)
|
||||
@manager.async.start
|
||||
poller.async.poll(true)
|
||||
sleep
|
||||
rescue Interrupt
|
||||
logger.info 'Shutting down'
|
||||
poller.terminate! if poller.alive?
|
||||
@manager.stop!(:shutdown => true, :timeout => options[:timeout])
|
||||
poller.async.terminate if poller.alive?
|
||||
@manager.async.stop(:shutdown => true, :timeout => options[:timeout])
|
||||
@manager.wait(:shutdown)
|
||||
# Explicitly exit so busy Processor threads can't block
|
||||
# process shutdown.
|
||||
|
|
|
@ -37,7 +37,7 @@ module Sidekiq
|
|||
Sidekiq.redis { |conn| queue, msg = conn.blpop(*queues_cmd) }
|
||||
|
||||
if msg
|
||||
@mgr.assign!(msg, queue.gsub(/.*queue:/, ''))
|
||||
@mgr.async.assign(msg, queue.gsub(/.*queue:/, ''))
|
||||
else
|
||||
after(0) { fetch }
|
||||
end
|
||||
|
|
|
@ -37,7 +37,7 @@ module Sidekiq
|
|||
|
||||
@done = true
|
||||
Sidekiq::Fetcher.done!
|
||||
@fetcher.terminate! if @fetcher.alive?
|
||||
@fetcher.async.terminate if @fetcher.alive?
|
||||
|
||||
logger.info { "Shutting down #{@ready.size} quiet workers" }
|
||||
@ready.each { |x| x.terminate if x.alive? }
|
||||
|
@ -108,7 +108,7 @@ module Sidekiq
|
|||
processor = @ready.pop
|
||||
@in_progress[processor.object_id] = [msg, queue]
|
||||
@busy << processor
|
||||
processor.process!(msg, queue)
|
||||
processor.async.process(msg, queue)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -146,7 +146,7 @@ module Sidekiq
|
|||
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
|
||||
raise "No ready processor!?" if @ready.empty?
|
||||
|
||||
@fetcher.fetch!
|
||||
@fetcher.async.fetch
|
||||
end
|
||||
|
||||
def stopped?
|
||||
|
|
|
@ -47,7 +47,7 @@ module Sidekiq
|
|||
raise
|
||||
end
|
||||
end
|
||||
@boss.processor_done!(current_actor)
|
||||
@boss.async.processor_done(current_actor)
|
||||
end
|
||||
|
||||
# See http://github.com/tarcieri/celluloid/issues/22
|
||||
|
|
|
@ -52,7 +52,9 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
|
||||
boss = MiniTest::Mock.new
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
actor = MiniTest::Mock.new
|
||||
actor.expect(:processor_done, nil, [processor])
|
||||
boss.expect(:async, actor, [])
|
||||
processor.process(msg, 'default')
|
||||
assert_equal %w(0 before work_performed 0 after), $recorder.flatten
|
||||
end
|
||||
|
|
|
@ -25,7 +25,9 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
|
||||
it 'processes as expected' do
|
||||
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
|
||||
@boss.expect(:processor_done!, nil, [@processor])
|
||||
actor = MiniTest::Mock.new
|
||||
actor.expect(:processor_done, nil, [@processor])
|
||||
@boss.expect(:async, actor, [])
|
||||
@processor.process(msg, 'default')
|
||||
@boss.verify
|
||||
assert_equal 1, $invokes
|
||||
|
@ -59,7 +61,9 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
|
||||
msgstr = Sidekiq.dump_json(msg)
|
||||
processor = ::Sidekiq::Processor.new(@boss)
|
||||
@boss.expect(:processor_done!, nil, [processor])
|
||||
actor = MiniTest::Mock.new
|
||||
actor.expect(:processor_done, nil, [processor])
|
||||
@boss.expect(:async, actor, [])
|
||||
processor.process(msgstr, 'default')
|
||||
assert_equal [['myarg']], msg['args']
|
||||
end
|
||||
|
|
|
@ -21,6 +21,7 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
it 'updates global stats in the success case' do
|
||||
msg = Sidekiq.dump_json({ 'class' => DumbWorker.to_s, 'args' => [""] })
|
||||
boss = MiniTest::Mock.new
|
||||
actor = MiniTest::Mock.new
|
||||
|
||||
@redis.with do |conn|
|
||||
|
||||
|
@ -28,9 +29,10 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
assert_equal 0, set.size
|
||||
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
3.times do
|
||||
actor.expect(:processor_done, nil, [processor])
|
||||
boss.expect(:async, actor, [])
|
||||
end
|
||||
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 0, Sidekiq.info[:processed]
|
||||
|
|
Loading…
Reference in a new issue