diff --git a/lib/gitlab/batch_pop_queueing.rb b/lib/gitlab/batch_pop_queueing.rb new file mode 100644 index 00000000000..61011abddf5 --- /dev/null +++ b/lib/gitlab/batch_pop_queueing.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +module Gitlab + ## + # This class is a queuing system for processing expensive tasks in an atomic manner + # with batch poping to let you optimize the total processing time. + # + # In usual queuing system, the first item started being processed immediately + # and the following items wait until the next items have been popped from the queue. + # On the other hand, this queueing system, the former part is same, however, + # it pops the enqueued items as batch. This is especially useful when you want to + # drop redandant items from the queue in order to process important items only, + # thus it's more efficient than the traditional queueing system. + # + # Caveats: + # - The order of the items are not guaranteed because of `sadd` (Redis Sets). + # + # Example: + # ``` + # class TheWorker + # def perform + # result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue| + # item = extract_the_most_important_item_from(items_in_queue) + # expensive_process(item) + # end + # + # if result[:status] == :finished && result[:new_items].present? + # item = extract_the_most_important_item_from(items_in_queue) + # TheWorker.perform_async(item.id) + # end + # end + # end + # ``` + # + class BatchPopQueueing + attr_reader :namespace, :queue_id + + EXTRA_QUEUE_EXPIRE_WINDOW = 1.hour + MAX_COUNTS_OF_POP_ALL = 1000 + + # Initialize queue + # + # @param [String] namespace The namespace of the exclusive lock and queue key. Typically, it's a feature name. + # @param [String] queue_id The identifier of the queue. + # @return [Boolean] + def initialize(namespace, queue_id) + raise ArgumentError if namespace.empty? || queue_id.empty? + + @namespace, @queue_id = namespace, queue_id + end + + ## + # Execute the given block in an exclusive lock. + # If there is the other thread has already working on the block, + # it enqueues the items without processing the block. + # + # @param [Array] new_items New items to be added to the queue. + # @param [Time] lock_timeout The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block. + # @return [Hash] + # - status => One of the `:enqueued` or `:finished`. + # - new_items => Newly enqueued items during the given block had been processed. + # + # NOTE: If an exception is raised in the block, the poppped items will not be recovered. + # We should NOT re-enqueue the items in this case because it could end up in an infinite loop. + def safe_execute(new_items, lock_timeout: 10.minutes, &block) + enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW) + + lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout) + + return { status: :enqueued } unless uuid = lease.try_obtain + + begin + all_args = pop_all + + yield all_args if block_given? + + { status: :finished, new_items: peek_all } + ensure + Gitlab::ExclusiveLease.cancel(lock_key, uuid) + end + end + + private + + def lock_key + @lock_key ||= "batch_pop_queueing:lock:#{namespace}:#{queue_id}" + end + + def queue_key + @queue_key ||= "batch_pop_queueing:queue:#{namespace}:#{queue_id}" + end + + def enqueue(items, expire_time) + Gitlab::Redis::Queues.with do |redis| + redis.sadd(queue_key, items) + redis.expire(queue_key, expire_time.to_i) + end + end + + def pop_all + Gitlab::Redis::Queues.with do |redis| + redis.spop(queue_key, MAX_COUNTS_OF_POP_ALL) + end + end + + def peek_all + Gitlab::Redis::Queues.with do |redis| + redis.smembers(queue_key) + end + end + end +end diff --git a/spec/lib/gitlab/batch_pop_queueing_spec.rb b/spec/lib/gitlab/batch_pop_queueing_spec.rb new file mode 100644 index 00000000000..28984d52024 --- /dev/null +++ b/spec/lib/gitlab/batch_pop_queueing_spec.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Gitlab::BatchPopQueueing do + include ExclusiveLeaseHelpers + using RSpec::Parameterized::TableSyntax + + describe '#initialize' do + where(:namespace, :queue_id, :expect_error, :error_type) do + 'feature' | '1' | false | nil + :feature | '1' | false | nil + nil | '1' | true | NoMethodError + 'feature' | nil | true | NoMethodError + '' | '1' | true | ArgumentError + 'feature' | '' | true | ArgumentError + 'feature' | 1 | true | NoMethodError + end + + with_them do + it do + if expect_error + expect { described_class.new(namespace, queue_id) }.to raise_error(error_type) + else + expect { described_class.new(namespace, queue_id) }.not_to raise_error + end + end + end + end + + describe '#safe_execute', :clean_gitlab_redis_queues do + subject { queue.safe_execute(new_items, lock_timeout: lock_timeout) } + + let(:queue) { described_class.new(namespace, queue_id) } + let(:namespace) { 'feature' } + let(:queue_id) { '1' } + let(:lock_timeout) { 10.minutes } + let(:new_items) { %w[A B] } + let(:lock_key) { queue.send(:lock_key) } + let(:queue_key) { queue.send(:queue_key) } + + it 'enqueues new items always' do + Gitlab::Redis::Queues.with do |redis| + expect(redis).to receive(:sadd).with(queue_key, new_items) + expect(redis).to receive(:expire).with(queue_key, (lock_timeout + described_class::EXTRA_QUEUE_EXPIRE_WINDOW).to_i) + end + + subject + end + + it 'yields the new items with exclusive lease' do + uuid = 'test' + expect_to_obtain_exclusive_lease(lock_key, uuid, timeout: lock_timeout) + expect_to_cancel_exclusive_lease(lock_key, uuid) + + expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) } + .to yield_with_args(match_array(new_items)) + end + + it 'returns the result and no items in the queue' do + expect(subject[:status]).to eq(:finished) + expect(subject[:new_items]).to be_empty + + Gitlab::Redis::Queues.with do |redis| + expect(redis.llen(queue_key)).to be(0) + end + end + + context 'when new items are enqueued during the process' do + it 'returns the result with newly added items' do + result = queue.safe_execute(new_items) do + queue.safe_execute(['C']) + end + + expect(result[:status]).to eq(:finished) + expect(result[:new_items]).to eq(['C']) + + Gitlab::Redis::Queues.with do |redis| + expect(redis.scard(queue_key)).to be(1) + end + end + end + + context 'when interger items are enqueued' do + let(:new_items) { [1, 2, 3] } + + it 'yields as String values' do + expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) } + .to yield_with_args(%w[1 2 3]) + end + end + + context 'when the queue key does not exist in Redis' do + before do + allow(queue).to receive(:enqueue) { } + end + + it 'yields empty array' do + expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) } + .to yield_with_args([]) + end + end + + context 'when the other process has already been working on the queue' do + before do + stub_exclusive_lease_taken(lock_key, timeout: lock_timeout) + end + + it 'does not yield the block' do + expect { |b| queue.safe_execute(new_items, lock_timeout: lock_timeout, &b) } + .not_to yield_control + end + + it 'returns the result' do + expect(subject[:status]).to eq(:enqueued) + end + end + + context 'when a duplicate item is enqueued' do + it 'returns the poped items to the queue and raise an error' do + expect { |b| queue.safe_execute(%w[1 1 2 2], &b) } + .to yield_with_args(match_array(%w[1 2])) + end + end + + context 'when there are two queues' do + it 'enqueues items to each queue' do + queue_1 = described_class.new(namespace, '1') + queue_2 = described_class.new(namespace, '2') + + result_2 = nil + + result_1 = queue_1.safe_execute(['A']) do |_| + result_2 = queue_2.safe_execute(['B']) do |_| + queue_1.safe_execute(['C']) + queue_2.safe_execute(['D']) + end + end + + expect(result_1[:status]).to eq(:finished) + expect(result_1[:new_items]).to eq(['C']) + expect(result_2[:status]).to eq(:finished) + expect(result_2[:new_items]).to eq(['D']) + end + end + end +end