From 08fd2b9cf50b8dae6914495b442d6b8af5539e34 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Sat, 17 Mar 2012 13:41:53 -0700 Subject: [PATCH] HOT new automatic retry feature. Needs testing. --- lib/sidekiq/middleware/server/retry_jobs.rb | 54 +++++++++++++++++++++ lib/sidekiq/processor.rb | 3 +- lib/sidekiq/retry.rb | 44 +++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 lib/sidekiq/middleware/server/retry_jobs.rb create mode 100644 lib/sidekiq/retry.rb diff --git a/lib/sidekiq/middleware/server/retry_jobs.rb b/lib/sidekiq/middleware/server/retry_jobs.rb new file mode 100644 index 00000000..6f25af3d --- /dev/null +++ b/lib/sidekiq/middleware/server/retry_jobs.rb @@ -0,0 +1,54 @@ +require 'sidekiq/retry' + +module Sidekiq + module Middleware + module Server + ## + # Automatically retry jobs that fail in Sidekiq. + # A message looks like: + # + # { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'] } + # + # We'll add a bit more data to the message to support retries: + # + # * 'queue' - the queue to use + # * 'retry_count' - number of times we've retried so far. + # * 'error_message' - the message from the exception + # * 'error_class' - the exception class + # * 'failed_at' - the first time it failed + # * 'retried_at' - the last time it was retried + # + # We don't store the backtrace as that can add a lot of overhead + # to the message and everyone is using Airbrake, right? + class RetryJobs + def call(worker, msg, queue) + yield + rescue => e + msg['queue'] = queue + msg['error_message'] = e.message + msg['error_class'] = e.class.name + count = if msg['retry_count'] + msg['retried_at'] = Time.now.utc + msg['retry_count'] += 1 + else + msg['failed_at'] = Time.now.utc + msg['retry_count'] = 0 + end + + if count <= Sidekiq::Retry::MAX_COUNT + retry_at = Time.now.to_f + Sidekiq::Retry::DELAY.call(count) + payload = MultiJson.encode(msg) + Sidekiq.redis do |conn| + conn.zadd('retry', retry_at, payload) + end + else + # Pour a 40 out for our friend. Goodbye dear message, + # You (re)tried your best, I'm sure. + Sidekiq::Util.logger.info("Dropping message after hitting the retry maximum: #{message}") + end + raise + end + end + end + end +end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index d780cafc..33206510 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -4,7 +4,7 @@ require 'sidekiq/util' require 'sidekiq/middleware/server/active_record' require 'sidekiq/middleware/server/exception_handler' require 'sidekiq/middleware/server/unique_jobs' -require 'sidekiq/middleware/server/failure_jobs' +require 'sidekiq/middleware/server/retry_jobs' require 'sidekiq/middleware/server/logging' module Sidekiq @@ -21,6 +21,7 @@ module Sidekiq m.add Middleware::Server::ExceptionHandler m.add Middleware::Server::Logging m.add Middleware::Server::UniqueJobs + #m.add Middleware::Server::RetryJobs m.add Middleware::Server::ActiveRecord end end diff --git a/lib/sidekiq/retry.rb b/lib/sidekiq/retry.rb new file mode 100644 index 00000000..20dbee10 --- /dev/null +++ b/lib/sidekiq/retry.rb @@ -0,0 +1,44 @@ +module Sidekiq + ## + # Sidekiq's retry support assumes a typical development lifecycle: + # 0. push some code changes with a bug in it + # 1. bug causes message processing to fail, sidekiq's middleware captures + # the message and pushes it onto a retry queue + # 2. sidekiq retries messages in the retry queue multiple times with + # an exponential delay, the message continues to fail + # 3. after a few days, a developer deploys a fix. the message is + # reprocessed successfully. + # 4. if 3 never happens, sidekiq will eventually give up and throw the + # message away. + module Retry + + # delayed_job uses the same basic formula + MAX_COUNT = 25 + DELAY = proc { |count| (count ** 4) + 15 } + POLL_INTERVAL = 15 + + ## + # The Poller checks Redis every N seconds for messages in the retry + # set have passed their retry timestamp and should be retried. If so, it + # just pops the message back onto its original queue so the + # workers can pick it up like any other message. + class Poller + include Celluloid + + def poll + Sidekiq.redis do |conn| + # A message's "score" in Redis is the time at which it should be retried. + # Just check Redis for the set of messages with a timestamp before now. + messages = conn.zremrangebyscore 'retry', '-inf', Time.now.to_f.to_s + messages.each do |message| + msg = MultiJson.decode(message) + conn.rpush(msg['queue'], message) + end + end + + after(POLL_INTERVAL) { poll } + end + + end + end +end