mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
d4b012bd8c
When I moved the reloader inside the block so that any errors it raised would be handled properly, the `job` local variable was pushed into a nested scope, which meant it wasn't accessible from the rescue block any more. This changed the meaning of `job` in that rescue block from the local variable to the `attr_reader` with the same name. We don't need to reload the application before parsing the job payload, so we can move this work outside the reloader block so that the job hash is accessible in the rescue block again.
249 lines
7.2 KiB
Ruby
249 lines
7.2 KiB
Ruby
# frozen_string_literal: true
|
|
require_relative 'helper'
|
|
require 'sidekiq/fetch'
|
|
require 'sidekiq/cli'
|
|
require 'sidekiq/processor'
|
|
|
|
class TestProcessor < Sidekiq::Test
|
|
TestException = Class.new(StandardError)
|
|
TEST_EXCEPTION = TestException.new("kerboom!")
|
|
|
|
describe 'processor' do
|
|
before do
|
|
$invokes = 0
|
|
@mgr = Minitest::Mock.new
|
|
@mgr.expect(:options, {:queues => ['default']})
|
|
@mgr.expect(:options, {:queues => ['default']})
|
|
@processor = ::Sidekiq::Processor.new(@mgr)
|
|
end
|
|
|
|
class MockWorker
|
|
include Sidekiq::Worker
|
|
def perform(args)
|
|
raise TEST_EXCEPTION if args == 'boom'
|
|
args.pop if args.is_a? Array
|
|
$invokes += 1
|
|
end
|
|
end
|
|
|
|
def work(msg, queue='queue:default')
|
|
Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
|
|
end
|
|
|
|
it 'processes as expected' do
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
|
|
@processor.process(work(msg))
|
|
assert_equal 1, $invokes
|
|
end
|
|
|
|
it 'executes a worker as expected' do
|
|
worker = Minitest::Mock.new
|
|
worker.expect(:perform, nil, [1, 2, 3])
|
|
@processor.execute_job(worker, [1, 2, 3])
|
|
end
|
|
|
|
it 're-raises exceptions after handling' do
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
|
|
re_raise = false
|
|
|
|
begin
|
|
@processor.process(work(msg))
|
|
flunk "Expected exception"
|
|
rescue TestException
|
|
re_raise = true
|
|
end
|
|
|
|
assert_equal 0, $invokes
|
|
assert re_raise, "does not re-raise exceptions after handling"
|
|
end
|
|
|
|
it 'does not modify original arguments' do
|
|
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
|
|
msgstr = Sidekiq.dump_json(msg)
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work(msgstr))
|
|
assert_equal [['myarg']], msg['args']
|
|
end
|
|
|
|
describe 'exception handling' do
|
|
let(:errors) { [] }
|
|
let(:error_handler) do
|
|
proc do |exception, context|
|
|
errors << { exception: exception, context: context }
|
|
end
|
|
end
|
|
|
|
before do
|
|
Sidekiq.error_handlers << error_handler
|
|
end
|
|
|
|
after do
|
|
Sidekiq.error_handlers.pop
|
|
end
|
|
|
|
it 'handles exceptions raised by the job' do
|
|
job_hash = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
|
|
msg = Sidekiq.dump_json(job_hash)
|
|
job = work(msg)
|
|
begin
|
|
@processor.instance_variable_set(:'@job', job)
|
|
@processor.process(job)
|
|
rescue TestException
|
|
end
|
|
assert_equal 1, errors.count
|
|
assert_instance_of TestException, errors.first[:exception]
|
|
assert_equal msg, errors.first[:context][:jobstr]
|
|
assert_equal job_hash, errors.first[:context][:job]
|
|
end
|
|
|
|
it 'handles exceptions raised by the reloader' do
|
|
job_hash = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
|
|
msg = Sidekiq.dump_json(job_hash)
|
|
@processor.instance_variable_set(:'@reloader', proc { raise TEST_EXCEPTION })
|
|
job = work(msg)
|
|
begin
|
|
@processor.instance_variable_set(:'@job', job)
|
|
@processor.process(job)
|
|
rescue TestException
|
|
end
|
|
assert_equal 1, errors.count
|
|
assert_instance_of TestException, errors.first[:exception]
|
|
assert_equal msg, errors.first[:context][:jobstr]
|
|
assert_equal job_hash, errors.first[:context][:job]
|
|
end
|
|
end
|
|
|
|
describe 'acknowledgement' do
|
|
class ExceptionRaisingMiddleware
|
|
def initialize(raise_before_yield, raise_after_yield, skip)
|
|
@raise_before_yield = raise_before_yield
|
|
@raise_after_yield = raise_after_yield
|
|
@skip = skip
|
|
end
|
|
|
|
def call(worker, item, queue)
|
|
raise TEST_EXCEPTION if @raise_before_yield
|
|
yield unless @skip
|
|
raise TEST_EXCEPTION if @raise_after_yield
|
|
end
|
|
end
|
|
|
|
let(:raise_before_yield) { false }
|
|
let(:raise_after_yield) { false }
|
|
let(:skip_job) { false }
|
|
let(:worker_args) { ['myarg'] }
|
|
let(:work) { MiniTest::Mock.new }
|
|
|
|
before do
|
|
work.expect(:queue_name, 'queue:default')
|
|
work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
|
|
end
|
|
end
|
|
|
|
after do
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.remove ExceptionRaisingMiddleware
|
|
end
|
|
work.verify
|
|
end
|
|
|
|
describe 'middleware throws an exception before processing the work' do
|
|
let(:raise_before_yield) { true }
|
|
|
|
it 'does not ack' do
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'middleware throws an exception after processing the work' do
|
|
let(:raise_after_yield) { true }
|
|
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'middleware decides to skip work' do
|
|
let(:skip_job) { true }
|
|
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work)
|
|
end
|
|
end
|
|
|
|
describe 'worker raises an exception' do
|
|
let(:worker_args) { ['boom'] }
|
|
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'everything goes well' do
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work)
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'stats' do
|
|
before do
|
|
Sidekiq.redis {|c| c.flushdb }
|
|
end
|
|
|
|
describe 'when successful' do
|
|
let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
|
|
|
|
def successful_job
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work(msg))
|
|
end
|
|
|
|
it 'increments processed stat' do
|
|
Sidekiq::Processor::PROCESSED.value = 0
|
|
successful_job
|
|
assert_equal 1, Sidekiq::Processor::PROCESSED.value
|
|
end
|
|
end
|
|
|
|
describe 'when failed' do
|
|
let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
|
|
|
|
def failed_job
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
|
|
begin
|
|
@processor.process(work(msg))
|
|
rescue TestException
|
|
end
|
|
end
|
|
|
|
it 'increments failed stat' do
|
|
Sidekiq::Processor::FAILURE.value = 0
|
|
failed_job
|
|
assert_equal 1, Sidekiq::Processor::FAILURE.value
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|