mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
67daa7a408
* Prepare for upcoming Sidekiq::Config redesign Adjust the server internals to use a config object rather than refering directly to the Sidekiq module.
362 lines
9.9 KiB
Ruby
362 lines
9.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "helper"
|
|
require "sidekiq/fetch"
|
|
require "sidekiq/cli"
|
|
require "sidekiq/processor"
|
|
|
|
describe Sidekiq::Processor do
|
|
TestProcessorException = Class.new(StandardError)
|
|
TEST_PROC_EXCEPTION = TestProcessorException.new("kerboom!")
|
|
|
|
before do
|
|
$invokes = 0
|
|
@config = Sidekiq
|
|
@config[:fetch] = Sidekiq::BasicFetch.new(@config)
|
|
@processor = ::Sidekiq::Processor.new(@config) { |*args| }
|
|
end
|
|
|
|
class MockWorker
|
|
include Sidekiq::Worker
|
|
def perform(args)
|
|
raise TEST_PROC_EXCEPTION if args.to_s == "boom"
|
|
args.pop if args.is_a? Array
|
|
$invokes += 1
|
|
end
|
|
end
|
|
|
|
def work(msg, queue = "queue:default")
|
|
Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
|
|
end
|
|
|
|
it "processes as expected" do
|
|
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
|
|
@processor.process(work(msg))
|
|
assert_equal 1, $invokes
|
|
end
|
|
|
|
it "executes a worker as expected" do
|
|
worker = Minitest::Mock.new
|
|
worker.expect(:perform, nil, [1, 2, 3])
|
|
@processor.execute_job(worker, [1, 2, 3])
|
|
end
|
|
|
|
it "re-raises exceptions after handling" do
|
|
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["boom"]})
|
|
re_raise = false
|
|
|
|
begin
|
|
@processor.process(work(msg))
|
|
flunk "Expected exception"
|
|
rescue TestProcessorException
|
|
re_raise = true
|
|
end
|
|
|
|
assert_equal 0, $invokes
|
|
assert re_raise, "does not re-raise exceptions after handling"
|
|
end
|
|
|
|
it "does not modify original arguments" do
|
|
msg = {"class" => MockWorker.to_s, "args" => [["myarg"]]}
|
|
msgstr = Sidekiq.dump_json(msg)
|
|
@processor.process(work(msgstr))
|
|
assert_equal [["myarg"]], msg["args"]
|
|
end
|
|
|
|
describe "exception handling" do
|
|
let(:errors) { [] }
|
|
let(:error_handler) do
|
|
proc do |exception, context|
|
|
errors << {exception: exception, context: context}
|
|
end
|
|
end
|
|
|
|
before do
|
|
Sidekiq.error_handlers << error_handler
|
|
end
|
|
|
|
after do
|
|
Sidekiq.error_handlers.pop
|
|
end
|
|
|
|
it "handles invalid JSON" do
|
|
ds = Sidekiq::DeadSet.new
|
|
ds.clear
|
|
job_hash = {"class" => MockWorker.to_s, "args" => ["boom"]}
|
|
msg = Sidekiq.dump_json(job_hash)
|
|
job = work(msg[0...-2])
|
|
ds = Sidekiq::DeadSet.new
|
|
assert_equal 0, ds.size
|
|
begin
|
|
@processor.instance_variable_set(:@job, job)
|
|
@processor.process(job)
|
|
rescue JSON::ParserError
|
|
end
|
|
assert_equal 1, ds.size
|
|
end
|
|
|
|
it "handles exceptions raised by the job" do
|
|
job_hash = {"class" => MockWorker.to_s, "args" => ["boom"], "jid" => "123987123"}
|
|
msg = Sidekiq.dump_json(job_hash)
|
|
job = work(msg)
|
|
begin
|
|
@processor.instance_variable_set(:@job, job)
|
|
@processor.process(job)
|
|
rescue TestProcessorException
|
|
end
|
|
assert_equal 1, errors.count
|
|
assert_instance_of TestProcessorException, errors.first[:exception]
|
|
assert_equal msg, errors.first[:context][:jobstr]
|
|
assert_equal job_hash["jid"], errors.first[:context][:job]["jid"]
|
|
end
|
|
|
|
it "handles exceptions raised by the reloader" do
|
|
job_hash = {"class" => MockWorker.to_s, "args" => ["boom"]}
|
|
msg = Sidekiq.dump_json(job_hash)
|
|
@processor.instance_variable_set(:@reloader, proc { raise TEST_PROC_EXCEPTION })
|
|
job = work(msg)
|
|
begin
|
|
@processor.instance_variable_set(:@job, job)
|
|
@processor.process(job)
|
|
rescue TestProcessorException
|
|
end
|
|
assert_equal 1, errors.count
|
|
assert_instance_of TestProcessorException, errors.first[:exception]
|
|
assert_equal msg, errors.first[:context][:jobstr]
|
|
assert_equal job_hash, errors.first[:context][:job]
|
|
end
|
|
|
|
it "handles exceptions raised during fetch" do
|
|
fetch_stub = lambda { raise StandardError, "fetch exception" }
|
|
# swallow logging because actually care about the added exception handler
|
|
capture_logging do
|
|
@processor.instance_variable_get(:@strategy).stub(:retrieve_work, fetch_stub) do
|
|
@processor.process_one
|
|
end
|
|
end
|
|
|
|
assert_instance_of StandardError, errors.last[:exception]
|
|
end
|
|
end
|
|
|
|
describe "acknowledgement" do
|
|
class ExceptionRaisingMiddleware
|
|
def initialize(raise_before_yield, raise_after_yield, skip)
|
|
@raise_before_yield = raise_before_yield
|
|
@raise_after_yield = raise_after_yield
|
|
@skip = skip
|
|
end
|
|
|
|
def call(worker, item, queue)
|
|
raise TEST_PROC_EXCEPTION if @raise_before_yield
|
|
yield unless @skip
|
|
raise TEST_PROC_EXCEPTION if @raise_after_yield
|
|
end
|
|
end
|
|
|
|
let(:raise_before_yield) { false }
|
|
let(:raise_after_yield) { false }
|
|
let(:skip_job) { false }
|
|
let(:worker_args) { ["myarg"] }
|
|
let(:work) { MiniTest::Mock.new }
|
|
|
|
before do
|
|
work.expect(:queue_name, "queue:default")
|
|
work.expect(:job, Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => worker_args}))
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
|
|
end
|
|
end
|
|
|
|
after do
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.remove ExceptionRaisingMiddleware
|
|
end
|
|
work.verify
|
|
end
|
|
|
|
describe "middleware throws an exception before processing the work" do
|
|
let(:raise_before_yield) { true }
|
|
|
|
it "acks the job" do
|
|
work.expect(:acknowledge, nil)
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestProcessorException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "middleware throws an exception after processing the work" do
|
|
let(:raise_after_yield) { true }
|
|
|
|
it "acks the job" do
|
|
work.expect(:acknowledge, nil)
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestProcessorException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "middleware decides to skip work" do
|
|
let(:skip_job) { true }
|
|
|
|
it "acks the job" do
|
|
work.expect(:acknowledge, nil)
|
|
@processor.process(work)
|
|
end
|
|
end
|
|
|
|
describe "worker raises an exception" do
|
|
let(:worker_args) { ["boom"] }
|
|
|
|
it "acks the job" do
|
|
work.expect(:acknowledge, nil)
|
|
begin
|
|
@processor.process(work)
|
|
flunk "Expected #process to raise exception"
|
|
rescue TestProcessorException
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "everything goes well" do
|
|
it "acks the job" do
|
|
work.expect(:acknowledge, nil)
|
|
@processor.process(work)
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "retry" do
|
|
class ArgsMutatingServerMiddleware
|
|
def call(worker, item, queue)
|
|
item["args"] = item["args"].map do |arg|
|
|
arg.to_sym if arg.is_a?(String)
|
|
end
|
|
yield
|
|
end
|
|
end
|
|
|
|
class ArgsMutatingClientMiddleware
|
|
def call(worker, item, queue, redis_pool)
|
|
item["args"] = item["args"].map do |arg|
|
|
arg.to_s if arg.is_a?(Symbol)
|
|
end
|
|
yield
|
|
end
|
|
end
|
|
|
|
before do
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.prepend ArgsMutatingServerMiddleware
|
|
end
|
|
Sidekiq.client_middleware do |chain|
|
|
chain.prepend ArgsMutatingClientMiddleware
|
|
end
|
|
end
|
|
|
|
after do
|
|
Sidekiq.server_middleware do |chain|
|
|
chain.remove ArgsMutatingServerMiddleware
|
|
end
|
|
Sidekiq.client_middleware do |chain|
|
|
chain.remove ArgsMutatingClientMiddleware
|
|
end
|
|
end
|
|
|
|
describe "middleware mutates the job args and then fails" do
|
|
it "requeues with original arguments" do
|
|
job_data = {"class" => MockWorker.to_s, "args" => ["boom"]}
|
|
|
|
retry_stub_called = false
|
|
retry_stub = lambda { |worker, msg, queue, exception|
|
|
retry_stub_called = true
|
|
assert_equal "boom", msg["args"].first
|
|
}
|
|
|
|
@processor.instance_variable_get(:@retrier).stub(:attempt_retry, retry_stub) do
|
|
msg = Sidekiq.dump_json(job_data)
|
|
begin
|
|
@processor.process(work(msg))
|
|
flunk "Expected exception"
|
|
rescue TestProcessorException
|
|
end
|
|
end
|
|
|
|
assert retry_stub_called
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "stats" do
|
|
before do
|
|
Sidekiq.redis { |c| c.flushdb }
|
|
end
|
|
|
|
describe "when successful" do
|
|
let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
|
|
|
|
def successful_job
|
|
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
|
|
@processor.process(work(msg))
|
|
end
|
|
|
|
it "increments processed stat" do
|
|
Sidekiq::Processor::PROCESSED.reset
|
|
successful_job
|
|
assert_equal 1, Sidekiq::Processor::PROCESSED.reset
|
|
end
|
|
end
|
|
|
|
describe "custom job logger class" do
|
|
class CustomJobLogger < Sidekiq::JobLogger
|
|
def call(item, queue)
|
|
yield
|
|
rescue Exception
|
|
raise
|
|
end
|
|
end
|
|
|
|
before do
|
|
opts = {queues: ["default"], job_logger: CustomJobLogger}
|
|
@processor = ::Sidekiq::Processor.new(opts) { |pr, ex| }
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "stats" do
|
|
before do
|
|
Sidekiq.redis { |c| c.flushdb }
|
|
end
|
|
|
|
def successful_job
|
|
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
|
|
@processor.process(work(msg))
|
|
end
|
|
|
|
it "increments processed stat" do
|
|
Sidekiq::Processor::PROCESSED.reset
|
|
successful_job
|
|
assert_equal 1, Sidekiq::Processor::PROCESSED.reset
|
|
end
|
|
end
|
|
|
|
describe "custom job logger class" do
|
|
before do
|
|
opts = Sidekiq
|
|
opts[:job_logger] = CustomJobLogger
|
|
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
|
|
@processor = ::Sidekiq::Processor.new(opts) { |pr, ex| }
|
|
end
|
|
|
|
it "is called instead default Sidekiq::JobLogger" do
|
|
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
|
|
@processor.process(work(msg))
|
|
assert_equal 1, $invokes
|
|
end
|
|
end
|
|
end
|