From 3b5862492a6f8ff114cc11ec751f311b0f49483f Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Wed, 15 Mar 2017 10:03:47 -0700 Subject: [PATCH] Remove job duplication edge case with Sidekiq Pro, fixes #3388 In #2531, we saw how an IO exception in the logger could cause a job to fail and be deleted before it reached the RetryJobs block, causing job loss. To fix this, we disabled job acknowledgement until job execution starts but this has the bad side effect of duplicating jobs if the user is running a reliable scheme and the error happens after the RetryJobs middleware but before execution starts. Instead we flip the middleware ordering; logging now happens within the retry block. We would lose context-specific logging within retry so we move the context log setup out of the middleware into the Processor. With these changes, we can properly retry and acknowledge even if there are errors within the initial server middleware and executor calls. This code path has been reimplemented in Sidekiq 5.0 so this change only applies to 4.x. --- lib/sidekiq.rb | 2 +- lib/sidekiq/middleware/server/logging.rb | 25 ++++++++---------------- lib/sidekiq/processor.rb | 23 +++++++++++++++------- test/test_processor.rb | 3 ++- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index ffcbb477..67174347 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -149,8 +149,8 @@ module Sidekiq require 'sidekiq/middleware/server/logging' Middleware::Chain.new do |m| - m.add Middleware::Server::Logging m.add Middleware::Server::RetryJobs + m.add Middleware::Server::Logging end end diff --git a/lib/sidekiq/middleware/server/logging.rb b/lib/sidekiq/middleware/server/logging.rb index 37ca106a..99f6dd90 100644 --- a/lib/sidekiq/middleware/server/logging.rb +++ b/lib/sidekiq/middleware/server/logging.rb @@ -4,28 +4,19 @@ module Sidekiq class Logging def call(worker, item, queue) - Sidekiq::Logging.with_context(log_context(worker, item)) do - begin - start = Time.now - logger.info("start".freeze) - yield - logger.info("done: #{elapsed(start)} sec") - rescue Exception - logger.info("fail: #{elapsed(start)} sec") - raise - end + begin + start = Time.now + logger.info("start".freeze) + yield + logger.info("done: #{elapsed(start)} sec") + rescue Exception + logger.info("fail: #{elapsed(start)} sec") + raise end end private - # If we're using a wrapper class, like ActiveJob, use the "wrapped" - # attribute to expose the underlying thing. - def log_context(worker, item) - klass = item['wrapped'.freeze] || worker.class.to_s - "#{klass} JID-#{item['jid'.freeze]}#{" BID-#{item['bid'.freeze]}" if item['bid'.freeze]}" - end - def elapsed(start) (Time.now - start).round(3) end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index d3dfb347..7d91ac2c 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -129,13 +129,15 @@ module Sidekiq worker.jid = job_hash['jid'.freeze] stats(worker, job_hash, queue) do - Sidekiq.server_middleware.invoke(worker, job_hash, queue) do - @executor.call do - # Only ack if we either attempted to start this job or - # successfully completed it. This prevents us from - # losing jobs if a middleware raises an exception before yielding - ack = true - execute_job(worker, cloned(job_hash['args'.freeze])) + Sidekiq::Logging.with_context(log_context(job_hash)) do + ack = true + Sidekiq.server_middleware.invoke(worker, job_hash, queue) do + @executor.call do + # Only ack if we either attempted to start this job or + # successfully completed it. This prevents us from + # losing jobs if a middleware raises an exception before yielding + execute_job(worker, cloned(job_hash['args'.freeze])) + end end end end @@ -154,6 +156,13 @@ module Sidekiq end end + # If we're using a wrapper class, like ActiveJob, use the "wrapped" + # attribute to expose the underlying thing. + def log_context(item) + klass = item['wrapped'.freeze] || item['class'.freeze] + "#{klass} JID-#{item['jid'.freeze]}#{" BID-#{item['bid'.freeze]}" if item['bid'.freeze]}" + end + def execute_job(worker, cloned_args) worker.perform(*cloned_args) end diff --git a/test/test_processor.rb b/test/test_processor.rb index 6b78a5dd..d4bfa200 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -152,7 +152,8 @@ class TestProcessor < Sidekiq::Test describe 'middleware throws an exception before processing the work' do let(:raise_before_yield) { true } - it 'does not ack' do + it 'acks the job' do + work.expect(:acknowledge, nil) begin @processor.process(work) flunk "Expected #process to raise exception"