mirror of
				https://github.com/mperham/sidekiq.git
				synced 2022-11-09 13:52:34 -05:00 
			
		
		
		
	Auto failure retry now working!
This commit is contained in:
		
							parent
							
								
									b782c05e1b
								
							
						
					
					
						commit
						e38a3d785a
					
				
					 12 changed files with 75 additions and 27 deletions
				
			
		| 
						 | 
				
			
			@ -1,3 +1,11 @@
 | 
			
		|||
0.10.0
 | 
			
		||||
-----------
 | 
			
		||||
 | 
			
		||||
- Automatic failure retry!  Sidekiq will now save failed messages
 | 
			
		||||
  and retry them, with an exponential backoff, over about 20 days.
 | 
			
		||||
  Did a message fail to process?  Just deploy a bug fix in the next
 | 
			
		||||
  few days and Sidekiq will retry the message eventually.
 | 
			
		||||
 | 
			
		||||
0.9.1
 | 
			
		||||
-----------
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,7 +28,7 @@ module Sidekiq
 | 
			
		|||
    include Singleton
 | 
			
		||||
 | 
			
		||||
    # Used for CLI testing
 | 
			
		||||
    attr_accessor :code, :manager
 | 
			
		||||
    attr_accessor :code
 | 
			
		||||
 | 
			
		||||
    def initialize
 | 
			
		||||
      @code = nil
 | 
			
		||||
| 
						 | 
				
			
			@ -44,6 +44,7 @@ module Sidekiq
 | 
			
		|||
      options.merge!(config.merge(cli))
 | 
			
		||||
 | 
			
		||||
      Sidekiq::Util.logger.level = Logger::DEBUG if options[:verbose]
 | 
			
		||||
      Celluloid.logger = nil
 | 
			
		||||
 | 
			
		||||
      validate!
 | 
			
		||||
      write_pid
 | 
			
		||||
| 
						 | 
				
			
			@ -51,13 +52,16 @@ module Sidekiq
 | 
			
		|||
    end
 | 
			
		||||
 | 
			
		||||
    def run
 | 
			
		||||
      @manager = Sidekiq::Manager.new(options)
 | 
			
		||||
      manager = Sidekiq::Manager.new(options)
 | 
			
		||||
      poller = Sidekiq::Retry::Poller.new
 | 
			
		||||
      begin
 | 
			
		||||
        logger.info 'Starting processing, hit Ctrl-C to stop'
 | 
			
		||||
        manager.start!
 | 
			
		||||
        poller.poll!
 | 
			
		||||
        sleep
 | 
			
		||||
      rescue Interrupt
 | 
			
		||||
        logger.info 'Shutting down'
 | 
			
		||||
        poller.terminate
 | 
			
		||||
        manager.stop!(:shutdown => true, :timeout => options[:timeout])
 | 
			
		||||
        manager.wait(:shutdown)
 | 
			
		||||
      end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -122,7 +122,7 @@ module Sidekiq
 | 
			
		|||
          @ready.size.times do
 | 
			
		||||
            found ||= find_work(@queues.sample)
 | 
			
		||||
          end
 | 
			
		||||
          break logger.debug('nothing to process') unless found
 | 
			
		||||
          break unless found
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        # This is the polling loop that ensures we check Redis every
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,6 +21,9 @@ module Sidekiq
 | 
			
		|||
      # We don't store the backtrace as that can add a lot of overhead
 | 
			
		||||
      # to the message and everyone is using Airbrake, right?
 | 
			
		||||
      class RetryJobs
 | 
			
		||||
        include Sidekiq::Util
 | 
			
		||||
        include Sidekiq::Retry
 | 
			
		||||
 | 
			
		||||
        def call(worker, msg, queue)
 | 
			
		||||
          yield
 | 
			
		||||
        rescue => e
 | 
			
		||||
| 
						 | 
				
			
			@ -35,19 +38,21 @@ module Sidekiq
 | 
			
		|||
            msg['retry_count'] = 0
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          if count <= Sidekiq::Retry::MAX_COUNT
 | 
			
		||||
            retry_at = Time.now.to_f + Sidekiq::Retry::DELAY.call(count)
 | 
			
		||||
          if count <= MAX_COUNT
 | 
			
		||||
            delay = DELAY.call(count)
 | 
			
		||||
            logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
 | 
			
		||||
            retry_at = Time.now.to_f + delay
 | 
			
		||||
            payload = MultiJson.encode(msg)
 | 
			
		||||
            Sidekiq.redis do |conn|
 | 
			
		||||
              conn.zadd('retry', retry_at, payload)
 | 
			
		||||
              conn.zadd('retry', retry_at.to_s, payload)
 | 
			
		||||
            end
 | 
			
		||||
          else
 | 
			
		||||
            # Pour a 40 out for our friend.  Goodbye dear message,
 | 
			
		||||
            # You (re)tried your best, I'm sure.
 | 
			
		||||
            Sidekiq::Util.logger.info("Dropping message after hitting the retry maximum: #{message}")
 | 
			
		||||
            # Goodbye dear message, you (re)tried your best I'm sure.
 | 
			
		||||
            logger.debug { "Dropping message after hitting the retry maximum: #{msg}" }
 | 
			
		||||
          end
 | 
			
		||||
          raise
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,7 +21,7 @@ module Sidekiq
 | 
			
		|||
        m.add Middleware::Server::ExceptionHandler
 | 
			
		||||
        m.add Middleware::Server::Logging
 | 
			
		||||
        m.add Middleware::Server::UniqueJobs
 | 
			
		||||
        #m.add Middleware::Server::RetryJobs
 | 
			
		||||
        m.add Middleware::Server::RetryJobs
 | 
			
		||||
        m.add Middleware::Server::ActiveRecord
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -27,21 +27,31 @@ module Sidekiq
 | 
			
		|||
    # workers can pick it up like any other message.
 | 
			
		||||
    class Poller
 | 
			
		||||
      include Celluloid
 | 
			
		||||
      include Sidekiq::Util
 | 
			
		||||
 | 
			
		||||
      def poll
 | 
			
		||||
        Sidekiq.redis do |conn|
 | 
			
		||||
          # A message's "score" in Redis is the time at which it should be retried.
 | 
			
		||||
          # Just check Redis for the set of messages with a timestamp before now.
 | 
			
		||||
          messages = conn.zremrangebyscore 'retry', '-inf', Time.now.to_f.to_s
 | 
			
		||||
          messages.each do |message|
 | 
			
		||||
            msg = MultiJson.decode(message)
 | 
			
		||||
            conn.rpush(msg['queue'], message)
 | 
			
		||||
        watchdog('retry poller thread died!') do
 | 
			
		||||
 | 
			
		||||
          Sidekiq.redis do |conn|
 | 
			
		||||
            # A message's "score" in Redis is the time at which it should be retried.
 | 
			
		||||
            # Just check Redis for the set of messages with a timestamp before now.
 | 
			
		||||
            messages = nil
 | 
			
		||||
            now = Time.now.to_f.to_s
 | 
			
		||||
            (messages, _) = conn.multi do
 | 
			
		||||
              conn.zrangebyscore('retry', '-inf', now)
 | 
			
		||||
              conn.zremrangebyscore('retry', '-inf', now)
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            messages.each do |message|
 | 
			
		||||
              logger.debug { "Retrying #{message}" }
 | 
			
		||||
              msg = MultiJson.decode(message)
 | 
			
		||||
              conn.rpush("queue:#{msg['queue']}", message)
 | 
			
		||||
            end
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          after(POLL_INTERVAL) { poll }
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        after(POLL_INTERVAL) { poll }
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,3 +1,3 @@
 | 
			
		|||
module Sidekiq
 | 
			
		||||
  VERSION = "0.9.1"
 | 
			
		||||
  VERSION = "0.10.0"
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,7 @@
 | 
			
		|||
