Merge branch '39246-fork-and-import-jobs-should-only-be-marked-as-failed-when-the-number-of-retries-was-exhausted' into 'master'
Fork and Import jobs only get marked as failed when the number of Sidekiq retries were exhausted Closes #39246 See merge request gitlab-org/gitlab-ce!15844
This commit is contained in:
commit
0969fdffda
|
@ -0,0 +1,23 @@
|
||||||
|
module ProjectImportOptions
|
||||||
|
extend ActiveSupport::Concern
|
||||||
|
|
||||||
|
included do
|
||||||
|
IMPORT_RETRY_COUNT = 5
|
||||||
|
|
||||||
|
sidekiq_options retry: IMPORT_RETRY_COUNT, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
||||||
|
|
||||||
|
# We only want to mark the project as failed once we exhausted all retries
|
||||||
|
sidekiq_retries_exhausted do |job|
|
||||||
|
project = Project.find(job['args'].first)
|
||||||
|
|
||||||
|
action = if project.forked?
|
||||||
|
"fork"
|
||||||
|
else
|
||||||
|
"import"
|
||||||
|
end
|
||||||
|
|
||||||
|
project.mark_import_as_failed("Every #{action} attempt has failed: #{job['error_message']}. Please try again.")
|
||||||
|
Sidekiq.logger.warn "Failed #{job['class']} with #{job['args']}: #{job['error_message']}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,3 +1,4 @@
|
||||||
|
# Used in EE by mirroring
|
||||||
module ProjectStartImport
|
module ProjectStartImport
|
||||||
def start(project)
|
def start(project)
|
||||||
if project.import_started? && project.import_jid == self.jid
|
if project.import_started? && project.import_jid == self.jid
|
||||||
|
|
|
@ -1,11 +1,8 @@
|
||||||
class RepositoryForkWorker
|
class RepositoryForkWorker
|
||||||
ForkError = Class.new(StandardError)
|
|
||||||
|
|
||||||
include ApplicationWorker
|
include ApplicationWorker
|
||||||
include Gitlab::ShellAdapter
|
include Gitlab::ShellAdapter
|
||||||
include ProjectStartImport
|
include ProjectStartImport
|
||||||
|
include ProjectImportOptions
|
||||||
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
|
||||||
|
|
||||||
def perform(project_id, forked_from_repository_storage_path, source_disk_path)
|
def perform(project_id, forked_from_repository_storage_path, source_disk_path)
|
||||||
project = Project.find(project_id)
|
project = Project.find(project_id)
|
||||||
|
@ -18,20 +15,12 @@ class RepositoryForkWorker
|
||||||
|
|
||||||
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path,
|
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path,
|
||||||
project.repository_storage_path, project.disk_path)
|
project.repository_storage_path, project.disk_path)
|
||||||
raise ForkError, "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result
|
raise "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result
|
||||||
|
|
||||||
project.repository.after_import
|
project.repository.after_import
|
||||||
raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
|
raise "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
|
||||||
|
|
||||||
project.import_finish
|
project.import_finish
|
||||||
rescue ForkError => ex
|
|
||||||
fail_fork(project, ex.message)
|
|
||||||
raise
|
|
||||||
rescue => ex
|
|
||||||
return unless project
|
|
||||||
|
|
||||||
fail_fork(project, ex.message)
|
|
||||||
raise ForkError, "#{ex.class} #{ex.message}"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -42,9 +31,4 @@ class RepositoryForkWorker
|
||||||
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
|
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
|
|
||||||
def fail_fork(project, message)
|
|
||||||
Rails.logger.error(message)
|
|
||||||
project.mark_import_as_failed(message)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,11 +1,8 @@
|
||||||
class RepositoryImportWorker
|
class RepositoryImportWorker
|
||||||
ImportError = Class.new(StandardError)
|
|
||||||
|
|
||||||
include ApplicationWorker
|
include ApplicationWorker
|
||||||
include ExceptionBacktrace
|
include ExceptionBacktrace
|
||||||
include ProjectStartImport
|
include ProjectStartImport
|
||||||
|
include ProjectImportOptions
|
||||||
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
|
||||||
|
|
||||||
def perform(project_id)
|
def perform(project_id)
|
||||||
project = Project.find(project_id)
|
project = Project.find(project_id)
|
||||||
|
@ -23,17 +20,9 @@ class RepositoryImportWorker
|
||||||
# to those importers to mark the import process as complete.
|
# to those importers to mark the import process as complete.
|
||||||
return if service.async?
|
return if service.async?
|
||||||
|
|
||||||
raise ImportError, result[:message] if result[:status] == :error
|
raise result[:message] if result[:status] == :error
|
||||||
|
|
||||||
project.after_import
|
project.after_import
|
||||||
rescue ImportError => ex
|
|
||||||
fail_import(project, ex.message)
|
|
||||||
raise
|
|
||||||
rescue => ex
|
|
||||||
return unless project
|
|
||||||
|
|
||||||
fail_import(project, ex.message)
|
|
||||||
raise ImportError, "#{ex.class} #{ex.message}"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -44,8 +33,4 @@ class RepositoryImportWorker
|
||||||
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
|
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
|
|
||||||
def fail_import(project, message)
|
|
||||||
project.mark_import_as_failed(message)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
title: Only mark import and fork jobs as failed once all Sidekiq retries get exhausted
|
||||||
|
merge_request: 15844
|
||||||
|
author:
|
||||||
|
type: changed
|
|
@ -0,0 +1,40 @@
|
||||||
|
require 'spec_helper'
|
||||||
|
|
||||||
|
describe ProjectImportOptions do
|
||||||
|
let(:project) { create(:project, :import_started) }
|
||||||
|
let(:job) { { 'args' => [project.id, nil, nil], 'jid' => '123' } }
|
||||||
|
let(:worker_class) do
|
||||||
|
Class.new do
|
||||||
|
include Sidekiq::Worker
|
||||||
|
include ProjectImportOptions
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'sets default retry limit' do
|
||||||
|
expect(worker_class.sidekiq_options['retry']).to eq(ProjectImportOptions::IMPORT_RETRY_COUNT)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'sets default status expiration' do
|
||||||
|
expect(worker_class.sidekiq_options['status_expiration']).to eq(StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION)
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '.sidekiq_retries_exhausted' do
|
||||||
|
it 'marks fork as failed' do
|
||||||
|
expect { worker_class.sidekiq_retries_exhausted_block.call(job) }.to change { project.reload.import_status }.from("started").to("failed")
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'logs the appropriate error message for forked projects' do
|
||||||
|
allow_any_instance_of(Project).to receive(:forked?).and_return(true)
|
||||||
|
|
||||||
|
worker_class.sidekiq_retries_exhausted_block.call(job)
|
||||||
|
|
||||||
|
expect(project.reload.import_error).to include("fork")
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'logs the appropriate error message for forked projects' do
|
||||||
|
worker_class.sidekiq_retries_exhausted_block.call(job)
|
||||||
|
|
||||||
|
expect(project.reload.import_error).to include("import")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,17 +1,21 @@
|
||||||
require 'spec_helper'
|
require 'spec_helper'
|
||||||
|
|
||||||
describe RepositoryForkWorker do
|
describe RepositoryForkWorker do
|
||||||
let(:project) { create(:project, :repository) }
|
describe 'modules' do
|
||||||
let(:fork_project) { create(:project, :repository, :import_scheduled, forked_from_project: project) }
|
it 'includes ProjectImportOptions' do
|
||||||
let(:shell) { Gitlab::Shell.new }
|
expect(described_class).to include_module(ProjectImportOptions)
|
||||||
|
end
|
||||||
subject { described_class.new }
|
|
||||||
|
|
||||||
before do
|
|
||||||
allow(subject).to receive(:gitlab_shell).and_return(shell)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "#perform" do
|
describe "#perform" do
|
||||||
|
let(:project) { create(:project, :repository) }
|
||||||
|
let(:fork_project) { create(:project, :repository, :import_scheduled, forked_from_project: project) }
|
||||||
|
let(:shell) { Gitlab::Shell.new }
|
||||||
|
|
||||||
|
before do
|
||||||
|
allow(subject).to receive(:gitlab_shell).and_return(shell)
|
||||||
|
end
|
||||||
|
|
||||||
def perform!
|
def perform!
|
||||||
subject.perform(fork_project.id, '/test/path', project.disk_path)
|
subject.perform(fork_project.id, '/test/path', project.disk_path)
|
||||||
end
|
end
|
||||||
|
@ -60,14 +64,7 @@ describe RepositoryForkWorker do
|
||||||
|
|
||||||
expect_fork_repository.and_return(false)
|
expect_fork_repository.and_return(false)
|
||||||
|
|
||||||
expect { perform! }.to raise_error(RepositoryForkWorker::ForkError, error_message)
|
expect { perform! }.to raise_error(StandardError, error_message)
|
||||||
end
|
|
||||||
|
|
||||||
it 'handles unexpected error' do
|
|
||||||
expect_fork_repository.and_raise(RuntimeError)
|
|
||||||
|
|
||||||
expect { perform! }.to raise_error(RepositoryForkWorker::ForkError)
|
|
||||||
expect(fork_project.reload.import_status).to eq('failed')
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,11 +1,15 @@
|
||||||
require 'spec_helper'
|
require 'spec_helper'
|
||||||
|
|
||||||
describe RepositoryImportWorker do
|
describe RepositoryImportWorker do
|
||||||
let(:project) { create(:project, :import_scheduled) }
|
describe 'modules' do
|
||||||
|
it 'includes ProjectImportOptions' do
|
||||||
subject { described_class.new }
|
expect(described_class).to include_module(ProjectImportOptions)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe '#perform' do
|
describe '#perform' do
|
||||||
|
let(:project) { create(:project, :import_scheduled) }
|
||||||
|
|
||||||
context 'when worker was reset without cleanup' do
|
context 'when worker was reset without cleanup' do
|
||||||
let(:jid) { '12345678' }
|
let(:jid) { '12345678' }
|
||||||
let(:started_project) { create(:project, :import_started, import_jid: jid) }
|
let(:started_project) { create(:project, :import_started, import_jid: jid) }
|
||||||
|
@ -44,22 +48,11 @@ describe RepositoryImportWorker do
|
||||||
|
|
||||||
expect do
|
expect do
|
||||||
subject.perform(project.id)
|
subject.perform(project.id)
|
||||||
end.to raise_error(RepositoryImportWorker::ImportError, error)
|
end.to raise_error(StandardError, error)
|
||||||
expect(project.reload.import_jid).not_to be_nil
|
expect(project.reload.import_jid).not_to be_nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'with unexpected error' do
|
|
||||||
it 'marks import as failed' do
|
|
||||||
allow_any_instance_of(Projects::ImportService).to receive(:execute).and_raise(RuntimeError)
|
|
||||||
|
|
||||||
expect do
|
|
||||||
subject.perform(project.id)
|
|
||||||
end.to raise_error(RepositoryImportWorker::ImportError)
|
|
||||||
expect(project.reload.import_status).to eq('failed')
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when using an asynchronous importer' do
|
context 'when using an asynchronous importer' do
|
||||||
it 'does not mark the import process as finished' do
|
it 'does not mark the import process as finished' do
|
||||||
service = double(:service)
|
service = double(:service)
|
||||||
|
|
Loading…
Reference in New Issue