1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Avoid calling processor during hard shutdown, fixes #997

This commit is contained in:
Mike Perham 2013-06-10 22:20:15 -07:00
parent 84172d512b
commit 06acbd4f60
7 changed files with 41 additions and 13 deletions

View file

@ -3,6 +3,7 @@
- Revert back to Celluloid's TaskFiber for job processing which has proven to be more - Revert back to Celluloid's TaskFiber for job processing which has proven to be more
stable than TaskThread. [#985] stable than TaskThread. [#985]
- Avoid possible lockup during hard shutdown [#997]
At this point, if you are experiencing stability issues with Sidekiq in At this point, if you are experiencing stability issues with Sidekiq in
Ruby 1.9, please try Ruby 2.0. It seems to be more stable. Ruby 1.9, please try Ruby 2.0. It seems to be more stable.

View file

@ -25,10 +25,11 @@ module Sidekiq
@done_callback = nil @done_callback = nil
@in_progress = {} @in_progress = {}
@threads = {}
@done = false @done = false
@busy = [] @busy = []
@fetcher = Fetcher.new(current_actor, options) @fetcher = Fetcher.new(current_actor, options)
@ready = @count.times.map { Processor.new_link(current_actor) } @ready = @count.times.map { Processor.new_link(current_actor).tap {|p| p.proxy_id = p.object_id} }
end end
def stop(options={}) def stop(options={})
@ -63,6 +64,7 @@ module Sidekiq
watchdog('Manager#processor_done died') do watchdog('Manager#processor_done died') do
@done_callback.call(processor) if @done_callback @done_callback.call(processor) if @done_callback
@in_progress.delete(processor.object_id) @in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor) @busy.delete(processor)
if stopped? if stopped?
processor.terminate if processor.alive? processor.terminate if processor.alive?
@ -77,10 +79,13 @@ module Sidekiq
def processor_died(processor, reason) def processor_died(processor, reason)
watchdog("Manager#processor_died died") do watchdog("Manager#processor_died died") do
@in_progress.delete(processor.object_id) @in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor) @busy.delete(processor)
unless stopped? unless stopped?
@ready << Processor.new_link(current_actor) @ready << Processor.new_link(current_actor).tap do |p|
p.proxy_id = p.object_id
end
dispatch dispatch
else else
signal(:shutdown) if @busy.empty? signal(:shutdown) if @busy.empty?
@ -105,6 +110,14 @@ module Sidekiq
end end
end end
# A hack worthy of Rube Goldberg. We need to be able
# to hard stop a working thread. But there's no way for us to
# get handle to the underlying thread performing work for a processor
# so we have it call us and tell us.
def real_thread(proxy_id, thr)
@threads[proxy_id] = thr
end
def procline(tag) def procline(tag)
"sidekiq #{Sidekiq::VERSION} #{tag}[#{@busy.size} of #{@count} busy]#{stopped? ? ' stopping' : ''}" "sidekiq #{Sidekiq::VERSION} #{tag}[#{@busy.size} of #{@count} busy]#{stopped? ? ' stopping' : ''}"
end end
@ -145,10 +158,9 @@ module Sidekiq
# it is worse to lose a job than to run it twice. # it is worse to lose a job than to run it twice.
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values) Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values)
logger.debug { "Terminating worker threads" } logger.debug { "Terminating #{@busy.size} busy worker threads" }
@busy.each do |processor| @busy.each do |processor|
if processor.alive? if processor.alive? && t = @threads.delete(processor.object_id)
t = processor.bare_object.actual_work_thread
t.raise Shutdown t.raise Shutdown
end end
end end

View file

@ -43,13 +43,12 @@ module Sidekiq
class RetryJobs class RetryJobs
include Sidekiq::Util include Sidekiq::Util
# delayed_job uses the same basic formula
DEFAULT_MAX_RETRY_ATTEMPTS = 25 DEFAULT_MAX_RETRY_ATTEMPTS = 25
def call(worker, msg, queue) def call(worker, msg, queue)
yield yield
rescue Sidekiq::Shutdown rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue # ignore, will be pushed back onto queue during hard_shutdown
raise raise
rescue Exception => e rescue Exception => e
raise e unless msg['retry'] raise e unless msg['retry']
@ -110,6 +109,7 @@ module Sidekiq
end end
end end
# delayed_job uses the same basic formula
def seconds_to_delay(count) def seconds_to_delay(count)
(count ** 4) + 15 + (rand(30)*(count+1)) (count ** 4) + 15 + (rand(30)*(count+1))
end end

View file

@ -24,10 +24,7 @@ module Sidekiq
end end
end end
# store the actual working thread so we attr_accessor :proxy_id
# can later kill if it necessary during
# hard shutdown.
attr_accessor :actual_work_thread
def initialize(boss) def initialize(boss)
@boss = boss @boss = boss
@ -37,8 +34,9 @@ module Sidekiq
msgstr = work.message msgstr = work.message
queue = work.queue_name queue = work.queue_name
@actual_work_thread = Thread.current
do_defer do do_defer do
@boss.async.real_thread(proxy_id, Thread.current)
begin begin
msg = Sidekiq.load_json(msgstr) msg = Sidekiq.load_json(msgstr)
klass = msg['class'].constantize klass = msg['class'].constantize

View file

@ -20,7 +20,7 @@ class WorkController < ApplicationController
def long def long
50.times do |x| 50.times do |x|
HardWorker.perform_async('bob', 10, x) HardWorker.perform_async('bob', 15, x)
end end
render :text => 'enqueued' render :text => 'enqueued'
end end

View file

@ -82,6 +82,8 @@ class TestMiddleware < Minitest::Test
processor = Sidekiq::Processor.new(boss) processor = Sidekiq::Processor.new(boss)
actor = Minitest::Mock.new actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [processor]) actor.expect(:processor_done, nil, [processor])
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
boss.expect(:async, actor, [])
boss.expect(:async, actor, []) boss.expect(:async, actor, [])
processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)) processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg))
assert_equal %w(2 before 3 before 0 before work_performed 0 after 3 after 2 after), $recorder.flatten assert_equal %w(2 before 3 before 0 before work_performed 0 after 3 after 2 after), $recorder.flatten

View file

@ -31,6 +31,8 @@ class TestProcessor < Minitest::Test
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [@processor]) actor.expect(:processor_done, nil, [@processor])
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, []) @boss.expect(:async, actor, [])
@processor.process(work(msg)) @processor.process(work(msg))
@boss.verify @boss.verify
@ -38,6 +40,9 @@ class TestProcessor < Minitest::Test
end end
it 'passes exceptions to ExceptionHandler' do it 'passes exceptions to ExceptionHandler' do
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin begin
@processor.process(work(msg)) @processor.process(work(msg))
@ -51,6 +56,9 @@ class TestProcessor < Minitest::Test
it 're-raises exceptions after handling' do it 're-raises exceptions after handling' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
re_raise = false re_raise = false
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
begin begin
@processor.process(work(msg)) @processor.process(work(msg))
@ -67,6 +75,8 @@ class TestProcessor < Minitest::Test
processor = ::Sidekiq::Processor.new(@boss) processor = ::Sidekiq::Processor.new(@boss)
actor = Minitest::Mock.new actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [processor]) actor.expect(:processor_done, nil, [processor])
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, []) @boss.expect(:async, actor, [])
processor.process(work(msgstr)) processor.process(work(msgstr))
assert_equal [['myarg']], msg['args'] assert_equal [['myarg']], msg['args']
@ -93,8 +103,10 @@ class TestProcessor < Minitest::Test
def successful_job def successful_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
actor.expect(:processor_done, nil, [@processor]) actor.expect(:processor_done, nil, [@processor])
@boss.expect(:async, actor, []) @boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
@processor.process(work(msg)) @processor.process(work(msg))
end end
@ -118,6 +130,9 @@ class TestProcessor < Minitest::Test
let(:failed_today_key) { "stat:failed:#{Time.now.utc.to_date}" } let(:failed_today_key) { "stat:failed:#{Time.now.utc.to_date}" }
def failed_job def failed_job
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Celluloid::Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin begin
@processor.process(work(msg)) @processor.process(work(msg))