1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/processor.rb
Mike Perham 3b5862492a Remove job duplication edge case with Sidekiq Pro, fixes #3388
In #2531, we saw how an IO exception in the logger could cause a job to fail and be deleted before it reached the RetryJobs block, causing job loss.  To fix this, we disabled job acknowledgement until job execution starts but this has the bad side effect of duplicating jobs if the user is running a reliable scheme and the error happens after the RetryJobs middleware but before execution starts.

Instead we flip the middleware ordering; logging now happens within the retry block.  We would lose context-specific logging within retry so we move the context log setup out of the middleware into the Processor.  With these changes, we can properly retry and acknowledge even if there are errors within the initial server middleware and executor calls.

This code path has been reimplemented in Sidekiq 5.0 so this change only applies to 4.x.
2017-03-15 10:03:47 -07:00

201 lines
5.1 KiB
Ruby

# frozen_string_literal: true
require 'sidekiq/util'
require 'sidekiq/fetch'
require 'thread'
require 'concurrent/map'
require 'concurrent/atomic/atomic_fixnum'
module Sidekiq
##
# The Processor is a standalone thread which:
#
# 1. fetches a job from Redis
# 2. executes the job
# a. instantiate the Worker
# b. run the middleware chain
# c. call #perform
#
# A Processor can exit due to shutdown (processor_stopped)
# or due to an error during job execution (processor_died)
#
# If an error occurs in the job execution, the
# Processor calls the Manager to create a new one
# to replace itself and exits.
#
class Processor
include Util
attr_reader :thread
attr_reader :job
def initialize(mgr)
@mgr = mgr
@down = false
@done = false
@job = nil
@thread = nil
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
@reloader = Sidekiq.options[:reloader]
@executor = Sidekiq.options[:executor]
end
def terminate(wait=false)
@done = true
return if !@thread
@thread.value if wait
end
def kill(wait=false)
@done = true
return if !@thread
# unlike the other actors, terminate does not wait
# for the thread to finish because we don't know how
# long the job will take to finish. Instead we
# provide a `kill` method to call after the shutdown
# timeout passes.
@thread.raise ::Sidekiq::Shutdown
@thread.value if wait
end
def start
@thread ||= safe_thread("processor", &method(:run))
end
private unless $TESTING
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
def process_one
@job = fetch
process(@job) if @job
@job = nil
end
def get_one
begin
work = @strategy.retrieve_work
(logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
work
rescue Sidekiq::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
end
def fetch
j = get_one
if j && @done
j.requeue
nil
else
j
end
end
def handle_fetch_exception(ex)
if !@down
@down = Time.now
logger.error("Error fetching job: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
end
sleep(1)
nil
end
def process(work)
jobstr = work.job
queue = work.queue_name
ack = false
begin
job_hash = Sidekiq.load_json(jobstr)
@reloader.call do
klass = job_hash['class'.freeze].constantize
worker = klass.new
worker.jid = job_hash['jid'.freeze]
stats(worker, job_hash, queue) do
Sidekiq::Logging.with_context(log_context(job_hash)) do
ack = true
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
@executor.call do
# Only ack if we either attempted to start this job or
# successfully completed it. This prevents us from
# losing jobs if a middleware raises an exception before yielding
execute_job(worker, cloned(job_hash['args'.freeze]))
end
end
end
end
ack = true
end
rescue Sidekiq::Shutdown
# Had to force kill this job because it didn't finish
# within the timeout. Don't acknowledge the work since
# we didn't properly finish it.
ack = false
rescue Exception => ex
handle_exception(ex, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
raise
ensure
work.acknowledge if ack
end
end
# If we're using a wrapper class, like ActiveJob, use the "wrapped"
# attribute to expose the underlying thing.
def log_context(item)
klass = item['wrapped'.freeze] || item['class'.freeze]
"#{klass} JID-#{item['jid'.freeze]}#{" BID-#{item['bid'.freeze]}" if item['bid'.freeze]}"
end
def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end
def thread_identity
@str ||= Thread.current.object_id.to_s(36)
end
WORKER_STATE = Concurrent::Map.new
PROCESSED = Concurrent::AtomicFixnum.new
FAILURE = Concurrent::AtomicFixnum.new
def stats(worker, job_hash, queue)
tid = thread_identity
WORKER_STATE[tid] = {:queue => queue, :payload => cloned(job_hash), :run_at => Time.now.to_i }
begin
yield
rescue Exception
FAILURE.increment
raise
ensure
WORKER_STATE.delete(tid)
PROCESSED.increment
end
end
# Deep clone the arguments passed to the worker so that if
# the job fails, what is pushed back onto Redis hasn't
# been mutated by the worker.
def cloned(ary)
Marshal.load(Marshal.dump(ary))
end
end
end