gitlab-org--gitlab-foss/spec/workers/concerns/application_worker_spec.rb

540 lines
18 KiB
Ruby

# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ApplicationWorker do
# We depend on the lazy-load characteristic of rspec. If the worker is loaded
# before setting up, it's likely to go wrong. Consider this catcha:
# before do
# allow(router).to receive(:route).with(worker).and_return('queue_1')
# end
# As worker is triggered, it includes ApplicationWorker, and the router is
# called before it is stubbed. That makes the stubbing useless.
let(:worker) do
Class.new do
def self.name
'Gitlab::Foo::Bar::DummyWorker'
end
include ApplicationWorker
end
end
let(:instance) { worker.new }
let(:router) { double(:router) }
before do
allow(::Gitlab::SidekiqConfig::WorkerRouter).to receive(:global).and_return(router)
allow(router).to receive(:route).and_return('foo_bar_dummy')
end
describe 'Sidekiq attributes' do
it 'sets the queue name based on the output of the router' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
expect(router).to have_received(:route).with(worker).at_least(:once)
end
context 'when a worker attribute is updated' do
before do
counter = 0
allow(router).to receive(:route) do
counter += 1
"queue_#{counter}"
end
end
it 'updates the queue name afterward' do
expect(worker.sidekiq_options['queue']).to eq('queue_1')
worker.feature_category :pages
expect(worker.sidekiq_options['queue']).to eq('queue_2')
worker.feature_category :not_owned
expect(worker.sidekiq_options['queue']).to eq('queue_3')
worker.urgency :high
expect(worker.sidekiq_options['queue']).to eq('queue_4')
worker.worker_has_external_dependencies!
expect(worker.sidekiq_options['queue']).to eq('queue_5')
worker.worker_resource_boundary :cpu
expect(worker.sidekiq_options['queue']).to eq('queue_6')
worker.idempotent!
expect(worker.sidekiq_options['queue']).to eq('queue_7')
worker.weight 3
expect(worker.sidekiq_options['queue']).to eq('queue_8')
worker.tags :hello
expect(worker.sidekiq_options['queue']).to eq('queue_9')
worker.big_payload!
expect(worker.sidekiq_options['queue']).to eq('queue_10')
expect(router).to have_received(:route).with(worker).at_least(10).times
end
end
context 'when the worker is inherited' do
let(:sub_worker) { Class.new(worker) }
before do
allow(router).to receive(:route).and_return('queue_1')
worker # Force loading worker 1 to update its queue
allow(router).to receive(:route).and_return('queue_2')
end
it 'sets the queue name for the inherited worker' do
expect(sub_worker.sidekiq_options['queue']).to eq('queue_2')
expect(router).to have_received(:route).with(sub_worker).at_least(:once)
end
end
end
describe '#logging_extras' do
it 'returns extra data to be logged that was set from #log_extra_metadata_on_done' do
instance.log_extra_metadata_on_done(:key1, "value1")
instance.log_extra_metadata_on_done(:key2, "value2")
expect(instance.logging_extras).to eq({ 'extra.gitlab_foo_bar_dummy_worker.key1' => "value1", 'extra.gitlab_foo_bar_dummy_worker.key2' => "value2" })
end
context 'when nothing is set' do
it 'returns {}' do
expect(instance.logging_extras).to eq({})
end
end
end
describe '#structured_payload' do
let(:payload) { {} }
subject(:result) { instance.structured_payload(payload) }
it 'adds worker related payload' do
instance.jid = 'a jid'
expect(result).to include(
'class' => instance.class.name,
'job_status' => 'running',
'queue' => worker.queue,
'jid' => instance.jid
)
end
it 'adds labkit context' do
user = build_stubbed(:user, username: 'jane-doe')
instance.with_context(user: user) do
expect(result).to include('meta.user' => user.username)
end
end
it 'adds custom payload converting stringified keys' do
payload[:message] = 'some message'
expect(result).to include('message' => payload[:message])
end
it 'does not override predefined context keys with custom payload' do
payload['class'] = 'custom value'
expect(result).to include('class' => instance.class.name)
end
end
describe '.queue_namespace' do
before do
allow(router).to receive(:route).and_return('foo_bar_dummy', 'some_namespace:foo_bar_dummy')
end
it 'updates the queue name from the router again' do
expect(worker.queue).to eq('foo_bar_dummy')
worker.queue_namespace :some_namespace
expect(worker.queue).to eq('some_namespace:foo_bar_dummy')
end
it 'updates the queue_namespace options of the worker' do
worker.queue_namespace :some_namespace
expect(worker.queue_namespace).to eql('some_namespace')
expect(worker.sidekiq_options['queue_namespace']).to be(:some_namespace)
end
end
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
expect(worker.queue).to eq('some_queue')
end
end
describe '.data_consistency' do
using RSpec::Parameterized::TableSyntax
where(:data_consistency, :sidekiq_option_retry, :expect_error) do
:delayed | false | true
:delayed | 0 | true
:delayed | 3 | false
:delayed | nil | false
:sticky | false | false
:sticky | 0 | false
:sticky | 3 | false
:sticky | nil | false
:always | false | false
:always | 0 | false
:always | 3 | false
:always | nil | false
end
with_them do
before do
worker.sidekiq_options retry: sidekiq_option_retry unless sidekiq_option_retry.nil?
end
context "when workers data consistency is #{params['data_consistency']}" do
it "#{params['expect_error'] ? '' : 'not to '}raise an exception" do
if expect_error
expect { worker.data_consistency data_consistency }
.to raise_error("Retry support cannot be disabled if data_consistency is set to :delayed")
else
expect { worker.data_consistency data_consistency }
.not_to raise_error
end
end
end
end
end
describe '.retry' do
using RSpec::Parameterized::TableSyntax
where(:data_consistency, :sidekiq_option_retry, :expect_error) do
:delayed | false | true
:delayed | 0 | true
:delayed | 3 | false
:sticky | false | false
:sticky | 0 | false
:sticky | 3 | false
:always | false | false
:always | 0 | false
:always | 3 | false
end
with_them do
before do
worker.data_consistency(data_consistency)
end
context "when retry sidekiq option is #{params['sidekiq_option_retry']}" do
it "#{params['expect_error'] ? '' : 'not to '}raise an exception" do
if expect_error
expect { worker.sidekiq_options retry: sidekiq_option_retry }
.to raise_error("Retry support cannot be disabled if data_consistency is set to :delayed")
else
expect { worker.sidekiq_options retry: sidekiq_option_retry }
.not_to raise_error
end
end
end
end
end
context 'different kinds of push_bulk' do
shared_context 'set safe limit beyond the number of jobs to be enqueued' do
before do
stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", args.count + 1)
end
end
shared_context 'set safe limit below the number of jobs to be enqueued' do
before do
stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", 2)
end
end
shared_examples_for 'returns job_id of all enqueued jobs' do
let(:job_id_regex) { /[0-9a-f]{12}/ }
it 'returns job_id of all enqueued jobs' do
job_ids = perform_action
expect(job_ids.count).to eq(args.count)
expect(job_ids).to all(match(job_id_regex))
end
end
shared_examples_for 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
it 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [1]], ['Foo', [2]]]))
.ordered
.and_call_original)
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [3]], ['Foo', [4]]]))
.ordered
.and_call_original)
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [5]]]))
.ordered
.and_call_original)
perform_action
expect(worker.jobs.count).to eq args.count
expect(worker.jobs).to all(include('enqueued_at'))
end
end
shared_examples_for 'enqueues jobs in one go' do
it 'enqueues jobs in one go' do
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => args)).once.and_call_original)
expect(Sidekiq.logger).not_to receive(:info)
perform_action
expect(worker.jobs.count).to eq args.count
expect(worker.jobs).to all(include('enqueued_at'))
end
end
shared_examples_for 'logs bulk insertions' do
it 'logs arguments and job IDs' do
worker.log_bulk_perform_async!
expect(Sidekiq.logger).to(
receive(:info).with(hash_including('class' => worker.name, 'args_list' => args)).once.and_call_original)
expect(Sidekiq.logger).to(
receive(:info).with(hash_including('class' => worker.name, 'jid_list' => anything)).once.and_call_original)
perform_action
end
end
before do
stub_const(worker.name, worker)
end
let(:args) do
[
['Foo', [1]],
['Foo', [2]],
['Foo', [3]],
['Foo', [4]],
['Foo', [5]]
]
end
describe '.bulk_perform_async' do
shared_examples_for 'does not schedule the jobs for any specific time' do
it 'does not schedule the jobs for any specific time' do
perform_action
expect(worker.jobs).to all(exclude('at'))
end
end
subject(:perform_action) do
worker.bulk_perform_async(args)
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'logs bulk insertions'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
end
end
describe '.bulk_perform_in' do
context 'without batches' do
shared_examples_for 'schedules all the jobs at a specific time' do
it 'schedules all the jobs at a specific time' do
perform_action
worker.jobs.each do |job_detail|
expect(job_detail['at']).to be_within(3.seconds).of(expected_scheduled_at_time)
end
end
end
let(:delay) { 3.minutes }
let(:expected_scheduled_at_time) { Time.current.to_i + delay.to_i }
subject(:perform_action) do
worker.bulk_perform_in(delay, args)
end
context 'when the scheduled time falls in the past' do
let(:delay) { -60 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
end
end
context 'with batches' do
shared_examples_for 'schedules all the jobs at a specific time, per batch' do
it 'schedules all the jobs at a specific time, per batch' do
perform_action
expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at'])
expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at'])
expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay)
expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay)
end
end
let(:delay) { 1.minute }
let(:batch_size) { 2 }
let(:batch_delay) { 10.minutes }
subject(:perform_action) do
worker.bulk_perform_in(delay, args, batch_size: batch_size, batch_delay: batch_delay)
end
context 'when the `batch_size` is invalid' do
context 'when `batch_size` is 0' do
let(:batch_size) { 0 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'when `batch_size` is negative' do
let(:batch_size) { -3 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
end
context 'when the `batch_delay` is invalid' do
context 'when `batch_delay` is 0' do
let(:batch_delay) { 0.minutes }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'when `batch_delay` is negative' do
let(:batch_delay) { -3.minutes }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
end
end
end
end
describe '.with_status' do
around do |example|
Sidekiq::Testing.fake!(&example)
end
context 'when the worker does have status_expiration set' do
let(:status_expiration_worker) do
Class.new(worker) do
sidekiq_options status_expiration: 3
end
end
it 'uses status_expiration from the worker' do
status_expiration_worker.with_status.perform_async
expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3)
expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1)
end
it 'uses status_expiration from the worker without with_status' do
status_expiration_worker.perform_async
expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3)
expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1)
end
end
context 'when the worker does not have status_expiration set' do
it 'uses the default status_expiration' do
worker.with_status.perform_async
expect(Sidekiq::Queues[worker.queue].first).to include('status_expiration' => Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
expect(Sidekiq::Queues[worker.queue].length).to eq(1)
end
it 'does not set status_expiration without with_status' do
worker.perform_async
expect(Sidekiq::Queues[worker.queue].first).not_to include('status_expiration')
expect(Sidekiq::Queues[worker.queue].length).to eq(1)
end
end
end
end