mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Under just the right conditions, we could lose a job: - Job raises an error - Retry subsystem catches error and tries to create a retry in Redis but this raises a "Redis down" exception - Processor catches Redis exception and thinks a retry was created - Redis comes back online just in time for the job to be acknowledged and lost That's a very specific and rare set of steps but it can happen. Instead have the Retry subsystem raise a specific error signaling that it created a retry. There will be three common cases: 1. Job is successful: job is acknowledged. 2. Job fails, retry is created, Processor rescues specific error: job is acknowledged. 3. Sidekiq::Shutdown is raised: job is not acknowledged Now there is another case: 4. Job fails, retry fails, Processor rescues Exception: job is NOT acknowledged. Sidekiq Pro's super_fetch will rescue the orphaned job at some point in the future.
This commit is contained in:
parent
2f37600568
commit
c650e9b150
3 changed files with 28 additions and 19 deletions
|
@ -5,6 +5,7 @@
|
|||
HEAD
|
||||
---------
|
||||
|
||||
- Fix edge case where a job failure during Redis outage could result in a lost job [#4141]
|
||||
- Better handling of malformed job arguments in payload [#4095]
|
||||
- Restore bootstap's dropdown css component [#4099, urkle]
|
||||
- Allow `Sidekiq::Worker#set` to be chained
|
||||
|
|
|
@ -56,7 +56,8 @@ module Sidekiq
|
|||
# end
|
||||
#
|
||||
class JobRetry
|
||||
class Skip < ::RuntimeError; end
|
||||
class Handled < ::RuntimeError; end
|
||||
class Skip < Handled; end
|
||||
|
||||
include Sidekiq::Util
|
||||
|
||||
|
@ -71,7 +72,7 @@ module Sidekiq
|
|||
# require the worker to be instantiated.
|
||||
def global(msg, queue)
|
||||
yield
|
||||
rescue Skip => ex
|
||||
rescue Handled => ex
|
||||
raise ex
|
||||
rescue Sidekiq::Shutdown => ey
|
||||
# ignore, will be pushed back onto queue during hard_shutdown
|
||||
|
@ -92,7 +93,7 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
raise e
|
||||
raise Handled
|
||||
end
|
||||
|
||||
|
||||
|
@ -106,7 +107,7 @@ module Sidekiq
|
|||
# calling the handle_exception handlers.
|
||||
def local(worker, msg, queue)
|
||||
yield
|
||||
rescue Skip => ex
|
||||
rescue Handled => ex
|
||||
raise ex
|
||||
rescue Sidekiq::Shutdown => ey
|
||||
# ignore, will be pushed back onto queue during hard_shutdown
|
||||
|
|
|
@ -147,21 +147,19 @@ module Sidekiq
|
|||
jobstr = work.job
|
||||
queue = work.queue_name
|
||||
|
||||
ack = false
|
||||
# Treat malformed JSON as a special case: job goes straight to the morgue.
|
||||
job_hash = nil
|
||||
begin
|
||||
# Treat malformed JSON as a special case: job goes straight to the morgue.
|
||||
job_hash = nil
|
||||
begin
|
||||
job_hash = Sidekiq.load_json(jobstr)
|
||||
rescue => ex
|
||||
handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
|
||||
# we can't notify because the job isn't a valid hash payload.
|
||||
DeadSet.new.kill(jobstr, notify_failure: false)
|
||||
ack = true
|
||||
raise
|
||||
end
|
||||
job_hash = Sidekiq.load_json(jobstr)
|
||||
rescue => ex
|
||||
handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
|
||||
# we can't notify because the job isn't a valid hash payload.
|
||||
DeadSet.new.kill(jobstr, notify_failure: false)
|
||||
return work.acknowledge
|
||||
end
|
||||
|
||||
ack = true
|
||||
ack = true
|
||||
begin
|
||||
dispatch(job_hash, queue) do |worker|
|
||||
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
|
||||
execute_job(worker, cloned(job_hash['args']))
|
||||
|
@ -172,10 +170,19 @@ module Sidekiq
|
|||
# within the timeout. Don't acknowledge the work since
|
||||
# we didn't properly finish it.
|
||||
ack = false
|
||||
rescue Exception => ex
|
||||
e = ex.is_a?(::Sidekiq::JobRetry::Skip) && ex.cause ? ex.cause : ex
|
||||
rescue Sidekiq::JobRetry::Handled => h
|
||||
# this is the common case: job raised error and Sidekiq::JobRetry::Handled
|
||||
# signals that we created a retry successfully. We can acknowlege the job.
|
||||
e = h.cause ? h.cause : h
|
||||
handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
|
||||
raise e
|
||||
rescue Exception => ex
|
||||
# Unexpected error! This is very bad and indicates an exception that got past
|
||||
# the retry subsystem (e.g. network partition). We won't acknowledge the job
|
||||
# so it can be rescued when using Sidekiq Pro.
|
||||
ack = false
|
||||
handle_exception(ex, { :context => "Internal exception!", :job => job_hash, :jobstr => jobstr })
|
||||
raise e
|
||||
ensure
|
||||
work.acknowledge if ack
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue