Merge branch '36792-inline-user-refresh-when-creating-project' into 'master'
Don't wait for project authorization sidekiq jobs when creating projects Closes #36792 See merge request !13775
This commit is contained in:
commit
0b73771205
8 changed files with 107 additions and 15 deletions
|
@ -206,9 +206,9 @@ class Group < Namespace
|
|||
SystemHooksService.new
|
||||
end
|
||||
|
||||
def refresh_members_authorized_projects
|
||||
def refresh_members_authorized_projects(blocking: true)
|
||||
UserProjectAccessChangedService.new(user_ids_for_project_authorizations)
|
||||
.execute
|
||||
.execute(blocking: blocking)
|
||||
end
|
||||
|
||||
def user_ids_for_project_authorizations
|
||||
|
|
|
@ -99,12 +99,19 @@ module Projects
|
|||
event_service.create_project(@project, current_user)
|
||||
system_hook_service.execute_hooks_for(@project, :create)
|
||||
|
||||
unless @project.group || @project.gitlab_project_import?
|
||||
owners = [current_user, @project.namespace.owner].compact.uniq
|
||||
@project.add_master(owners, current_user: current_user)
|
||||
end
|
||||
setup_authorizations
|
||||
end
|
||||
|
||||
@project.group&.refresh_members_authorized_projects
|
||||
# Refresh the current user's authorizations inline (so they can access the
|
||||
# project immediately after this request completes), and any other affected
|
||||
# users in the background
|
||||
def setup_authorizations
|
||||
if @project.group
|
||||
@project.group.refresh_members_authorized_projects(blocking: false)
|
||||
current_user.refresh_authorized_projects
|
||||
else
|
||||
@project.add_master(@project.namespace.owner, current_user: current_user)
|
||||
end
|
||||
end
|
||||
|
||||
def skip_wiki?
|
||||
|
|
|
@ -3,7 +3,13 @@ class UserProjectAccessChangedService
|
|||
@user_ids = Array.wrap(user_ids)
|
||||
end
|
||||
|
||||
def execute
|
||||
AuthorizedProjectsWorker.bulk_perform_and_wait(@user_ids.map { |id| [id] })
|
||||
def execute(blocking: true)
|
||||
bulk_args = @user_ids.map { |id| [id] }
|
||||
|
||||
if blocking
|
||||
AuthorizedProjectsWorker.bulk_perform_and_wait(bulk_args)
|
||||
else
|
||||
AuthorizedProjectsWorker.bulk_perform_async(bulk_args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,20 +4,40 @@ class AuthorizedProjectsWorker
|
|||
|
||||
# Schedules multiple jobs and waits for them to be completed.
|
||||
def self.bulk_perform_and_wait(args_list)
|
||||
# Short-circuit: it's more efficient to do small numbers of jobs inline
|
||||
return bulk_perform_inline(args_list) if args_list.size <= 3
|
||||
|
||||
waiter = Gitlab::JobWaiter.new(args_list.size)
|
||||
|
||||
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
|
||||
# into [[1, "key"], [2, "key"], [3, "key"]]
|
||||
waiting_args_list = args_list.map { |args| args << waiter.key }
|
||||
waiting_args_list = args_list.map { |args| [*args, waiter.key] }
|
||||
bulk_perform_async(waiting_args_list)
|
||||
|
||||
waiter.wait
|
||||
end
|
||||
|
||||
# Schedules multiple jobs to run in sidekiq without waiting for completion
|
||||
def self.bulk_perform_async(args_list)
|
||||
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
|
||||
end
|
||||
|
||||
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
|
||||
# they can benefit from retries
|
||||
def self.bulk_perform_inline(args_list)
|
||||
failed = []
|
||||
|
||||
args_list.each do |args|
|
||||
begin
|
||||
new.perform(*args)
|
||||
rescue
|
||||
failed << args
|
||||
end
|
||||
end
|
||||
|
||||
bulk_perform_async(failed) if failed.present?
|
||||
end
|
||||
|
||||
def perform(user_id, notify_key = nil)
|
||||
user = User.find_by(id: user_id)
|
||||
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
title: Never wait for sidekiq jobs when creating projects
|
||||
merge_request: 13775
|
||||
author:
|
||||
type: other
|
|
@ -38,7 +38,7 @@ describe Projects::CreateService, '#execute' do
|
|||
|
||||
expect(project).to be_persisted
|
||||
expect(project.owner).to eq(user)
|
||||
expect(project.team.masters).to include(user, admin)
|
||||
expect(project.team.masters).to contain_exactly(user)
|
||||
expect(project.namespace).to eq(user.namespace)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,5 +8,12 @@ describe UserProjectAccessChangedService do
|
|||
|
||||
described_class.new([1, 2]).execute
|
||||
end
|
||||
|
||||
it 'permits non-blocking operation' do
|
||||
expect(AuthorizedProjectsWorker).to receive(:bulk_perform_async)
|
||||
.with([[1], [2]])
|
||||
|
||||
described_class.new([1, 2]).execute(blocking: false)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,28 +3,75 @@ require 'spec_helper'
|
|||
describe AuthorizedProjectsWorker do
|
||||
let(:project) { create(:project) }
|
||||
|
||||
def build_args_list(*ids, multiply: 1)
|
||||
args_list = ids.map { |id| [id] }
|
||||
args_list * multiply
|
||||
end
|
||||
|
||||
describe '.bulk_perform_and_wait' do
|
||||
it 'schedules the ids and waits for the jobs to complete' do
|
||||
args_list = build_args_list(project.owner.id)
|
||||
|
||||
project.owner.project_authorizations.delete_all
|
||||
described_class.bulk_perform_and_wait(args_list)
|
||||
|
||||
expect(project.owner.project_authorizations.count).to eq(1)
|
||||
end
|
||||
|
||||
it 'inlines workloads <= 3 jobs' do
|
||||
args_list = build_args_list(project.owner.id, multiply: 3)
|
||||
expect(described_class).to receive(:bulk_perform_inline).with(args_list)
|
||||
|
||||
described_class.bulk_perform_and_wait(args_list)
|
||||
end
|
||||
|
||||
it 'runs > 3 jobs using sidekiq' do
|
||||
project.owner.project_authorizations.delete_all
|
||||
|
||||
described_class.bulk_perform_and_wait([[project.owner.id]])
|
||||
expect(described_class).to receive(:bulk_perform_async).and_call_original
|
||||
|
||||
args_list = build_args_list(project.owner.id, multiply: 4)
|
||||
described_class.bulk_perform_and_wait(args_list)
|
||||
|
||||
expect(project.owner.project_authorizations.count).to eq(1)
|
||||
end
|
||||
end
|
||||
|
||||
describe '.bulk_perform_inline' do
|
||||
it 'refreshes the authorizations inline' do
|
||||
project.owner.project_authorizations.delete_all
|
||||
|
||||
expect_any_instance_of(described_class).to receive(:perform).and_call_original
|
||||
|
||||
described_class.bulk_perform_inline(build_args_list(project.owner.id))
|
||||
|
||||
expect(project.owner.project_authorizations.count).to eq(1)
|
||||
end
|
||||
|
||||
it 'enqueues jobs if an error is raised' do
|
||||
invalid_id = -1
|
||||
args_list = build_args_list(project.owner.id, invalid_id)
|
||||
|
||||
allow_any_instance_of(described_class).to receive(:perform).with(project.owner.id)
|
||||
allow_any_instance_of(described_class).to receive(:perform).with(invalid_id).and_raise(ArgumentError)
|
||||
expect(described_class).to receive(:bulk_perform_async).with(build_args_list(invalid_id))
|
||||
|
||||
described_class.bulk_perform_inline(args_list)
|
||||
end
|
||||
end
|
||||
|
||||
describe '.bulk_perform_async' do
|
||||
it "uses it's respective sidekiq queue" do
|
||||
args = [[project.owner.id]]
|
||||
args_list = build_args_list(project.owner.id)
|
||||
push_bulk_args = {
|
||||
'class' => described_class,
|
||||
'queue' => described_class.sidekiq_options['queue'],
|
||||
'args' => args
|
||||
'args' => args_list
|
||||
}
|
||||
|
||||
expect(Sidekiq::Client).to receive(:push_bulk).with(push_bulk_args).once
|
||||
|
||||
described_class.bulk_perform_async(args)
|
||||
described_class.bulk_perform_async(args_list)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in a new issue