mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Merge pull request #2545 from remind101/middleware_exceptions_ack_fix
Don't ack unless we attempted to start working on the job
This commit is contained in:
commit
f874161f53
2 changed files with 105 additions and 1 deletions
|
@ -40,7 +40,7 @@ module Sidekiq
|
|||
|
||||
@boss.async.real_thread(proxy_id, Thread.current)
|
||||
|
||||
ack = true
|
||||
ack = false
|
||||
begin
|
||||
msg = Sidekiq.load_json(msgstr)
|
||||
klass = msg['class'].constantize
|
||||
|
@ -49,9 +49,14 @@ module Sidekiq
|
|||
|
||||
stats(worker, msg, queue) do
|
||||
Sidekiq.server_middleware.invoke(worker, msg, queue) do
|
||||
# Only ack if we either attempted to start this job or
|
||||
# successfully completed it. This prevents us from
|
||||
# losing jobs if a middleware raises an exception before yielding
|
||||
ack = true
|
||||
execute_job(worker, cloned(msg['args']))
|
||||
end
|
||||
end
|
||||
ack = true
|
||||
rescue Sidekiq::Shutdown
|
||||
# Had to force kill this job because it didn't finish
|
||||
# within the timeout. Don't acknowledge the work since
|
||||
|
|
|
@ -88,6 +88,105 @@ class TestProcessor < Sidekiq::Test
|
|||
assert_equal [['myarg']], msg['args']
|
||||
end
|
||||
|
||||
describe 'acknowledgement' do
|
||||
class ExceptionRaisingMiddleware
|
||||
def initialize(raise_before_yield, raise_after_yield, skip)
|
||||
@raise_before_yield = raise_before_yield
|
||||
@raise_after_yield = raise_after_yield
|
||||
@skip = skip
|
||||
end
|
||||
|
||||
def call(worker, item, queue)
|
||||
raise TEST_EXCEPTION if @raise_before_yield
|
||||
yield unless @skip
|
||||
raise TEST_EXCEPTION if @raise_after_yield
|
||||
end
|
||||
end
|
||||
|
||||
let(:raise_before_yield) { false }
|
||||
let(:raise_after_yield) { false }
|
||||
let(:skip_job) { false }
|
||||
let(:worker_args) { ['myarg'] }
|
||||
let(:work) { MiniTest::Mock.new }
|
||||
let(:actor) { Minitest::Mock.new }
|
||||
|
||||
before do
|
||||
work.expect(:queue_name, 'queues:default')
|
||||
work.expect(:message, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
|
||||
Sidekiq.server_middleware do |chain|
|
||||
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
|
||||
end
|
||||
|
||||
actor.expect(:real_thread, nil, [nil, Thread])
|
||||
@boss.expect(:async, actor, [])
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq.server_middleware do |chain|
|
||||
chain.remove ExceptionRaisingMiddleware
|
||||
end
|
||||
work.verify
|
||||
end
|
||||
|
||||
describe 'middleware throws an exception before processing the work' do
|
||||
let(:raise_before_yield) { true }
|
||||
|
||||
it 'does not ack' do
|
||||
begin
|
||||
@processor.process(work)
|
||||
flunk "Expected #process to raise exception"
|
||||
rescue TestException
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'middleware throws an exception after processing the work' do
|
||||
let(:raise_after_yield) { true }
|
||||
|
||||
it 'acks the job' do
|
||||
work.expect(:acknowledge, nil)
|
||||
begin
|
||||
@processor.process(work)
|
||||
flunk "Expected #process to raise exception"
|
||||
rescue TestException
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'middleware decides to skip work' do
|
||||
let(:skip_job) { true }
|
||||
|
||||
it 'acks the job' do
|
||||
work.expect(:acknowledge, nil)
|
||||
@boss.expect(:async, actor, [])
|
||||
actor.expect(:processor_done, nil, [@processor])
|
||||
@processor.process(work)
|
||||
end
|
||||
end
|
||||
|
||||
describe 'worker raises an exception' do
|
||||
let(:worker_args) { ['boom'] }
|
||||
|
||||
it 'acks the job' do
|
||||
work.expect(:acknowledge, nil)
|
||||
begin
|
||||
@processor.process(work)
|
||||
flunk "Expected #process to raise exception"
|
||||
rescue TestException
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'everything goes well' do
|
||||
it 'acks the job' do
|
||||
work.expect(:acknowledge, nil)
|
||||
@boss.expect(:async, actor, [])
|
||||
actor.expect(:processor_done, nil, [@processor])
|
||||
@processor.process(work)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'stats' do
|
||||
before do
|
||||
Sidekiq.redis {|c| c.flushdb }
|
||||
|
|
Loading…
Add table
Reference in a new issue