Merge branch '34533-inline-single-authorized-projects' into 'master'
Use an event-based system when waiting for AuthorizedProjectsWorker to complete See merge request !13564
This commit is contained in:
commit
78a0d27e98
|
@ -4,18 +4,25 @@ class AuthorizedProjectsWorker
|
|||
|
||||
# Schedules multiple jobs and waits for them to be completed.
|
||||
def self.bulk_perform_and_wait(args_list)
|
||||
job_ids = bulk_perform_async(args_list)
|
||||
waiter = Gitlab::JobWaiter.new(args_list.size)
|
||||
|
||||
Gitlab::JobWaiter.new(job_ids).wait
|
||||
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
|
||||
# into [[1, "key"], [2, "key"], [3, "key"]]
|
||||
waiting_args_list = args_list.map { |args| args << waiter.key }
|
||||
bulk_perform_async(waiting_args_list)
|
||||
|
||||
waiter.wait
|
||||
end
|
||||
|
||||
def self.bulk_perform_async(args_list)
|
||||
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
|
||||
end
|
||||
|
||||
def perform(user_id)
|
||||
def perform(user_id, notify_key = nil)
|
||||
user = User.find_by(id: user_id)
|
||||
|
||||
user&.refresh_authorized_projects
|
||||
ensure
|
||||
Gitlab::JobWaiter.notify(notify_key, jid) if notify_key
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,12 +1,31 @@
|
|||
module Gitlab
|
||||
# JobWaiter can be used to wait for a number of Sidekiq jobs to complete.
|
||||
#
|
||||
# Its use requires the cooperation of the sidekiq jobs themselves. Set up the
|
||||
# waiter, then start the jobs, passing them its `key`. Their `perform` methods
|
||||
# should look like:
|
||||
#
|
||||
# def perform(args, notify_key)
|
||||
# # do work
|
||||
# ensure
|
||||
# ::Gitlab::JobWaiter.notify(notify_key, jid)
|
||||
# end
|
||||
#
|
||||
# The JobWaiter blocks popping items from a Redis array. All the sidekiq jobs
|
||||
# push to that array when done. Once the waiter has popped `count` items, it
|
||||
# knows all the jobs are done.
|
||||
class JobWaiter
|
||||
# The sleep interval between checking keys, in seconds.
|
||||
INTERVAL = 0.1
|
||||
def self.notify(key, jid)
|
||||
Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) }
|
||||
end
|
||||
|
||||
# jobs - The job IDs to wait for.
|
||||
def initialize(jobs)
|
||||
@jobs = jobs
|
||||
attr_reader :key, :jobs_remaining, :finished
|
||||
|
||||
# jobs_remaining - the number of jobs left to wait for
|
||||
def initialize(jobs_remaining)
|
||||
@key = "gitlab:job_waiter:#{SecureRandom.uuid}"
|
||||
@jobs_remaining = jobs_remaining
|
||||
@finished = []
|
||||
end
|
||||
|
||||
# Waits for all the jobs to be completed.
|
||||
|
@ -15,13 +34,33 @@ module Gitlab
|
|||
# ensures we don't indefinitely block a caller in case a job takes
|
||||
# long to process, or is never processed.
|
||||
def wait(timeout = 10)
|
||||
start = Time.current
|
||||
deadline = Time.now.utc + timeout
|
||||
|
||||
while (Time.current - start) <= timeout
|
||||
break if SidekiqStatus.all_completed?(@jobs)
|
||||
Gitlab::Redis::SharedState.with do |redis|
|
||||
# Fallback key expiry: allow a long grace period to reduce the chance of
|
||||
# a job pushing to an expired key and recreating it
|
||||
redis.expire(key, [timeout * 2, 10.minutes.to_i].max)
|
||||
|
||||
sleep(INTERVAL) # to not overload Redis too much.
|
||||
end
|
||||
while jobs_remaining > 0
|
||||
# Redis will not take fractional seconds. Prefer waiting too long over
|
||||
# not waiting long enough
|
||||
seconds_left = (deadline - Time.now.utc).ceil
|
||||
|
||||
# Redis interprets 0 as "wait forever", so skip the final `blpop` call
|
||||
break if seconds_left <= 0
|
||||
|
||||
list, jid = redis.blpop(key, timeout: seconds_left)
|
||||
break unless list && jid # timed out
|
||||
|
||||
@finished << jid
|
||||
@jobs_remaining -= 1
|
||||
end
|
||||
|
||||
# All jobs have finished, so expire the key immediately
|
||||
redis.expire(key, 0) if jobs_remaining == 0
|
||||
end
|
||||
|
||||
finished
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,30 +1,39 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe Gitlab::JobWaiter do
|
||||
describe '.notify' do
|
||||
it 'pushes the jid to the named queue' do
|
||||
key = 'gitlab:job_waiter:foo'
|
||||
jid = 1
|
||||
|
||||
redis = double('redis')
|
||||
expect(Gitlab::Redis::SharedState).to receive(:with).and_yield(redis)
|
||||
expect(redis).to receive(:lpush).with(key, jid)
|
||||
|
||||
described_class.notify(key, jid)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#wait' do
|
||||
let(:waiter) { described_class.new(%w(a)) }
|
||||
let(:waiter) { described_class.new(2) }
|
||||
|
||||
it 'returns when all jobs have been completed' do
|
||||
expect(Gitlab::SidekiqStatus).to receive(:all_completed?).with(%w(a))
|
||||
.and_return(true)
|
||||
described_class.notify(waiter.key, 'a')
|
||||
described_class.notify(waiter.key, 'b')
|
||||
|
||||
expect(waiter).not_to receive(:sleep)
|
||||
result = nil
|
||||
expect { Timeout.timeout(1) { result = waiter.wait(2) } }.not_to raise_error
|
||||
|
||||
waiter.wait
|
||||
expect(result).to contain_exactly('a', 'b')
|
||||
end
|
||||
|
||||
it 'sleeps between checking the job statuses' do
|
||||
expect(Gitlab::SidekiqStatus).to receive(:all_completed?)
|
||||
.with(%w(a))
|
||||
.and_return(false, true)
|
||||
it 'times out if not all jobs complete' do
|
||||
described_class.notify(waiter.key, 'a')
|
||||
|
||||
expect(waiter).to receive(:sleep).with(described_class::INTERVAL)
|
||||
result = nil
|
||||
expect { Timeout.timeout(2) { result = waiter.wait(1) } }.not_to raise_error
|
||||
|
||||
waiter.wait
|
||||
end
|
||||
|
||||
it 'returns when timing out' do
|
||||
expect(waiter).not_to receive(:sleep)
|
||||
waiter.wait(0)
|
||||
expect(result).to contain_exactly('a')
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -29,21 +29,27 @@ describe AuthorizedProjectsWorker do
|
|||
end
|
||||
|
||||
describe '#perform' do
|
||||
subject { described_class.new }
|
||||
let(:user) { create(:user) }
|
||||
|
||||
subject(:job) { described_class.new }
|
||||
|
||||
it "refreshes user's authorized projects" do
|
||||
user = create(:user)
|
||||
|
||||
expect_any_instance_of(User).to receive(:refresh_authorized_projects)
|
||||
|
||||
subject.perform(user.id)
|
||||
job.perform(user.id)
|
||||
end
|
||||
|
||||
it 'notifies the JobWaiter when done if the key is provided' do
|
||||
expect(Gitlab::JobWaiter).to receive(:notify).with('notify-key', job.jid)
|
||||
|
||||
job.perform(user.id, 'notify-key')
|
||||
end
|
||||
|
||||
context "when the user is not found" do
|
||||
it "does nothing" do
|
||||
expect_any_instance_of(User).not_to receive(:refresh_authorized_projects)
|
||||
|
||||
subject.perform(-1)
|
||||
job.perform(-1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue