mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Remove job duplication edge case with Sidekiq Pro, fixes #3388
In #2531, we saw how an IO exception in the logger could cause a job to fail and be deleted before it reached the RetryJobs block, causing job loss. To fix this, we disabled job acknowledgement until job execution starts but this has the bad side effect of duplicating jobs if the user is running a reliable scheme and the error happens after the RetryJobs middleware but before execution starts. Instead we flip the middleware ordering; logging now happens within the retry block. We would lose context-specific logging within retry so we move the context log setup out of the middleware into the Processor. With these changes, we can properly retry and acknowledge even if there are errors within the initial server middleware and executor calls. This code path has been reimplemented in Sidekiq 5.0 so this change only applies to 4.x.
This commit is contained in:
parent
9e7cc9afda
commit
3b5862492a
4 changed files with 27 additions and 26 deletions
|
@ -149,8 +149,8 @@ module Sidekiq
|
|||
require 'sidekiq/middleware/server/logging'
|
||||
|
||||
Middleware::Chain.new do |m|
|
||||
m.add Middleware::Server::Logging
|
||||
m.add Middleware::Server::RetryJobs
|
||||
m.add Middleware::Server::Logging
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -4,28 +4,19 @@ module Sidekiq
|
|||
class Logging
|
||||
|
||||
def call(worker, item, queue)
|
||||
Sidekiq::Logging.with_context(log_context(worker, item)) do
|
||||
begin
|
||||
start = Time.now
|
||||
logger.info("start".freeze)
|
||||
yield
|
||||
logger.info("done: #{elapsed(start)} sec")
|
||||
rescue Exception
|
||||
logger.info("fail: #{elapsed(start)} sec")
|
||||
raise
|
||||
end
|
||||
begin
|
||||
start = Time.now
|
||||
logger.info("start".freeze)
|
||||
yield
|
||||
logger.info("done: #{elapsed(start)} sec")
|
||||
rescue Exception
|
||||
logger.info("fail: #{elapsed(start)} sec")
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# If we're using a wrapper class, like ActiveJob, use the "wrapped"
|
||||
# attribute to expose the underlying thing.
|
||||
def log_context(worker, item)
|
||||
klass = item['wrapped'.freeze] || worker.class.to_s
|
||||
"#{klass} JID-#{item['jid'.freeze]}#{" BID-#{item['bid'.freeze]}" if item['bid'.freeze]}"
|
||||
end
|
||||
|
||||
def elapsed(start)
|
||||
(Time.now - start).round(3)
|
||||
end
|
||||
|
|
|
@ -129,13 +129,15 @@ module Sidekiq
|
|||
worker.jid = job_hash['jid'.freeze]
|
||||
|
||||
stats(worker, job_hash, queue) do
|
||||
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
|
||||
@executor.call do
|
||||
# Only ack if we either attempted to start this job or
|
||||
# successfully completed it. This prevents us from
|
||||
# losing jobs if a middleware raises an exception before yielding
|
||||
ack = true
|
||||
execute_job(worker, cloned(job_hash['args'.freeze]))
|
||||
Sidekiq::Logging.with_context(log_context(job_hash)) do
|
||||
ack = true
|
||||
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
|
||||
@executor.call do
|
||||
# Only ack if we either attempted to start this job or
|
||||
# successfully completed it. This prevents us from
|
||||
# losing jobs if a middleware raises an exception before yielding
|
||||
execute_job(worker, cloned(job_hash['args'.freeze]))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -154,6 +156,13 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
# If we're using a wrapper class, like ActiveJob, use the "wrapped"
|
||||
# attribute to expose the underlying thing.
|
||||
def log_context(item)
|
||||
klass = item['wrapped'.freeze] || item['class'.freeze]
|
||||
"#{klass} JID-#{item['jid'.freeze]}#{" BID-#{item['bid'.freeze]}" if item['bid'.freeze]}"
|
||||
end
|
||||
|
||||
def execute_job(worker, cloned_args)
|
||||
worker.perform(*cloned_args)
|
||||
end
|
||||
|
|
|
@ -152,7 +152,8 @@ class TestProcessor < Sidekiq::Test
|
|||
describe 'middleware throws an exception before processing the work' do
|
||||
let(:raise_before_yield) { true }
|
||||
|
||||
it 'does not ack' do
|
||||
it 'acks the job' do
|
||||
work.expect(:acknowledge, nil)
|
||||
begin
|
||||
@processor.process(work)
|
||||
flunk "Expected #process to raise exception"
|
||||
|
|
Loading…
Reference in a new issue