Add Sidekiq migration helpers for migrating queues
This commit is contained in:
parent
73187801df
commit
193b199672
2 changed files with 62 additions and 3 deletions
|
@ -611,6 +611,20 @@ module Gitlab
|
|||
remove_foreign_key(*args)
|
||||
rescue ArgumentError
|
||||
end
|
||||
|
||||
def sidekiq_queue_migrate(queue_from, to: queue_to)
|
||||
while sidekiq_queue_length(queue_from) > 0
|
||||
Sidekiq.redis do |conn|
|
||||
conn.rpoplpush "queue:#{queue_from}", "queue:#{to}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def sidekiq_queue_length(queue_name)
|
||||
Sidekiq.redis do |conn|
|
||||
conn.llen("queue:#{queue_name}")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,9 +2,7 @@ require 'spec_helper'
|
|||
|
||||
describe Gitlab::Database::MigrationHelpers do
|
||||
let(:model) do
|
||||
ActiveRecord::Migration.new.extend(
|
||||
described_class
|
||||
)
|
||||
ActiveRecord::Migration.new.extend(described_class)
|
||||
end
|
||||
|
||||
before do
|
||||
|
@ -845,4 +843,51 @@ describe Gitlab::Database::MigrationHelpers do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'sidekiq migration helpers', :sidekiq, :redis do
|
||||
let(:worker) do
|
||||
Class.new do
|
||||
include Sidekiq::Worker
|
||||
sidekiq_options queue: 'test'
|
||||
end
|
||||
end
|
||||
|
||||
describe '#sidekiq_queue_length' do
|
||||
context 'when queue is empty' do
|
||||
it 'returns zero' do
|
||||
Sidekiq::Testing.disable! do
|
||||
expect(model.sidekiq_queue_length('test')).to eq 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when queue contains jobs' do
|
||||
it 'returns correct size of the queue' do
|
||||
Sidekiq::Testing.disable! do
|
||||
worker.perform_async('Something', [1])
|
||||
worker.perform_async('Something', [2])
|
||||
|
||||
expect(model.sidekiq_queue_length('test')).to eq 2
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#migrate_sidekiq_queue' do
|
||||
it 'migrates jobs from one sidekiq queue to another' do
|
||||
Sidekiq::Testing.disable! do
|
||||
worker.perform_async('Something', [1])
|
||||
worker.perform_async('Something', [2])
|
||||
|
||||
expect(model.sidekiq_queue_length('test')).to eq 2
|
||||
expect(model.sidekiq_queue_length('new_test')).to eq 0
|
||||
|
||||
model.sidekiq_queue_migrate('test', to: 'new_test')
|
||||
|
||||
expect(model.sidekiq_queue_length('test')).to eq 0
|
||||
expect(model.sidekiq_queue_length('new_test')).to eq 2
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue