1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Refactor Processor to avoid costly json cloning (#4316)

This commit is contained in:
fatkodima 2019-10-08 07:48:39 +03:00 committed by Mike Perham
parent 84d638294d
commit 60361b03a3
4 changed files with 38 additions and 49 deletions

View file

@ -74,7 +74,7 @@ module Sidekiq
# The global retry handler requires only the barest of data.
# We want to be able to retry as much as possible so we don't
# require the worker to be instantiated.
def global(msg, queue)
def global(jobstr, queue)
yield
rescue Handled => ex
raise ex
@ -85,6 +85,7 @@ module Sidekiq
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
msg = Sidekiq.load_json(jobstr)
if msg["retry"]
attempt_retry(nil, msg, queue, e)
else
@ -106,7 +107,7 @@ module Sidekiq
# exception so the global block does not reprocess the error. The
# Skip exception is unwrapped within Sidekiq::Processor#process before
# calling the handle_exception handlers.
def local(worker, msg, queue)
def local(worker, jobstr, queue)
yield
rescue Handled => ex
raise ex
@ -117,6 +118,7 @@ module Sidekiq
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
msg = Sidekiq.load_json(jobstr)
if msg["retry"].nil?
msg["retry"] = worker.class.get_sidekiq_options["retry"]
end

View file

@ -111,16 +111,19 @@ module Sidekiq
nil
end
def dispatch(job_hash, queue)
def dispatch(job_hash, queue, jobstr)
# since middleware can mutate the job hash
# we clone here so we report the original
# we need to clone it to report the original
# job structure to the Web UI
pristine = json_clone(job_hash)
# or to push back to redis when retrying.
# To avoid costly and, most of the time, useless cloning here,
# we pass original String of JSON to respected methods
# to re-parse it there if we need access to the original, untouched job
@job_logger.prepare(job_hash) do
@retrier.global(pristine, queue) do
@retrier.global(jobstr, queue) do
@job_logger.call(job_hash, queue) do
stats(pristine, queue) do
stats(jobstr, queue) do
# Rails 5 requires a Reloader to wrap code execution. In order to
# constantize the worker and instantiate an instance, we have to call
# the Reloader. It handles code loading, db connection management, etc.
@ -129,7 +132,7 @@ module Sidekiq
klass = constantize(job_hash["class"])
worker = klass.new
worker.jid = job_hash["jid"]
@retrier.local(worker, pristine, queue) do
@retrier.local(worker, jobstr, queue) do
yield worker
end
end
@ -156,7 +159,7 @@ module Sidekiq
ack = false
begin
dispatch(job_hash, queue) do |worker|
dispatch(job_hash, queue, jobstr) do |worker|
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
execute_job(worker, job_hash["args"])
end
@ -247,8 +250,8 @@ module Sidekiq
FAILURE = Counter.new
WORKER_STATE = SharedWorkerState.new
def stats(job_hash, queue)
WORKER_STATE.set(tid, {queue: queue, payload: job_hash, run_at: Time.now.to_i})
def stats(jobstr, queue)
WORKER_STATE.set(tid, {queue: queue, payload: jobstr, run_at: Time.now.to_i})
begin
yield
@ -273,26 +276,5 @@ module Sidekiq
constant.const_get(name, false)
end
end
# Deep clone the arguments passed to the worker so that if
# the job fails, what is pushed back onto Redis hasn't
# been mutated by the worker.
def json_clone(obj)
if String === obj
obj.dup
elsif Integer === obj || Float === obj || TrueClass === obj || FalseClass === obj || NilClass === obj
obj
elsif Array === obj
obj.map { |e| json_clone(e) }
elsif Hash === obj
duped = {}
obj.each_pair do |key, value|
duped[key] = json_clone(value)
end
duped
else
obj.dup
end
end
end
end

View file

@ -29,13 +29,17 @@ describe Sidekiq::JobRetry do
@handler ||= Sidekiq::JobRetry.new(options)
end
def job(options={})
@job ||= { 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => true }.merge(options)
def jobstr(options={})
Sidekiq.dump_json({ 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => true }.merge(options))
end
def job
Sidekiq::RetrySet.new.first
end
it 'retries with a nil worker' do
assert_raises RuntimeError do
handler.global(job, 'default') do
handler.global(jobstr, 'default') do
raise "boom"
end
end
@ -44,7 +48,7 @@ describe Sidekiq::JobRetry do
it 'allows disabling retry' do
assert_raises RuntimeError do
handler.local(worker, job('retry' => false), 'default') do
handler.local(worker, jobstr('retry' => false), 'default') do
raise "kerblammo!"
end
end
@ -53,7 +57,7 @@ describe Sidekiq::JobRetry do
it 'allows a numeric retry' do
assert_raises RuntimeError do
handler.local(worker, job('retry' => 2), 'default') do
handler.local(worker, jobstr('retry' => 2), 'default') do
raise "kerblammo!"
end
end
@ -63,7 +67,7 @@ describe Sidekiq::JobRetry do
it 'allows 0 retry => no retry and dead queue' do
assert_raises RuntimeError do
handler.local(worker, job('retry' => 0), 'default') do
handler.local(worker, jobstr('retry' => 0), 'default') do
raise "kerblammo!"
end
end
@ -75,7 +79,7 @@ describe Sidekiq::JobRetry do
skip 'skipped! test requires ruby 2.1+' if RUBY_VERSION <= '2.1.0'
assert_raises RuntimeError do
handler.local(worker, job, 'default') do
handler.local(worker, jobstr, 'default') do
raise "kerblammo! #{195.chr}"
end
end
@ -87,7 +91,7 @@ describe Sidekiq::JobRetry do
# error classes that override #message
it 'handles error message that raises an error' do
assert_raises RuntimeError do
handler.local(worker, job, 'default') do
handler.local(worker, jobstr, 'default') do
raise BadErrorMessage.new
end
end
@ -98,8 +102,9 @@ describe Sidekiq::JobRetry do
it 'allows a max_retries option in initializer' do
max_retries = 7
1.upto(max_retries + 1) do
1.upto(max_retries + 1) do |i|
assert_raises RuntimeError do
job = i > 1 ? jobstr('retry_count' => i - 2) : jobstr
handler(:max_retries => max_retries).local(worker, job, 'default') do
raise "kerblammo!"
end
@ -113,7 +118,7 @@ describe Sidekiq::JobRetry do
it 'saves backtraces' do
c = nil
assert_raises RuntimeError do
handler.local(worker, job('backtrace' => true), 'default') do
handler.local(worker, jobstr('backtrace' => true), 'default') do
c = caller(0); raise "kerblammo!"
end
end
@ -126,7 +131,7 @@ describe Sidekiq::JobRetry do
it 'saves partial backtraces' do
c = nil
assert_raises RuntimeError do
handler.local(worker, job('backtrace' => 3), 'default') do
handler.local(worker, jobstr('backtrace' => 3), 'default') do
c = caller(0)[0...3]; raise "kerblammo!"
end
end
@ -139,7 +144,7 @@ describe Sidekiq::JobRetry do
it 'handles a new failed message' do
assert_raises RuntimeError do
handler.local(worker, job, 'default') do
handler.local(worker, jobstr, 'default') do
raise "kerblammo!"
end
end
@ -187,7 +192,7 @@ describe Sidekiq::JobRetry do
rs = Sidekiq::RetrySet.new
assert_equal 0, rs.size
assert_raises Sidekiq::Shutdown do
handler.local(worker, job, 'default') do
handler.local(worker, jobstr, 'default') do
begin
raise Sidekiq::Shutdown
rescue Interrupt
@ -204,7 +209,7 @@ describe Sidekiq::JobRetry do
it 'allows a retry queue' do
assert_raises RuntimeError do
handler.local(worker, job("retry_queue" => 'retryx'), 'default') do
handler.local(worker, jobstr("retry_queue" => 'retryx'), 'default') do
raise "kerblammo!"
end
end
@ -220,7 +225,7 @@ describe Sidekiq::JobRetry do
now = Time.now.to_f
msg = {"queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>10}
assert_raises RuntimeError do
handler.local(worker, job(msg), 'default') do
handler.local(worker, jobstr(msg), 'default') do
raise "kerblammo!"
end
end
@ -241,7 +246,7 @@ describe Sidekiq::JobRetry do
now = Time.now.to_f
msg = {"queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>25}
assert_raises RuntimeError do
handler.local(worker, job(msg), 'default') do
handler.local(worker, jobstr(msg), 'default') do
raise "kerblammo!"
end
end

View file

@ -55,7 +55,7 @@ describe 'sidekiq_retries_exhausted' do
end
def job(options={})
@job ||= {'class' => 'Bob', 'args' => [1, 2, 'foo']}.merge(options)
@job ||= Sidekiq.dump_json({'class' => 'Bob', 'args' => [1, 2, 'foo']}.merge(options))
end
it 'does not run exhausted block when job successful on first run' do