Merge branch 'process-commit-worker-improvements' into 'master'
Pass commit data to ProcessCommitWorker This changes `ProcessCommitWorker` so that it takes a Hash containing commit data instead of a commit SHA. This means the worker doesn't have to access Git just to process a commit message (and other data it may use). This in turn should solve the problem of ending up with 15 000-something jobs in the `process_commit` queue that take forever to process. See merge request !7744
This commit is contained in:
commit
ec4e7d9a9f
|
@ -48,6 +48,10 @@ class Commit
|
||||||
max_lines: DIFF_HARD_LIMIT_LINES,
|
max_lines: DIFF_HARD_LIMIT_LINES,
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def from_hash(hash, project)
|
||||||
|
new(Gitlab::Git::Commit.new(hash), project)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
attr_accessor :raw
|
attr_accessor :raw
|
||||||
|
|
|
@ -135,7 +135,7 @@ class GitPushService < BaseService
|
||||||
|
|
||||||
@push_commits.each do |commit|
|
@push_commits.each do |commit|
|
||||||
ProcessCommitWorker.
|
ProcessCommitWorker.
|
||||||
perform_async(project.id, current_user.id, commit.id, default)
|
perform_async(project.id, current_user.id, commit.to_hash, default)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -10,9 +10,10 @@ class ProcessCommitWorker
|
||||||
|
|
||||||
# project_id - The ID of the project this commit belongs to.
|
# project_id - The ID of the project this commit belongs to.
|
||||||
# user_id - The ID of the user that pushed the commit.
|
# user_id - The ID of the user that pushed the commit.
|
||||||
# commit_sha - The SHA1 of the commit to process.
|
# commit_hash - Hash containing commit details to use for constructing a
|
||||||
|
# Commit object without having to use the Git repository.
|
||||||
# default - The data was pushed to the default branch.
|
# default - The data was pushed to the default branch.
|
||||||
def perform(project_id, user_id, commit_sha, default = false)
|
def perform(project_id, user_id, commit_hash, default = false)
|
||||||
project = Project.find_by(id: project_id)
|
project = Project.find_by(id: project_id)
|
||||||
|
|
||||||
return unless project
|
return unless project
|
||||||
|
@ -21,10 +22,7 @@ class ProcessCommitWorker
|
||||||
|
|
||||||
return unless user
|
return unless user
|
||||||
|
|
||||||
commit = find_commit(project, commit_sha)
|
commit = build_commit(project, commit_hash)
|
||||||
|
|
||||||
return unless commit
|
|
||||||
|
|
||||||
author = commit.author || user
|
author = commit.author || user
|
||||||
|
|
||||||
process_commit_message(project, commit, user, author, default)
|
process_commit_message(project, commit, user, author, default)
|
||||||
|
@ -59,9 +57,18 @@ class ProcessCommitWorker
|
||||||
update_all(first_mentioned_in_commit_at: commit.committed_date)
|
update_all(first_mentioned_in_commit_at: commit.committed_date)
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
def build_commit(project, hash)
|
||||||
|
date_suffix = '_date'
|
||||||
|
|
||||||
def find_commit(project, sha)
|
# When processing Sidekiq payloads various timestamps are stored as Strings.
|
||||||
project.commit(sha)
|
# Commit in turn expects Time-like instances upon input, so we have to
|
||||||
|
# manually parse these values.
|
||||||
|
hash.each do |key, value|
|
||||||
|
if key.to_s.end_with?(date_suffix) && value.is_a?(String)
|
||||||
|
hash[key] = Time.parse(value)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Commit.from_hash(hash, project)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
title: Pass commit data to ProcessCommitWorker to reduce Git overhead
|
||||||
|
merge_request: 7744
|
||||||
|
author:
|
|
@ -0,0 +1,92 @@
|
||||||
|
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
|
||||||
|
# for more information on how to write migrations for GitLab.
|
||||||
|
|
||||||
|
class MigrateProcessCommitWorkerJobs < ActiveRecord::Migration
|
||||||
|
include Gitlab::Database::MigrationHelpers
|
||||||
|
|
||||||
|
class Project < ActiveRecord::Base
|
||||||
|
def self.find_including_path(id)
|
||||||
|
select("projects.*, CONCAT(namespaces.path, '/', projects.path) AS path_with_namespace").
|
||||||
|
joins('INNER JOIN namespaces ON namespaces.id = projects.namespace_id').
|
||||||
|
find_by(id: id)
|
||||||
|
end
|
||||||
|
|
||||||
|
def repository_storage_path
|
||||||
|
Gitlab.config.repositories.storages[repository_storage]
|
||||||
|
end
|
||||||
|
|
||||||
|
def repository_path
|
||||||
|
File.join(repository_storage_path, read_attribute(:path_with_namespace) + '.git')
|
||||||
|
end
|
||||||
|
|
||||||
|
def repository
|
||||||
|
@repository ||= Rugged::Repository.new(repository_path)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
DOWNTIME = true
|
||||||
|
DOWNTIME_REASON = 'Existing workers will error until they are using a newer version of the code'
|
||||||
|
|
||||||
|
disable_ddl_transaction!
|
||||||
|
|
||||||
|
def up
|
||||||
|
Sidekiq.redis do |redis|
|
||||||
|
new_jobs = []
|
||||||
|
|
||||||
|
while job = redis.lpop('queue:process_commit')
|
||||||
|
payload = JSON.load(job)
|
||||||
|
project = Project.find_including_path(payload['args'][0])
|
||||||
|
|
||||||
|
next unless project
|
||||||
|
|
||||||
|
begin
|
||||||
|
commit = project.repository.lookup(payload['args'][2])
|
||||||
|
rescue Rugged::OdbError
|
||||||
|
next
|
||||||
|
end
|
||||||
|
|
||||||
|
hash = {
|
||||||
|
id: commit.oid,
|
||||||
|
message: commit.message,
|
||||||
|
parent_ids: commit.parent_ids,
|
||||||
|
authored_date: commit.author[:time],
|
||||||
|
author_name: commit.author[:name],
|
||||||
|
author_email: commit.author[:email],
|
||||||
|
committed_date: commit.committer[:time],
|
||||||
|
committer_email: commit.committer[:email],
|
||||||
|
committer_name: commit.committer[:name]
|
||||||
|
}
|
||||||
|
|
||||||
|
payload['args'][2] = hash
|
||||||
|
|
||||||
|
new_jobs << JSON.dump(payload)
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.multi do |multi|
|
||||||
|
new_jobs.each do |j|
|
||||||
|
multi.lpush('queue:process_commit', j)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def down
|
||||||
|
Sidekiq.redis do |redis|
|
||||||
|
new_jobs = []
|
||||||
|
|
||||||
|
while job = redis.lpop('queue:process_commit')
|
||||||
|
payload = JSON.load(job)
|
||||||
|
|
||||||
|
payload['args'][2] = payload['args'][2]['id']
|
||||||
|
|
||||||
|
new_jobs << JSON.dump(payload)
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.multi do |multi|
|
||||||
|
new_jobs.each do |j|
|
||||||
|
multi.lpush('queue:process_commit', j)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -321,6 +321,6 @@ describe Gitlab::CycleAnalytics::Events do
|
||||||
context.update(milestone: milestone)
|
context.update(milestone: milestone)
|
||||||
mr = create_merge_request_closing_issue(context)
|
mr = create_merge_request_closing_issue(context)
|
||||||
|
|
||||||
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha)
|
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,194 @@
|
||||||
|
require 'spec_helper'
|
||||||
|
require Rails.root.join('db', 'migrate', '20161124141322_migrate_process_commit_worker_jobs.rb')
|
||||||
|
|
||||||
|
describe MigrateProcessCommitWorkerJobs do
|
||||||
|
let(:project) { create(:project) }
|
||||||
|
let(:user) { create(:user) }
|
||||||
|
let(:commit) { project.commit.raw.raw_commit }
|
||||||
|
|
||||||
|
describe 'Project' do
|
||||||
|
describe 'find_including_path' do
|
||||||
|
it 'returns Project instances' do
|
||||||
|
expect(described_class::Project.find_including_path(project.id)).
|
||||||
|
to be_an_instance_of(described_class::Project)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'selects the full path for every Project' do
|
||||||
|
migration_project = described_class::Project.
|
||||||
|
find_including_path(project.id)
|
||||||
|
|
||||||
|
expect(migration_project[:path_with_namespace]).
|
||||||
|
to eq(project.path_with_namespace)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#repository_storage_path' do
|
||||||
|
it 'returns the storage path for the repository' do
|
||||||
|
migration_project = described_class::Project.
|
||||||
|
find_including_path(project.id)
|
||||||
|
|
||||||
|
expect(File.directory?(migration_project.repository_storage_path)).
|
||||||
|
to eq(true)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#repository_path' do
|
||||||
|
it 'returns the path to the repository' do
|
||||||
|
migration_project = described_class::Project.
|
||||||
|
find_including_path(project.id)
|
||||||
|
|
||||||
|
expect(File.directory?(migration_project.repository_path)).to eq(true)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#repository' do
|
||||||
|
it 'returns a Rugged::Repository' do
|
||||||
|
migration_project = described_class::Project.
|
||||||
|
find_including_path(project.id)
|
||||||
|
|
||||||
|
expect(migration_project.repository).
|
||||||
|
to be_an_instance_of(Rugged::Repository)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#up', :redis do
|
||||||
|
let(:migration) { described_class.new }
|
||||||
|
|
||||||
|
def job_count
|
||||||
|
Sidekiq.redis { |r| r.llen('queue:process_commit') }
|
||||||
|
end
|
||||||
|
|
||||||
|
before do
|
||||||
|
Sidekiq.redis do |redis|
|
||||||
|
job = JSON.dump(args: [project.id, user.id, commit.oid])
|
||||||
|
redis.lpush('queue:process_commit', job)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'skips jobs using a project that no longer exists' do
|
||||||
|
allow(described_class::Project).to receive(:find_including_path).
|
||||||
|
with(project.id).
|
||||||
|
and_return(nil)
|
||||||
|
|
||||||
|
migration.up
|
||||||
|
|
||||||
|
expect(job_count).to eq(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'skips jobs using commits that no longer exist' do
|
||||||
|
allow_any_instance_of(Rugged::Repository).to receive(:lookup).
|
||||||
|
with(commit.oid).
|
||||||
|
and_raise(Rugged::OdbError)
|
||||||
|
|
||||||
|
migration.up
|
||||||
|
|
||||||
|
expect(job_count).to eq(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'inserts migrated jobs back into the queue' do
|
||||||
|
migration.up
|
||||||
|
|
||||||
|
expect(job_count).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'a migrated job' do
|
||||||
|
let(:job) do
|
||||||
|
migration.up
|
||||||
|
|
||||||
|
JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:commit_hash) do
|
||||||
|
job['args'][2]
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the project ID' do
|
||||||
|
expect(job['args'][0]).to eq(project.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the user ID' do
|
||||||
|
expect(job['args'][1]).to eq(user.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the commit ID' do
|
||||||
|
expect(commit_hash['id']).to eq(commit.oid)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the commit message' do
|
||||||
|
expect(commit_hash['message']).to eq(commit.message)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the parent IDs' do
|
||||||
|
expect(commit_hash['parent_ids']).to eq(commit.parent_ids)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the author date' do
|
||||||
|
expect(commit_hash['authored_date']).to eq(commit.author[:time].to_s)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the author name' do
|
||||||
|
expect(commit_hash['author_name']).to eq(commit.author[:name])
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the author Email' do
|
||||||
|
expect(commit_hash['author_email']).to eq(commit.author[:email])
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the commit date' do
|
||||||
|
expect(commit_hash['committed_date']).to eq(commit.committer[:time].to_s)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the committer name' do
|
||||||
|
expect(commit_hash['committer_name']).to eq(commit.committer[:name])
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the committer Email' do
|
||||||
|
expect(commit_hash['committer_email']).to eq(commit.committer[:email])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#down', :redis do
|
||||||
|
let(:migration) { described_class.new }
|
||||||
|
|
||||||
|
def job_count
|
||||||
|
Sidekiq.redis { |r| r.llen('queue:process_commit') }
|
||||||
|
end
|
||||||
|
|
||||||
|
before do
|
||||||
|
Sidekiq.redis do |redis|
|
||||||
|
job = JSON.dump(args: [project.id, user.id, commit.oid])
|
||||||
|
redis.lpush('queue:process_commit', job)
|
||||||
|
|
||||||
|
migration.up
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'pushes migrated jobs back into the queue' do
|
||||||
|
migration.down
|
||||||
|
|
||||||
|
expect(job_count).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'a migrated job' do
|
||||||
|
let(:job) do
|
||||||
|
migration.down
|
||||||
|
|
||||||
|
JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the project ID' do
|
||||||
|
expect(job['args'][0]).to eq(project.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the user ID' do
|
||||||
|
expect(job['args'][1]).to eq(user.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'includes the commit SHA' do
|
||||||
|
expect(job['args'][2]).to eq(commit.oid)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -302,4 +302,21 @@ eos
|
||||||
expect(commit.uri_type('this/path/doesnt/exist')).to be_nil
|
expect(commit.uri_type('this/path/doesnt/exist')).to be_nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe '.from_hash' do
|
||||||
|
let(:new_commit) { described_class.from_hash(commit.to_hash, project) }
|
||||||
|
|
||||||
|
it 'returns a Commit' do
|
||||||
|
expect(new_commit).to be_an_instance_of(described_class)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'wraps a Gitlab::Git::Commit' do
|
||||||
|
expect(new_commit.raw).to be_an_instance_of(Gitlab::Git::Commit)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'stores the correct commit fields' do
|
||||||
|
expect(new_commit.id).to eq(commit.id)
|
||||||
|
expect(new_commit.message).to eq(commit.message)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -135,6 +135,6 @@ describe 'cycle analytics events' do
|
||||||
|
|
||||||
merge_merge_requests_closing_issue(issue)
|
merge_merge_requests_closing_issue(issue)
|
||||||
|
|
||||||
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha)
|
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -263,7 +263,7 @@ describe GitPushService, services: true do
|
||||||
author_email: commit_author.email
|
author_email: commit_author.email
|
||||||
)
|
)
|
||||||
|
|
||||||
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
|
allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
|
||||||
and_return(commit)
|
and_return(commit)
|
||||||
|
|
||||||
allow(project.repository).to receive(:commits_between).and_return([commit])
|
allow(project.repository).to receive(:commits_between).and_return([commit])
|
||||||
|
@ -321,7 +321,7 @@ describe GitPushService, services: true do
|
||||||
committed_date: commit_time
|
committed_date: commit_time
|
||||||
)
|
)
|
||||||
|
|
||||||
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
|
allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
|
||||||
and_return(commit)
|
and_return(commit)
|
||||||
|
|
||||||
allow(project.repository).to receive(:commits_between).and_return([commit])
|
allow(project.repository).to receive(:commits_between).and_return([commit])
|
||||||
|
@ -360,7 +360,7 @@ describe GitPushService, services: true do
|
||||||
allow(project.repository).to receive(:commits_between).
|
allow(project.repository).to receive(:commits_between).
|
||||||
and_return([closing_commit])
|
and_return([closing_commit])
|
||||||
|
|
||||||
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
|
allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
|
||||||
and_return(closing_commit)
|
and_return(closing_commit)
|
||||||
|
|
||||||
project.team << [commit_author, :master]
|
project.team << [commit_author, :master]
|
||||||
|
|
|
@ -55,8 +55,12 @@ RSpec.configure do |config|
|
||||||
|
|
||||||
config.around(:each, :redis) do |example|
|
config.around(:each, :redis) do |example|
|
||||||
Gitlab::Redis.with(&:flushall)
|
Gitlab::Redis.with(&:flushall)
|
||||||
|
Sidekiq.redis(&:flushall)
|
||||||
|
|
||||||
example.run
|
example.run
|
||||||
|
|
||||||
Gitlab::Redis.with(&:flushall)
|
Gitlab::Redis.with(&:flushall)
|
||||||
|
Sidekiq.redis(&:flushall)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -11,31 +11,25 @@ describe ProcessCommitWorker do
|
||||||
it 'does not process the commit when the project does not exist' do
|
it 'does not process the commit when the project does not exist' do
|
||||||
expect(worker).not_to receive(:close_issues)
|
expect(worker).not_to receive(:close_issues)
|
||||||
|
|
||||||
worker.perform(-1, user.id, commit.id)
|
worker.perform(-1, user.id, commit.to_hash)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'does not process the commit when the user does not exist' do
|
it 'does not process the commit when the user does not exist' do
|
||||||
expect(worker).not_to receive(:close_issues)
|
expect(worker).not_to receive(:close_issues)
|
||||||
|
|
||||||
worker.perform(project.id, -1, commit.id)
|
worker.perform(project.id, -1, commit.to_hash)
|
||||||
end
|
|
||||||
|
|
||||||
it 'does not process the commit when the commit no longer exists' do
|
|
||||||
expect(worker).not_to receive(:close_issues)
|
|
||||||
|
|
||||||
worker.perform(project.id, user.id, 'this-should-does-not-exist')
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'processes the commit message' do
|
it 'processes the commit message' do
|
||||||
expect(worker).to receive(:process_commit_message).and_call_original
|
expect(worker).to receive(:process_commit_message).and_call_original
|
||||||
|
|
||||||
worker.perform(project.id, user.id, commit.id)
|
worker.perform(project.id, user.id, commit.to_hash)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'updates the issue metrics' do
|
it 'updates the issue metrics' do
|
||||||
expect(worker).to receive(:update_issue_metrics).and_call_original
|
expect(worker).to receive(:update_issue_metrics).and_call_original
|
||||||
|
|
||||||
worker.perform(project.id, user.id, commit.id)
|
worker.perform(project.id, user.id, commit.to_hash)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -106,4 +100,19 @@ describe ProcessCommitWorker do
|
||||||
expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date)
|
expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe '#build_commit' do
|
||||||
|
it 'returns a Commit' do
|
||||||
|
commit = worker.build_commit(project, id: '123')
|
||||||
|
|
||||||
|
expect(commit).to be_an_instance_of(Commit)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'parses date strings into Time instances' do
|
||||||
|
commit = worker.
|
||||||
|
build_commit(project, id: '123', authored_date: Time.now.to_s)
|
||||||
|
|
||||||
|
expect(commit.authored_date).to be_an_instance_of(Time)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue