Merge branch 'ee-2628-backport-to-ce' into 'master'
Backports EE mirror stuck handling feature… See merge request !13627
This commit is contained in:
commit
91d3e6d901
2
Gemfile
2
Gemfile
|
@ -144,8 +144,6 @@ end
|
|||
|
||||
# State machine
|
||||
gem 'state_machines-activerecord', '~> 0.4.0'
|
||||
# Run events after state machine commits
|
||||
gem 'after_commit_queue', '~> 1.3.0'
|
||||
|
||||
# Issue tags
|
||||
gem 'acts-as-taggable-on', '~> 4.0'
|
||||
|
|
|
@ -46,8 +46,6 @@ GEM
|
|||
ice_nine (~> 0.11.0)
|
||||
memoizable (~> 0.4.0)
|
||||
addressable (2.3.8)
|
||||
after_commit_queue (1.3.0)
|
||||
activerecord (>= 3.0)
|
||||
akismet (2.0.0)
|
||||
allocations (1.0.5)
|
||||
arel (6.0.4)
|
||||
|
@ -960,7 +958,6 @@ DEPENDENCIES
|
|||
activerecord_sane_schema_dumper (= 0.2)
|
||||
acts-as-taggable-on (~> 4.0)
|
||||
addressable (~> 2.3.8)
|
||||
after_commit_queue (~> 1.3.0)
|
||||
akismet (~> 2.0)
|
||||
allocations (~> 1.0)
|
||||
asana (~> 0.6.0)
|
||||
|
|
|
@ -369,7 +369,10 @@ class Project < ActiveRecord::Base
|
|||
state :failed
|
||||
|
||||
after_transition [:none, :finished, :failed] => :scheduled do |project, _|
|
||||
project.run_after_commit { add_import_job }
|
||||
project.run_after_commit do
|
||||
job_id = add_import_job
|
||||
update(import_jid: job_id) if job_id
|
||||
end
|
||||
end
|
||||
|
||||
after_transition started: :finished do |project, _|
|
||||
|
@ -524,17 +527,26 @@ class Project < ActiveRecord::Base
|
|||
def add_import_job
|
||||
job_id =
|
||||
if forked?
|
||||
RepositoryForkWorker.perform_async(id, forked_from_project.repository_storage_path,
|
||||
forked_from_project.full_path,
|
||||
self.namespace.full_path)
|
||||
RepositoryForkWorker.perform_async(id,
|
||||
forked_from_project.repository_storage_path,
|
||||
forked_from_project.full_path,
|
||||
self.namespace.full_path)
|
||||
else
|
||||
RepositoryImportWorker.perform_async(self.id)
|
||||
end
|
||||
|
||||
log_import_activity(job_id)
|
||||
|
||||
job_id
|
||||
end
|
||||
|
||||
def log_import_activity(job_id, type: :import)
|
||||
job_type = type.to_s.capitalize
|
||||
|
||||
if job_id
|
||||
Rails.logger.info "Import job started for #{full_path} with job ID #{job_id}"
|
||||
Rails.logger.info("#{job_type} job scheduled for #{full_path} with job ID #{job_id}.")
|
||||
else
|
||||
Rails.logger.error "Import job failed to start for #{full_path}"
|
||||
Rails.logger.error("#{job_type} job failed to create for #{full_path}.")
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -543,6 +555,7 @@ class Project < ActiveRecord::Base
|
|||
ProjectCacheWorker.perform_async(self.id)
|
||||
end
|
||||
|
||||
update(import_error: nil)
|
||||
remove_import_data
|
||||
end
|
||||
|
||||
|
|
|
@ -5,14 +5,17 @@ class RepositoryForkWorker
|
|||
include Gitlab::ShellAdapter
|
||||
include DedicatedSidekiqQueue
|
||||
|
||||
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
||||
|
||||
def perform(project_id, forked_from_repository_storage_path, source_path, target_path)
|
||||
project = Project.find(project_id)
|
||||
|
||||
return unless start_fork(project)
|
||||
|
||||
Gitlab::Metrics.add_event(:fork_repository,
|
||||
source_path: source_path,
|
||||
target_path: target_path)
|
||||
|
||||
project = Project.find(project_id)
|
||||
project.import_start
|
||||
|
||||
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path,
|
||||
project.repository_storage_path, target_path)
|
||||
raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result
|
||||
|
@ -33,6 +36,13 @@ class RepositoryForkWorker
|
|||
|
||||
private
|
||||
|
||||
def start_fork(project)
|
||||
return true if project.import_start
|
||||
|
||||
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
|
||||
false
|
||||
end
|
||||
|
||||
def fail_fork(project, message)
|
||||
Rails.logger.error(message)
|
||||
project.mark_import_as_failed(message)
|
||||
|
|
|
@ -4,23 +4,18 @@ class RepositoryImportWorker
|
|||
include Sidekiq::Worker
|
||||
include DedicatedSidekiqQueue
|
||||
|
||||
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION
|
||||
|
||||
attr_accessor :project, :current_user
|
||||
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
|
||||
|
||||
def perform(project_id)
|
||||
@project = Project.find(project_id)
|
||||
@current_user = @project.creator
|
||||
project = Project.find(project_id)
|
||||
|
||||
project.import_start
|
||||
return unless start_import(project)
|
||||
|
||||
Gitlab::Metrics.add_event(:import_repository,
|
||||
import_url: @project.import_url,
|
||||
path: @project.full_path)
|
||||
import_url: project.import_url,
|
||||
path: project.full_path)
|
||||
|
||||
project.update_columns(import_jid: self.jid, import_error: nil)
|
||||
|
||||
result = Projects::ImportService.new(project, current_user).execute
|
||||
result = Projects::ImportService.new(project, project.creator).execute
|
||||
raise ImportError, result[:message] if result[:status] == :error
|
||||
|
||||
project.repository.after_import
|
||||
|
@ -37,6 +32,13 @@ class RepositoryImportWorker
|
|||
|
||||
private
|
||||
|
||||
def start_import(project)
|
||||
return true if project.import_start
|
||||
|
||||
Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
|
||||
false
|
||||
end
|
||||
|
||||
def fail_import(project, message)
|
||||
project.mark_import_as_failed(message)
|
||||
end
|
||||
|
|
|
@ -2,36 +2,60 @@ class StuckImportJobsWorker
|
|||
include Sidekiq::Worker
|
||||
include CronjobQueue
|
||||
|
||||
IMPORT_EXPIRATION = 15.hours.to_i
|
||||
IMPORT_JOBS_EXPIRATION = 15.hours.to_i
|
||||
|
||||
def perform
|
||||
stuck_projects.find_in_batches(batch_size: 500) do |group|
|
||||
jids = group.map(&:import_jid)
|
||||
projects_without_jid_count = mark_projects_without_jid_as_failed!
|
||||
projects_with_jid_count = mark_projects_with_jid_as_failed!
|
||||
|
||||
# Find the jobs that aren't currently running or that exceeded the threshold.
|
||||
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids)
|
||||
|
||||
if completed_jids.any?
|
||||
completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id)
|
||||
|
||||
fail_batch!(completed_jids, completed_ids)
|
||||
end
|
||||
end
|
||||
Gitlab::Metrics.add_event(:stuck_import_jobs,
|
||||
projects_without_jid_count: projects_without_jid_count,
|
||||
projects_with_jid_count: projects_with_jid_count)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def stuck_projects
|
||||
Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil)
|
||||
def mark_projects_without_jid_as_failed!
|
||||
started_projects_without_jid.each do |project|
|
||||
project.mark_import_as_failed(error_message)
|
||||
end.count
|
||||
end
|
||||
|
||||
def fail_batch!(completed_jids, completed_ids)
|
||||
Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message)
|
||||
def mark_projects_with_jid_as_failed!
|
||||
completed_jids_count = 0
|
||||
|
||||
Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}")
|
||||
started_projects_with_jid.find_in_batches(batch_size: 500) do |group|
|
||||
jids = group.map(&:import_jid)
|
||||
|
||||
# Find the jobs that aren't currently running or that exceeded the threshold.
|
||||
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids).to_set
|
||||
|
||||
if completed_jids.any?
|
||||
completed_jids_count += completed_jids.count
|
||||
group.each do |project|
|
||||
project.mark_import_as_failed(error_message) if completed_jids.include?(project.import_jid)
|
||||
end
|
||||
|
||||
Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.to_a.join(', ')}")
|
||||
end
|
||||
end
|
||||
|
||||
completed_jids_count
|
||||
end
|
||||
|
||||
def started_projects
|
||||
Project.with_import_status(:started)
|
||||
end
|
||||
|
||||
def started_projects_with_jid
|
||||
started_projects.where.not(import_jid: nil)
|
||||
end
|
||||
|
||||
def started_projects_without_jid
|
||||
started_projects.where(import_jid: nil)
|
||||
end
|
||||
|
||||
def error_message
|
||||
"Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds"
|
||||
"Import timed out. Import took longer than #{IMPORT_JOBS_EXPIRATION} seconds"
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
module AfterCommitQueue
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
after_commit :_run_after_commit_queue
|
||||
after_rollback :_clear_after_commit_queue
|
||||
end
|
||||
|
||||
def run_after_commit(method = nil, &block)
|
||||
_after_commit_queue << proc { self.send(method) } if method
|
||||
_after_commit_queue << block if block
|
||||
true
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def _run_after_commit_queue
|
||||
while action = _after_commit_queue.pop
|
||||
self.instance_eval(&action)
|
||||
end
|
||||
end
|
||||
|
||||
def _after_commit_queue
|
||||
@after_commit_queue ||= []
|
||||
end
|
||||
|
||||
def _clear_after_commit_queue
|
||||
_after_commit_queue.clear
|
||||
end
|
||||
end
|
|
@ -90,9 +90,14 @@ module Gitlab
|
|||
#
|
||||
# Returns an array of completed JIDs
|
||||
def self.completed_jids(job_ids)
|
||||
Sidekiq.redis do |redis|
|
||||
job_ids.reject { |jid| redis.exists(key_for(jid)) }
|
||||
statuses = job_status(job_ids)
|
||||
|
||||
completed = []
|
||||
job_ids.zip(statuses).each do |job_id, status|
|
||||
completed << job_id unless status
|
||||
end
|
||||
|
||||
completed
|
||||
end
|
||||
|
||||
def self.key_for(jid)
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe AfterCommitQueue do
|
||||
it 'runs after transaction is committed' do
|
||||
called = false
|
||||
test_proc = proc { called = true }
|
||||
|
||||
project = build(:project)
|
||||
project.run_after_commit(&test_proc)
|
||||
|
||||
project.save
|
||||
|
||||
expect(called).to be true
|
||||
end
|
||||
end
|
|
@ -1610,8 +1610,7 @@ describe Project do
|
|||
it 'imports a project' do
|
||||
expect_any_instance_of(RepositoryImportWorker).to receive(:perform).and_call_original
|
||||
|
||||
project.import_schedule
|
||||
|
||||
expect { project.import_schedule }.to change { project.import_jid }
|
||||
expect(project.reload.import_status).to eq('finished')
|
||||
end
|
||||
end
|
||||
|
@ -1624,6 +1623,13 @@ describe Project do
|
|||
allow(Projects::HousekeepingService).to receive(:new) { housekeeping_service }
|
||||
end
|
||||
|
||||
it 'resets project import_error' do
|
||||
error_message = 'Some error'
|
||||
mirror = create(:project_empty_repo, :import_started, import_error: error_message)
|
||||
|
||||
expect { mirror.import_finish }.to change { mirror.import_error }.from(error_message).to(nil)
|
||||
end
|
||||
|
||||
it 'performs housekeeping when an import of a fresh project is completed' do
|
||||
project = create(:project_empty_repo, :import_started, import_type: :github)
|
||||
|
||||
|
@ -1730,17 +1736,21 @@ describe Project do
|
|||
end
|
||||
|
||||
describe '#add_import_job' do
|
||||
let(:import_jid) { '123' }
|
||||
|
||||
context 'forked' do
|
||||
let(:forked_project_link) { create(:forked_project_link, :forked_to_empty_project) }
|
||||
let(:forked_from_project) { forked_project_link.forked_from_project }
|
||||
let(:project) { forked_project_link.forked_to_project }
|
||||
|
||||
it 'schedules a RepositoryForkWorker job' do
|
||||
expect(RepositoryForkWorker).to receive(:perform_async)
|
||||
.with(project.id, forked_from_project.repository_storage_path,
|
||||
forked_from_project.disk_path, project.namespace.full_path)
|
||||
expect(RepositoryForkWorker).to receive(:perform_async).with(
|
||||
project.id,
|
||||
forked_from_project.repository_storage_path,
|
||||
forked_from_project.disk_path,
|
||||
project.namespace.full_path).and_return(import_jid)
|
||||
|
||||
project.add_import_job
|
||||
expect(project.add_import_job).to eq(import_jid)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -1748,9 +1758,8 @@ describe Project do
|
|||
it 'schedules a RepositoryImportWorker job' do
|
||||
project = create(:project, import_url: generate(:url))
|
||||
|
||||
expect(RepositoryImportWorker).to receive(:perform_async).with(project.id)
|
||||
|
||||
project.add_import_job
|
||||
expect(RepositoryImportWorker).to receive(:perform_async).with(project.id).and_return(import_jid)
|
||||
expect(project.add_import_job).to eq(import_jid)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -22,8 +22,8 @@ describe RepositoryImportWorker do
|
|||
it 'hide the credentials that were used in the import URL' do
|
||||
error = %q{remote: Not Found fatal: repository 'https://user:pass@test.com/root/repoC.git/' not found }
|
||||
|
||||
project.update_attributes(import_jid: '123')
|
||||
expect_any_instance_of(Projects::ImportService).to receive(:execute).and_return({ status: :error, message: error })
|
||||
allow(subject).to receive(:jid).and_return('123')
|
||||
|
||||
expect do
|
||||
subject.perform(project.id)
|
||||
|
|
|
@ -8,29 +8,29 @@ describe StuckImportJobsWorker do
|
|||
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(exclusive_lease_uuid)
|
||||
end
|
||||
|
||||
describe 'long running import' do
|
||||
let(:project) { create(:project, import_jid: '123', import_status: 'started') }
|
||||
describe 'with started import_status' do
|
||||
let(:project) { create(:project, :import_started, import_jid: '123') }
|
||||
|
||||
before do
|
||||
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123'])
|
||||
describe 'long running import' do
|
||||
it 'marks the project as failed' do
|
||||
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123'])
|
||||
|
||||
expect { worker.perform }.to change { project.reload.import_status }.to('failed')
|
||||
end
|
||||
end
|
||||
|
||||
it 'marks the project as failed' do
|
||||
expect { worker.perform }.to change { project.reload.import_status }.to('failed')
|
||||
end
|
||||
end
|
||||
describe 'running import' do
|
||||
it 'does not mark the project as failed' do
|
||||
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
|
||||
|
||||
describe 'running import' do
|
||||
let(:project) { create(:project, import_jid: '123', import_status: 'started') }
|
||||
expect { worker.perform }.not_to change { project.reload.import_status }
|
||||
end
|
||||
|
||||
before do
|
||||
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
|
||||
end
|
||||
|
||||
it 'does not mark the project as failed' do
|
||||
worker.perform
|
||||
|
||||
expect(project.reload.import_status).to eq('started')
|
||||
describe 'import without import_jid' do
|
||||
it 'marks the project as failed' do
|
||||
expect { worker.perform }.to change { project.reload.import_status }.to('failed')
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue