Merge branch 'better-merge-train-exlusive-lock-ce' into 'master'
CE Port: Fix race condition on merge train with better exclusive lock See merge request gitlab-org/gitlab-ce!30352
This commit is contained in:
commit
debaf20c09
2 changed files with 259 additions and 0 deletions
112
lib/gitlab/batch_pop_queueing.rb
Normal file
112
lib/gitlab/batch_pop_queueing.rb
Normal file
|
@ -0,0 +1,112 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Gitlab
|
||||
##
|
||||
# This class is a queuing system for processing expensive tasks in an atomic manner
|
||||
# with batch poping to let you optimize the total processing time.
|
||||
#
|
||||
# In usual queuing system, the first item started being processed immediately
|
||||
# and the following items wait until the next items have been popped from the queue.
|
||||
# On the other hand, this queueing system, the former part is same, however,
|
||||
# it pops the enqueued items as batch. This is especially useful when you want to
|
||||
# drop redandant items from the queue in order to process important items only,
|
||||
# thus it's more efficient than the traditional queueing system.
|
||||
#
|
||||
# Caveats:
|
||||
# - The order of the items are not guaranteed because of `sadd` (Redis Sets).
|
||||
#
|
||||
# Example:
|
||||
# ```
|
||||
# class TheWorker
|
||||
# def perform
|
||||
# result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
|
||||
# item = extract_the_most_important_item_from(items_in_queue)
|
||||
# expensive_process(item)
|
||||
# end
|
||||
#
|
||||
# if result[:status] == :finished && result[:new_items].present?
|
||||
# item = extract_the_most_important_item_from(items_in_queue)
|
||||
# TheWorker.perform_async(item.id)
|
||||
# end
|
||||
# end
|
||||
# end
|
||||
# ```
|
||||
#
|
||||
class BatchPopQueueing
|
||||
attr_reader :namespace, :queue_id
|
||||
|
||||
EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour
|
||||
MAX_COUNTS_OF_POP_ALL = 1000
|
||||
|
||||
# Initialize queue
|
||||
#
|
||||
# @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name.
|
||||
# @param [String] queue_id The identifier of the queue.
|
||||
# @return [Boolean]
|
||||
def initialize(namespace, queue_id)
|
||||
raise ArgumentError if namespace.empty? || queue_id.empty?
|
||||
|
||||
@namespace, @queue_id = namespace, queue_id
|
||||
end
|
||||
|
||||
##
|
||||
# Execute the given block in an exclusive lock.
|
||||
# If there is the other thread has already working on the block,
|
||||
# it enqueues the items without processing the block.
|
||||
#
|
||||
# @param [Array<String>] new_items New items to be added to the queue.
|
||||
# @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block.
|
||||
# @return [Hash]
|
||||
# - status => One of the `:enqueued` or `:finished`.
|
||||
# - new_items => Newly enqueued items during the given block had been processed.
|
||||
#
|
||||
# NOTE: If an exception is raised in the block, the poppped items will not be recovered.
|
||||
# We should NOT re-enqueue the items in this case because it could end up in an infinite loop.
|
||||
def safe_execute(new_items, lock_timeout: 10.minutes, &block)
|
||||
enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW)
|
||||
|
||||
lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout)
|
||||
|
||||
return { status: :enqueued } unless uuid = lease.try_obtain
|
||||
|
||||
begin
|
||||
all_args = pop_all
|
||||
|
||||
yield all_args if block_given?
|
||||
|
||||
{ status: :finished, new_items: peek_all }
|
||||
ensure
|
||||
Gitlab::ExclusiveLease.cancel(lock_key, uuid)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def lock_key
|
||||
@lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}"
|
||||
end
|
||||
|
||||
def queue_key
|
||||
@queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}"
|
||||
end
|
||||
|
||||
def enqueue(items, expire_time)
|
||||
Gitlab::Redis::Queues.with do |redis|
|
||||
redis.sadd(queue_key, items)
|
||||
redis.expire(queue_key, expire_time.to_i)
|
||||
end
|
||||
end
|
||||
|
||||
def pop_all
|
||||
Gitlab::Redis::Queues.with do |redis|
|
||||
redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL)
|
||||
end
|
||||
end
|
||||
|
||||
def peek_all
|
||||
Gitlab::Redis::Queues.with do |redis|
|
||||
redis.smembers(queue_key)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
147
spec/lib/gitlab/batch_pop_queueing_spec.rb
Normal file
147
spec/lib/gitlab/batch_pop_queueing_spec.rb
Normal file
|
@ -0,0 +1,147 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
describe Gitlab::BatchPopQueueing do
|
||||
include ExclusiveLeaseHelpers
|
||||
using RSpec::Parameterized::TableSyntax
|
||||
|
||||
describe '#initialize' do
|
||||
where(:namespace, :queue_id, :expect_error, :error_type) do
|
||||
'feature' | '1' | false | nil
|
||||
:feature | '1' | false | nil
|
||||
nil | '1' | true | NoMethodError
|
||||
'feature' | nil | true | NoMethodError
|
||||
'' | '1' | true | ArgumentError
|
||||
'feature' | '' | true | ArgumentError
|
||||
'feature' | 1 | true | NoMethodError
|
||||
end
|
||||
|
||||
with_them do
|
||||
it do
|
||||
if expect_error
|
||||
expect { described_class.new(namespace, queue_id) }.to raise_error(error_type)
|
||||
else
|
||||
expect { described_class.new(namespace, queue_id) }.not_to raise_error
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#safe_execute', :clean_gitlab_redis_queues do
|
||||
subject { queue.safe_execute(new_items, lock_timeout: lock_timeout) }
|
||||
|
||||
let(:queue) { described_class.new(namespace, queue_id) }
|
||||
let(:namespace) { 'feature' }
|
||||
let(:queue_id) { '1' }
|
||||
let(:lock_timeout) { 10.minutes }
|
||||
let(:new_items) { %w[A B] }
|
||||
let(:lock_key) { queue.send(:lock_key) }
|
||||
let(:queue_key) { queue.send(:queue_key) }
|
||||
|
||||
it 'enqueues new items always' do
|
||||
Gitlab::Redis::Queues.with do |redis|
|
||||
expect(redis).to receive(:sadd).with(queue_key, new_items)
|
||||
expect(redis).to receive(:expire).with(queue_key, (lock_timeout + described_class::EXTRA_QUEUE_EXPIRE_WINDOW).to_i)
|
||||
end
|
||||
|
||||
subject
|
||||
end
|
||||
|
||||
it 'yields the new items with exclusive lease' do
|
||||
uuid = 'test'
|
||||
expect_to_obtain_exclusive_lease(lock_key, uuid, timeout: lock_timeout)
|
||||
expect_to_cancel_exclusive_lease(lock_key, uuid)
|
||||
|
||||
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
|
||||
.to yield_with_args(match_array(new_items))
|
||||
end
|
||||
|
||||
it 'returns the result and no items in the queue' do
|
||||
expect(subject[:status]).to eq(:finished)
|
||||
expect(subject[:new_items]).to be_empty
|
||||
|
||||
Gitlab::Redis::Queues.with do |redis|
|
||||
expect(redis.llen(queue_key)).to be(0)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when new items are enqueued during the process' do
|
||||
it 'returns the result with newly added items' do
|
||||
result = queue.safe_execute(new_items) do
|
||||
queue.safe_execute(['C'])
|
||||
end
|
||||
|
||||
expect(result[:status]).to eq(:finished)
|
||||
expect(result[:new_items]).to eq(['C'])
|
||||
|
||||
Gitlab::Redis::Queues.with do |redis|
|
||||
expect(redis.scard(queue_key)).to be(1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when interger items are enqueued' do
|
||||
let(:new_items) { [1, 2, 3] }
|
||||
|
||||
it 'yields as String values' do
|
||||
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
|
||||
.to yield_with_args(%w[1 2 3])
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the queue key does not exist in Redis' do
|
||||
before do
|
||||
allow(queue).to receive(:enqueue) { }
|
||||
end
|
||||
|
||||
it 'yields empty array' do
|
||||
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
|
||||
.to yield_with_args([])
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the other process has already been working on the queue' do
|
||||
before do
|
||||
stub_exclusive_lease_taken(lock_key, timeout: lock_timeout)
|
||||
end
|
||||
|
||||
it 'does not yield the block' do
|
||||
expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) }
|
||||
.not_to yield_control
|
||||
end
|
||||
|
||||
it 'returns the result' do
|
||||
expect(subject[:status]).to eq(:enqueued)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when a duplicate item is enqueued' do
|
||||
it 'returns the poped items to the queue and raise an error' do
|
||||
expect { |b| queue.safe_execute(%w[1 1 2 2], &b) }
|
||||
.to yield_with_args(match_array(%w[1 2]))
|
||||
end
|
||||
end
|
||||
|
||||
context 'when there are two queues' do
|
||||
it 'enqueues items to each queue' do
|
||||
queue_1 = described_class.new(namespace, '1')
|
||||
queue_2 = described_class.new(namespace, '2')
|
||||
|
||||
result_2 = nil
|
||||
|
||||
result_1 = queue_1.safe_execute(['A']) do |_|
|
||||
result_2 = queue_2.safe_execute(['B']) do |_|
|
||||
queue_1.safe_execute(['C'])
|
||||
queue_2.safe_execute(['D'])
|
||||
end
|
||||
end
|
||||
|
||||
expect(result_1[:status]).to eq(:finished)
|
||||
expect(result_1[:new_items]).to eq(['C'])
|
||||
expect(result_2[:status]).to eq(:finished)
|
||||
expect(result_2[:new_items]).to eq(['D'])
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue