1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/processor.rb
Eugene Kenny f49b4f11db Use Rails executor if code reloading is disabled (#3221)
We have to release the current thread's Active Record connection after
performing each job, in case another thread is waiting to use it.

On Rails 4 and earlier, this is handled with middleware. On Rails 5 in
development mode, the reloader does it by delegating to the executor.
However on Rails 5 in production mode, we're not adding the middleware
or enabling the reloader, so connections will never be released.

We can call the executor directly to have it release the connection for
us in this case. By calling it inside the middleware stack, the job will
be retried if the executor raises, avoiding the problem with lost jobs
that led to the reloader being disabled in production.
2016-11-12 09:17:36 -05:00

192 lines
4.7 KiB
Ruby

# frozen_string_literal: true
require 'sidekiq/util'
require 'sidekiq/fetch'
require 'thread'
require 'concurrent/map'
require 'concurrent/atomic/atomic_fixnum'
module Sidekiq
##
# The Processor is a standalone thread which:
#
# 1. fetches a job from Redis
# 2. executes the job
# a. instantiate the Worker
# b. run the middleware chain
# c. call #perform
#
# A Processor can exit due to shutdown (processor_stopped)
# or due to an error during job execution (processor_died)
#
# If an error occurs in the job execution, the
# Processor calls the Manager to create a new one
# to replace itself and exits.
#
class Processor
include Util
attr_reader :thread
attr_reader :job
def initialize(mgr)
@mgr = mgr
@down = false
@done = false
@job = nil
@thread = nil
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
@reloader = Sidekiq.options[:reloader]
@executor = Sidekiq.options[:executor]
end
def terminate(wait=false)
@done = true
return if !@thread
@thread.value if wait
end
def kill(wait=false)
@done = true
return if !@thread
# unlike the other actors, terminate does not wait
# for the thread to finish because we don't know how
# long the job will take to finish. Instead we
# provide a `kill` method to call after the shutdown
# timeout passes.
@thread.raise ::Sidekiq::Shutdown
@thread.value if wait
end
def start
@thread ||= safe_thread("processor", &method(:run))
end
private unless $TESTING
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
def process_one
@job = fetch
process(@job) if @job
@job = nil
end
def get_one
begin
work = @strategy.retrieve_work
(logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
work
rescue Sidekiq::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
end
def fetch
j = get_one
if j && @done
j.requeue
nil
else
j
end
end
def handle_fetch_exception(ex)
if !@down
@down = Time.now
logger.error("Error fetching job: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
end
sleep(1)
nil
end
def process(work)
jobstr = work.job
queue = work.queue_name
ack = false
begin
job_hash = Sidekiq.load_json(jobstr)
@reloader.call do
klass = job_hash['class'.freeze].constantize
worker = klass.new
worker.jid = job_hash['jid'.freeze]
stats(worker, job_hash, queue) do
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
@executor.call do
# Only ack if we either attempted to start this job or
# successfully completed it. This prevents us from
# losing jobs if a middleware raises an exception before yielding
ack = true
execute_job(worker, cloned(job_hash['args'.freeze]))
end
end
end
ack = true
end
rescue Sidekiq::Shutdown
# Had to force kill this job because it didn't finish
# within the timeout. Don't acknowledge the work since
# we didn't properly finish it.
ack = false
rescue Exception => ex
handle_exception(ex, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
raise
ensure
work.acknowledge if ack
end
end
def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end
def thread_identity
@str ||= Thread.current.object_id.to_s(36)
end
WORKER_STATE = Concurrent::Map.new
PROCESSED = Concurrent::AtomicFixnum.new
FAILURE = Concurrent::AtomicFixnum.new
def stats(worker, job_hash, queue)
tid = thread_identity
WORKER_STATE[tid] = {:queue => queue, :payload => cloned(job_hash), :run_at => Time.now.to_i }
begin
yield
rescue Exception
FAILURE.increment
raise
ensure
WORKER_STATE.delete(tid)
PROCESSED.increment
end
end
# Deep clone the arguments passed to the worker so that if
# the job fails, what is pushed back onto Redis hasn't
# been mutated by the worker.
def cloned(ary)
Marshal.load(Marshal.dump(ary))
end
end
end