163 lines
5.4 KiB
Ruby
163 lines
5.4 KiB
Ruby
|
# frozen_string_literal: true
|
||
|
|
||
|
module Gitlab
|
||
|
module GithubImport
|
||
|
module ParallelScheduling
|
||
|
attr_reader :project, :client, :page_counter, :already_imported_cache_key
|
||
|
|
||
|
# The base cache key to use for tracking already imported objects.
|
||
|
ALREADY_IMPORTED_CACHE_KEY =
|
||
|
'github-importer/already-imported/%{project}/%{collection}'.freeze
|
||
|
|
||
|
# project - An instance of `Project`.
|
||
|
# client - An instance of `Gitlab::GithubImport::Client`.
|
||
|
# parallel - When set to true the objects will be imported in parallel.
|
||
|
def initialize(project, client, parallel: true)
|
||
|
@project = project
|
||
|
@client = client
|
||
|
@parallel = parallel
|
||
|
@page_counter = PageCounter.new(project, collection_method)
|
||
|
@already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY %
|
||
|
{ project: project.id, collection: collection_method }
|
||
|
end
|
||
|
|
||
|
def parallel?
|
||
|
@parallel
|
||
|
end
|
||
|
|
||
|
def execute
|
||
|
retval =
|
||
|
if parallel?
|
||
|
parallel_import
|
||
|
else
|
||
|
sequential_import
|
||
|
end
|
||
|
|
||
|
# Once we have completed all work we can remove our "already exists"
|
||
|
# cache so we don't put too much pressure on Redis.
|
||
|
#
|
||
|
# We don't immediately remove it since it's technically possible for
|
||
|
# other instances of this job to still run, instead we set the
|
||
|
# expiration time to a lower value. This prevents the other jobs from
|
||
|
# still scheduling duplicates while. Since all work has already been
|
||
|
# completed those jobs will just cycle through any remaining pages while
|
||
|
# not scheduling anything.
|
||
|
Caching.expire(already_imported_cache_key, 15.minutes.to_i)
|
||
|
|
||
|
retval
|
||
|
end
|
||
|
|
||
|
# Imports all the objects in sequence in the current thread.
|
||
|
def sequential_import
|
||
|
each_object_to_import do |object|
|
||
|
repr = representation_class.from_api_response(object)
|
||
|
|
||
|
importer_class.new(repr, project, client).execute
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# Imports all objects in parallel by scheduling a Sidekiq job for every
|
||
|
# individual object.
|
||
|
def parallel_import
|
||
|
waiter = JobWaiter.new
|
||
|
|
||
|
each_object_to_import do |object|
|
||
|
repr = representation_class.from_api_response(object)
|
||
|
|
||
|
sidekiq_worker_class
|
||
|
.perform_async(project.id, repr.to_hash, waiter.key)
|
||
|
|
||
|
waiter.jobs_remaining += 1
|
||
|
end
|
||
|
|
||
|
waiter
|
||
|
end
|
||
|
|
||
|
# The method that will be called for traversing through all the objects to
|
||
|
# import, yielding them to the supplied block.
|
||
|
def each_object_to_import
|
||
|
repo = project.import_source
|
||
|
|
||
|
# We inject the page number here to make sure that all importers always
|
||
|
# start where they left off. Simply starting over wouldn't work for
|
||
|
# repositories with a lot of data (e.g. tens of thousands of comments).
|
||
|
options = collection_options.merge(page: page_counter.current)
|
||
|
|
||
|
client.each_page(collection_method, repo, options) do |page|
|
||
|
# Technically it's possible that the same work is performed multiple
|
||
|
# times, as Sidekiq doesn't guarantee there will ever only be one
|
||
|
# instance of a job. In such a scenario it's possible for one job to
|
||
|
# have a lower page number (e.g. 5) compared to another (e.g. 10). In
|
||
|
# this case we skip over all the objects until we have caught up,
|
||
|
# reducing the number of duplicate jobs scheduled by the provided
|
||
|
# block.
|
||
|
next unless page_counter.set(page.number)
|
||
|
|
||
|
page.objects.each do |object|
|
||
|
next if already_imported?(object)
|
||
|
|
||
|
yield object
|
||
|
|
||
|
# We mark the object as imported immediately so we don't end up
|
||
|
# scheduling it multiple times.
|
||
|
mark_as_imported(object)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# Returns true if the given object has already been imported, false
|
||
|
# otherwise.
|
||
|
#
|
||
|
# object - The object to check.
|
||
|
def already_imported?(object)
|
||
|
id = id_for_already_imported_cache(object)
|
||
|
|
||
|
Caching.set_includes?(already_imported_cache_key, id)
|
||
|
end
|
||
|
|
||
|
# Marks the given object as "already imported".
|
||
|
def mark_as_imported(object)
|
||
|
id = id_for_already_imported_cache(object)
|
||
|
|
||
|
Caching.set_add(already_imported_cache_key, id)
|
||
|
end
|
||
|
|
||
|
# Returns the ID to use for the cache used for checking if an object has
|
||
|
# already been imported or not.
|
||
|
#
|
||
|
# object - The object we may want to import.
|
||
|
def id_for_already_imported_cache(object)
|
||
|
raise NotImplementedError
|
||
|
end
|
||
|
|
||
|
# The class used for converting API responses to Hashes when performing
|
||
|
# the import.
|
||
|
def representation_class
|
||
|
raise NotImplementedError
|
||
|
end
|
||
|
|
||
|
# The class to use for importing objects when importing them sequentially.
|
||
|
def importer_class
|
||
|
raise NotImplementedError
|
||
|
end
|
||
|
|
||
|
# The Sidekiq worker class used for scheduling the importing of objects in
|
||
|
# parallel.
|
||
|
def sidekiq_worker_class
|
||
|
raise NotImplementedError
|
||
|
end
|
||
|
|
||
|
# The name of the method to call to retrieve the data to import.
|
||
|
def collection_method
|
||
|
raise NotImplementedError
|
||
|
end
|
||
|
|
||
|
# Any options to be passed to the method used for retrieving the data to
|
||
|
# import.
|
||
|
def collection_options
|
||
|
{}
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
end
|