Integrate two workers into one ArchiveTraceWorker with pipeline_background queue. This queue takes loqer precedence than pipeline_default.
This commit is contained in:
parent
7bbd5f6e31
commit
335bc0fec0
11 changed files with 115 additions and 19 deletions
|
@ -43,9 +43,9 @@
|
|||
- pipeline_cache:expire_pipeline_cache
|
||||
- pipeline_creation:create_pipeline
|
||||
- pipeline_creation:run_pipeline_schedule
|
||||
- pipeline_background:archive_trace
|
||||
- pipeline_default:build_coverage
|
||||
- pipeline_default:build_trace_sections
|
||||
- pipeline_default:create_trace_artifact
|
||||
- pipeline_default:pipeline_metrics
|
||||
- pipeline_default:pipeline_notification
|
||||
- pipeline_default:update_head_pipeline_for_merge_request
|
||||
|
@ -62,8 +62,6 @@
|
|||
- repository_check:repository_check_clear
|
||||
- repository_check:repository_check_single_repository
|
||||
|
||||
- object_storage:archive_legacy_trace
|
||||
|
||||
- default
|
||||
- mailers # ActionMailer::DeliveryJob.queue_name
|
||||
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
class ArchiveLegacyTraceWorker
|
||||
include ApplicationWorker
|
||||
include ObjectStorageQueue
|
||||
|
||||
def perform(job_id)
|
||||
Ci::Build.find_by(id: job_id).try do |job|
|
||||
job.trace.archive!
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,6 +1,6 @@
|
|||
class CreateTraceArtifactWorker
|
||||
class ArchiveTraceWorker
|
||||
include ApplicationWorker
|
||||
include PipelineQueue
|
||||
include PipelineBackgroundQueue
|
||||
|
||||
def perform(job_id)
|
||||
Ci::Build.find_by(id: job_id).try do |job|
|
|
@ -12,7 +12,7 @@ class BuildFinishedWorker
|
|||
|
||||
# We execute that async as this are two indepentent operations that can be executed after TraceSections and Coverage
|
||||
BuildHooksWorker.perform_async(build.id)
|
||||
CreateTraceArtifactWorker.perform_async(build.id)
|
||||
ArchiveTraceWorker.perform_async(build.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
10
app/workers/concerns/pipeline_background_queue.rb
Normal file
10
app/workers/concerns/pipeline_background_queue.rb
Normal file
|
@ -0,0 +1,10 @@
|
|||
##
|
||||
# Concern for setting Sidekiq settings for the low priority CI pipeline workers.
|
||||
#
|
||||
module PipelineBackgroundQueue
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
queue_namespace :pipeline_background
|
||||
end
|
||||
end
|
|
@ -69,4 +69,4 @@
|
|||
- [storage_migrator, 1]
|
||||
- [pages_domain_verification, 1]
|
||||
- [plugin, 1]
|
||||
- [object_storage, 1]
|
||||
- [pipeline_background, 1]
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
class MigrateCreateTraceArtifactSidekiqQueue < ActiveRecord::Migration
|
||||
include Gitlab::Database::MigrationHelpers
|
||||
|
||||
DOWNTIME = false
|
||||
|
||||
def up
|
||||
sidekiq_queue_migrate 'pipeline_default:create_trace_artifact', to: 'pipeline_background:archive_trace'
|
||||
end
|
||||
|
||||
def down
|
||||
sidekiq_queue_migrate 'pipeline_background:archive_trace', to: 'pipeline_default:create_trace_artifact'
|
||||
end
|
||||
end
|
|
@ -11,7 +11,7 @@
|
|||
#
|
||||
# It's strongly recommended that you check this file into your version control system.
|
||||
|
||||
ActiveRecord::Schema.define(version: 20180305144721) do
|
||||
ActiveRecord::Schema.define(version: 20180306074045) do
|
||||
|
||||
# These are extensions that must be enabled in order to support this database
|
||||
enable_extension "plpgsql"
|
||||
|
|
|
@ -15,7 +15,7 @@ namespace :gitlab do
|
|||
.find_in_batches(batch_size: 1000) do |jobs|
|
||||
job_ids = jobs.map { |job| [job.id] }
|
||||
|
||||
ArchiveLegacyTraceWorker.bulk_perform_async(job_ids)
|
||||
ArchiveTraceWorker.bulk_perform_async(job_ids)
|
||||
|
||||
logger.info("Scheduled #{job_ids.count} jobs. From #{job_ids.min} to #{job_ids.max}")
|
||||
end
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
require 'spec_helper'
|
||||
require Rails.root.join('db', 'post_migrate', '20180306074045_migrate_create_trace_artifact_sidekiq_queue.rb')
|
||||
|
||||
describe MigrateCreateTraceArtifactSidekiqQueue, :sidekiq, :redis do
|
||||
include Gitlab::Database::MigrationHelpers
|
||||
|
||||
context 'when there are jobs in the queues' do
|
||||
it 'correctly migrates queue when migrating up' do
|
||||
Sidekiq::Testing.disable! do
|
||||
stubbed_worker(queue: 'pipeline_default:create_trace_artifact').perform_async('Something', [1])
|
||||
stubbed_worker(queue: 'pipeline_background:archive_trace').perform_async('Something', [1])
|
||||
|
||||
described_class.new.up
|
||||
|
||||
expect(sidekiq_queue_length('pipeline_default:create_trace_artifact')).to eq 0
|
||||
expect(sidekiq_queue_length('pipeline_background:archive_trace')).to eq 2
|
||||
end
|
||||
end
|
||||
|
||||
it 'does not affect other queues under the same namespace' do
|
||||
Sidekiq::Testing.disable! do
|
||||
stubbed_worker(queue: 'pipeline_default:build_coverage').perform_async('Something', [1])
|
||||
stubbed_worker(queue: 'pipeline_default:build_trace_sections').perform_async('Something', [1])
|
||||
stubbed_worker(queue: 'pipeline_default:pipeline_metrics').perform_async('Something', [1])
|
||||
stubbed_worker(queue: 'pipeline_default:pipeline_notification').perform_async('Something', [1])
|
||||
stubbed_worker(queue: 'pipeline_default:update_head_pipeline_for_merge_request').perform_async('Something', [1])
|
||||
|
||||
described_class.new.up
|
||||
|
||||
expect(sidekiq_queue_length('pipeline_default:build_coverage')).to eq 1
|
||||
expect(sidekiq_queue_length('pipeline_default:build_trace_sections')).to eq 1
|
||||
expect(sidekiq_queue_length('pipeline_default:pipeline_metrics')).to eq 1
|
||||
expect(sidekiq_queue_length('pipeline_default:pipeline_notification')).to eq 1
|
||||
expect(sidekiq_queue_length('pipeline_default:update_head_pipeline_for_merge_request')).to eq 1
|
||||
end
|
||||
end
|
||||
|
||||
it 'correctly migrates queue when migrating down' do
|
||||
Sidekiq::Testing.disable! do
|
||||
stubbed_worker(queue: 'pipeline_background:archive_trace').perform_async('Something', [1])
|
||||
|
||||
described_class.new.down
|
||||
|
||||
expect(sidekiq_queue_length('pipeline_default:create_trace_artifact')).to eq 1
|
||||
expect(sidekiq_queue_length('pipeline_background:archive_trace')).to eq 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when there are no jobs in the queues' do
|
||||
it 'does not raise error when migrating up' do
|
||||
expect { described_class.new.up }.not_to raise_error
|
||||
end
|
||||
|
||||
it 'does not raise error when migrating down' do
|
||||
expect { described_class.new.down }.not_to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
def stubbed_worker(queue:)
|
||||
Class.new do
|
||||
include Sidekiq::Worker
|
||||
sidekiq_options queue: queue
|
||||
end
|
||||
end
|
||||
end
|
19
spec/workers/concerns/pipeline_background_queue_spec.rb
Normal file
19
spec/workers/concerns/pipeline_background_queue_spec.rb
Normal file
|
@ -0,0 +1,19 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe PipelineBackgroundQueue do
|
||||
let(:worker) do
|
||||
Class.new do
|
||||
def self.name
|
||||
'DummyWorker'
|
||||
end
|
||||
|
||||
include ApplicationWorker
|
||||
include PipelineBackgroundQueue
|
||||
end
|
||||
end
|
||||
|
||||
it 'sets a default object storage queue automatically' do
|
||||
expect(worker.sidekiq_options['queue'])
|
||||
.to eq 'object_storage:dummy'
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue