removed EE specific code from the port
This commit is contained in:
parent
508938873c
commit
0e732fa466
15 changed files with 0 additions and 948 deletions
|
@ -1,32 +0,0 @@
|
|||
module EE
|
||||
module Projects
|
||||
module JobsController
|
||||
extend ActiveSupport::Concern
|
||||
include SendFileUpload
|
||||
|
||||
def raw
|
||||
if trace_artifact_file
|
||||
send_upload(trace_artifact_file,
|
||||
send_params: raw_send_params,
|
||||
redirect_params: raw_redirect_params)
|
||||
else
|
||||
super
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def raw_send_params
|
||||
{ type: 'text/plain; charset=utf-8', disposition: 'inline' }
|
||||
end
|
||||
|
||||
def raw_redirect_params
|
||||
{ query: { 'response-content-type' => 'text/plain; charset=utf-8', 'response-content-disposition' => 'inline' } }
|
||||
end
|
||||
|
||||
def trace_artifact_file
|
||||
@trace_artifact_file ||= build.job_artifacts_trace&.file
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,25 +0,0 @@
|
|||
module EE
|
||||
# CI::JobArtifact EE mixin
|
||||
#
|
||||
# This module is intended to encapsulate EE-specific model logic
|
||||
# and be prepended in the `Ci::JobArtifact` model
|
||||
module Ci::JobArtifact
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
prepended do
|
||||
after_destroy :log_geo_event
|
||||
|
||||
scope :with_files_stored_locally, -> { where(file_store: [nil, ::JobArtifactUploader::Store::LOCAL]) }
|
||||
end
|
||||
|
||||
def local_store?
|
||||
[nil, ::JobArtifactUploader::Store::LOCAL].include?(self.file_store)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def log_geo_event
|
||||
::Geo::JobArtifactDeletedEventStore.new(self).create
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,23 +0,0 @@
|
|||
module EE
|
||||
# LFS Object EE mixin
|
||||
#
|
||||
# This module is intended to encapsulate EE-specific model logic
|
||||
# and be prepended in the `LfsObject` model
|
||||
module LfsObject
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
prepended do
|
||||
after_destroy :log_geo_event
|
||||
end
|
||||
|
||||
def local_store?
|
||||
[nil, LfsObjectUploader::Store::LOCAL].include?(self.file_store)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def log_geo_event
|
||||
::Geo::LfsObjectDeletedEventStore.new(self).create
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,11 +0,0 @@
|
|||
module Geo
|
||||
module Fdw
|
||||
module Ci
|
||||
class JobArtifact < ::Geo::BaseFdw
|
||||
self.table_name = Gitlab::Geo.fdw_table('ci_job_artifacts')
|
||||
|
||||
scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,9 +0,0 @@
|
|||
module Geo
|
||||
module Fdw
|
||||
class LfsObject < ::Geo::BaseFdw
|
||||
self.table_name = Gitlab::Geo.fdw_table('lfs_objects')
|
||||
|
||||
scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) }
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,77 +0,0 @@
|
|||
module Geo
|
||||
class FilesExpireService
|
||||
include ::Gitlab::Geo::LogHelpers
|
||||
|
||||
BATCH_SIZE = 500
|
||||
|
||||
attr_reader :project, :old_full_path
|
||||
|
||||
def initialize(project, old_full_path)
|
||||
@project = project
|
||||
@old_full_path = old_full_path
|
||||
end
|
||||
|
||||
# Expire already replicated uploads
|
||||
#
|
||||
# This is a fallback solution to support projects that haven't rolled out to hashed-storage yet.
|
||||
#
|
||||
# Note: Unless we add some locking mechanism, this will be best effort only
|
||||
# as if there are files that are being replicated during this execution, they will not
|
||||
# be expired.
|
||||
#
|
||||
# The long-term solution is to use hashed storage.
|
||||
def execute
|
||||
return unless Gitlab::Geo.secondary?
|
||||
|
||||
uploads = finder.find_project_uploads(project)
|
||||
log_info("Expiring replicated attachments after project rename", count: uploads.count)
|
||||
|
||||
schedule_file_removal(uploads)
|
||||
mark_for_resync!
|
||||
end
|
||||
|
||||
# Project's base directory for attachments storage
|
||||
#
|
||||
# @return base directory where all uploads for the project are stored
|
||||
def base_dir
|
||||
@base_dir ||= File.join(FileUploader.root, old_full_path)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def schedule_file_removal(uploads)
|
||||
paths_to_remove = uploads.find_each(batch_size: BATCH_SIZE).each_with_object([]) do |upload, to_remove|
|
||||
file_path = File.join(base_dir, upload.path)
|
||||
|
||||
if File.exist?(file_path)
|
||||
to_remove << [file_path]
|
||||
|
||||
log_info("Scheduled to remove file", file_path: file_path)
|
||||
end
|
||||
end
|
||||
|
||||
Geo::FileRemovalWorker.bulk_perform_async(paths_to_remove)
|
||||
end
|
||||
|
||||
def mark_for_resync!
|
||||
finder.find_file_registries_uploads(project).delete_all
|
||||
end
|
||||
|
||||
def finder
|
||||
@finder ||= ::Geo::ExpireUploadsFinder.new
|
||||
end
|
||||
|
||||
# This is called by LogHelpers to build json log with context info
|
||||
#
|
||||
# @see ::Gitlab::Geo::LogHelpers
|
||||
def base_log_data(message)
|
||||
{
|
||||
class: self.class.name,
|
||||
project_id: project.id,
|
||||
project_path: project.full_path,
|
||||
project_old_path: old_full_path,
|
||||
message: message
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,55 +0,0 @@
|
|||
module Geo
|
||||
AttachmentMigrationError = Class.new(StandardError)
|
||||
|
||||
class HashedStorageAttachmentsMigrationService
|
||||
include ::Gitlab::Geo::LogHelpers
|
||||
|
||||
attr_reader :project_id, :old_attachments_path, :new_attachments_path
|
||||
|
||||
def initialize(project_id, old_attachments_path:, new_attachments_path:)
|
||||
@project_id = project_id
|
||||
@old_attachments_path = old_attachments_path
|
||||
@new_attachments_path = new_attachments_path
|
||||
end
|
||||
|
||||
def async_execute
|
||||
Geo::HashedStorageAttachmentsMigrationWorker.perform_async(
|
||||
project_id,
|
||||
old_attachments_path,
|
||||
new_attachments_path
|
||||
)
|
||||
end
|
||||
|
||||
def execute
|
||||
origin = File.join(FileUploader.root, old_attachments_path)
|
||||
target = File.join(FileUploader.root, new_attachments_path)
|
||||
move_folder!(origin, target)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def project
|
||||
@project ||= Project.find(project_id)
|
||||
end
|
||||
|
||||
def move_folder!(old_path, new_path)
|
||||
unless File.directory?(old_path)
|
||||
log_info("Skipped attachments migration to Hashed Storage, source path doesn't exist or is not a directory", project_id: project.id, source: old_path, target: new_path)
|
||||
return
|
||||
end
|
||||
|
||||
if File.exist?(new_path)
|
||||
log_error("Cannot migrate attachments to Hashed Storage, target path already exist", project_id: project.id, source: old_path, target: new_path)
|
||||
raise AttachmentMigrationError, "Target path '#{new_path}' already exist"
|
||||
end
|
||||
|
||||
# Create hashed storage base path folder
|
||||
FileUtils.mkdir_p(File.dirname(new_path))
|
||||
|
||||
FileUtils.mv(old_path, new_path)
|
||||
log_info("Migrated project attachments to Hashed Storage", project_id: project.id, source: old_path, target: new_path)
|
||||
|
||||
true
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,48 +0,0 @@
|
|||
module Geo
|
||||
class JobArtifactDeletedEventStore < EventStore
|
||||
self.event_type = :job_artifact_deleted_event
|
||||
|
||||
attr_reader :job_artifact
|
||||
|
||||
def initialize(job_artifact)
|
||||
@job_artifact = job_artifact
|
||||
end
|
||||
|
||||
def create
|
||||
return unless job_artifact.local_store?
|
||||
|
||||
super
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_event
|
||||
Geo::JobArtifactDeletedEvent.new(
|
||||
job_artifact: job_artifact,
|
||||
file_path: relative_file_path
|
||||
)
|
||||
end
|
||||
|
||||
def local_store_path
|
||||
Pathname.new(JobArtifactUploader.root)
|
||||
end
|
||||
|
||||
def relative_file_path
|
||||
return unless job_artifact.file.present?
|
||||
|
||||
Pathname.new(job_artifact.file.path).relative_path_from(local_store_path)
|
||||
end
|
||||
|
||||
# This is called by ProjectLogHelpers to build json log with context info
|
||||
#
|
||||
# @see ::Gitlab::Geo::ProjectLogHelpers
|
||||
def base_log_data(message)
|
||||
{
|
||||
class: self.class.name,
|
||||
job_artifact_id: job_artifact.id,
|
||||
file_path: job_artifact.file.path,
|
||||
message: message
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,49 +0,0 @@
|
|||
module Geo
|
||||
class LfsObjectDeletedEventStore < EventStore
|
||||
self.event_type = :lfs_object_deleted_event
|
||||
|
||||
attr_reader :lfs_object
|
||||
|
||||
def initialize(lfs_object)
|
||||
@lfs_object = lfs_object
|
||||
end
|
||||
|
||||
def create
|
||||
return unless lfs_object.local_store?
|
||||
|
||||
super
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_event
|
||||
Geo::LfsObjectDeletedEvent.new(
|
||||
lfs_object: lfs_object,
|
||||
oid: lfs_object.oid,
|
||||
file_path: relative_file_path
|
||||
)
|
||||
end
|
||||
|
||||
def local_store_path
|
||||
Pathname.new(LfsObjectUploader.root)
|
||||
end
|
||||
|
||||
def relative_file_path
|
||||
return unless lfs_object.file.present?
|
||||
|
||||
Pathname.new(lfs_object.file.path).relative_path_from(local_store_path)
|
||||
end
|
||||
|
||||
# This is called by ProjectLogHelpers to build json log with context info
|
||||
#
|
||||
# @see ::Gitlab::Geo::ProjectLogHelpers
|
||||
def base_log_data(message)
|
||||
{
|
||||
class: self.class.name,
|
||||
lfs_object_id: lfs_object.id,
|
||||
file_path: lfs_object.file.path,
|
||||
message: message
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,13 +0,0 @@
|
|||
module EE
|
||||
module JobArtifactUploader
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
def open
|
||||
if file_storage?
|
||||
super
|
||||
else
|
||||
::Gitlab::Ci::Trace::HttpIO.new(url, size) if url
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,316 +0,0 @@
|
|||
require 'fog/aws'
|
||||
require 'carrierwave/storage/fog'
|
||||
|
||||
#
|
||||
# This concern should add object storage support
|
||||
# to the GitlabUploader class
|
||||
#
|
||||
module ObjectStorage
|
||||
RemoteStoreError = Class.new(StandardError)
|
||||
UnknownStoreError = Class.new(StandardError)
|
||||
ObjectStoreUnavailable = Class.new(StandardError)
|
||||
|
||||
module Store
|
||||
LOCAL = 1
|
||||
REMOTE = 2
|
||||
end
|
||||
|
||||
module Extension
|
||||
# this extension is the glue between the ObjectStorage::Concern and RecordsUploads::Concern
|
||||
module RecordsUploads
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
prepended do |base|
|
||||
raise ObjectStoreUnavailable, "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern
|
||||
|
||||
base.include(::RecordsUploads::Concern)
|
||||
end
|
||||
|
||||
def retrieve_from_store!(identifier)
|
||||
paths = store_dirs.map { |store, path| File.join(path, identifier) }
|
||||
|
||||
unless current_upload_satisfies?(paths, model)
|
||||
# the upload we already have isn't right, find the correct one
|
||||
self.upload = uploads.find_by(model: model, path: paths)
|
||||
end
|
||||
|
||||
super
|
||||
end
|
||||
|
||||
def build_upload_from_uploader(uploader)
|
||||
super.tap { |upload| upload.store = object_store }
|
||||
end
|
||||
|
||||
def upload=(upload)
|
||||
return unless upload
|
||||
|
||||
self.object_store = upload.store
|
||||
super
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def current_upload_satisfies?(paths, model)
|
||||
return false unless upload
|
||||
return false unless model
|
||||
|
||||
paths.include?(upload.path) &&
|
||||
upload.model_id == model.id &&
|
||||
upload.model_type == model.class.base_class.sti_name
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Add support for automatic background uploading after the file is stored.
|
||||
#
|
||||
module BackgroundMove
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
def background_upload(mount_points = [])
|
||||
return unless mount_points.any?
|
||||
|
||||
run_after_commit do
|
||||
mount_points.each { |mount| send(mount).schedule_background_upload } # rubocop:disable GitlabSecurity/PublicSend
|
||||
end
|
||||
end
|
||||
|
||||
def changed_mounts
|
||||
self.class.uploaders.select do |mount, uploader_class|
|
||||
mounted_as = uploader_class.serialization_column(self.class, mount)
|
||||
uploader = send(:"#{mounted_as}") # rubocop:disable GitlabSecurity/PublicSend
|
||||
|
||||
next unless uploader
|
||||
next unless uploader.exists?
|
||||
next unless send(:"#{mounted_as}_changed?") # rubocop:disable GitlabSecurity/PublicSend
|
||||
|
||||
mount
|
||||
end.keys
|
||||
end
|
||||
|
||||
included do
|
||||
after_save on: [:create, :update] do
|
||||
background_upload(changed_mounts)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
module Concern
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do |base|
|
||||
base.include(ObjectStorage)
|
||||
|
||||
before :store, :verify_license!
|
||||
after :migrate, :delete_migrated_file
|
||||
end
|
||||
|
||||
class_methods do
|
||||
def object_store_options
|
||||
options.object_store
|
||||
end
|
||||
|
||||
def object_store_enabled?
|
||||
object_store_options.enabled
|
||||
end
|
||||
|
||||
def background_upload_enabled?
|
||||
object_store_options.background_upload
|
||||
end
|
||||
|
||||
def object_store_credentials
|
||||
object_store_options.connection.to_hash.deep_symbolize_keys
|
||||
end
|
||||
|
||||
def remote_store_path
|
||||
object_store_options.remote_directory
|
||||
end
|
||||
|
||||
def licensed?
|
||||
License.feature_available?(:object_storage)
|
||||
end
|
||||
end
|
||||
|
||||
def file_storage?
|
||||
storage.is_a?(CarrierWave::Storage::File)
|
||||
end
|
||||
|
||||
def file_cache_storage?
|
||||
cache_storage.is_a?(CarrierWave::Storage::File)
|
||||
end
|
||||
|
||||
def object_store
|
||||
@object_store ||= model.try(store_serialization_column) || Store::LOCAL
|
||||
end
|
||||
|
||||
# rubocop:disable Gitlab/ModuleWithInstanceVariables
|
||||
def object_store=(value)
|
||||
@object_store = value || Store::LOCAL
|
||||
@storage = storage_for(object_store)
|
||||
end
|
||||
# rubocop:enable Gitlab/ModuleWithInstanceVariables
|
||||
|
||||
# Return true if the current file is part or the model (i.e. is mounted in the model)
|
||||
#
|
||||
def persist_object_store?
|
||||
model.respond_to?(:"#{store_serialization_column}=")
|
||||
end
|
||||
|
||||
# Save the current @object_store to the model <mounted_as>_store column
|
||||
def persist_object_store!
|
||||
return unless persist_object_store?
|
||||
|
||||
updated = model.update_column(store_serialization_column, object_store)
|
||||
raise 'Failed to update object store' unless updated
|
||||
end
|
||||
|
||||
def use_file
|
||||
if file_storage?
|
||||
return yield path
|
||||
end
|
||||
|
||||
begin
|
||||
cache_stored_file!
|
||||
yield cache_path
|
||||
ensure
|
||||
cache_storage.delete_dir!(cache_path(nil))
|
||||
end
|
||||
end
|
||||
|
||||
def filename
|
||||
super || file&.filename
|
||||
end
|
||||
|
||||
#
|
||||
# Move the file to another store
|
||||
#
|
||||
# new_store: Enum (Store::LOCAL, Store::REMOTE)
|
||||
#
|
||||
def migrate!(new_store)
|
||||
uuid = Gitlab::ExclusiveLease.new(exclusive_lease_key, timeout: 1.hour.to_i).try_obtain
|
||||
raise 'Already running' unless uuid
|
||||
|
||||
unsafe_migrate!(new_store)
|
||||
ensure
|
||||
Gitlab::ExclusiveLease.cancel(exclusive_lease_key, uuid)
|
||||
end
|
||||
|
||||
def schedule_migration_to_object_storage(*args)
|
||||
return unless self.class.object_store_enabled?
|
||||
return unless self.class.background_upload_enabled?
|
||||
return unless self.class.licensed?
|
||||
return unless self.file_storage?
|
||||
|
||||
ObjectStorageUploadWorker.perform_async(self.class.name, model.class.name, mounted_as, model.id)
|
||||
end
|
||||
|
||||
def fog_directory
|
||||
self.class.remote_store_path
|
||||
end
|
||||
|
||||
def fog_credentials
|
||||
self.class.object_store_credentials
|
||||
end
|
||||
|
||||
def fog_public
|
||||
false
|
||||
end
|
||||
|
||||
def delete_migrated_file(migrated_file)
|
||||
migrated_file.delete if exists?
|
||||
end
|
||||
|
||||
def verify_license!(_file)
|
||||
return if file_storage?
|
||||
|
||||
raise 'Object Storage feature is missing' unless self.class.licensed?
|
||||
end
|
||||
|
||||
def exists?
|
||||
file.present?
|
||||
end
|
||||
|
||||
def store_dir(store = nil)
|
||||
store_dirs[store || object_store]
|
||||
end
|
||||
|
||||
def store_dirs
|
||||
{
|
||||
Store::LOCAL => File.join(base_dir, dynamic_segment),
|
||||
Store::REMOTE => File.join(dynamic_segment)
|
||||
}
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# this is a hack around CarrierWave. The #migrate method needs to be
|
||||
# able to force the current file to the migrated file upon success.
|
||||
def file=(file)
|
||||
@file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables
|
||||
end
|
||||
|
||||
def serialization_column
|
||||
model.class.uploader_options.dig(mounted_as, :mount_on) || mounted_as
|
||||
end
|
||||
|
||||
# Returns the column where the 'store' is saved
|
||||
# defaults to 'store'
|
||||
def store_serialization_column
|
||||
[serialization_column, 'store'].compact.join('_').to_sym
|
||||
end
|
||||
|
||||
def storage
|
||||
@storage ||= storage_for(object_store)
|
||||
end
|
||||
|
||||
def storage_for(store)
|
||||
case store
|
||||
when Store::REMOTE
|
||||
raise 'Object Storage is not enabled' unless self.class.object_store_enabled?
|
||||
|
||||
CarrierWave::Storage::Fog.new(self)
|
||||
when Store::LOCAL
|
||||
CarrierWave::Storage::File.new(self)
|
||||
else
|
||||
raise UnknownStoreError
|
||||
end
|
||||
end
|
||||
|
||||
def exclusive_lease_key
|
||||
"object_storage_migrate:#{model.class}:#{model.id}"
|
||||
end
|
||||
|
||||
#
|
||||
# Move the file to another store
|
||||
#
|
||||
# new_store: Enum (Store::LOCAL, Store::REMOTE)
|
||||
#
|
||||
def unsafe_migrate!(new_store)
|
||||
return unless object_store != new_store
|
||||
return unless file
|
||||
|
||||
new_file = nil
|
||||
file_to_delete = file
|
||||
from_object_store = object_store
|
||||
self.object_store = new_store # changes the storage and file
|
||||
|
||||
cache_stored_file! if file_storage?
|
||||
|
||||
with_callbacks(:migrate, file_to_delete) do
|
||||
with_callbacks(:store, file_to_delete) do # for #store_versions!
|
||||
new_file = storage.store!(file)
|
||||
persist_object_store!
|
||||
self.file = new_file
|
||||
end
|
||||
end
|
||||
|
||||
file
|
||||
rescue => e
|
||||
# in case of failure delete new file
|
||||
new_file.delete unless new_file.nil?
|
||||
# revert back to the old file
|
||||
self.object_store = from_object_store
|
||||
self.file = file_to_delete
|
||||
raise e
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,24 +0,0 @@
|
|||
module Gitlab
|
||||
module Geo
|
||||
class FileTransfer < Transfer
|
||||
def initialize(file_type, upload)
|
||||
@file_type = file_type
|
||||
@file_id = upload.id
|
||||
@filename = upload.absolute_path
|
||||
@request_data = build_request_data(upload)
|
||||
rescue ObjectStorage::RemoteStoreError
|
||||
Rails.logger.warn "Cannot transfer a remote object."
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_request_data(upload)
|
||||
{
|
||||
id: upload.model_id,
|
||||
type: upload.model_type,
|
||||
checksum: upload.checksum
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,266 +0,0 @@
|
|||
module Gitlab
|
||||
module Geo
|
||||
module LogCursor
|
||||
class Daemon
|
||||
VERSION = '0.2.0'.freeze
|
||||
BATCH_SIZE = 250
|
||||
SECONDARY_CHECK_INTERVAL = 1.minute
|
||||
|
||||
attr_reader :options
|
||||
|
||||
def initialize(options = {})
|
||||
@options = options
|
||||
@exit = false
|
||||
logger.geo_logger.build.level = options[:debug] ? :debug : Rails.logger.level
|
||||
end
|
||||
|
||||
def run!
|
||||
trap_signals
|
||||
|
||||
until exit?
|
||||
# Prevent the node from processing events unless it's a secondary
|
||||
unless Geo.secondary?
|
||||
sleep(SECONDARY_CHECK_INTERVAL)
|
||||
next
|
||||
end
|
||||
|
||||
lease = Lease.try_obtain_with_ttl { run_once! }
|
||||
|
||||
return if exit?
|
||||
|
||||
# When no new event is found sleep for a few moments
|
||||
arbitrary_sleep(lease[:ttl])
|
||||
end
|
||||
end
|
||||
|
||||
def run_once!
|
||||
LogCursor::Events.fetch_in_batches { |batch| handle_events(batch) }
|
||||
end
|
||||
|
||||
def handle_events(batch)
|
||||
batch.each do |event_log|
|
||||
next unless can_replay?(event_log)
|
||||
|
||||
begin
|
||||
event = event_log.event
|
||||
handler = "handle_#{event.class.name.demodulize.underscore}"
|
||||
|
||||
__send__(handler, event, event_log.created_at) # rubocop:disable GitlabSecurity/PublicSend
|
||||
rescue NoMethodError => e
|
||||
logger.error(e.message)
|
||||
raise e
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def trap_signals
|
||||
trap(:TERM) do
|
||||
quit!
|
||||
end
|
||||
trap(:INT) do
|
||||
quit!
|
||||
end
|
||||
end
|
||||
|
||||
# Safe shutdown
|
||||
def quit!
|
||||
$stdout.puts 'Exiting...'
|
||||
|
||||
@exit = true
|
||||
end
|
||||
|
||||
def exit?
|
||||
@exit
|
||||
end
|
||||
|
||||
def can_replay?(event_log)
|
||||
return true if event_log.project_id.nil?
|
||||
|
||||
Gitlab::Geo.current_node&.projects_include?(event_log.project_id)
|
||||
end
|
||||
|
||||
def handle_repository_created_event(event, created_at)
|
||||
registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?)
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Repository created',
|
||||
project_id: event.project_id,
|
||||
repo_path: event.repo_path,
|
||||
wiki_path: event.wiki_path,
|
||||
resync_repository: registry.resync_repository,
|
||||
resync_wiki: registry.resync_wiki)
|
||||
|
||||
registry.save!
|
||||
|
||||
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
|
||||
end
|
||||
|
||||
def handle_repository_updated_event(event, created_at)
|
||||
registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true)
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Repository update',
|
||||
project_id: event.project_id,
|
||||
source: event.source,
|
||||
resync_repository: registry.resync_repository,
|
||||
resync_wiki: registry.resync_wiki)
|
||||
|
||||
registry.save!
|
||||
|
||||
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
|
||||
end
|
||||
|
||||
def handle_repository_deleted_event(event, created_at)
|
||||
job_id = ::Geo::RepositoryDestroyService
|
||||
.new(event.project_id, event.deleted_project_name, event.deleted_path, event.repository_storage_name)
|
||||
.async_execute
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Deleted project',
|
||||
project_id: event.project_id,
|
||||
repository_storage_name: event.repository_storage_name,
|
||||
disk_path: event.deleted_path,
|
||||
job_id: job_id)
|
||||
|
||||
# No need to create a project entry if it doesn't exist
|
||||
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
|
||||
end
|
||||
|
||||
def handle_repositories_changed_event(event, created_at)
|
||||
return unless Gitlab::Geo.current_node.id == event.geo_node_id
|
||||
|
||||
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
|
||||
|
||||
if job_id
|
||||
logger.info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id)
|
||||
else
|
||||
logger.error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id)
|
||||
end
|
||||
end
|
||||
|
||||
def handle_repository_renamed_event(event, created_at)
|
||||
return unless event.project_id
|
||||
|
||||
old_path = event.old_path_with_namespace
|
||||
new_path = event.new_path_with_namespace
|
||||
|
||||
job_id = ::Geo::RenameRepositoryService
|
||||
.new(event.project_id, old_path, new_path)
|
||||
.async_execute
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Renaming project',
|
||||
project_id: event.project_id,
|
||||
old_path: old_path,
|
||||
new_path: new_path,
|
||||
job_id: job_id)
|
||||
end
|
||||
|
||||
def handle_hashed_storage_migrated_event(event, created_at)
|
||||
return unless event.project_id
|
||||
|
||||
job_id = ::Geo::HashedStorageMigrationService.new(
|
||||
event.project_id,
|
||||
old_disk_path: event.old_disk_path,
|
||||
new_disk_path: event.new_disk_path,
|
||||
old_storage_version: event.old_storage_version
|
||||
).async_execute
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Migrating project to hashed storage',
|
||||
project_id: event.project_id,
|
||||
old_storage_version: event.old_storage_version,
|
||||
new_storage_version: event.new_storage_version,
|
||||
old_disk_path: event.old_disk_path,
|
||||
new_disk_path: event.new_disk_path,
|
||||
job_id: job_id)
|
||||
end
|
||||
|
||||
def handle_hashed_storage_attachments_event(event, created_at)
|
||||
job_id = ::Geo::HashedStorageAttachmentsMigrationService.new(
|
||||
event.project_id,
|
||||
old_attachments_path: event.old_attachments_path,
|
||||
new_attachments_path: event.new_attachments_path
|
||||
).async_execute
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Migrating attachments to hashed storage',
|
||||
project_id: event.project_id,
|
||||
old_attachments_path: event.old_attachments_path,
|
||||
new_attachments_path: event.new_attachments_path,
|
||||
job_id: job_id
|
||||
)
|
||||
end
|
||||
|
||||
def handle_lfs_object_deleted_event(event, created_at)
|
||||
file_path = File.join(LfsObjectUploader.root, event.file_path)
|
||||
|
||||
job_id = ::Geo::FileRemovalWorker.perform_async(file_path)
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Deleted LFS object',
|
||||
oid: event.oid,
|
||||
file_id: event.lfs_object_id,
|
||||
file_path: file_path,
|
||||
job_id: job_id)
|
||||
|
||||
::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all
|
||||
end
|
||||
|
||||
def handle_job_artifact_deleted_event(event, created_at)
|
||||
file_registry_job_artifacts = ::Geo::FileRegistry.job_artifacts.where(file_id: event.job_artifact_id)
|
||||
return unless file_registry_job_artifacts.any? # avoid race condition
|
||||
|
||||
file_path = File.join(::JobArtifactUploader.root, event.file_path)
|
||||
|
||||
if File.file?(file_path)
|
||||
deleted = delete_file(file_path) # delete synchronously to ensure consistency
|
||||
return unless deleted # do not delete file from registry if deletion failed
|
||||
end
|
||||
|
||||
logger.event_info(
|
||||
created_at,
|
||||
message: 'Deleted job artifact',
|
||||
file_id: event.job_artifact_id,
|
||||
file_path: file_path)
|
||||
|
||||
file_registry_job_artifacts.delete_all
|
||||
end
|
||||
|
||||
def find_or_initialize_registry(project_id, attrs)
|
||||
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
|
||||
registry.assign_attributes(attrs)
|
||||
registry
|
||||
end
|
||||
|
||||
def delete_file(path)
|
||||
File.delete(path)
|
||||
rescue => ex
|
||||
logger.error("Failed to remove file", exception: ex.class.name, details: ex.message, filename: path)
|
||||
false
|
||||
end
|
||||
|
||||
# Sleeps for the expired TTL that remains on the lease plus some random seconds.
|
||||
#
|
||||
# This allows multiple GeoLogCursors to randomly process a batch of events,
|
||||
# without favouring the shortest path (or latency).
|
||||
def arbitrary_sleep(delay)
|
||||
sleep(delay + rand(1..20) * 0.1)
|
||||
end
|
||||
|
||||
def logger
|
||||
Gitlab::Geo::LogCursor::Logger
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue