gitlab-org--gitlab-foss/lib/gitlab/background_migration/migrate_merge_request_diff_...

297 lines
11 KiB
Ruby

# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# Migrates author and committer names and emails from
# merge_request_diff_commits to two columns that point to
# merge_request_diff_commit_users.
#
# rubocop: disable Metrics/ClassLength
class MigrateMergeRequestDiffCommitUsers
# The number of user rows in merge_request_diff_commit_users to get in a
# single query.
USER_ROWS_PER_QUERY = 1_000
# The number of rows in merge_request_diff_commits to get in a single
# query.
COMMIT_ROWS_PER_QUERY = 1_000
# The number of rows in merge_request_diff_commits to update in a single
# query.
#
# Tests in staging revealed that increasing the number of updates per
# query translates to a longer total runtime for a migration. For example,
# given the same range of rows to migrate, 1000 updates per query required
# a total of roughly 15 seconds. On the other hand, 5000 updates per query
# required a total of roughly 25 seconds. For this reason, we use a value
# of 1000 rows per update.
UPDATES_PER_QUERY = 1_000
# rubocop: disable Style/Documentation
class MergeRequestDiffCommit < ActiveRecord::Base
include FromUnion
extend ::SuppressCompositePrimaryKeyWarning
self.table_name = 'merge_request_diff_commits'
# Yields each row to migrate in the given range.
#
# This method uses keyset pagination to ensure we don't retrieve
# potentially tens of thousands (or even hundreds of thousands) of rows
# in a single query. Such queries could time out, or increase the amount
# of memory needed to process the data.
#
# We can't use `EachBatch` and similar approaches, as
# merge_request_diff_commits doesn't have a single monotonically
# increasing primary key.
def self.each_row_to_migrate(start_id, stop_id, &block)
order = Pagination::Keyset::Order.build(
%w[merge_request_diff_id relative_order].map do |col|
Pagination::Keyset::ColumnOrderDefinition.new(
attribute_name: col,
order_expression: self.arel_table[col.to_sym].asc,
nullable: :not_nullable,
distinct: false
)
end
)
scope = MergeRequestDiffCommit
.where(merge_request_diff_id: start_id...stop_id)
.order(order)
Pagination::Keyset::Iterator
.new(scope: scope, use_union_optimization: true)
.each_batch(of: COMMIT_ROWS_PER_QUERY) { |rows| rows.each(&block) }
end
end
# rubocop: enable Style/Documentation
# rubocop: disable Style/Documentation
class MergeRequestDiffCommitUser < ActiveRecord::Base
self.table_name = 'merge_request_diff_commit_users'
def self.union(queries)
from("(#{queries.join("\nUNION ALL\n")}) #{table_name}")
end
end
# rubocop: enable Style/Documentation
def perform(start_id, stop_id)
return if already_processed?(start_id, stop_id)
# This Hash maps user names + emails to their corresponding rows in
# merge_request_diff_commit_users.
user_mapping = {}
user_details, diff_rows_to_update = get_data_to_update(start_id, stop_id)
get_user_rows_in_batches(user_details, user_mapping)
create_missing_users(user_details, user_mapping)
update_commit_rows(diff_rows_to_update, user_mapping)
Database::BackgroundMigrationJob.mark_all_as_succeeded(
'MigrateMergeRequestDiffCommitUsers',
[start_id, stop_id]
)
end
def already_processed?(start_id, stop_id)
Database::BackgroundMigrationJob
.for_migration_execution('MigrateMergeRequestDiffCommitUsers', [start_id, stop_id])
.succeeded
.any?
end
# Returns the data we'll use to determine what merge_request_diff_commits
# rows to update, and what data to use for populating their
# commit_author_id and committer_id columns.
def get_data_to_update(start_id, stop_id)
# This Set is used to retrieve users that already exist in
# merge_request_diff_commit_users.
users = Set.new
# This Hash maps the primary key of every row in
# merge_request_diff_commits to the (trimmed) author and committer
# details to use for updating the row.
to_update = {}
MergeRequestDiffCommit.each_row_to_migrate(start_id, stop_id) do |row|
author = [prepare(row.author_name), prepare(row.author_email)]
committer = [prepare(row.committer_name), prepare(row.committer_email)]
to_update[[row.merge_request_diff_id, row.relative_order]] =
[author, committer]
users << author if author[0] || author[1]
users << committer if committer[0] || committer[1]
end
[users, to_update]
end
# Gets any existing rows in merge_request_diff_commit_users in batches.
#
# This method may end up having to retrieve lots of rows. To reduce the
# overhead, we batch queries into a UNION query. We limit the number of
# queries per UNION so we don't end up sending a single query containing
# too many SELECT statements.
def get_user_rows_in_batches(users, user_mapping)
users.each_slice(USER_ROWS_PER_QUERY) do |pairs|
queries = pairs.map do |(name, email)|
MergeRequestDiffCommitUser.where(name: name, email: email).to_sql
end
MergeRequestDiffCommitUser.union(queries).each do |row|
user_mapping[[row.name.to_s, row.email.to_s]] = row
end
end
end
# Creates any users for which no row exists in
# merge_request_diff_commit_users.
#
# Not all users queried may exist yet, so we need to create any missing
# ones; making sure we handle concurrent creations of the same user
def create_missing_users(users, mapping)
create = []
users.each do |(name, email)|
create << { name: name, email: email } unless mapping[[name, email]]
end
return if create.empty?
MergeRequestDiffCommitUser
.insert_all(create, returning: %w[id name email])
.each do |row|
mapping[[row['name'], row['email']]] = MergeRequestDiffCommitUser
.new(id: row['id'], name: row['name'], email: row['email'])
end
# It's possible for (name, email) pairs to be inserted concurrently,
# resulting in the above insert not returning anything. Here we get any
# remaining users that were created concurrently.
get_user_rows_in_batches(
users.reject { |pair| mapping.key?(pair) },
mapping
)
end
# Updates rows in merge_request_diff_commits with their new
# commit_author_id and committer_id values.
def update_commit_rows(to_update, user_mapping)
to_update.each_slice(UPDATES_PER_QUERY) do |slice|
updates = {}
slice.each do |(diff_id, order), (author, committer)|
author_id = user_mapping[author]&.id
committer_id = user_mapping[committer]&.id
updates[[diff_id, order]] = [author_id, committer_id]
end
bulk_update_commit_rows(updates)
end
end
# Bulk updates rows in the merge_request_diff_commits table with their new
# author and/or committer ID values.
#
# Updates are batched together to reduce the overhead of having to produce
# a single UPDATE for every row, as we may end up having to update
# thousands of rows at once.
#
# The query produced by this method is along the lines of the following:
#
# UPDATE merge_request_diff_commits
# SET commit_author_id =
# CASE
# WHEN (merge_request_diff_id, relative_order) = (x, y) THEN X
# WHEN ...
# END,
# committer_id =
# CASE
# WHEN (merge_request_diff_id, relative_order) = (x, y) THEN Y
# WHEN ...
# END
# WHERE (merge_request_diff_id, relative_order) IN ( (x, y), ... )
#
# The `mapping` argument is a Hash in the following format:
#
# { [merge_request_diff_id, relative_order] => [author_id, committer_id] }
#
# rubocop: disable Metrics/AbcSize
def bulk_update_commit_rows(mapping)
author_case = Arel::Nodes::Case.new
committer_case = Arel::Nodes::Case.new
primary_values = []
mapping.each do |diff_id_and_order, (author_id, committer_id)|
primary_value = Arel::Nodes::Grouping.new(diff_id_and_order)
primary_values << primary_value
if author_id
author_case.when(primary_key.eq(primary_value)).then(author_id)
end
if committer_id
committer_case.when(primary_key.eq(primary_value)).then(committer_id)
end
end
if author_case.conditions.empty? && committer_case.conditions.empty?
return
end
fields = []
# Statements such as `SET x = CASE END` are not valid SQL statements, so
# we omit setting an ID field if there are no values to populate it
# with.
if author_case.conditions.any?
fields << [arel_table[:commit_author_id], author_case]
end
if committer_case.conditions.any?
fields << [arel_table[:committer_id], committer_case]
end
query = Arel::UpdateManager.new
.table(arel_table)
.where(primary_key.in(primary_values))
.set(fields)
.to_sql
MergeRequestDiffCommit.connection.execute(query)
end
# rubocop: enable Metrics/AbcSize
def primary_key
Arel::Nodes::Grouping.new(
[arel_table[:merge_request_diff_id], arel_table[:relative_order]]
)
end
def arel_table
MergeRequestDiffCommit.arel_table
end
# Prepares a value to be inserted into a column in the table
# `merge_request_diff_commit_users`. Values in this table are limited to
# 512 characters.
#
# We treat empty strings as NULL values, as there's no point in (for
# example) storing a row where both the name and Email are an empty
# string. In addition, if we treated them differently we could end up with
# two rows: one where field X is NULL, and one where field X is an empty
# string. This is redundant, so we avoid storing such data.
def prepare(value)
value.present? ? value[0..511] : nil
end
end
# rubocop: enable Metrics/ClassLength
end
end