PATH
 | 
			
		||||
  remote: ..
 | 
			
		||||
  specs:
 | 
			
		||||
    sidekiq (0.9.1)
 | 
			
		||||
    sidekiq (0.10.0)
 | 
			
		||||
      celluloid
 | 
			
		||||
      connection_pool (>= 0.9.0)
 | 
			
		||||
      multi_json
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,6 +19,11 @@ class WorkController < ApplicationController
 | 
			
		|||
    render :text => 'enqueued'
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def crash
 | 
			
		||||
    HardWorker.perform_async('crash', 1, Time.now.to_f)
 | 
			
		||||
    render :text => 'enqueued'
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def delayed_post
 | 
			
		||||
    p = Post.first
 | 
			
		||||
    unless p
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,6 +2,7 @@ class HardWorker
 | 
			
		|||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  def perform(name, count, salt)
 | 
			
		||||
    raise name if name == 'crash'
 | 
			
		||||
    print "#{Time.now}\n"
 | 
			
		||||
    sleep count
 | 
			
		||||
  end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,4 +6,5 @@ Myapp::Application.routes.draw do
 | 
			
		|||
  get "work/email" => "work#email"
 | 
			
		||||
  get "work/post" => "work#delayed_post"
 | 
			
		||||
  get "work/long" => "work#long"
 | 
			
		||||
  get "work/crash" => "work#crash"
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,10 +10,10 @@ class TestRetry < MiniTest::Unit::TestCase
 | 
			
		|||
      Sidekiq.instance_variable_set(:@redis, @redis)
 | 
			
		||||
 | 
			
		||||
      def @redis.with; yield self; end
 | 
			
		||||
      @redis.expect :zadd, 1, ['retry', Float, String]
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    it 'handles a new failed message' do
 | 
			
		||||
      @redis.expect :zadd, 1, ['retry', String, String]
 | 
			
		||||
      msg = { 'class' => 'Bob', 'args' => [1,2,'foo'] }
 | 
			
		||||
      handler = Sidekiq::Middleware::Server::RetryJobs.new
 | 
			
		||||
      assert_raises RuntimeError do
 | 
			
		||||
| 
						 | 
				
			
			@ -30,6 +30,7 @@ class TestRetry < MiniTest::Unit::TestCase
 | 
			
		|||
    end
 | 
			
		||||
 | 
			
		||||
    it 'handles a recurring failed message' do
 | 
			
		||||
      @redis.expect :zadd, 1, ['retry', String, String]
 | 
			
		||||
      now = Time.now.utc
 | 
			
		||||
      msg = {"class"=>"Bob", "args"=>[1, 2, "foo"], "queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>10}
 | 
			
		||||
      handler = Sidekiq::Middleware::Server::RetryJobs.new
 | 
			
		||||
| 
						 | 
				
			
			@ -45,6 +46,18 @@ class TestRetry < MiniTest::Unit::TestCase
 | 
			
		|||
      assert msg["failed_at"]
 | 
			
		||||
      @redis.verify
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    it 'throws away old messages after too many retries' do
 | 
			
		||||
      now = Time.now.utc
 | 
			
		||||
      msg = {"class"=>"Bob", "args"=>[1, 2, "foo"], "queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>25}
 | 
			
		||||
      handler = Sidekiq::Middleware::Server::RetryJobs.new
 | 
			
		||||
      assert_raises RuntimeError do
 | 
			
		||||
        handler.call('', msg, 'default') do
 | 
			
		||||
          raise "kerblammo!"
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
      @redis.verify
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe 'poller' do
 | 
			
		||||
| 
						 | 
				
			
			@ -52,16 +65,17 @@ class TestRetry < MiniTest::Unit::TestCase
 | 
			
		|||
      @redis = MiniTest::Mock.new
 | 
			
		||||
      Sidekiq.instance_variable_set(:@redis, @redis)
 | 
			
		||||
 | 
			
		||||
      fake_msg = MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
 | 
			
		||||
 | 
			
		||||
      def @redis.with; yield self; end
 | 
			
		||||
      @redis.expect :zremrangebyscore, [fake_msg], ['retry', '-inf', String]
 | 
			
		||||
      @redis.expect :rpush, 1, ['someq', fake_msg]
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    it 'should poll like a bad mother...SHUT YO MOUTH' do
 | 
			
		||||
      fake_msg = MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
 | 
			
		||||
      @redis.expect :multi, [[fake_msg], 1], []
 | 
			
		||||
      @redis.expect :rpush, 1, ['queue:someq', fake_msg]
 | 
			
		||||
 | 
			
		||||
      inst = Sidekiq::Retry::Poller.new
 | 
			
		||||
      inst.poll
 | 
			
		||||
 | 
			
		||||
      @redis.verify
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue