1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/test/test_processor.rb
Mike Perham 3b5862492a Remove job duplication edge case with Sidekiq Pro, fixes #3388
In #2531, we saw how an IO exception in the logger could cause a job to fail and be deleted before it reached the RetryJobs block, causing job loss.  To fix this, we disabled job acknowledgement until job execution starts but this has the bad side effect of duplicating jobs if the user is running a reliable scheme and the error happens after the RetryJobs middleware but before execution starts.

Instead we flip the middleware ordering; logging now happens within the retry block.  We would lose context-specific logging within retry so we move the context log setup out of the middleware into the Processor.  With these changes, we can properly retry and acknowledge even if there are errors within the initial server middleware and executor calls.

This code path has been reimplemented in Sidekiq 5.0 so this change only applies to 4.x.
2017-03-15 10:03:47 -07:00

266 lines
7.7 KiB
Ruby

# frozen_string_literal: true
require_relative 'helper'
require 'sidekiq/fetch'
require 'sidekiq/cli'
require 'sidekiq/processor'
class TestProcessor < Sidekiq::Test
TestException = Class.new(StandardError)
TEST_EXCEPTION = TestException.new("kerboom!")
describe 'processor' do
before do
$invokes = 0
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default']})
@mgr.expect(:options, {:queues => ['default']})
@processor = ::Sidekiq::Processor.new(@mgr)
end
class MockWorker
include Sidekiq::Worker
def perform(args)
raise TEST_EXCEPTION if args == 'boom'
args.pop if args.is_a? Array
$invokes += 1
end
end
def work(msg, queue='queue:default')
Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
end
it 'processes as expected' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
@processor.process(work(msg))
assert_equal 1, $invokes
end
it 'executes a worker as expected' do
worker = Minitest::Mock.new
worker.expect(:perform, nil, [1, 2, 3])
@processor.execute_job(worker, [1, 2, 3])
end
it 're-raises exceptions after handling' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
re_raise = false
begin
@processor.process(work(msg))
flunk "Expected exception"
rescue TestException
re_raise = true
end
assert_equal 0, $invokes
assert re_raise, "does not re-raise exceptions after handling"
end
it 'does not modify original arguments' do
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
msgstr = Sidekiq.dump_json(msg)
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work(msgstr))
assert_equal [['myarg']], msg['args']
end
describe 'exception handling' do
let(:errors) { [] }
let(:error_handler) do
proc do |exception, context|
errors << { exception: exception, context: context }
end
end
before do
Sidekiq.error_handlers << error_handler
end
after do
Sidekiq.error_handlers.pop
end
it 'handles invalid JSON' do
ds = Sidekiq::DeadSet.new
ds.clear
job_hash = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
msg = Sidekiq.dump_json(job_hash)
job = work(msg[0...-2])
ds = Sidekiq::DeadSet.new
assert_equal 0, ds.size
begin
@processor.instance_variable_set(:'@job', job)
@processor.process(job)
rescue JSON::ParserError
end
assert_equal 1, ds.size
end
it 'handles exceptions raised by the job' do
job_hash = { 'class' => MockWorker.to_s, 'args' => ['boom'], 'jid' => '123987123' }
msg = Sidekiq.dump_json(job_hash)
job = work(msg)
begin
@processor.instance_variable_set(:'@job', job)
@processor.process(job)
rescue TestException
end
assert_equal 1, errors.count
assert_instance_of TestException, errors.first[:exception]
assert_equal msg, errors.first[:context][:jobstr]
assert_equal job_hash['jid'], errors.first[:context][:job]['jid']
end
it 'handles exceptions raised by the reloader' do
job_hash = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
msg = Sidekiq.dump_json(job_hash)
@processor.instance_variable_set(:'@reloader', proc { raise TEST_EXCEPTION })
job = work(msg)
begin
@processor.instance_variable_set(:'@job', job)
@processor.process(job)
rescue TestException
end
assert_equal 1, errors.count
assert_instance_of TestException, errors.first[:exception]
assert_equal msg, errors.first[:context][:jobstr]
assert_equal job_hash, errors.first[:context][:job]
end
end
describe 'acknowledgement' do
class ExceptionRaisingMiddleware
def initialize(raise_before_yield, raise_after_yield, skip)
@raise_before_yield = raise_before_yield
@raise_after_yield = raise_after_yield
@skip = skip
end
def call(worker, item, queue)
raise TEST_EXCEPTION if @raise_before_yield
yield unless @skip
raise TEST_EXCEPTION if @raise_after_yield
end
end
let(:raise_before_yield) { false }
let(:raise_after_yield) { false }
let(:skip_job) { false }
let(:worker_args) { ['myarg'] }
let(:work) { MiniTest::Mock.new }
before do
work.expect(:queue_name, 'queue:default')
work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
Sidekiq.server_middleware do |chain|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
end
end
after do
Sidekiq.server_middleware do |chain|
chain.remove ExceptionRaisingMiddleware
end
work.verify
end
describe 'middleware throws an exception before processing the work' do
let(:raise_before_yield) { true }
it 'acks the job' do
work.expect(:acknowledge, nil)
begin
@processor.process(work)
flunk "Expected #process to raise exception"
rescue TestException
end
end
end
describe 'middleware throws an exception after processing the work' do
let(:raise_after_yield) { true }
it 'acks the job' do
work.expect(:acknowledge, nil)
begin
@processor.process(work)
flunk "Expected #process to raise exception"
rescue TestException
end
end
end
describe 'middleware decides to skip work' do
let(:skip_job) { true }
it 'acks the job' do
work.expect(:acknowledge, nil)
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work)
end
end
describe 'worker raises an exception' do
let(:worker_args) { ['boom'] }
it 'acks the job' do
work.expect(:acknowledge, nil)
begin
@processor.process(work)
flunk "Expected #process to raise exception"
rescue TestException
end
end
end
describe 'everything goes well' do
it 'acks the job' do
work.expect(:acknowledge, nil)
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work)
end
end
end
describe 'stats' do
before do
Sidekiq.redis {|c| c.flushdb }
end
describe 'when successful' do
let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
def successful_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work(msg))
end
it 'increments processed stat' do
Sidekiq::Processor::PROCESSED.value = 0
successful_job
assert_equal 1, Sidekiq::Processor::PROCESSED.value
end
end
describe 'when failed' do
let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
def failed_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(work(msg))
rescue TestException
end
end
it 'increments failed stat' do
Sidekiq::Processor::FAILURE.value = 0
failed_job
assert_equal 1, Sidekiq::Processor::FAILURE.value
end
end
end
end
end