# frozen_string_literal: true module Gitlab module BackgroundMigration # Migrates author and committer names and emails from # merge_request_diff_commits to two columns that point to # merge_request_diff_commit_users. # # rubocop: disable Metrics/ClassLength class MigrateMergeRequestDiffCommitUsers # The number of user rows in merge_request_diff_commit_users to get in a # single query. USER_ROWS_PER_QUERY = 1_000 # The number of rows in merge_request_diff_commits to get in a single # query. COMMIT_ROWS_PER_QUERY = 1_000 # The number of rows in merge_request_diff_commits to update in a single # query. # # Tests in staging revealed that increasing the number of updates per # query translates to a longer total runtime for a migration. For example, # given the same range of rows to migrate, 1000 updates per query required # a total of roughly 15 seconds. On the other hand, 5000 updates per query # required a total of roughly 25 seconds. For this reason, we use a value # of 1000 rows per update. UPDATES_PER_QUERY = 1_000 # rubocop: disable Style/Documentation class MergeRequestDiffCommit < ActiveRecord::Base include FromUnion extend ::SuppressCompositePrimaryKeyWarning self.table_name = 'merge_request_diff_commits' # Yields each row to migrate in the given range. # # This method uses keyset pagination to ensure we don't retrieve # potentially tens of thousands (or even hundreds of thousands) of rows # in a single query. Such queries could time out, or increase the amount # of memory needed to process the data. # # We can't use `EachBatch` and similar approaches, as # merge_request_diff_commits doesn't have a single monotonically # increasing primary key. def self.each_row_to_migrate(start_id, stop_id, &block) order = Pagination::Keyset::Order.build( %w[merge_request_diff_id relative_order].map do |col| Pagination::Keyset::ColumnOrderDefinition.new( attribute_name: col, order_expression: self.arel_table[col.to_sym].asc, nullable: :not_nullable, distinct: false ) end ) scope = MergeRequestDiffCommit .where(merge_request_diff_id: start_id...stop_id) .order(order) Pagination::Keyset::Iterator .new(scope: scope, use_union_optimization: true) .each_batch(of: COMMIT_ROWS_PER_QUERY) { |rows| rows.each(&block) } end end # rubocop: enable Style/Documentation # rubocop: disable Style/Documentation class MergeRequestDiffCommitUser < ActiveRecord::Base self.table_name = 'merge_request_diff_commit_users' def self.union(queries) from("(#{queries.join("\nUNION ALL\n")}) #{table_name}") end end # rubocop: enable Style/Documentation def perform(start_id, stop_id) return if already_processed?(start_id, stop_id) # This Hash maps user names + emails to their corresponding rows in # merge_request_diff_commit_users. user_mapping = {} user_details, diff_rows_to_update = get_data_to_update(start_id, stop_id) get_user_rows_in_batches(user_details, user_mapping) create_missing_users(user_details, user_mapping) update_commit_rows(diff_rows_to_update, user_mapping) Database::BackgroundMigrationJob.mark_all_as_succeeded( 'MigrateMergeRequestDiffCommitUsers', [start_id, stop_id] ) end def already_processed?(start_id, stop_id) Database::BackgroundMigrationJob .for_migration_execution('MigrateMergeRequestDiffCommitUsers', [start_id, stop_id]) .succeeded .any? end # Returns the data we'll use to determine what merge_request_diff_commits # rows to update, and what data to use for populating their # commit_author_id and committer_id columns. def get_data_to_update(start_id, stop_id) # This Set is used to retrieve users that already exist in # merge_request_diff_commit_users. users = Set.new # This Hash maps the primary key of every row in # merge_request_diff_commits to the (trimmed) author and committer # details to use for updating the row. to_update = {} MergeRequestDiffCommit.each_row_to_migrate(start_id, stop_id) do |row| author = [prepare(row.author_name), prepare(row.author_email)] committer = [prepare(row.committer_name), prepare(row.committer_email)] to_update[[row.merge_request_diff_id, row.relative_order]] = [author, committer] users << author if author[0] || author[1] users << committer if committer[0] || committer[1] end [users, to_update] end # Gets any existing rows in merge_request_diff_commit_users in batches. # # This method may end up having to retrieve lots of rows. To reduce the # overhead, we batch queries into a UNION query. We limit the number of # queries per UNION so we don't end up sending a single query containing # too many SELECT statements. def get_user_rows_in_batches(users, user_mapping) users.each_slice(USER_ROWS_PER_QUERY) do |pairs| queries = pairs.map do |(name, email)| MergeRequestDiffCommitUser.where(name: name, email: email).to_sql end MergeRequestDiffCommitUser.union(queries).each do |row| user_mapping[[row.name.to_s, row.email.to_s]] = row end end end # Creates any users for which no row exists in # merge_request_diff_commit_users. # # Not all users queried may exist yet, so we need to create any missing # ones; making sure we handle concurrent creations of the same user def create_missing_users(users, mapping) create = [] users.each do |(name, email)| create << { name: name, email: email } unless mapping[[name, email]] end return if create.empty? MergeRequestDiffCommitUser .insert_all(create, returning: %w[id name email]) .each do |row| mapping[[row['name'], row['email']]] = MergeRequestDiffCommitUser .new(id: row['id'], name: row['name'], email: row['email']) end # It's possible for (name, email) pairs to be inserted concurrently, # resulting in the above insert not returning anything. Here we get any # remaining users that were created concurrently. get_user_rows_in_batches( users.reject { |pair| mapping.key?(pair) }, mapping ) end # Updates rows in merge_request_diff_commits with their new # commit_author_id and committer_id values. def update_commit_rows(to_update, user_mapping) to_update.each_slice(UPDATES_PER_QUERY) do |slice| updates = {} slice.each do |(diff_id, order), (author, committer)| author_id = user_mapping[author]&.id committer_id = user_mapping[committer]&.id updates[[diff_id, order]] = [author_id, committer_id] end bulk_update_commit_rows(updates) end end # Bulk updates rows in the merge_request_diff_commits table with their new # author and/or committer ID values. # # Updates are batched together to reduce the overhead of having to produce # a single UPDATE for every row, as we may end up having to update # thousands of rows at once. # # The query produced by this method is along the lines of the following: # # UPDATE merge_request_diff_commits # SET commit_author_id = # CASE # WHEN (merge_request_diff_id, relative_order) = (x, y) THEN X # WHEN ... # END, # committer_id = # CASE # WHEN (merge_request_diff_id, relative_order) = (x, y) THEN Y # WHEN ... # END # WHERE (merge_request_diff_id, relative_order) IN ( (x, y), ... ) # # The `mapping` argument is a Hash in the following format: # # { [merge_request_diff_id, relative_order] => [author_id, committer_id] } # # rubocop: disable Metrics/AbcSize def bulk_update_commit_rows(mapping) author_case = Arel::Nodes::Case.new committer_case = Arel::Nodes::Case.new primary_values = [] mapping.each do |diff_id_and_order, (author_id, committer_id)| primary_value = Arel::Nodes::Grouping.new(diff_id_and_order) primary_values << primary_value if author_id author_case.when(primary_key.eq(primary_value)).then(author_id) end if committer_id committer_case.when(primary_key.eq(primary_value)).then(committer_id) end end if author_case.conditions.empty? && committer_case.conditions.empty? return end fields = [] # Statements such as `SET x = CASE END` are not valid SQL statements, so # we omit setting an ID field if there are no values to populate it # with. if author_case.conditions.any? fields << [arel_table[:commit_author_id], author_case] end if committer_case.conditions.any? fields << [arel_table[:committer_id], committer_case] end query = Arel::UpdateManager.new .table(arel_table) .where(primary_key.in(primary_values)) .set(fields) .to_sql MergeRequestDiffCommit.connection.execute(query) end # rubocop: enable Metrics/AbcSize def primary_key Arel::Nodes::Grouping.new( [arel_table[:merge_request_diff_id], arel_table[:relative_order]] ) end def arel_table MergeRequestDiffCommit.arel_table end # Prepares a value to be inserted into a column in the table # `merge_request_diff_commit_users`. Values in this table are limited to # 512 characters. # # We treat empty strings as NULL values, as there's no point in (for # example) storing a row where both the name and Email are an empty # string. In addition, if we treated them differently we could end up with # two rows: one where field X is NULL, and one where field X is an empty # string. This is redundant, so we avoid storing such data. def prepare(value) value.present? ? value[0..511] : nil end end # rubocop: enable Metrics/ClassLength end end