Move sidekiq-based project authorization refresh out of Projects::CreateService
If the project is in a group, the `group.refresh_members_authorized_projects` is made non-blocking, and we call `current_user.refresh_authorized_projects` directly. Projects in a personal namespace are more difficult. Rather than passing the `blocking:` parameter through the entire `add_master` chain, have the `AuthorizedProjectsWorker` automatically inline authorizations for three IDs or less. Since the maximum number of IDs in this path is 2, that has the same effect.
This commit is contained in:
parent
24244d03b5
commit
8b73df0cf5
|
@ -206,9 +206,9 @@ class Group < Namespace
|
||||||
SystemHooksService.new
|
SystemHooksService.new
|
||||||
end
|
end
|
||||||
|
|
||||||
def refresh_members_authorized_projects
|
def refresh_members_authorized_projects(blocking: true)
|
||||||
UserProjectAccessChangedService.new(user_ids_for_project_authorizations)
|
UserProjectAccessChangedService.new(user_ids_for_project_authorizations)
|
||||||
.execute
|
.execute(blocking: blocking)
|
||||||
end
|
end
|
||||||
|
|
||||||
def user_ids_for_project_authorizations
|
def user_ids_for_project_authorizations
|
||||||
|
|
|
@ -99,12 +99,22 @@ module Projects
|
||||||
event_service.create_project(@project, current_user)
|
event_service.create_project(@project, current_user)
|
||||||
system_hook_service.execute_hooks_for(@project, :create)
|
system_hook_service.execute_hooks_for(@project, :create)
|
||||||
|
|
||||||
unless @project.group || @project.gitlab_project_import?
|
setup_authorizations
|
||||||
|
end
|
||||||
|
|
||||||
|
# Refresh the current user's authorizations inline (so they can access the
|
||||||
|
# project immediately after this request completes), and any other affected
|
||||||
|
# users in the background
|
||||||
|
def setup_authorizations
|
||||||
|
group = @project.group
|
||||||
|
group&.refresh_members_authorized_projects(blocking: false)
|
||||||
|
|
||||||
|
if group || @project.gitlab_project_import?
|
||||||
|
current_user.refresh_authorized_projects
|
||||||
|
else
|
||||||
owners = [current_user, @project.namespace.owner].compact.uniq
|
owners = [current_user, @project.namespace.owner].compact.uniq
|
||||||
@project.add_master(owners, current_user: current_user)
|
@project.add_master(owners, current_user: current_user)
|
||||||
end
|
end
|
||||||
|
|
||||||
@project.group&.refresh_members_authorized_projects
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def skip_wiki?
|
def skip_wiki?
|
||||||
|
|
|
@ -3,7 +3,13 @@ class UserProjectAccessChangedService
|
||||||
@user_ids = Array.wrap(user_ids)
|
@user_ids = Array.wrap(user_ids)
|
||||||
end
|
end
|
||||||
|
|
||||||
def execute
|
def execute(blocking: true)
|
||||||
AuthorizedProjectsWorker.bulk_perform_and_wait(@user_ids.map { |id| [id] })
|
bulk_args = @user_ids.map { |id| [id] }
|
||||||
|
|
||||||
|
if blocking
|
||||||
|
AuthorizedProjectsWorker.bulk_perform_and_wait(bulk_args)
|
||||||
|
else
|
||||||
|
AuthorizedProjectsWorker.bulk_perform_async(bulk_args)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -4,20 +4,40 @@ class AuthorizedProjectsWorker
|
||||||
|
|
||||||
# Schedules multiple jobs and waits for them to be completed.
|
# Schedules multiple jobs and waits for them to be completed.
|
||||||
def self.bulk_perform_and_wait(args_list)
|
def self.bulk_perform_and_wait(args_list)
|
||||||
|
# Short-circuit: it's more efficient to do small numbers of jobs inline
|
||||||
|
return bulk_perform_inline(args_list) if args_list.size <= 3
|
||||||
|
|
||||||
waiter = Gitlab::JobWaiter.new(args_list.size)
|
waiter = Gitlab::JobWaiter.new(args_list.size)
|
||||||
|
|
||||||
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
|
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
|
||||||
# into [[1, "key"], [2, "key"], [3, "key"]]
|
# into [[1, "key"], [2, "key"], [3, "key"]]
|
||||||
waiting_args_list = args_list.map { |args| args << waiter.key }
|
waiting_args_list = args_list.map { |args| [*args, waiter.key] }
|
||||||
bulk_perform_async(waiting_args_list)
|
bulk_perform_async(waiting_args_list)
|
||||||
|
|
||||||
waiter.wait
|
waiter.wait
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Schedules multiple jobs to run in sidekiq without waiting for completion
|
||||||
def self.bulk_perform_async(args_list)
|
def self.bulk_perform_async(args_list)
|
||||||
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
|
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
|
||||||
|
# they can benefit from retries
|
||||||
|
def self.bulk_perform_inline(args_list)
|
||||||
|
failed = []
|
||||||
|
|
||||||
|
args_list.each do |args|
|
||||||
|
begin
|
||||||
|
new.perform(*args)
|
||||||
|
rescue
|
||||||
|
failed << args
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
bulk_perform_async(failed) if failed.present?
|
||||||
|
end
|
||||||
|
|
||||||
def perform(user_id, notify_key = nil)
|
def perform(user_id, notify_key = nil)
|
||||||
user = User.find_by(id: user_id)
|
user = User.find_by(id: user_id)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
title: Never wait for sidekiq jobs when creating projects
|
||||||
|
merge_request: 13775
|
||||||
|
author:
|
||||||
|
type: other
|
|
@ -8,5 +8,12 @@ describe UserProjectAccessChangedService do
|
||||||
|
|
||||||
described_class.new([1, 2]).execute
|
described_class.new([1, 2]).execute
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'permits non-blocking operation' do
|
||||||
|
expect(AuthorizedProjectsWorker).to receive(:bulk_perform_async)
|
||||||
|
.with([[1], [2]])
|
||||||
|
|
||||||
|
described_class.new([1, 2]).execute(blocking: false)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -3,28 +3,75 @@ require 'spec_helper'
|
||||||
describe AuthorizedProjectsWorker do
|
describe AuthorizedProjectsWorker do
|
||||||
let(:project) { create(:project) }
|
let(:project) { create(:project) }
|
||||||
|
|
||||||
|
def build_args_list(*ids, multiply: 1)
|
||||||
|
args_list = ids.map { |id| [id] }
|
||||||
|
args_list * multiply
|
||||||
|
end
|
||||||
|
|
||||||
describe '.bulk_perform_and_wait' do
|
describe '.bulk_perform_and_wait' do
|
||||||
it 'schedules the ids and waits for the jobs to complete' do
|
it 'schedules the ids and waits for the jobs to complete' do
|
||||||
|
args_list = build_args_list(project.owner.id)
|
||||||
|
|
||||||
|
project.owner.project_authorizations.delete_all
|
||||||
|
described_class.bulk_perform_and_wait(args_list)
|
||||||
|
|
||||||
|
expect(project.owner.project_authorizations.count).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'inlines workloads <= 3 jobs' do
|
||||||
|
args_list = build_args_list(project.owner.id, multiply: 3)
|
||||||
|
expect(described_class).to receive(:bulk_perform_inline).with(args_list)
|
||||||
|
|
||||||
|
described_class.bulk_perform_and_wait(args_list)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'runs > 3 jobs using sidekiq' do
|
||||||
project.owner.project_authorizations.delete_all
|
project.owner.project_authorizations.delete_all
|
||||||
|
|
||||||
described_class.bulk_perform_and_wait([[project.owner.id]])
|
expect(described_class).to receive(:bulk_perform_async).and_call_original
|
||||||
|
|
||||||
|
args_list = build_args_list(project.owner.id, multiply: 4)
|
||||||
|
described_class.bulk_perform_and_wait(args_list)
|
||||||
|
|
||||||
expect(project.owner.project_authorizations.count).to eq(1)
|
expect(project.owner.project_authorizations.count).to eq(1)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe '.bulk_perform_inline' do
|
||||||
|
it 'refreshes the authorizations inline' do
|
||||||
|
project.owner.project_authorizations.delete_all
|
||||||
|
|
||||||
|
expect_any_instance_of(described_class).to receive(:perform).and_call_original
|
||||||
|
|
||||||
|
described_class.bulk_perform_inline(build_args_list(project.owner.id))
|
||||||
|
|
||||||
|
expect(project.owner.project_authorizations.count).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'enqueues jobs if an error is raised' do
|
||||||
|
invalid_id = -1
|
||||||
|
args_list = build_args_list(project.owner.id, invalid_id)
|
||||||
|
|
||||||
|
allow_any_instance_of(described_class).to receive(:perform).with(project.owner.id)
|
||||||
|
allow_any_instance_of(described_class).to receive(:perform).with(invalid_id).and_raise(ArgumentError)
|
||||||
|
expect(described_class).to receive(:bulk_perform_async).with(build_args_list(invalid_id))
|
||||||
|
|
||||||
|
described_class.bulk_perform_inline(args_list)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe '.bulk_perform_async' do
|
describe '.bulk_perform_async' do
|
||||||
it "uses it's respective sidekiq queue" do
|
it "uses it's respective sidekiq queue" do
|
||||||
args = [[project.owner.id]]
|
args_list = build_args_list(project.owner.id)
|
||||||
push_bulk_args = {
|
push_bulk_args = {
|
||||||
'class' => described_class,
|
'class' => described_class,
|
||||||
'queue' => described_class.sidekiq_options['queue'],
|
'queue' => described_class.sidekiq_options['queue'],
|
||||||
'args' => args
|
'args' => args_list
|
||||||
}
|
}
|
||||||
|
|
||||||
expect(Sidekiq::Client).to receive(:push_bulk).with(push_bulk_args).once
|
expect(Sidekiq::Client).to receive(:push_bulk).with(push_bulk_args).once
|
||||||
|
|
||||||
described_class.bulk_perform_async(args)
|
described_class.bulk_perform_async(args_list)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue