Use ActiveRecord 5 batching to schedule bg migration
This commit is contained in:
parent
af2f2dc5ed
commit
69736f3927
|
@ -14,7 +14,7 @@ class BackgroundMigrationWorker
|
|||
'args' => jobs)
|
||||
end
|
||||
|
||||
# Schedules a number of jobs in bulk, with a delay.
|
||||
# Schedules multiple jobs in bulk, with a delay.
|
||||
#
|
||||
def self.perform_bulk_in(delay, jobs)
|
||||
now = Time.now.to_f
|
||||
|
|
|
@ -15,7 +15,7 @@ module ActiveRecord
|
|||
relation = relation.where(arel_table[primary_key].lteq(finish)) if finish
|
||||
batch_relation = relation
|
||||
|
||||
loop do
|
||||
1.step do |index|
|
||||
if load
|
||||
records = batch_relation.records
|
||||
ids = records.map(&:id)
|
||||
|
@ -31,7 +31,7 @@ module ActiveRecord
|
|||
primary_key_offset = ids.last
|
||||
raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
|
||||
|
||||
yield yielded_relation
|
||||
yield yielded_relation, index
|
||||
|
||||
break if ids.length < of
|
||||
batch_relation = relation.where(arel_table[primary_key].gt(primary_key_offset))
|
||||
|
|
|
@ -13,10 +13,9 @@ class MigrateStageIdReferenceInBackground < ActiveRecord::Migration
|
|||
|
||||
def up
|
||||
Build.where(stage_id: nil)
|
||||
.find_in_batches(batch_size: BATCH_SIZE)
|
||||
.with_index do |builds, batch|
|
||||
builds.each do |build|
|
||||
schedule = (batch - 1) * 5.minutes
|
||||
.in_batches(of: BATCH_SIZE) do |relation, index|
|
||||
schedule = index * 5.minutes
|
||||
relation.each do |build|
|
||||
BackgroundMigrationWorker.perform_at(schedule, MIGRATION, [build.id])
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,44 +1,39 @@
|
|||
require 'spec_helper'
|
||||
require Rails.root.join('db', 'post_migrate', '20170628080858_migrate_stage_id_reference_in_background')
|
||||
|
||||
RSpec::Matchers.define :have_migrated do |*expected|
|
||||
match do |migration|
|
||||
BackgroundMigrationWorker.jobs.any? do |job|
|
||||
job['enqueued_at'].present? && job['args'] == [migration, expected]
|
||||
describe MigrateStageIdReferenceInBackground, :migration, :sidekiq do
|
||||
matcher :have_migrated do |*expected|
|
||||
match do |migration|
|
||||
BackgroundMigrationWorker.jobs.any? do |job|
|
||||
job['enqueued_at'].present? && job['args'] == [migration, expected]
|
||||
end
|
||||
end
|
||||
|
||||
failure_message do |migration|
|
||||
"Migration `#{migration}` with args `#{expected.inspect}` not executed!"
|
||||
end
|
||||
end
|
||||
|
||||
failure_message do |migration|
|
||||
<<-EOS
|
||||
Background migration `#{migration}` with args `#{expected.inspect}`
|
||||
not migrated!
|
||||
EOS
|
||||
end
|
||||
end
|
||||
matcher :have_scheduled_migration do |delay, *expected|
|
||||
match do |migration|
|
||||
BackgroundMigrationWorker.jobs.any? do |job|
|
||||
job['args'] == [migration, expected] &&
|
||||
job['at'].to_f == (delay.to_f + Time.now.to_f)
|
||||
end
|
||||
end
|
||||
|
||||
RSpec::Matchers.define :have_scheduled_migration do |time, *expected|
|
||||
match do |migration|
|
||||
BackgroundMigrationWorker.jobs.any? do |job|
|
||||
job['args'] == [migration, expected] && job['at'] >= time
|
||||
failure_message do |migration|
|
||||
"Migration `#{migration}` with args `#{expected.inspect}` not scheduled!"
|
||||
end
|
||||
end
|
||||
|
||||
failure_message do |migration|
|
||||
<<-EOS
|
||||
Background migration `#{migration}` with args `#{expected.inspect}`
|
||||
not scheduled!
|
||||
EOS
|
||||
end
|
||||
end
|
||||
|
||||
describe MigrateStageIdReferenceInBackground, :migration do
|
||||
let(:jobs) { table(:ci_builds) }
|
||||
let(:stages) { table(:ci_stages) }
|
||||
let(:pipelines) { table(:ci_pipelines) }
|
||||
let(:projects) { table(:projects) }
|
||||
|
||||
before do
|
||||
stub_const('MigrateStageIdReferenceInBackground::BATCH_SIZE', 1)
|
||||
stub_const('MigrateStageIdReferenceInBackground::BATCH_SIZE', 2)
|
||||
|
||||
projects.create!(id: 123, name: 'gitlab1', path: 'gitlab1')
|
||||
pipelines.create!(id: 1, project_id: 123, ref: 'master', sha: 'adf43c3a')
|
||||
|
@ -55,12 +50,14 @@ describe MigrateStageIdReferenceInBackground, :migration do
|
|||
|
||||
it 'correctly schedules background migrations' do
|
||||
Sidekiq::Testing.fake! do
|
||||
migrate!
|
||||
Timecop.freeze do
|
||||
migrate!
|
||||
|
||||
expect(described_class::MIGRATION).to have_migrated(1)
|
||||
expect(described_class::MIGRATION).to have_migrated(2)
|
||||
expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 3)
|
||||
expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 4)
|
||||
expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 1)
|
||||
expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 2)
|
||||
expect(described_class::MIGRATION).to have_scheduled_migration(10.minutes, 3)
|
||||
expect(described_class::MIGRATION).to have_scheduled_migration(10.minutes, 4)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue