Add RepositoryCheck::DispatchWorker to start worker per shard
The RepositoryCheck::DispatchWorker will start a RepositoryCheck::BatchWorker for each healthy shard. Closes gitlab-org/gitlab-ce#48042
This commit is contained in:
parent
48901bdecf
commit
95ac8b0e1d
7 changed files with 96 additions and 11 deletions
|
@ -11,7 +11,7 @@
|
|||
- cronjob:remove_old_web_hook_logs
|
||||
- cronjob:remove_unreferenced_lfs_objects
|
||||
- cronjob:repository_archive_cache
|
||||
- cronjob:repository_check_batch
|
||||
- cronjob:repository_check_dispatch
|
||||
- cronjob:requests_profiles
|
||||
- cronjob:schedule_update_user_activity
|
||||
- cronjob:stuck_ci_jobs
|
||||
|
@ -71,6 +71,7 @@
|
|||
- pipeline_processing:update_head_pipeline_for_merge_request
|
||||
|
||||
- repository_check:repository_check_clear
|
||||
- repository_check:repository_check_batch
|
||||
- repository_check:repository_check_single_repository
|
||||
|
||||
- default
|
||||
|
|
|
@ -3,13 +3,18 @@
|
|||
module RepositoryCheck
|
||||
class BatchWorker
|
||||
include ApplicationWorker
|
||||
include CronjobQueue
|
||||
include RepositoryCheckQueue
|
||||
|
||||
RUN_TIME = 3600
|
||||
BATCH_SIZE = 10_000
|
||||
|
||||
def perform
|
||||
attr_reader :shard_name
|
||||
|
||||
def perform(shard_name)
|
||||
@shard_name = shard_name
|
||||
|
||||
return unless Gitlab::CurrentSettings.repository_checks_enabled
|
||||
return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
|
||||
|
||||
start = Time.now
|
||||
|
||||
|
@ -39,18 +44,22 @@ module RepositoryCheck
|
|||
end
|
||||
|
||||
def never_checked_project_ids(batch_size)
|
||||
Project.where(last_repository_check_at: nil)
|
||||
projects_on_shard.where(last_repository_check_at: nil)
|
||||
.where('created_at < ?', 24.hours.ago)
|
||||
.limit(batch_size).pluck(:id)
|
||||
end
|
||||
|
||||
def old_checked_project_ids(batch_size)
|
||||
Project.where.not(last_repository_check_at: nil)
|
||||
projects_on_shard.where.not(last_repository_check_at: nil)
|
||||
.where('last_repository_check_at < ?', 1.month.ago)
|
||||
.reorder(last_repository_check_at: :asc)
|
||||
.limit(batch_size).pluck(:id)
|
||||
end
|
||||
|
||||
def projects_on_shard
|
||||
Project.where(repository_storage: shard_name)
|
||||
end
|
||||
|
||||
def try_obtain_lease(id)
|
||||
# Use a 24-hour timeout because on servers/projects where 'git fsck' is
|
||||
# super slow we definitely do not want to run it twice in parallel.
|
||||
|
|
15
app/workers/repository_check/dispatch_worker.rb
Normal file
15
app/workers/repository_check/dispatch_worker.rb
Normal file
|
@ -0,0 +1,15 @@
|
|||
module RepositoryCheck
|
||||
class DispatchWorker
|
||||
include ApplicationWorker
|
||||
include CronjobQueue
|
||||
include ::EachShardWorker
|
||||
|
||||
def perform
|
||||
return unless Gitlab::CurrentSettings.repository_checks_enabled
|
||||
|
||||
each_shard do |shard_name|
|
||||
RepositoryCheck::BatchWorker.perform_async(shard_name)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
5
changelogs/unreleased/tc-repo-check-per-shard.yml
Normal file
5
changelogs/unreleased/tc-repo-check-per-shard.yml
Normal file
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
title: Run repository checks in parallel for each shard
|
||||
merge_request: 20179
|
||||
author:
|
||||
type: added
|
|
@ -279,7 +279,7 @@ Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '50 * * * *'
|
|||
Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker'
|
||||
Settings.cron_jobs['repository_check_worker'] ||= Settingslogic.new({})
|
||||
Settings.cron_jobs['repository_check_worker']['cron'] ||= '20 * * * *'
|
||||
Settings.cron_jobs['repository_check_worker']['job_class'] = 'RepositoryCheck::BatchWorker'
|
||||
Settings.cron_jobs['repository_check_worker']['job_class'] = 'RepositoryCheck::DispatchWorker'
|
||||
Settings.cron_jobs['admin_email_worker'] ||= Settingslogic.new({})
|
||||
Settings.cron_jobs['admin_email_worker']['cron'] ||= '0 0 * * 0'
|
||||
Settings.cron_jobs['admin_email_worker']['job_class'] = 'AdminEmailWorker'
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe RepositoryCheck::BatchWorker do
|
||||
let(:shard_name) { 'default' }
|
||||
subject { described_class.new }
|
||||
|
||||
before do
|
||||
Gitlab::ShardHealthCache.update([shard_name])
|
||||
end
|
||||
|
||||
it 'prefers projects that have never been checked' do
|
||||
projects = create_list(:project, 3, created_at: 1.week.ago)
|
||||
projects[0].update_column(:last_repository_check_at, 4.months.ago)
|
||||
projects[2].update_column(:last_repository_check_at, 3.months.ago)
|
||||
|
||||
expect(subject.perform).to eq(projects.values_at(1, 0, 2).map(&:id))
|
||||
expect(subject.perform(shard_name)).to eq(projects.values_at(1, 0, 2).map(&:id))
|
||||
end
|
||||
|
||||
it 'sorts projects by last_repository_check_at' do
|
||||
|
@ -17,7 +22,7 @@ describe RepositoryCheck::BatchWorker do
|
|||
projects[1].update_column(:last_repository_check_at, 4.months.ago)
|
||||
projects[2].update_column(:last_repository_check_at, 3.months.ago)
|
||||
|
||||
expect(subject.perform).to eq(projects.values_at(1, 2, 0).map(&:id))
|
||||
expect(subject.perform(shard_name)).to eq(projects.values_at(1, 2, 0).map(&:id))
|
||||
end
|
||||
|
||||
it 'excludes projects that were checked recently' do
|
||||
|
@ -26,7 +31,14 @@ describe RepositoryCheck::BatchWorker do
|
|||
projects[1].update_column(:last_repository_check_at, 2.months.ago)
|
||||
projects[2].update_column(:last_repository_check_at, 3.days.ago)
|
||||
|
||||
expect(subject.perform).to eq([projects[1].id])
|
||||
expect(subject.perform(shard_name)).to eq([projects[1].id])
|
||||
end
|
||||
|
||||
it 'excludes projects on another shard' do
|
||||
projects = create_list(:project, 2, created_at: 1.week.ago)
|
||||
projects[0].update_column(:repository_storage, 'other')
|
||||
|
||||
expect(subject.perform(shard_name)).to eq([projects[1].id])
|
||||
end
|
||||
|
||||
it 'does nothing when repository checks are disabled' do
|
||||
|
@ -34,13 +46,20 @@ describe RepositoryCheck::BatchWorker do
|
|||
|
||||
stub_application_setting(repository_checks_enabled: false)
|
||||
|
||||
expect(subject.perform).to eq(nil)
|
||||
expect(subject.perform(shard_name)).to eq(nil)
|
||||
end
|
||||
|
||||
it 'does nothing when shard is unhealthy' do
|
||||
shard_name = 'broken'
|
||||
create(:project, created_at: 1.week.ago, repository_storage: shard_name)
|
||||
|
||||
expect(subject.perform(shard_name)).to eq(nil)
|
||||
end
|
||||
|
||||
it 'skips projects created less than 24 hours ago' do
|
||||
project = create(:project)
|
||||
project.update_column(:created_at, 23.hours.ago)
|
||||
|
||||
expect(subject.perform).to eq([])
|
||||
expect(subject.perform(shard_name)).to eq([])
|
||||
end
|
||||
end
|
||||
|
|
36
spec/workers/repository_check/dispatch_worker_spec.rb
Normal file
36
spec/workers/repository_check/dispatch_worker_spec.rb
Normal file
|
@ -0,0 +1,36 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe RepositoryCheck::DispatchWorker do
|
||||
subject { described_class.new }
|
||||
|
||||
it 'does nothing when repository checks are disabled' do
|
||||
stub_application_setting(repository_checks_enabled: false)
|
||||
|
||||
expect(RepositoryCheck::BatchWorker).not_to receive(:perform_async)
|
||||
|
||||
subject.perform
|
||||
end
|
||||
|
||||
it 'dispatches work to RepositoryCheck::BatchWorker' do
|
||||
expect(RepositoryCheck::BatchWorker).to receive(:perform_async).at_least(:once)
|
||||
|
||||
subject.perform
|
||||
end
|
||||
|
||||
context 'with unhealthy shard' do
|
||||
let(:default_shard_name) { 'default' }
|
||||
let(:unhealthy_shard_name) { 'unhealthy' }
|
||||
let(:default_shard) { Gitlab::HealthChecks::Result.new(true, nil, shard: default_shard_name) }
|
||||
let(:unhealthy_shard) { Gitlab::HealthChecks::Result.new(false, '14:Connect Failed', shard: unhealthy_shard_name) }
|
||||
|
||||
before do
|
||||
allow(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness).and_return([default_shard, unhealthy_shard])
|
||||
end
|
||||
|
||||
it 'only triggers RepositoryCheck::BatchWorker for healthy shards' do
|
||||
expect(RepositoryCheck::BatchWorker).to receive(:perform_async).with('default')
|
||||
|
||||
subject.perform
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue