From 6e8d0b78ebbde2eada151649fc7d1040b902e28f Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Tue, 15 Aug 2017 13:56:04 +0100 Subject: [PATCH] Use event-based waiting in Gitlab::JobWaiter --- app/workers/authorized_projects_worker.rb | 13 ++++- lib/gitlab/job_waiter.rb | 57 ++++++++++++++++--- spec/lib/gitlab/job_waiter_spec.rb | 41 +++++++------ .../authorized_projects_worker_spec.rb | 16 ++++-- 4 files changed, 94 insertions(+), 33 deletions(-) diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb index 13207a8bc71..be4c77503bb 100644 --- a/app/workers/authorized_projects_worker.rb +++ b/app/workers/authorized_projects_worker.rb @@ -4,18 +4,25 @@ class AuthorizedProjectsWorker # Schedules multiple jobs and waits for them to be completed. def self.bulk_perform_and_wait(args_list) - job_ids = bulk_perform_async(args_list) + waiter = Gitlab::JobWaiter.new(args_list.size) - Gitlab::JobWaiter.new(job_ids).wait + # Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]] + # into [[1, "key"], [2, "key"], [3, "key"]] + waiting_args_list = args_list.map { |args| args << waiter.key } + bulk_perform_async(waiting_args_list) + + waiter.wait end def self.bulk_perform_async(args_list) Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) end - def perform(user_id) + def perform(user_id, notify_key = nil) user = User.find_by(id: user_id) user&.refresh_authorized_projects + ensure + Gitlab::JobWaiter.notify(notify_key, jid) if notify_key end end diff --git a/lib/gitlab/job_waiter.rb b/lib/gitlab/job_waiter.rb index 208f0e1bbea..4d6bbda15f3 100644 --- a/lib/gitlab/job_waiter.rb +++ b/lib/gitlab/job_waiter.rb @@ -1,12 +1,31 @@ module Gitlab # JobWaiter can be used to wait for a number of Sidekiq jobs to complete. + # + # Its use requires the cooperation of the sidekiq jobs themselves. Set up the + # waiter, then start the jobs, passing them its `key`. Their `perform` methods + # should look like: + # + # def perform(args, notify_key) + # # do work + # ensure + # ::Gitlab::JobWaiter.notify(notify_key, jid) + # end + # + # The JobWaiter blocks popping items from a Redis array. All the sidekiq jobs + # push to that array when done. Once the waiter has popped `count` items, it + # knows all the jobs are done. class JobWaiter - # The sleep interval between checking keys, in seconds. - INTERVAL = 0.1 + def self.notify(key, jid) + Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) } + end - # jobs - The job IDs to wait for. - def initialize(jobs) - @jobs = jobs + attr_reader :key, :jobs_remaining, :finished + + # jobs_remaining - the number of jobs left to wait for + def initialize(jobs_remaining) + @key = "gitlab:job_waiter:#{SecureRandom.uuid}" + @jobs_remaining = jobs_remaining + @finished = [] end # Waits for all the jobs to be completed. @@ -15,13 +34,33 @@ module Gitlab # ensures we don't indefinitely block a caller in case a job takes # long to process, or is never processed. def wait(timeout = 10) - start = Time.current + deadline = Time.now.utc + timeout - while (Time.current - start) <= timeout - break if SidekiqStatus.all_completed?(@jobs) + Gitlab::Redis::SharedState.with do |redis| + # Fallback key expiry: allow a long grace period to reduce the chance of + # a job pushing to an expired key and recreating it + redis.expire(key, [timeout * 2, 10.minutes.to_i].max) - sleep(INTERVAL) # to not overload Redis too much. + while jobs_remaining > 0 + # Redis will not take fractional seconds. Prefer waiting too long over + # not waiting long enough + seconds_left = (deadline - Time.now.utc).ceil + + # Redis interprets 0 as "wait forever", so skip the final `blpop` call + break if seconds_left <= 0 + + list, jid = redis.blpop(key, timeout: seconds_left) + break unless list && jid # timed out + + @finished << jid + @jobs_remaining -= 1 + end + + # All jobs have finished, so expire the key immediately + redis.expire(key, 0) if jobs_remaining == 0 end + + finished end end end diff --git a/spec/lib/gitlab/job_waiter_spec.rb b/spec/lib/gitlab/job_waiter_spec.rb index 6186cec2689..b0b4fdc09bc 100644 --- a/spec/lib/gitlab/job_waiter_spec.rb +++ b/spec/lib/gitlab/job_waiter_spec.rb @@ -1,30 +1,39 @@ require 'spec_helper' describe Gitlab::JobWaiter do + describe '.notify' do + it 'pushes the jid to the named queue' do + key = 'gitlab:job_waiter:foo' + jid = 1 + + redis = double('redis') + expect(Gitlab::Redis::SharedState).to receive(:with).and_yield(redis) + expect(redis).to receive(:lpush).with(key, jid) + + described_class.notify(key, jid) + end + end + describe '#wait' do - let(:waiter) { described_class.new(%w(a)) } + let(:waiter) { described_class.new(2) } + it 'returns when all jobs have been completed' do - expect(Gitlab::SidekiqStatus).to receive(:all_completed?).with(%w(a)) - .and_return(true) + described_class.notify(waiter.key, 'a') + described_class.notify(waiter.key, 'b') - expect(waiter).not_to receive(:sleep) + result = nil + expect { Timeout.timeout(1) { result = waiter.wait(2) } }.not_to raise_error - waiter.wait + expect(result).to contain_exactly('a', 'b') end - it 'sleeps between checking the job statuses' do - expect(Gitlab::SidekiqStatus).to receive(:all_completed?) - .with(%w(a)) - .and_return(false, true) + it 'times out if not all jobs complete' do + described_class.notify(waiter.key, 'a') - expect(waiter).to receive(:sleep).with(described_class::INTERVAL) + result = nil + expect { Timeout.timeout(2) { result = waiter.wait(1) } }.not_to raise_error - waiter.wait - end - - it 'returns when timing out' do - expect(waiter).not_to receive(:sleep) - waiter.wait(0) + expect(result).to contain_exactly('a') end end end diff --git a/spec/workers/authorized_projects_worker_spec.rb b/spec/workers/authorized_projects_worker_spec.rb index 03b9b99e263..f8385ae7c72 100644 --- a/spec/workers/authorized_projects_worker_spec.rb +++ b/spec/workers/authorized_projects_worker_spec.rb @@ -29,21 +29,27 @@ describe AuthorizedProjectsWorker do end describe '#perform' do - subject { described_class.new } + let(:user) { create(:user) } + + subject(:job) { described_class.new } it "refreshes user's authorized projects" do - user = create(:user) - expect_any_instance_of(User).to receive(:refresh_authorized_projects) - subject.perform(user.id) + job.perform(user.id) + end + + it 'notifies the JobWaiter when done if the key is provided' do + expect(Gitlab::JobWaiter).to receive(:notify).with('notify-key', job.jid) + + job.perform(user.id, 'notify-key') end context "when the user is not found" do it "does nothing" do expect_any_instance_of(User).not_to receive(:refresh_authorized_projects) - subject.perform(-1) + job.perform(-1) end end end