another round of fixes
This commit is contained in:
parent
0f1d348d68
commit
a8df653fae
10 changed files with 360 additions and 56 deletions
|
@ -20,10 +20,10 @@ module ObjectStorage
|
|||
module RecordsUploads
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
prepended do |base|
|
||||
def prepended(base)
|
||||
raise "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern
|
||||
|
||||
base.include(::RecordsUploads::Concern)
|
||||
base.include(RecordsUploads::Concern)
|
||||
end
|
||||
|
||||
def retrieve_from_store!(identifier)
|
||||
|
|
8
app/workers/concerns/object_storage_queue.rb
Normal file
8
app/workers/concerns/object_storage_queue.rb
Normal file
|
@ -0,0 +1,8 @@
|
|||
# Concern for setting Sidekiq settings for the various GitLab ObjectStorage workers.
|
||||
module ObjectStorageQueue
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
queue_namespace :object_storage
|
||||
end
|
||||
end
|
30
app/workers/object_storage/background_move_worker.rb
Normal file
30
app/workers/object_storage/background_move_worker.rb
Normal file
|
@ -0,0 +1,30 @@
|
|||
module ObjectStorage
|
||||
class BackgroundMoveWorker
|
||||
include ApplicationWorker
|
||||
include ObjectStorageQueue
|
||||
|
||||
sidekiq_options retry: 5
|
||||
|
||||
def perform(uploader_class_name, subject_class_name, file_field, subject_id)
|
||||
uploader_class = uploader_class_name.constantize
|
||||
subject_class = subject_class_name.constantize
|
||||
|
||||
return unless uploader_class < ObjectStorage::Concern
|
||||
return unless uploader_class.object_store_enabled?
|
||||
return unless uploader_class.licensed?
|
||||
return unless uploader_class.background_upload_enabled?
|
||||
|
||||
subject = subject_class.find(subject_id)
|
||||
uploader = build_uploader(subject, file_field&.to_sym)
|
||||
uploader.migrate!(ObjectStorage::Store::REMOTE)
|
||||
end
|
||||
|
||||
def build_uploader(subject, mount_point)
|
||||
case subject
|
||||
when Upload then subject.build_uploader(mount_point)
|
||||
else
|
||||
subject.send(mount_point) # rubocop:disable GitlabSecurity/PublicSend
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
202
app/workers/object_storage/migrate_uploads_worker.rb
Normal file
202
app/workers/object_storage/migrate_uploads_worker.rb
Normal file
|
@ -0,0 +1,202 @@
|
|||
# frozen_string_literal: true
|
||||
# rubocop:disable Metrics/LineLength
|
||||
# rubocop:disable Style/Documentation
|
||||
|
||||
module ObjectStorage
|
||||
class MigrateUploadsWorker
|
||||
include ApplicationWorker
|
||||
include ObjectStorageQueue
|
||||
|
||||
SanityCheckError = Class.new(StandardError)
|
||||
|
||||
class Upload < ActiveRecord::Base
|
||||
# Upper limit for foreground checksum processing
|
||||
CHECKSUM_THRESHOLD = 100.megabytes
|
||||
|
||||
belongs_to :model, polymorphic: true # rubocop:disable Cop/PolymorphicAssociations
|
||||
|
||||
validates :size, presence: true
|
||||
validates :path, presence: true
|
||||
validates :model, presence: true
|
||||
validates :uploader, presence: true
|
||||
|
||||
before_save :calculate_checksum!, if: :foreground_checksummable?
|
||||
after_commit :schedule_checksum, if: :checksummable?
|
||||
|
||||
scope :stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
|
||||
scope :stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
|
||||
|
||||
def self.hexdigest(path)
|
||||
Digest::SHA256.file(path).hexdigest
|
||||
end
|
||||
|
||||
def absolute_path
|
||||
raise ObjectStorage::RemoteStoreError, "Remote object has no absolute path." unless local?
|
||||
return path unless relative_path?
|
||||
|
||||
uploader_class.absolute_path(self)
|
||||
end
|
||||
|
||||
def calculate_checksum!
|
||||
self.checksum = nil
|
||||
return unless checksummable?
|
||||
|
||||
self.checksum = self.class.hexdigest(absolute_path)
|
||||
end
|
||||
|
||||
def build_uploader(mounted_as = nil)
|
||||
uploader_class.new(model, mounted_as).tap do |uploader|
|
||||
uploader.upload = self
|
||||
uploader.retrieve_from_store!(identifier)
|
||||
end
|
||||
end
|
||||
|
||||
def exist?
|
||||
File.exist?(absolute_path)
|
||||
end
|
||||
|
||||
def local?
|
||||
return true if store.nil?
|
||||
|
||||
store == ObjectStorage::Store::LOCAL
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def checksummable?
|
||||
checksum.nil? && local? && exist?
|
||||
end
|
||||
|
||||
def foreground_checksummable?
|
||||
checksummable? && size <= CHECKSUM_THRESHOLD
|
||||
end
|
||||
|
||||
def schedule_checksum
|
||||
UploadChecksumWorker.perform_async(id)
|
||||
end
|
||||
|
||||
def relative_path?
|
||||
!path.start_with?('/')
|
||||
end
|
||||
|
||||
def identifier
|
||||
File.basename(path)
|
||||
end
|
||||
|
||||
def uploader_class
|
||||
Object.const_get(uploader)
|
||||
end
|
||||
end
|
||||
|
||||
class MigrationResult
|
||||
attr_reader :upload
|
||||
attr_accessor :error
|
||||
|
||||
def initialize(upload, error = nil)
|
||||
@upload, @error = upload, error
|
||||
end
|
||||
|
||||
def success?
|
||||
error.nil?
|
||||
end
|
||||
|
||||
def to_s
|
||||
success? ? "Migration successful." : "Error while migrating #{upload.id}: #{error.message}"
|
||||
end
|
||||
end
|
||||
|
||||
module Report
|
||||
class MigrationFailures < StandardError
|
||||
attr_reader :errors
|
||||
|
||||
def initialize(errors)
|
||||
@errors = errors
|
||||
end
|
||||
|
||||
def message
|
||||
errors.map(&:message).join("\n")
|
||||
end
|
||||
end
|
||||
|
||||
def report!(results)
|
||||
success, failures = results.partition(&:success?)
|
||||
|
||||
Rails.logger.info header(success, failures)
|
||||
Rails.logger.warn failures(failures)
|
||||
|
||||
raise MigrationFailures.new(failures.map(&:error)) if failures.any?
|
||||
end
|
||||
|
||||
def header(success, failures)
|
||||
"Migrated #{success.count}/#{success.count + failures.count} files."
|
||||
end
|
||||
|
||||
def failures(failures)
|
||||
failures.map { |f| "\t#{f}" }.join('\n')
|
||||
end
|
||||
end
|
||||
|
||||
include Report
|
||||
|
||||
def self.enqueue!(uploads, mounted_as, to_store)
|
||||
sanity_check!(uploads, mounted_as)
|
||||
|
||||
perform_async(uploads.ids, mounted_as, to_store)
|
||||
end
|
||||
|
||||
# We need to be sure all the uploads are for the same uploader and model type
|
||||
# and that the mount point exists if provided.
|
||||
#
|
||||
def self.sanity_check!(uploads, mounted_as)
|
||||
upload = uploads.first
|
||||
|
||||
uploader_class = upload.uploader.constantize
|
||||
model_class = uploads.first.model_type.constantize
|
||||
|
||||
uploader_types = uploads.map(&:uploader).uniq
|
||||
model_types = uploads.map(&:model_type).uniq
|
||||
model_has_mount = mounted_as.nil? || model_class.uploaders[mounted_as] == uploader_class
|
||||
|
||||
raise(SanityCheckError, "Multiple uploaders found: #{uploader_types}") unless uploader_types.count == 1
|
||||
raise(SanityCheckError, "Multiple model types found: #{model_types}") unless model_types.count == 1
|
||||
raise(SanityCheckError, "Mount point #{mounted_as} not found in #{model_class}.") unless model_has_mount
|
||||
end
|
||||
|
||||
def perform(ids, mounted_as, to_store)
|
||||
@mounted_as = mounted_as&.to_sym
|
||||
@to_store = to_store
|
||||
|
||||
uploads = Upload.preload(:model).where(id: ids)
|
||||
|
||||
sanity_check!(uploads)
|
||||
results = migrate(uploads)
|
||||
|
||||
report!(results)
|
||||
rescue SanityCheckError => e
|
||||
# do not retry: the job is insane
|
||||
Rails.logger.warn "#{self.class}: Sanity check error (#{e.message})"
|
||||
end
|
||||
|
||||
def sanity_check!(uploads)
|
||||
self.class.sanity_check!(uploads, @mounted_as)
|
||||
end
|
||||
|
||||
def build_uploaders(uploads)
|
||||
uploads.map { |upload| upload.build_uploader(@mounted_as) }
|
||||
end
|
||||
|
||||
def migrate(uploads)
|
||||
build_uploaders(uploads).map(&method(:process_uploader))
|
||||
end
|
||||
|
||||
def process_uploader(uploader)
|
||||
MigrationResult.new(uploader.upload).tap do |result|
|
||||
begin
|
||||
uploader.migrate!(@to_store)
|
||||
rescue => e
|
||||
result.error = e
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,6 +1,8 @@
|
|||
# @Deprecated - remove once the `object_storage_upload` queue is empty
|
||||
# The queue has been renamed `object_storage:object_storage_background_upload`
|
||||
#
|
||||
class ObjectStorageUploadWorker
|
||||
include Sidekiq::Worker
|
||||
include DedicatedSidekiqQueue
|
||||
include ApplicationWorker
|
||||
|
||||
sidekiq_options retry: 5
|
||||
|
||||
|
@ -16,8 +18,5 @@ class ObjectStorageUploadWorker
|
|||
subject = subject_class.find(subject_id)
|
||||
uploader = subject.public_send(file_field) # rubocop:disable GitlabSecurity/PublicSend
|
||||
uploader.migrate!(ObjectStorage::Store::REMOTE)
|
||||
rescue RecordNotFound
|
||||
# does not retry when the record do not exists
|
||||
Rails.logger.warn("Cannot find subject #{subject_class} with id=#{subject_id}.")
|
||||
end
|
||||
end
|
||||
|
|
30
lib/tasks/gitlab/uploads/check.rake
Normal file
30
lib/tasks/gitlab/uploads/check.rake
Normal file
|
@ -0,0 +1,30 @@
|
|||
require_relative 'helpers.rb'
|
||||
|
||||
namespace :gitlab do
|
||||
namespace :uploads do
|
||||
desc 'GitLab | Uploads | Check integrity of uploaded files'
|
||||
task check: :environment do
|
||||
include UploadTaskHelpers
|
||||
|
||||
puts 'Checking integrity of uploaded files'
|
||||
|
||||
uploads_batches do |batch|
|
||||
batch.each do |upload|
|
||||
begin
|
||||
puts "- Checking file (#{upload.id}): #{upload.absolute_path}".color(:green)
|
||||
|
||||
if upload.exist?
|
||||
check_checksum(upload)
|
||||
else
|
||||
puts " * File does not exist on the file system".color(:red)
|
||||
end
|
||||
rescue ObjectStorage::RemoteStoreError
|
||||
puts "- File (#{upload.id}): File is stored remotely, skipping".color(:yellow)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
puts 'Done!'
|
||||
end
|
||||
end
|
||||
end
|
23
lib/tasks/gitlab/uploads/helpers.rb
Normal file
23
lib/tasks/gitlab/uploads/helpers.rb
Normal file
|
@ -0,0 +1,23 @@
|
|||
module UploadTaskHelpers
|
||||
def batch_size
|
||||
ENV.fetch('BATCH', 200).to_i
|
||||
end
|
||||
|
||||
def calculate_checksum(absolute_path)
|
||||
Digest::SHA256.file(absolute_path).hexdigest
|
||||
end
|
||||
|
||||
def check_checksum(upload)
|
||||
checksum = calculate_checksum(upload.absolute_path)
|
||||
|
||||
if checksum != upload.checksum
|
||||
puts " * File checksum (#{checksum}) does not match the one in the database (#{upload.checksum})".color(:red)
|
||||
end
|
||||
end
|
||||
|
||||
def uploads_batches(&block)
|
||||
Upload.all.in_batches(of: batch_size, start: ENV['ID_FROM'], finish: ENV['ID_TO']) do |relation| # rubocop: disable Cop/InBatches
|
||||
yield relation
|
||||
end
|
||||
end
|
||||
end
|
33
lib/tasks/gitlab/uploads/migrate.rake
Normal file
33
lib/tasks/gitlab/uploads/migrate.rake
Normal file
|
@ -0,0 +1,33 @@
|
|||
namespace :gitlab do
|
||||
namespace :uploads do
|
||||
desc 'GitLab | Uploads | Migrate the uploaded files to object storage'
|
||||
task :migrate, [:uploader_class, :model_class, :mounted_as] => :environment do |task, args|
|
||||
batch_size = ENV.fetch('BATCH', 200).to_i
|
||||
@to_store = ObjectStorage::Store::REMOTE
|
||||
@mounted_as = args.mounted_as&.gsub(':', '')&.to_sym
|
||||
@uploader_class = args.uploader_class.constantize
|
||||
@model_class = args.model_class.constantize
|
||||
|
||||
uploads.each_batch(of: batch_size, &method(:enqueue_batch)) # rubocop: disable Cop/InBatches
|
||||
end
|
||||
|
||||
def enqueue_batch(batch, index)
|
||||
job = ObjectStorage::MigrateUploadsWorker.enqueue!(batch,
|
||||
@mounted_as,
|
||||
@to_store)
|
||||
puts "Enqueued job ##{index}: #{job}"
|
||||
rescue ObjectStorage::MigrateUploadsWorker::SanityCheckError => e
|
||||
# continue for the next batch
|
||||
puts "Could not enqueue batch (#{batch.ids}) #{e.message}".color(:red)
|
||||
end
|
||||
|
||||
def uploads
|
||||
Upload.class_eval { include EachBatch } unless Upload < EachBatch
|
||||
|
||||
Upload
|
||||
.where.not(store: @to_store)
|
||||
.where(uploader: @uploader_class.to_s,
|
||||
model_type: @model_class.base_class.sti_name)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -124,52 +124,3 @@ shared_examples "migrates" do |to_store:, from_store: nil|
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
shared_examples "matches the method pattern" do |method|
|
||||
let(:target) { subject }
|
||||
let(:args) { nil }
|
||||
let(:pattern) { patterns[method] }
|
||||
|
||||
it do
|
||||
return skip "No pattern provided, skipping." unless pattern
|
||||
|
||||
expect(target.method(method).call(*args)).to match(pattern)
|
||||
end
|
||||
end
|
||||
|
||||
shared_examples "builds correct paths" do |**patterns|
|
||||
let(:patterns) { patterns }
|
||||
|
||||
before do
|
||||
allow(subject).to receive(:filename).and_return('<filename>')
|
||||
end
|
||||
|
||||
describe "#store_dir" do
|
||||
it_behaves_like "matches the method pattern", :store_dir
|
||||
end
|
||||
|
||||
describe "#cache_dir" do
|
||||
it_behaves_like "matches the method pattern", :cache_dir
|
||||
end
|
||||
|
||||
describe "#work_dir" do
|
||||
it_behaves_like "matches the method pattern", :work_dir
|
||||
end
|
||||
|
||||
describe "#upload_path" do
|
||||
it_behaves_like "matches the method pattern", :upload_path
|
||||
end
|
||||
|
||||
describe ".absolute_path" do
|
||||
it_behaves_like "matches the method pattern", :absolute_path do
|
||||
let(:target) { subject.class }
|
||||
let(:args) { [upload] }
|
||||
end
|
||||
end
|
||||
|
||||
describe ".base_dir" do
|
||||
it_behaves_like "matches the method pattern", :base_dir do
|
||||
let(:target) { subject.class }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
28
spec/tasks/gitlab/uploads/migrate_rake_spec.rb
Normal file
28
spec/tasks/gitlab/uploads/migrate_rake_spec.rb
Normal file
|
@ -0,0 +1,28 @@
|
|||
require 'rake_helper'
|
||||
|
||||
describe 'gitlab:uploads:migrate rake tasks' do
|
||||
let!(:projects) { create_list(:project, 10, :with_avatar) }
|
||||
let(:model_class) { Project }
|
||||
let(:uploader_class) { AvatarUploader }
|
||||
let(:mounted_as) { :avatar }
|
||||
let(:batch_size) { 3 }
|
||||
|
||||
before do
|
||||
stub_env('BATCH', batch_size.to_s)
|
||||
stub_uploads_object_storage(uploader_class)
|
||||
Rake.application.rake_require 'tasks/gitlab/uploads/migrate'
|
||||
|
||||
allow(ObjectStorage::MigrateUploadsWorker).to receive(:perform_async)
|
||||
end
|
||||
|
||||
def run
|
||||
args = [uploader_class.to_s, model_class.to_s, mounted_as].compact
|
||||
run_rake_task("gitlab:uploads:migrate", *args)
|
||||
end
|
||||
|
||||
it 'enqueue jobs in batch' do
|
||||
expect(ObjectStorage::MigrateUploadsWorker).to receive(:enqueue!).exactly(4).times
|
||||
|
||||
run
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue