Fork and Import jobs only get marked as failed when the number of Sidekiq retries were exhausted
This commit is contained in:
parent
fb47f2a745
commit
558c971e31
8 changed files with 95 additions and 67 deletions
23
app/workers/concerns/project_import_options.rb
Normal file
23
app/workers/concerns/project_import_options.rb
Normal file
|
@ -0,0 +1,23 @@
|
|||
module ProjectImportOptions
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
IMPORT_RETRY_COUNT = 5
|
||||
|
||||
sidekiq_options retry: IMPORT_RETRY_COUNT, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
||||
|
||||
# We only want to mark the project as failed once we exhausted all retries
|
||||
sidekiq_retries_exhausted do |job|
|
||||
project = Project.find(job['args'].first)
|
||||
|
||||
action = if project.forked?
|
||||
"fork"
|
||||
else
|
||||
"import"
|
||||
end
|
||||
|
||||
project.mark_import_as_failed("Every #{action} attempt has failed: #{job['error_message']}. Please try again.")
|
||||
Sidekiq.logger.warn "Failed #{job['class']} with #{job['args']}: #{job['error_message']}"
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,3 +1,4 @@
|
|||
# Used in EE by mirroring
|
||||
module ProjectStartImport
|
||||
def start(project)
|
||||
if project.import_started? && project.import_jid == self.jid
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
class RepositoryForkWorker
|
||||
ForkError = Class.new(StandardError)
|
||||
|
||||
include ApplicationWorker
|
||||
include Gitlab::ShellAdapter
|
||||
include ProjectStartImport
|
||||
|
||||
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
||||
include ProjectImportOptions
|
||||
|
||||
def perform(project_id, forked_from_repository_storage_path, source_disk_path)
|
||||
project = Project.find(project_id)
|
||||
|
@ -18,20 +15,12 @@ class RepositoryForkWorker
|
|||
|
||||
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path,
|
||||
project.repository_storage_path, project.disk_path)
|
||||
raise ForkError, "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result
|
||||
raise "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result
|
||||
|
||||
project.repository.after_import
|
||||
raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
|
||||
raise "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
|
||||
|
||||
project.import_finish
|
||||
rescue ForkError => ex
|
||||
fail_fork(project, ex.message)
|
||||
raise
|
||||
rescue => ex
|
||||
return unless project
|
||||
|
||||
fail_fork(project, ex.message)
|
||||
raise ForkError, "#{ex.class} #{ex.message}"
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -42,9 +31,4 @@ class RepositoryForkWorker
|
|||
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
|
||||
false
|
||||
end
|
||||
|
||||
def fail_fork(project, message)
|
||||
Rails.logger.error(message)
|
||||
project.mark_import_as_failed(message)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
class RepositoryImportWorker
|
||||
ImportError = Class.new(StandardError)
|
||||
|
||||
include ApplicationWorker
|
||||
include ExceptionBacktrace
|
||||
include ProjectStartImport
|
||||
|
||||
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
||||
include ProjectImportOptions
|
||||
|
||||
def perform(project_id)
|
||||
project = Project.find(project_id)
|
||||
|
@ -23,17 +20,9 @@ class RepositoryImportWorker
|
|||
# to those importers to mark the import process as complete.
|
||||
return if service.async?
|
||||
|
||||
raise ImportError, result[:message] if result[:status] == :error
|
||||
raise result[:message] if result[:status] == :error
|
||||
|
||||
project.after_import
|
||||
rescue ImportError => ex
|
||||
fail_import(project, ex.message)
|
||||
raise
|
||||
rescue => ex
|
||||
return unless project
|
||||
|
||||
fail_import(project, ex.message)
|
||||
raise ImportError, "#{ex.class} #{ex.message}"
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -44,8 +33,4 @@ class RepositoryImportWorker
|
|||
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
|
||||
false
|
||||
end
|
||||
|
||||
def fail_import(project, message)
|
||||
project.mark_import_as_failed(message)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
title: Only mark import and fork jobs as failed once all Sidekiq retries get exhausted
|
||||
merge_request: 15844
|
||||
author:
|
||||
type: changed
|
40
spec/workers/concerns/project_import_options_spec.rb
Normal file
40
spec/workers/concerns/project_import_options_spec.rb
Normal file
|
@ -0,0 +1,40 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe ProjectImportOptions do
|
||||
let(:project) { create(:project, :import_started) }
|
||||
let(:job) { { 'args' => [project.id, nil, nil], 'jid' => '123' } }
|
||||
let(:worker_class) do
|
||||
Class.new do
|
||||
include Sidekiq::Worker
|
||||
include ProjectImportOptions
|
||||
end
|
||||
end
|
||||
|
||||
it 'sets default retry limit' do
|
||||
expect(worker_class.sidekiq_options['retry']).to eq(ProjectImportOptions::IMPORT_RETRY_COUNT)
|
||||
end
|
||||
|
||||
it 'sets default status expiration' do
|
||||
expect(worker_class.sidekiq_options['status_expiration']).to eq(StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION)
|
||||
end
|
||||
|
||||
describe '.sidekiq_retries_exhausted' do
|
||||
it 'marks fork as failed' do
|
||||
expect { worker_class.sidekiq_retries_exhausted_block.call(job) }.to change { project.reload.import_status }.from("started").to("failed")
|
||||
end
|
||||
|
||||
it 'logs the appropriate error message for forked projects' do
|
||||
allow_any_instance_of(Project).to receive(:forked?).and_return(true)
|
||||
|
||||
worker_class.sidekiq_retries_exhausted_block.call(job)
|
||||
|
||||
expect(project.reload.import_error).to include("fork")
|
||||
end
|
||||
|
||||
it 'logs the appropriate error message for forked projects' do
|
||||
worker_class.sidekiq_retries_exhausted_block.call(job)
|
||||
|
||||
expect(project.reload.import_error).to include("import")
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,17 +1,21 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe RepositoryForkWorker do
|
||||
let(:project) { create(:project, :repository) }
|
||||
let(:fork_project) { create(:project, :repository, :import_scheduled, forked_from_project: project) }
|
||||
let(:shell) { Gitlab::Shell.new }
|
||||
|
||||
subject { described_class.new }
|
||||
|
||||
before do
|
||||
allow(subject).to receive(:gitlab_shell).and_return(shell)
|
||||
describe 'modules' do
|
||||
it 'includes ProjectImportOptions' do
|
||||
expect(described_class).to include_module(ProjectImportOptions)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#perform" do
|
||||
let(:project) { create(:project, :repository) }
|
||||
let(:fork_project) { create(:project, :repository, :import_scheduled, forked_from_project: project) }
|
||||
let(:shell) { Gitlab::Shell.new }
|
||||
|
||||
before do
|
||||
allow(subject).to receive(:gitlab_shell).and_return(shell)
|
||||
end
|
||||
|
||||
def perform!
|
||||
subject.perform(fork_project.id, '/test/path', project.disk_path)
|
||||
end
|
||||
|
@ -60,14 +64,7 @@ describe RepositoryForkWorker do
|
|||
|
||||
expect_fork_repository.and_return(false)
|
||||
|
||||
expect { perform! }.to raise_error(RepositoryForkWorker::ForkError, error_message)
|
||||
end
|
||||
|
||||
it 'handles unexpected error' do
|
||||
expect_fork_repository.and_raise(RuntimeError)
|
||||
|
||||
expect { perform! }.to raise_error(RepositoryForkWorker::ForkError)
|
||||
expect(fork_project.reload.import_status).to eq('failed')
|
||||
expect { perform! }.to raise_error(StandardError, error_message)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe RepositoryImportWorker do
|
||||
let(:project) { create(:project, :import_scheduled) }
|
||||
|
||||
subject { described_class.new }
|
||||
describe 'modules' do
|
||||
it 'includes ProjectImportOptions' do
|
||||
expect(described_class).to include_module(ProjectImportOptions)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#perform' do
|
||||
let(:project) { create(:project, :import_scheduled) }
|
||||
|
||||
context 'when worker was reset without cleanup' do
|
||||
let(:jid) { '12345678' }
|
||||
let(:started_project) { create(:project, :import_started, import_jid: jid) }
|
||||
|
@ -44,22 +48,11 @@ describe RepositoryImportWorker do
|
|||
|
||||
expect do
|
||||
subject.perform(project.id)
|
||||
end.to raise_error(RepositoryImportWorker::ImportError, error)
|
||||
end.to raise_error(StandardError, error)
|
||||
expect(project.reload.import_jid).not_to be_nil
|
||||
end
|
||||
end
|
||||
|
||||
context 'with unexpected error' do
|
||||
it 'marks import as failed' do
|
||||
allow_any_instance_of(Projects::ImportService).to receive(:execute).and_raise(RuntimeError)
|
||||
|
||||
expect do
|
||||
subject.perform(project.id)
|
||||
end.to raise_error(RepositoryImportWorker::ImportError)
|
||||
expect(project.reload.import_status).to eq('failed')
|
||||
end
|
||||
end
|
||||
|
||||
context 'when using an asynchronous importer' do
|
||||
it 'does not mark the import process as finished' do
|
||||
service = double(:service)
|
||||
|
|
Loading…
Reference in a new issue