1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Enable sidekiq_retry_in to signal :kill or :discard dynamically, fixes #5406 (#5432)

This commit is contained in:
Mike Perham 2022-07-14 15:56:58 -07:00 committed by GitHub
parent f9f976aa43
commit 0676a5202e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 39 deletions

View file

@ -91,7 +91,7 @@ module Sidekiq
msg = Sidekiq.load_json(jobstr)
if msg["retry"]
attempt_retry(nil, msg, queue, e)
process_retry(nil, msg, queue, e)
else
Sidekiq.death_handlers.each do |handler|
handler.call(msg, e)
@ -128,7 +128,7 @@ module Sidekiq
end
raise e unless msg["retry"]
attempt_retry(jobinst, msg, queue, e)
process_retry(jobinst, msg, queue, e)
# We've handled this error associated with this job, don't
# need to handle it at the global level
raise Skip
@ -139,7 +139,7 @@ module Sidekiq
# Note that +jobinst+ can be nil here if an error is raised before we can
# instantiate the job instance. All access must be guarded and
# best effort.
def attempt_retry(jobinst, msg, queue, exception)
def process_retry(jobinst, msg, queue, exception)
max_retry_attempts = retry_attempts_from(msg["retry"], @max_retries)
msg["queue"] = (msg["retry_queue"] || queue)
@ -170,19 +170,50 @@ module Sidekiq
msg["error_backtrace"] = compress_backtrace(lines)
end
if count < max_retry_attempts
delay = delay_for(jobinst, count, exception)
# Logging here can break retries if the logging device raises ENOSPC #3979
# logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
redis do |conn|
conn.zadd("retry", retry_at.to_s, payload)
end
else
# Goodbye dear message, you (re)tried your best I'm sure.
retries_exhausted(jobinst, msg, exception)
# Goodbye dear message, you (re)tried your best I'm sure.
return retries_exhausted(jobinst, msg, exception) if count >= max_retry_attempts
strategy, delay = delay_for(jobinst, count, exception)
case strategy
when :discard
return # poof!
when :kill
return retries_exhausted(jobinst, msg, exception)
end
# Logging here can break retries if the logging device raises ENOSPC #3979
# logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
jitter = rand(10) * (count + 1)
retry_at = Time.now.to_f + delay + jitter
payload = Sidekiq.dump_json(msg)
redis do |conn|
conn.zadd("retry", retry_at.to_s, payload)
end
end
# returns (strategy, seconds)
def delay_for(jobinst, count, exception)
rv = begin
# sidekiq_retry_in can return two different things:
# 1. When to retry next, as an integer of seconds
# 2. A symbol which re-routes the job elsewhere, e.g. :discard, :kill, :default
jobinst&.sidekiq_retry_in_block&.call(count, exception)
rescue Exception => e
handle_exception(e, {context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{jobinst.class.name}, falling back to default"})
nil
end
delay = if Integer === rv && rv > 0
rv
elsif rv == :discard
return [:discard, nil] # do nothing, job goes poof
elsif rv == :kill
return [:kill, nil]
else
(count**4) + 15
end
[:default, delay]
end
def retries_exhausted(jobinst, msg, exception)
@ -216,22 +247,6 @@ module Sidekiq
end
end
def delay_for(jobinst, count, exception)
jitter = rand(10) * (count + 1)
if jobinst&.sidekiq_retry_in_block
custom_retry_in = retry_in(jobinst, count, exception).to_i
return custom_retry_in + jitter if custom_retry_in > 0
end
(count**4) + 15 + jitter
end
def retry_in(jobinst, count, exception)
jobinst.sidekiq_retry_in_block.call(count, exception)
rescue Exception => e
handle_exception(e, {context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{jobinst.class.name}, falling back to default"})
nil
end
def exception_caused_by_shutdown?(e, checked_causes = [])
return false unless e.cause

View file

@ -278,7 +278,7 @@ describe Sidekiq::Processor do
assert_equal "boom", msg["args"].first
}
@processor.instance_variable_get(:@retrier).stub(:attempt_retry, retry_stub) do
@processor.instance_variable_get(:@retrier).stub(:process_retry, retry_stub) do
msg = Sidekiq.dump_json(job_data)
begin
@processor.process(work(msg))

View file

@ -269,6 +269,10 @@ describe Sidekiq::JobRetry do
sidekiq_retry_in do |count, exception|
case exception
when RuntimeError
:kill
when Interrupt
:discard
when SpecialError
nil
when ArgumentError
@ -288,33 +292,68 @@ describe Sidekiq::JobRetry do
end
it "retries with a default delay" do
refute_equal 4, handler.__send__(:delay_for, worker, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, worker, 2, StandardError.new)
assert_equal :default, strat
refute_equal 4, count
end
it "retries with a custom delay and exception 1" do
assert_includes 4..35, handler.__send__(:delay_for, CustomWorkerWithException, 2, ArgumentError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, ArgumentError.new)
assert_equal :default, strat
assert_includes 4..35, count
end
it "supports discard" do
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, Interrupt.new)
assert_equal :discard, strat
assert_nil count
end
it "supports kill" do
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, RuntimeError.new)
assert_equal :kill, strat
assert_nil count
end
it "retries with a custom delay and exception 2" do
assert_includes 4..35, handler.__send__(:delay_for, CustomWorkerWithException, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, StandardError.new)
assert_equal :default, strat
assert_includes 4..35, count
end
it "retries with a default delay and exception in case of configured with nil" do
refute_equal 8, handler.__send__(:delay_for, CustomWorkerWithException, 2, SpecialError.new)
refute_equal 4, handler.__send__(:delay_for, CustomWorkerWithException, 2, SpecialError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, SpecialError.new)
assert_equal :default, strat
refute_equal 8, count
refute_equal 4, count
end
it "retries with a custom delay without exception" do
assert_includes 4..35, handler.__send__(:delay_for, CustomWorkerWithoutException, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithoutException, 2, StandardError.new)
assert_equal :default, strat
assert_includes 4..35, count
end
it "falls back to the default retry on exception" do
output = capture_logging do
refute_equal 4, handler.__send__(:delay_for, ErrorWorker, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, ErrorWorker, 2, StandardError.new)
assert_equal :default, strat
refute_equal 4, count
end
assert_match(/Failure scheduling retry using the defined `sidekiq_retry_in`/,
output, "Log entry missing for sidekiq_retry_in")
end
it "kills when configured on special exceptions" do
ds = Sidekiq::DeadSet.new
assert_equal 0, ds.size
assert_raises Sidekiq::JobRetry::Skip do
handler.local(CustomWorkerWithException, jobstr({"class" => "CustomWorkerWithException"}), "default") do
raise "oops"
end
end
assert_equal 1, ds.size
end
end
describe "handles errors withouth cause" do