4dfe26cd8b
Prior to this MR there were two GitHub related importers: * Github::Import: the main importer used for GitHub projects * Gitlab::GithubImport: importer that's somewhat confusingly used for importing Gitea projects (apparently they have a compatible API) This MR renames the Gitea importer to Gitlab::LegacyGithubImport and introduces a new GitHub importer in the Gitlab::GithubImport namespace. This new GitHub importer uses Sidekiq for importing multiple resources in parallel, though it also has the ability to import data sequentially should this be necessary. The new code is spread across the following directories: * lib/gitlab/github_import: this directory contains most of the importer code such as the classes used for importing resources. * app/workers/gitlab/github_import: this directory contains the Sidekiq workers, most of which simply use the code from the directory above. * app/workers/concerns/gitlab/github_import: this directory provides a few modules that are included in every GitHub importer worker. == Stages The import work is divided into separate stages, with each stage importing a specific set of data. Stages will schedule the work that needs to be performed, followed by scheduling a job for the "AdvanceStageWorker" worker. This worker will periodically check if all work is completed and schedule the next stage if this is the case. If work is not yet completed this worker will reschedule itself. Using this approach we don't have to block threads by calling `sleep()`, as doing so for large projects could block the thread from doing any work for many hours. == Retrying Work Workers will reschedule themselves whenever necessary. For example, hitting the GitHub API's rate limit will result in jobs rescheduling themselves. These jobs are not processed until the rate limit has been reset. == User Lookups Part of the importing process involves looking up user details in the GitHub API so we can map them to GitLab users. The old importer used an in-memory cache, but this obviously doesn't work when the work is spread across different threads. The new importer uses a Redis cache and makes sure we only perform API/database calls if absolutely necessary. Frequently used keys are refreshed, and lookup misses are also cached; removing the need for performing API/database calls if we know we don't have the data we're looking for. == Performance & Models The new importer in various places uses raw INSERT statements (as generated by `Gitlab::Database.bulk_insert`) instead of using Rails models. This allows us to bypass any validations and callbacks, drastically reducing the number of SQL queries and Gitaly RPC calls necessary to import projects. To ensure the code produces valid data the corresponding tests check if the produced rows are valid according to the model validation rules.
162 lines
5.4 KiB
Ruby
162 lines
5.4 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Gitlab
|
|
module GithubImport
|
|
module ParallelScheduling
|
|
attr_reader :project, :client, :page_counter, :already_imported_cache_key
|
|
|
|
# The base cache key to use for tracking already imported objects.
|
|
ALREADY_IMPORTED_CACHE_KEY =
|
|
'github-importer/already-imported/%{project}/%{collection}'.freeze
|
|
|
|
# project - An instance of `Project`.
|
|
# client - An instance of `Gitlab::GithubImport::Client`.
|
|
# parallel - When set to true the objects will be imported in parallel.
|
|
def initialize(project, client, parallel: true)
|
|
@project = project
|
|
@client = client
|
|
@parallel = parallel
|
|
@page_counter = PageCounter.new(project, collection_method)
|
|
@already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY %
|
|
{ project: project.id, collection: collection_method }
|
|
end
|
|
|
|
def parallel?
|
|
@parallel
|
|
end
|
|
|
|
def execute
|
|
retval =
|
|
if parallel?
|
|
parallel_import
|
|
else
|
|
sequential_import
|
|
end
|
|
|
|
# Once we have completed all work we can remove our "already exists"
|
|
# cache so we don't put too much pressure on Redis.
|
|
#
|
|
# We don't immediately remove it since it's technically possible for
|
|
# other instances of this job to still run, instead we set the
|
|
# expiration time to a lower value. This prevents the other jobs from
|
|
# still scheduling duplicates while. Since all work has already been
|
|
# completed those jobs will just cycle through any remaining pages while
|
|
# not scheduling anything.
|
|
Caching.expire(already_imported_cache_key, 15.minutes.to_i)
|
|
|
|
retval
|
|
end
|
|
|
|
# Imports all the objects in sequence in the current thread.
|
|
def sequential_import
|
|
each_object_to_import do |object|
|
|
repr = representation_class.from_api_response(object)
|
|
|
|
importer_class.new(repr, project, client).execute
|
|
end
|
|
end
|
|
|
|
# Imports all objects in parallel by scheduling a Sidekiq job for every
|
|
# individual object.
|
|
def parallel_import
|
|
waiter = JobWaiter.new
|
|
|
|
each_object_to_import do |object|
|
|
repr = representation_class.from_api_response(object)
|
|
|
|
sidekiq_worker_class
|
|
.perform_async(project.id, repr.to_hash, waiter.key)
|
|
|
|
waiter.jobs_remaining += 1
|
|
end
|
|
|
|
waiter
|
|
end
|
|
|
|
# The method that will be called for traversing through all the objects to
|
|
# import, yielding them to the supplied block.
|
|
def each_object_to_import
|
|
repo = project.import_source
|
|
|
|
# We inject the page number here to make sure that all importers always
|
|
# start where they left off. Simply starting over wouldn't work for
|
|
# repositories with a lot of data (e.g. tens of thousands of comments).
|
|
options = collection_options.merge(page: page_counter.current)
|
|
|
|
client.each_page(collection_method, repo, options) do |page|
|
|
# Technically it's possible that the same work is performed multiple
|
|
# times, as Sidekiq doesn't guarantee there will ever only be one
|
|
# instance of a job. In such a scenario it's possible for one job to
|
|
# have a lower page number (e.g. 5) compared to another (e.g. 10). In
|
|
# this case we skip over all the objects until we have caught up,
|
|
# reducing the number of duplicate jobs scheduled by the provided
|
|
# block.
|
|
next unless page_counter.set(page.number)
|
|
|
|
page.objects.each do |object|
|
|
next if already_imported?(object)
|
|
|
|
yield object
|
|
|
|
# We mark the object as imported immediately so we don't end up
|
|
# scheduling it multiple times.
|
|
mark_as_imported(object)
|
|
end
|
|
end
|
|
end
|
|
|
|
# Returns true if the given object has already been imported, false
|
|
# otherwise.
|
|
#
|
|
# object - The object to check.
|
|
def already_imported?(object)
|
|
id = id_for_already_imported_cache(object)
|
|
|
|
Caching.set_includes?(already_imported_cache_key, id)
|
|
end
|
|
|
|
# Marks the given object as "already imported".
|
|
def mark_as_imported(object)
|
|
id = id_for_already_imported_cache(object)
|
|
|
|
Caching.set_add(already_imported_cache_key, id)
|
|
end
|
|
|
|
# Returns the ID to use for the cache used for checking if an object has
|
|
# already been imported or not.
|
|
#
|
|
# object - The object we may want to import.
|
|
def id_for_already_imported_cache(object)
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The class used for converting API responses to Hashes when performing
|
|
# the import.
|
|
def representation_class
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The class to use for importing objects when importing them sequentially.
|
|
def importer_class
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The Sidekiq worker class used for scheduling the importing of objects in
|
|
# parallel.
|
|
def sidekiq_worker_class
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The name of the method to call to retrieve the data to import.
|
|
def collection_method
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# Any options to be passed to the method used for retrieving the data to
|
|
# import.
|
|
def collection_options
|
|
{}
|
|
end
|
|
end
|
|
end
|
|
end
|