mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Fix retries_exhausted + dead job handling.
retries_exhausted should always be called and then the job pushed to the DJQ.
This commit is contained in:
parent
7a74a8ee22
commit
c1c3c349af
2 changed files with 32 additions and 62 deletions
|
@ -113,21 +113,28 @@ module Sidekiq
|
|||
|
||||
def retries_exhausted(worker, msg)
|
||||
logger.debug { "Dropping message after hitting the retry maximum: #{msg}" }
|
||||
if worker.sidekiq_retries_exhausted_block?
|
||||
worker.sidekiq_retries_exhausted_block.call(msg)
|
||||
else
|
||||
Sidekiq.logger.info { "Adding a dead #{msg['class']} job" }
|
||||
payload = Sidekiq.dump_json(msg)
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.zadd('dead', Time.now.to_f, payload)
|
||||
conn.zremrangebyscore('dead', '-inf', (Time.now.to_i - DEAD_JOB_TIMEOUT).to_f)
|
||||
conn.zremrangebyrank('dead', 0, -MAX_JOBS)
|
||||
end
|
||||
begin
|
||||
if worker.sidekiq_retries_exhausted_block?
|
||||
worker.sidekiq_retries_exhausted_block.call(msg)
|
||||
end
|
||||
rescue => e
|
||||
handle_exception(e, { :context => "Error calling retries_exhausted" })
|
||||
end
|
||||
|
||||
send_to_morgue(msg)
|
||||
end
|
||||
|
||||
def send_to_morgue(msg)
|
||||
Sidekiq.logger.info { "Adding dead #{msg['class']} job #{msg['jid']}" }
|
||||
payload = Sidekiq.dump_json(msg)
|
||||
now = Time.now.to_f
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.zadd('dead', now, payload)
|
||||
conn.zremrangebyscore('dead', '-inf', now - DEAD_JOB_TIMEOUT)
|
||||
conn.zremrangebyrank('dead', 0, -MAX_JOBS)
|
||||
end
|
||||
end
|
||||
rescue Exception => e
|
||||
handle_exception(e, { :context => "Error calling retries_exhausted" })
|
||||
end
|
||||
|
||||
def retry_attempts_from(msg_retry, default)
|
||||
|
|
|
@ -10,6 +10,7 @@ class TestRetry < Sidekiq::Test
|
|||
Sidekiq.instance_variable_set(:@redis, @redis)
|
||||
|
||||
def @redis.with; yield self; end
|
||||
def @redis.multi; yield self; end
|
||||
end
|
||||
|
||||
let(:worker) do
|
||||
|
@ -50,6 +51,9 @@ class TestRetry < Sidekiq::Test
|
|||
1.upto(max_retries) do
|
||||
@redis.expect :zadd, 1, ['retry', String, String]
|
||||
end
|
||||
@redis.expect :zadd, 1, ['dead', Float, String]
|
||||
@redis.expect :zremrangebyscore, 0, ['dead', String, Float]
|
||||
@redis.expect :zremrangebyrank, 0, ['dead', Numeric, Numeric]
|
||||
msg = { 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => true }
|
||||
handler = Sidekiq::Middleware::Server::RetryJobs.new({:max_retries => max_retries})
|
||||
1.upto(max_retries + 1) do
|
||||
|
@ -166,72 +170,31 @@ class TestRetry < Sidekiq::Test
|
|||
it 'throws away old messages after too many retries (using the default)' do
|
||||
now = Time.now.to_f
|
||||
msg = {"class"=>"Bob", "args"=>[1, 2, "foo"], "queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry"=>true, "retry_count"=>25}
|
||||
@redis.expect :zadd, 1, [ 'retry', String, String ]
|
||||
@redis.expect :zadd, 1, ['dead', Float, String]
|
||||
@redis.expect :zremrangebyscore, 0, ['dead', String, Float]
|
||||
@redis.expect :zremrangebyrank, 0, ['dead', Numeric, Numeric]
|
||||
handler = Sidekiq::Middleware::Server::RetryJobs.new
|
||||
assert_raises RuntimeError do
|
||||
handler.call(worker, msg, 'default') do
|
||||
raise "kerblammo!"
|
||||
end
|
||||
end
|
||||
# Minitest can't assert that a method call did NOT happen!?
|
||||
assert_raises(MockExpectationError) { @redis.verify }
|
||||
@redis.verify
|
||||
end
|
||||
|
||||
it 'throws away old messages after too many retries (using user-specified max)' do
|
||||
now = Time.now.to_f
|
||||
msg = {"class"=>"Bob", "args"=>[1, 2, "foo"], "queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry"=>3, "retry_count"=>3}
|
||||
@redis.expect :zadd, 1, [ 'retry', String, String ]
|
||||
@redis.expect :zadd, 1, ['dead', Float, String]
|
||||
@redis.expect :zremrangebyscore, 0, ['dead', String, Float]
|
||||
@redis.expect :zremrangebyrank, 0, ['dead', Numeric, Numeric]
|
||||
handler = Sidekiq::Middleware::Server::RetryJobs.new
|
||||
assert_raises RuntimeError do
|
||||
handler.call(worker, msg, 'default') do
|
||||
raise "kerblammo!"
|
||||
end
|
||||
end
|
||||
# Minitest can't assert that a method call did NOT happen!?
|
||||
assert_raises(MockExpectationError) { @redis.verify }
|
||||
end
|
||||
|
||||
describe "retry exhaustion" do
|
||||
let(:handler){ Sidekiq::Middleware::Server::RetryJobs.new }
|
||||
let(:worker) { Minitest::Mock.new }
|
||||
let(:msg){ {"class"=>"Bob", "args"=>[1, 2, "foo"], "queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>Time.now.to_f, "retry"=>3, "retry_count"=>3} }
|
||||
|
||||
describe "worker block" do
|
||||
let(:worker) do
|
||||
Class.new do
|
||||
include Sidekiq::Worker
|
||||
|
||||
sidekiq_retries_exhausted do |msg|
|
||||
msg.tap {|m| m['called_by_callback'] = true }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it 'calls worker sidekiq_retries_exhausted_block after too many retries' do
|
||||
new_msg = handler.__send__(:retries_exhausted, worker.new, msg)
|
||||
expected_msg = msg.merge('called_by_callback' => true)
|
||||
|
||||
assert_equal expected_msg, new_msg, "sidekiq_retries_exhausted block not called"
|
||||
end
|
||||
end
|
||||
|
||||
it 'handles and logs retries_exhausted failures gracefully (drops them)' do
|
||||
def worker.retries_exhausted(*args)
|
||||
raise 'bam!'
|
||||
end
|
||||
|
||||
e = task_misbehaving_worker
|
||||
assert_equal e.message, "kerblammo!"
|
||||
worker.verify
|
||||
end
|
||||
|
||||
def task_misbehaving_worker
|
||||
assert_raises RuntimeError do
|
||||
handler.call(worker, msg, 'default') do
|
||||
raise 'kerblammo!'
|
||||
end
|
||||
end
|
||||
end
|
||||
@redis.verify
|
||||
end
|
||||
|
||||
describe "custom retry delay" do
|
||||
|
|
Loading…
Reference in a new issue