mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
2c9f7662fc
Tests pass and smoke test works perfectly.
201 lines
5.7 KiB
Ruby
201 lines
5.7 KiB
Ruby
# frozen_string_literal: true
|
|
require_relative 'helper'
|
|
require 'sidekiq/fetch'
|
|
require 'sidekiq/cli'
|
|
require 'sidekiq/processor'
|
|
|
|
class TestProcessor < Sidekiq::Test
|
|
TestException = Class.new(StandardError)
|
|
TEST_EXCEPTION = TestException.new("kerboom!")
|
|
|
|
describe 'processor' do
|
|
before do
|
|
$invokes = 0
|
|
@mgr = Minitest::Mock.new
|
|
@mgr.expect(:options, {:queues => ['default']})
|
|
@mgr.expect(:options, {:queues => ['default']})
|
|
@processor = ::Sidekiq::Processor.new(@mgr)
|
|
end
|
|
|
|
class MockWorker
|
|
include Sidekiq::Worker
|
|
def perform(args)
|
|
raise TEST_EXCEPTION if args == 'boom'
|
|
args.pop if args.is_a? Array
|
|
$invokes += 1
|
|
end
|
|
end
|
|
|
|
def work(msg, queue='queue:default')
|
|
Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
|
|
end
|
|
|
|
it 'processes as expected' do
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
|
|
@processor.process(work(msg))
|
|
assert_equal 1, $invokes
|
|
end
|
|
|
|
it 'executes a worker as expected' do
|
|
worker = Minitest::Mock.new
|
|
worker.expect(:perform, nil, [1, 2, 3])
|
|
@processor.execute_job(worker, [1, 2, 3])
|
|
end
|
|
|
|
it 're-raises exceptions after handling' do
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
|
|
re_raise = false
|
|
|
|
begin
|
|
@processor.process(work(msg))
|
|
flunk "Expected exception"
|
|
rescue TestException
|
|
re_raise = true
|
|
end
|
|
|
|
assert_equal 0, $invokes
|
|
assert re_raise, "does not re-raise exceptions after handling"
|
|
end
|
|
|
|
it 'does not modify original arguments' do
|
|
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
|
|
msgstr = Sidekiq.dump_json(msg)
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work(msgstr))
|
|
assert_equal [['myarg']], msg['args']
|
|
end
|
|
|
|
describe 'acknowledgement' do
|
|
class ExceptionRaisingMiddleware
|
|
def initialize(raise_before_yield, raise_after_yield, skip)
|
|
@raise_before_yield = raise_before_yield
|
|
@raise_after_yield = raise_after_yield
|
|
@skip = skip
|
|
end
|
|
|
|
def call(worker, item, queue)
|
|
raise TEST_EXCEPTION if @raise_before_yield
|
|
yield unless @skip
|
|
raise TEST_EXCEPTION if @raise_after_yield
|
|
end
|
|
end
|
|
|
|
let(:raise_before_yield) { false }
|
|
let(:raise_after_yield) { false }
|
|
let(:skip_job) { false }
|
|
let(:worker_args) { ['myarg'] }
|
|
let(:work) { MiniTest::Mock.new }
|
|
|
|
before do
|
|
work.expect(:queue_name, 'queue:default')
|
|
work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
|
|
end
|
|
end
|
|
|
|
after do
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.remove ExceptionRaisingMiddleware
|
|
end
|
|
work.verify
|
|
end
|
|
|
|
describe 'middleware throws an exception before processing the work' do
|
|
let(:raise_before_yield) { true }
|
|
|
|
it 'does not ack' do
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'middleware throws an exception after processing the work' do
|
|
let(:raise_after_yield) { true }
|
|
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'middleware decides to skip work' do
|
|
let(:skip_job) { true }
|
|
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work)
|
|
end
|
|
end
|
|
|
|
describe 'worker raises an exception' do
|
|
let(:worker_args) { ['boom'] }
|
|
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'everything goes well' do
|
|
it 'acks the job' do
|
|
work.expect(:acknowledge, nil)
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work)
|
|
end
|
|
end
|
|
end
|
|
|
|
describe 'stats' do
|
|
before do
|
|
Sidekiq.redis {|c| c.flushdb }
|
|
end
|
|
|
|
describe 'when successful' do
|
|
let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
|
|
|
|
def successful_job
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
|
|
@mgr.expect(:processor_done, nil, [@processor])
|
|
@processor.process(work(msg))
|
|
end
|
|
|
|
it 'increments processed stat' do
|
|
Sidekiq::Processor::PROCESSED.value = 0
|
|
successful_job
|
|
assert_equal 1, Sidekiq::Processor::PROCESSED.value
|
|
end
|
|
end
|
|
|
|
describe 'when failed' do
|
|
let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
|
|
|
|
def failed_job
|
|
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
|
|
begin
|
|
@processor.process(work(msg))
|
|
rescue TestException
|
|
end
|
|
end
|
|
|
|
it 'increments failed stat' do
|
|
Sidekiq::Processor::FAILURE.value = 0
|
|
failed_job
|
|
assert_equal 1, Sidekiq::Processor::FAILURE.value
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|