diff --git a/app/models/commit.rb b/app/models/commit.rb index 9e7fde9503d..176c524cf7b 100644 --- a/app/models/commit.rb +++ b/app/models/commit.rb @@ -48,6 +48,10 @@ class Commit max_lines: DIFF_HARD_LIMIT_LINES, } end + + def from_hash(hash, project) + new(Gitlab::Git::Commit.new(hash), project) + end end attr_accessor :raw diff --git a/app/services/git_push_service.rb b/app/services/git_push_service.rb index 647930d555c..185556c12cc 100644 --- a/app/services/git_push_service.rb +++ b/app/services/git_push_service.rb @@ -135,7 +135,7 @@ class GitPushService < BaseService @push_commits.each do |commit| ProcessCommitWorker. - perform_async(project.id, current_user.id, commit.id, default) + perform_async(project.id, current_user.id, commit.to_hash, default) end end diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb index 071741fbacd..e9a5bd7f24e 100644 --- a/app/workers/process_commit_worker.rb +++ b/app/workers/process_commit_worker.rb @@ -10,9 +10,10 @@ class ProcessCommitWorker # project_id - The ID of the project this commit belongs to. # user_id - The ID of the user that pushed the commit. - # commit_sha - The SHA1 of the commit to process. + # commit_hash - Hash containing commit details to use for constructing a + # Commit object without having to use the Git repository. # default - The data was pushed to the default branch. - def perform(project_id, user_id, commit_sha, default = false) + def perform(project_id, user_id, commit_hash, default = false) project = Project.find_by(id: project_id) return unless project @@ -21,10 +22,7 @@ class ProcessCommitWorker return unless user - commit = find_commit(project, commit_sha) - - return unless commit - + commit = build_commit(project, commit_hash) author = commit.author || user process_commit_message(project, commit, user, author, default) @@ -59,9 +57,18 @@ class ProcessCommitWorker update_all(first_mentioned_in_commit_at: commit.committed_date) end - private + def build_commit(project, hash) + date_suffix = '_date' - def find_commit(project, sha) - project.commit(sha) + # When processing Sidekiq payloads various timestamps are stored as Strings. + # Commit in turn expects Time-like instances upon input, so we have to + # manually parse these values. + hash.each do |key, value| + if key.to_s.end_with?(date_suffix) && value.is_a?(String) + hash[key] = Time.parse(value) + end + end + + Commit.from_hash(hash, project) end end diff --git a/changelogs/unreleased/process-commit-worker-improvements.yml b/changelogs/unreleased/process-commit-worker-improvements.yml new file mode 100644 index 00000000000..0038c6e34e6 --- /dev/null +++ b/changelogs/unreleased/process-commit-worker-improvements.yml @@ -0,0 +1,4 @@ +--- +title: Pass commit data to ProcessCommitWorker to reduce Git overhead +merge_request: 7744 +author: diff --git a/db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb b/db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb new file mode 100644 index 00000000000..453a44e271a --- /dev/null +++ b/db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb @@ -0,0 +1,92 @@ +# See http://doc.gitlab.com/ce/development/migration_style_guide.html +# for more information on how to write migrations for GitLab. + +class MigrateProcessCommitWorkerJobs < ActiveRecord::Migration + include Gitlab::Database::MigrationHelpers + + class Project < ActiveRecord::Base + def self.find_including_path(id) + select("projects.*, CONCAT(namespaces.path, '/', projects.path) AS path_with_namespace"). + joins('INNER JOIN namespaces ON namespaces.id = projects.namespace_id'). + find_by(id: id) + end + + def repository_storage_path + Gitlab.config.repositories.storages[repository_storage] + end + + def repository_path + File.join(repository_storage_path, read_attribute(:path_with_namespace) + '.git') + end + + def repository + @repository ||= Rugged::Repository.new(repository_path) + end + end + + DOWNTIME = true + DOWNTIME_REASON = 'Existing workers will error until they are using a newer version of the code' + + disable_ddl_transaction! + + def up + Sidekiq.redis do |redis| + new_jobs = [] + + while job = redis.lpop('queue:process_commit') + payload = JSON.load(job) + project = Project.find_including_path(payload['args'][0]) + + next unless project + + begin + commit = project.repository.lookup(payload['args'][2]) + rescue Rugged::OdbError + next + end + + hash = { + id: commit.oid, + message: commit.message, + parent_ids: commit.parent_ids, + authored_date: commit.author[:time], + author_name: commit.author[:name], + author_email: commit.author[:email], + committed_date: commit.committer[:time], + committer_email: commit.committer[:email], + committer_name: commit.committer[:name] + } + + payload['args'][2] = hash + + new_jobs << JSON.dump(payload) + end + + redis.multi do |multi| + new_jobs.each do |j| + multi.lpush('queue:process_commit', j) + end + end + end + end + + def down + Sidekiq.redis do |redis| + new_jobs = [] + + while job = redis.lpop('queue:process_commit') + payload = JSON.load(job) + + payload['args'][2] = payload['args'][2]['id'] + + new_jobs << JSON.dump(payload) + end + + redis.multi do |multi| + new_jobs.each do |j| + multi.lpush('queue:process_commit', j) + end + end + end + end +end diff --git a/spec/lib/gitlab/cycle_analytics/events_spec.rb b/spec/lib/gitlab/cycle_analytics/events_spec.rb index 9aeaa6b3ee8..6062e7af4f5 100644 --- a/spec/lib/gitlab/cycle_analytics/events_spec.rb +++ b/spec/lib/gitlab/cycle_analytics/events_spec.rb @@ -321,6 +321,6 @@ describe Gitlab::CycleAnalytics::Events do context.update(milestone: milestone) mr = create_merge_request_closing_issue(context) - ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha) + ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash) end end diff --git a/spec/migrations/migrate_process_commit_worker_jobs_spec.rb b/spec/migrations/migrate_process_commit_worker_jobs_spec.rb new file mode 100644 index 00000000000..52428547a9f --- /dev/null +++ b/spec/migrations/migrate_process_commit_worker_jobs_spec.rb @@ -0,0 +1,194 @@ +require 'spec_helper' +require Rails.root.join('db', 'migrate', '20161124141322_migrate_process_commit_worker_jobs.rb') + +describe MigrateProcessCommitWorkerJobs do + let(:project) { create(:project) } + let(:user) { create(:user) } + let(:commit) { project.commit.raw.raw_commit } + + describe 'Project' do + describe 'find_including_path' do + it 'returns Project instances' do + expect(described_class::Project.find_including_path(project.id)). + to be_an_instance_of(described_class::Project) + end + + it 'selects the full path for every Project' do + migration_project = described_class::Project. + find_including_path(project.id) + + expect(migration_project[:path_with_namespace]). + to eq(project.path_with_namespace) + end + end + + describe '#repository_storage_path' do + it 'returns the storage path for the repository' do + migration_project = described_class::Project. + find_including_path(project.id) + + expect(File.directory?(migration_project.repository_storage_path)). + to eq(true) + end + end + + describe '#repository_path' do + it 'returns the path to the repository' do + migration_project = described_class::Project. + find_including_path(project.id) + + expect(File.directory?(migration_project.repository_path)).to eq(true) + end + end + + describe '#repository' do + it 'returns a Rugged::Repository' do + migration_project = described_class::Project. + find_including_path(project.id) + + expect(migration_project.repository). + to be_an_instance_of(Rugged::Repository) + end + end + end + + describe '#up', :redis do + let(:migration) { described_class.new } + + def job_count + Sidekiq.redis { |r| r.llen('queue:process_commit') } + end + + before do + Sidekiq.redis do |redis| + job = JSON.dump(args: [project.id, user.id, commit.oid]) + redis.lpush('queue:process_commit', job) + end + end + + it 'skips jobs using a project that no longer exists' do + allow(described_class::Project).to receive(:find_including_path). + with(project.id). + and_return(nil) + + migration.up + + expect(job_count).to eq(0) + end + + it 'skips jobs using commits that no longer exist' do + allow_any_instance_of(Rugged::Repository).to receive(:lookup). + with(commit.oid). + and_raise(Rugged::OdbError) + + migration.up + + expect(job_count).to eq(0) + end + + it 'inserts migrated jobs back into the queue' do + migration.up + + expect(job_count).to eq(1) + end + + context 'a migrated job' do + let(:job) do + migration.up + + JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') }) + end + + let(:commit_hash) do + job['args'][2] + end + + it 'includes the project ID' do + expect(job['args'][0]).to eq(project.id) + end + + it 'includes the user ID' do + expect(job['args'][1]).to eq(user.id) + end + + it 'includes the commit ID' do + expect(commit_hash['id']).to eq(commit.oid) + end + + it 'includes the commit message' do + expect(commit_hash['message']).to eq(commit.message) + end + + it 'includes the parent IDs' do + expect(commit_hash['parent_ids']).to eq(commit.parent_ids) + end + + it 'includes the author date' do + expect(commit_hash['authored_date']).to eq(commit.author[:time].to_s) + end + + it 'includes the author name' do + expect(commit_hash['author_name']).to eq(commit.author[:name]) + end + + it 'includes the author Email' do + expect(commit_hash['author_email']).to eq(commit.author[:email]) + end + + it 'includes the commit date' do + expect(commit_hash['committed_date']).to eq(commit.committer[:time].to_s) + end + + it 'includes the committer name' do + expect(commit_hash['committer_name']).to eq(commit.committer[:name]) + end + + it 'includes the committer Email' do + expect(commit_hash['committer_email']).to eq(commit.committer[:email]) + end + end + end + + describe '#down', :redis do + let(:migration) { described_class.new } + + def job_count + Sidekiq.redis { |r| r.llen('queue:process_commit') } + end + + before do + Sidekiq.redis do |redis| + job = JSON.dump(args: [project.id, user.id, commit.oid]) + redis.lpush('queue:process_commit', job) + + migration.up + end + end + + it 'pushes migrated jobs back into the queue' do + migration.down + + expect(job_count).to eq(1) + end + + context 'a migrated job' do + let(:job) do + migration.down + + JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') }) + end + + it 'includes the project ID' do + expect(job['args'][0]).to eq(project.id) + end + + it 'includes the user ID' do + expect(job['args'][1]).to eq(user.id) + end + + it 'includes the commit SHA' do + expect(job['args'][2]).to eq(commit.oid) + end + end + end +end diff --git a/spec/models/commit_spec.rb b/spec/models/commit_spec.rb index e3bb3482d67..7194c20d3bf 100644 --- a/spec/models/commit_spec.rb +++ b/spec/models/commit_spec.rb @@ -302,4 +302,21 @@ eos expect(commit.uri_type('this/path/doesnt/exist')).to be_nil end end + + describe '.from_hash' do + let(:new_commit) { described_class.from_hash(commit.to_hash, project) } + + it 'returns a Commit' do + expect(new_commit).to be_an_instance_of(described_class) + end + + it 'wraps a Gitlab::Git::Commit' do + expect(new_commit.raw).to be_an_instance_of(Gitlab::Git::Commit) + end + + it 'stores the correct commit fields' do + expect(new_commit.id).to eq(commit.id) + expect(new_commit.message).to eq(commit.message) + end + end end diff --git a/spec/requests/projects/cycle_analytics_events_spec.rb b/spec/requests/projects/cycle_analytics_events_spec.rb index 5c90fd9bad9..f5e0fdcda2d 100644 --- a/spec/requests/projects/cycle_analytics_events_spec.rb +++ b/spec/requests/projects/cycle_analytics_events_spec.rb @@ -135,6 +135,6 @@ describe 'cycle analytics events' do merge_merge_requests_closing_issue(issue) - ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha) + ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash) end end diff --git a/spec/services/git_push_service_spec.rb b/spec/services/git_push_service_spec.rb index 9d7702f5c96..e7624e70725 100644 --- a/spec/services/git_push_service_spec.rb +++ b/spec/services/git_push_service_spec.rb @@ -263,7 +263,7 @@ describe GitPushService, services: true do author_email: commit_author.email ) - allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). + allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit). and_return(commit) allow(project.repository).to receive(:commits_between).and_return([commit]) @@ -321,7 +321,7 @@ describe GitPushService, services: true do committed_date: commit_time ) - allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). + allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit). and_return(commit) allow(project.repository).to receive(:commits_between).and_return([commit]) @@ -360,7 +360,7 @@ describe GitPushService, services: true do allow(project.repository).to receive(:commits_between). and_return([closing_commit]) - allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). + allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit). and_return(closing_commit) project.team << [commit_author, :master] diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index ef33c473d38..6ee3307512d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -55,8 +55,12 @@ RSpec.configure do |config| config.around(:each, :redis) do |example| Gitlab::Redis.with(&:flushall) + Sidekiq.redis(&:flushall) + example.run + Gitlab::Redis.with(&:flushall) + Sidekiq.redis(&:flushall) end end diff --git a/spec/workers/process_commit_worker_spec.rb b/spec/workers/process_commit_worker_spec.rb index 3e4fee42240..75c7fc1efd2 100644 --- a/spec/workers/process_commit_worker_spec.rb +++ b/spec/workers/process_commit_worker_spec.rb @@ -11,31 +11,25 @@ describe ProcessCommitWorker do it 'does not process the commit when the project does not exist' do expect(worker).not_to receive(:close_issues) - worker.perform(-1, user.id, commit.id) + worker.perform(-1, user.id, commit.to_hash) end it 'does not process the commit when the user does not exist' do expect(worker).not_to receive(:close_issues) - worker.perform(project.id, -1, commit.id) - end - - it 'does not process the commit when the commit no longer exists' do - expect(worker).not_to receive(:close_issues) - - worker.perform(project.id, user.id, 'this-should-does-not-exist') + worker.perform(project.id, -1, commit.to_hash) end it 'processes the commit message' do expect(worker).to receive(:process_commit_message).and_call_original - worker.perform(project.id, user.id, commit.id) + worker.perform(project.id, user.id, commit.to_hash) end it 'updates the issue metrics' do expect(worker).to receive(:update_issue_metrics).and_call_original - worker.perform(project.id, user.id, commit.id) + worker.perform(project.id, user.id, commit.to_hash) end end @@ -106,4 +100,19 @@ describe ProcessCommitWorker do expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date) end end + + describe '#build_commit' do + it 'returns a Commit' do + commit = worker.build_commit(project, id: '123') + + expect(commit).to be_an_instance_of(Commit) + end + + it 'parses date strings into Time instances' do + commit = worker. + build_commit(project, id: '123', authored_date: Time.now.to_s) + + expect(commit.authored_date).to be_an_instance_of(Time) + end + end end