2012-05-25 23:21:42 -04:00
require 'sidekiq/scheduled'
2012-03-17 16:41:53 -04:00
module Sidekiq
module Middleware
module Server
##
# Automatically retry jobs that fail in Sidekiq.
2012-05-25 23:21:42 -04:00
# Sidekiq's retry support assumes a typical development lifecycle:
# 0. push some code changes with a bug in it
# 1. bug causes message processing to fail, sidekiq's middleware captures
# the message and pushes it onto a retry queue
# 2. sidekiq retries messages in the retry queue multiple times with
# an exponential delay, the message continues to fail
# 3. after a few days, a developer deploys a fix. the message is
# reprocessed successfully.
# 4. if 3 never happens, sidekiq will eventually give up and throw the
2013-03-21 15:49:05 -04:00
# message away. If the worker defines a method called 'retries_exhausted',
# this will be called before throwing the message away. If the
# 'retries_exhausted' method throws an exception, it's dropped and logged.
2012-05-25 23:21:42 -04:00
#
2012-03-17 16:41:53 -04:00
# A message looks like:
#
# { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'] }
#
2012-10-17 18:51:26 -04:00
# The 'retry' option also accepts a number (in place of 'true'):
#
# { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => 5 }
#
# The job will be retried this number of times before giving up. (If simply
# 'true', Sidekiq retries 25 times)
#
2012-03-17 16:41:53 -04:00
# We'll add a bit more data to the message to support retries:
#
# * 'queue' - the queue to use
# * 'retry_count' - number of times we've retried so far.
# * 'error_message' - the message from the exception
# * 'error_class' - the exception class
# * 'failed_at' - the first time it failed
# * 'retried_at' - the last time it was retried
#
# We don't store the backtrace as that can add a lot of overhead
# to the message and everyone is using Airbrake, right?
class RetryJobs
2012-03-18 02:04:31 -04:00
include Sidekiq :: Util
2012-05-25 23:21:42 -04:00
2012-10-17 18:51:26 -04:00
DEFAULT_MAX_RETRY_ATTEMPTS = 25
2012-03-18 02:04:31 -04:00
2012-03-17 16:41:53 -04:00
def call ( worker , msg , queue )
yield
2013-05-03 12:50:20 -04:00
rescue Sidekiq :: Shutdown
2013-06-11 01:20:15 -04:00
# ignore, will be pushed back onto queue during hard_shutdown
2013-05-03 12:50:20 -04:00
raise
2012-08-29 23:20:20 -04:00
rescue Exception = > e
raise e unless msg [ 'retry' ]
2012-10-17 18:51:26 -04:00
max_retry_attempts = retry_attempts_from ( msg [ 'retry' ] , DEFAULT_MAX_RETRY_ATTEMPTS )
2012-04-01 22:53:45 -04:00
2013-01-15 20:28:52 -05:00
msg [ 'queue' ] = if msg [ 'retry_queue' ]
msg [ 'retry_queue' ]
else
queue
end
2012-03-17 16:41:53 -04:00
msg [ 'error_message' ] = e . message
msg [ 'error_class' ] = e . class . name
count = if msg [ 'retry_count' ]
msg [ 'retried_at' ] = Time . now . utc
msg [ 'retry_count' ] += 1
else
msg [ 'failed_at' ] = Time . now . utc
msg [ 'retry_count' ] = 0
end
2012-04-27 23:25:46 -04:00
if msg [ 'backtrace' ] == true
msg [ 'error_backtrace' ] = e . backtrace
2013-04-24 13:53:02 -04:00
elsif msg [ 'backtrace' ] == false
# do nothing
2012-04-27 23:25:46 -04:00
elsif msg [ 'backtrace' ] . to_i != 0
msg [ 'error_backtrace' ] = e . backtrace [ 0 .. msg [ 'backtrace' ] . to_i ]
end
2012-11-10 00:18:02 -05:00
if count < max_retry_attempts
2013-06-26 00:10:46 -04:00
delay = delay_for ( worker , count )
2012-03-18 02:04:31 -04:00
logger . debug { " Failure! Retry #{ count } in #{ delay } seconds " }
retry_at = Time . now . to_f + delay
2012-04-22 17:02:35 -04:00
payload = Sidekiq . dump_json ( msg )
2012-03-17 16:41:53 -04:00
Sidekiq . redis do | conn |
2012-03-18 02:04:31 -04:00
conn . zadd ( 'retry' , retry_at . to_s , payload )
2012-03-17 16:41:53 -04:00
end
else
2012-03-18 02:04:31 -04:00
# Goodbye dear message, you (re)tried your best I'm sure.
2013-03-21 17:37:41 -04:00
retries_exhausted ( worker , msg )
2012-03-17 16:41:53 -04:00
end
2013-03-18 17:20:28 -04:00
2012-08-29 23:20:20 -04:00
raise e
2012-03-17 16:41:53 -04:00
end
2012-03-18 02:04:31 -04:00
2013-03-21 17:37:41 -04:00
def retries_exhausted ( worker , msg )
2013-03-21 14:16:07 -04:00
logger . debug { " Dropping message after hitting the retry maximum: #{ msg } " }
2013-06-26 13:48:24 -04:00
if worker . respond_to? ( :retries_exhausted )
2013-06-27 11:58:23 -04:00
logger . warn { " Defining #{ worker . class . name } # retries_exhausted as a method is deprecated, use `sidekiq_retries_exhausted` callback instead http://git.io/Ijju8g " }
2013-06-26 13:48:24 -04:00
worker . retries_exhausted ( * msg [ 'args' ] )
elsif worker . sidekiq_retries_exhausted_block?
worker . sidekiq_retries_exhausted_block . call ( * msg [ 'args' ] )
end
2013-03-21 14:16:07 -04:00
rescue Exception = > e
2013-03-21 17:37:41 -04:00
handle_exception ( e , " Error calling retries_exhausted " )
2013-03-21 14:16:07 -04:00
end
2012-10-17 18:51:26 -04:00
def retry_attempts_from ( msg_retry , default )
if msg_retry . is_a? ( Fixnum )
msg_retry
else
default
end
end
2013-06-26 00:10:46 -04:00
def delay_for ( worker , count )
worker . sidekiq_retry_in_block? && retry_in ( worker , count ) || seconds_to_delay ( count )
end
2013-06-11 01:20:15 -04:00
# delayed_job uses the same basic formula
2013-06-26 00:10:46 -04:00
def seconds_to_delay ( count )
( count ** 4 ) + 15 + ( rand ( 30 ) * ( count + 1 ) )
end
2013-06-25 23:41:36 -04:00
2013-06-26 00:10:46 -04:00
def retry_in ( worker , count )
begin
worker . sidekiq_retry_in_block . call ( count )
rescue Exception = > e
logger . error { " Failure scheduling retry using the defined `sidekiq_retry_in` in #{ worker . class . name } , falling back to default: #{ e . message } " }
nil
2013-06-25 23:41:36 -04:00
end
2013-03-05 03:22:31 -05:00
end
2013-06-26 00:10:46 -04:00
2012-03-17 16:41:53 -04:00
end
end
end
end