gitlab-org--gitlab-foss/db/migrate/20161019190736_migrate_sidekiq_queues_from_default.rb
Yorick Peterse 97731760d7
Re-organize queues to use for Sidekiq
Dumping too many jobs in the same queue (e.g. the "default" queue) is a
dangerous setup. Jobs that take a long time to process can effectively
block any other work from being performed given there are enough of
these jobs.

Furthermore it becomes harder to monitor the jobs as a single queue
could contain jobs for different workers. In such a setup the only
reliable way of getting counts per job is to iterate over all jobs in a
queue, which is a rather time consuming process.

By using separate queues for various workers we have better control over
throughput, we can add weight to queues, and we can monitor queues
better. Some workers still use the same queue whenever their work is
related. For example, the various CI pipeline workers use the same
"pipeline" queue.

This commit includes a Rails migration that moves Sidekiq jobs from the
old queues to the new ones. This migration also takes care of doing the
inverse if ever needed. This does require downtime as otherwise new jobs
could be scheduled in the old queues after this migration completes.

This commit also includes an RSpec test that blacklists the use of the
"default" queue and ensures cron workers use the "cronjob" queue.

Fixes gitlab-org/gitlab-ce#23370
2016-10-21 18:17:07 +02:00

109 lines
4.1 KiB
Ruby

require 'json'
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class MigrateSidekiqQueuesFromDefault < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = true
DOWNTIME_REASON = <<-EOF
Moving Sidekiq jobs from queues requires Sidekiq to be stopped. Not stopping
Sidekiq will result in the loss of jobs that are scheduled after this
migration completes.
EOF
disable_ddl_transaction!
# Jobs for which the queue names have been changed (e.g. multiple workers
# using the same non-default queue).
#
# The keys are the old queue names, the values the jobs to move and their new
# queue names.
RENAMED_QUEUES = {
gitlab_shell: {
'GitGarbageCollectorWorker' => :git_garbage_collector,
'ProjectExportWorker' => :project_export,
'RepositoryForkWorker' => :repository_fork,
'RepositoryImportWorker' => :repository_import
},
project_web_hook: {
'ProjectServiceWorker' => :project_service
},
incoming_email: {
'EmailReceiverWorker' => :email_receiver
},
mailers: {
'EmailsOnPushWorker' => :emails_on_push
},
default: {
'AdminEmailWorker' => :cronjob,
'BuildCoverageWorker' => :build,
'BuildEmailWorker' => :build,
'BuildFinishedWorker' => :build,
'BuildHooksWorker' => :build,
'BuildSuccessWorker' => :build,
'ClearDatabaseCacheWorker' => :clear_database_cache,
'DeleteUserWorker' => :delete_user,
'ExpireBuildArtifactsWorker' => :cronjob,
'ExpireBuildInstanceArtifactsWorker' => :expire_build_instance_artifacts,
'GroupDestroyWorker' => :group_destroy,
'ImportExportProjectCleanupWorker' => :cronjob,
'IrkerWorker' => :irker,
'MergeWorker' => :merge,
'NewNoteWorker' => :new_note,
'PipelineHooksWorker' => :pipeline,
'PipelineMetricsWorker' => :pipeline,
'PipelineProcessWorker' => :pipeline,
'PipelineSuccessWorker' => :pipeline,
'PipelineUpdateWorker' => :pipeline,
'ProjectCacheWorker' => :project_cache,
'ProjectDestroyWorker' => :project_destroy,
'PruneOldEventsWorker' => :cronjob,
'RemoveExpiredGroupLinksWorker' => :cronjob,
'RemoveExpiredMembersWorker' => :cronjob,
'RepositoryArchiveCacheWorker' => :cronjob,
'RepositoryCheck::BatchWorker' => :cronjob,
'RepositoryCheck::ClearWorker' => :repository_check,
'RepositoryCheck::SingleRepositoryWorker' => :repository_check,
'RequestsProfilesWorker' => :cronjob,
'StuckCiBuildsWorker' => :cronjob,
'UpdateMergeRequestsWorker' => :update_merge_requests
}
}
def up
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |queue, jobs|
migrate_from_queue(redis, queue, jobs)
end
end
end
def down
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |dest_queue, jobs|
jobs.each do |worker, from_queue|
migrate_from_queue(redis, from_queue, worker => dest_queue)
end
end
end
end
def migrate_from_queue(redis, queue, job_mapping)
while job = redis.lpop("queue:#{queue}")
payload = JSON.load(job)
new_queue = job_mapping[payload['class']]
# If we have no target queue to migrate to we're probably dealing with
# some ancient job for which the worker no longer exists. In that case
# there's no sane option we can take, other than just dropping the job.
next unless new_queue
payload['queue'] = new_queue
redis.lpush("queue:#{new_queue}", JSON.dump(payload))
end
end
end