Merge branch 'use-optimistic-locking' into 'master'
Use optimistic locking ## What does this MR do? Removes the usage of pessimistic locking in favor of optimistic which is way cheaper and doesn't block database operation. Since this is very simple change it should be safe. If we receive `StaleObjectError` message we will reload object a retry operations in lock. However, I still believe that we need this one: https://gitlab.com/gitlab-org/gitlab-ce/merge_requests/7005 as this will reduce a load on Database and FS. This changes a behavior from: ### Pesimistic locking (previous behavior) #### For updating 1. SELECT * FOR UPDATE (other updates wait on this) 2. we update ci_pipeline 3. latest_build_status 4. enqueue: (use: transition :created -> :pending) 5. [state_machine] we are in state created, we can go to pending 6. [state_machine] ci_pipeline.status = created 7. [state_machine] ci_pipeline.save 8. [state_machine] after_transition: (if for success): PipelineSuccessWorker on Sidekiq 9. release DB lock #### If no update is required 1. SELECT * FOR UPDATE (other updates wait on this) 2. we update ci_pipeline 3. latest_build_status 4. we are in pending, we can't transition to pending, because it's forbidden 5. release DB lock ### Optimistic locking (implemented by this MR) #### For updating 1. latest_build_status 2. enqueue: (use `transition :created -> :pending`) 3. [state_machine] we are in state created, we can go to pending 4. [state_machine] ci_pipeline.status = created 5. [state_machine] ci_pipeline.save 6. [state_machine] [save] where(lock_version: ci_pipeline.lock_version).update_all(status: :created, updated_at: Time.now) 7. [state_machine] [save] unless we_updated_row then raise ObjectInconsistentError #### If no update is required 1. we update ci_pipeline 2. latest_build_status 3. we are in pending, we can't transition to pending, because it's forbidden ## Why was this MR needed? We have been seeing a number of problems when we migrated Pipeline/Build processing to Sidekiq. Especially we started seeing a lot of blocking queries. We used a pessimistic locking which doesn't seem to be required. This effectively allows us to fix our issues with blocked queries by using more efficient method of operation. ## What are the relevant issue numbers? Issues: https://gitlab.com/gitlab-com/infrastructure/issues/623 and https://gitlab.com/gitlab-com/infrastructure/issues/584, but also there's a bunch of Merge Requests that try to improve behavior of scheduled jobs. cc @pcarranza @yorickpeterse @stanhu See merge request !7040
This commit is contained in:
commit
d306b0d7c2
14 changed files with 122 additions and 49 deletions
|
@ -91,6 +91,7 @@ Please view this file on the master branch, on stable branches it's out of date.
|
|||
- Updating verbiage on git basics to be more intuitive
|
||||
- Fix project_feature record not generated on project creation
|
||||
- Clarify documentation for Runners API (Gennady Trafimenkov)
|
||||
- Use optimistic locking for pipelines and builds
|
||||
- The instrumentation for Banzai::Renderer has been restored
|
||||
- Change user & group landing page routing from /u/:username to /:username
|
||||
- Added documentation for .gitattributes files
|
||||
|
|
|
@ -30,23 +30,23 @@ module Ci
|
|||
end
|
||||
|
||||
event :run do
|
||||
transition any => :running
|
||||
transition any - [:running] => :running
|
||||
end
|
||||
|
||||
event :skip do
|
||||
transition any => :skipped
|
||||
transition any - [:skipped] => :skipped
|
||||
end
|
||||
|
||||
event :drop do
|
||||
transition any => :failed
|
||||
transition any - [:failed] => :failed
|
||||
end
|
||||
|
||||
event :succeed do
|
||||
transition any => :success
|
||||
transition any - [:success] => :success
|
||||
end
|
||||
|
||||
event :cancel do
|
||||
transition any => :canceled
|
||||
transition any - [:canceled] => :canceled
|
||||
end
|
||||
|
||||
# IMPORTANT
|
||||
|
@ -260,7 +260,7 @@ module Ci
|
|||
end
|
||||
|
||||
def update_status
|
||||
with_lock do
|
||||
Gitlab::OptimisticLocking.retry_lock(self) do
|
||||
case latest_builds_status
|
||||
when 'pending' then enqueue
|
||||
when 'running' then run
|
||||
|
|
|
@ -73,16 +73,16 @@ class CommitStatus < ActiveRecord::Base
|
|||
transition [:created, :pending, :running] => :canceled
|
||||
end
|
||||
|
||||
after_transition created: [:pending, :running] do |commit_status|
|
||||
commit_status.update_attributes queued_at: Time.now
|
||||
before_transition created: [:pending, :running] do |commit_status|
|
||||
commit_status.queued_at = Time.now
|
||||
end
|
||||
|
||||
after_transition [:created, :pending] => :running do |commit_status|
|
||||
commit_status.update_attributes started_at: Time.now
|
||||
before_transition [:created, :pending] => :running do |commit_status|
|
||||
commit_status.started_at = Time.now
|
||||
end
|
||||
|
||||
after_transition any => [:success, :failed, :canceled] do |commit_status|
|
||||
commit_status.update_attributes finished_at: Time.now
|
||||
before_transition any => [:success, :failed, :canceled] do |commit_status|
|
||||
commit_status.finished_at = Time.now
|
||||
end
|
||||
|
||||
after_transition do |commit_status, transition|
|
||||
|
|
|
@ -10,17 +10,14 @@ module Ci
|
|||
create_builds!
|
||||
end
|
||||
|
||||
@pipeline.with_lock do
|
||||
new_builds =
|
||||
stage_indexes_of_created_builds.map do |index|
|
||||
process_stage(index)
|
||||
end
|
||||
new_builds =
|
||||
stage_indexes_of_created_builds.map do |index|
|
||||
process_stage(index)
|
||||
end
|
||||
|
||||
@pipeline.update_status
|
||||
@pipeline.update_status
|
||||
|
||||
# Return a flag if a when builds got enqueued
|
||||
new_builds.flatten.any?
|
||||
end
|
||||
new_builds.flatten.any?
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -32,9 +29,11 @@ module Ci
|
|||
def process_stage(index)
|
||||
current_status = status_for_prior_stages(index)
|
||||
|
||||
created_builds_in_stage(index).select do |build|
|
||||
if HasStatus::COMPLETED_STATUSES.include?(current_status)
|
||||
process_build(build, current_status)
|
||||
if HasStatus::COMPLETED_STATUSES.include?(current_status)
|
||||
created_builds_in_stage(index).select do |build|
|
||||
Gitlab::OptimisticLocking.retry_lock(build) do |subject|
|
||||
process_build(subject, current_status)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -28,17 +28,14 @@ module Ci
|
|||
|
||||
if build
|
||||
# In case when 2 runners try to assign the same build, second runner will be declined
|
||||
# with StateMachines::InvalidTransition in run! method.
|
||||
build.with_lock do
|
||||
build.runner_id = current_runner.id
|
||||
build.save!
|
||||
build.run!
|
||||
end
|
||||
# with StateMachines::InvalidTransition or StaleObjectError when doing run! or save method.
|
||||
build.runner_id = current_runner.id
|
||||
build.run!
|
||||
end
|
||||
|
||||
build
|
||||
|
||||
rescue StateMachines::InvalidTransition
|
||||
rescue StateMachines::InvalidTransition, ActiveRecord::StaleObjectError
|
||||
nil
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
|
||||
# for more information on how to write migrations for GitLab.
|
||||
|
||||
class AddLockVersionToBuildAndPipelines < ActiveRecord::Migration
|
||||
include Gitlab::Database::MigrationHelpers
|
||||
|
||||
# Set this constant to true if this migration requires downtime.
|
||||
DOWNTIME = false
|
||||
|
||||
def change
|
||||
add_column :ci_builds, :lock_version, :integer
|
||||
add_column :ci_commits, :lock_version, :integer
|
||||
end
|
||||
end
|
|
@ -189,6 +189,7 @@ ActiveRecord::Schema.define(version: 20161025231710) do
|
|||
t.text "yaml_variables"
|
||||
t.datetime "queued_at"
|
||||
t.string "token"
|
||||
t.integer "lock_version"
|
||||
end
|
||||
|
||||
add_index "ci_builds", ["commit_id", "stage_idx", "created_at"], name: "index_ci_builds_on_commit_id_and_stage_idx_and_created_at", using: :btree
|
||||
|
@ -219,6 +220,7 @@ ActiveRecord::Schema.define(version: 20161025231710) do
|
|||
t.datetime "finished_at"
|
||||
t.integer "duration"
|
||||
t.integer "user_id"
|
||||
t.integer "lock_version"
|
||||
end
|
||||
|
||||
add_index "ci_commits", ["gl_project_id", "sha"], name: "index_ci_commits_on_gl_project_id_and_sha", using: :btree
|
||||
|
|
19
lib/gitlab/optimistic_locking.rb
Normal file
19
lib/gitlab/optimistic_locking.rb
Normal file
|
@ -0,0 +1,19 @@
|
|||
module Gitlab
|
||||
module OptimisticLocking
|
||||
extend self
|
||||
|
||||
def retry_lock(subject, retries = 100, &block)
|
||||
loop do
|
||||
begin
|
||||
ActiveRecord::Base.transaction do
|
||||
return block.call(subject)
|
||||
end
|
||||
rescue ActiveRecord::StaleObjectError
|
||||
retries -= 1
|
||||
raise unless retries >= 0
|
||||
subject.reload
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -178,6 +178,7 @@ Ci::Pipeline:
|
|||
- finished_at
|
||||
- duration
|
||||
- user_id
|
||||
- lock_version
|
||||
CommitStatus:
|
||||
- id
|
||||
- project_id
|
||||
|
@ -217,6 +218,7 @@ CommitStatus:
|
|||
- yaml_variables
|
||||
- queued_at
|
||||
- token
|
||||
- lock_version
|
||||
Ci::Variable:
|
||||
- id
|
||||
- project_id
|
||||
|
|
39
spec/lib/gitlab/optimistic_locking_spec.rb
Normal file
39
spec/lib/gitlab/optimistic_locking_spec.rb
Normal file
|
@ -0,0 +1,39 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe Gitlab::OptimisticLocking, lib: true do
|
||||
describe '#retry_lock' do
|
||||
let!(:pipeline) { create(:ci_pipeline) }
|
||||
let!(:pipeline2) { Ci::Pipeline.find(pipeline.id) }
|
||||
|
||||
it 'does not reload object if state changes' do
|
||||
expect(pipeline).not_to receive(:reload)
|
||||
expect(pipeline).to receive(:succeed).and_call_original
|
||||
|
||||
described_class.retry_lock(pipeline) do |subject|
|
||||
subject.succeed
|
||||
end
|
||||
end
|
||||
|
||||
it 'retries action if exception is raised' do
|
||||
pipeline.succeed
|
||||
|
||||
expect(pipeline2).to receive(:reload).and_call_original
|
||||
expect(pipeline2).to receive(:drop).twice.and_call_original
|
||||
|
||||
described_class.retry_lock(pipeline2) do |subject|
|
||||
subject.drop
|
||||
end
|
||||
end
|
||||
|
||||
it 'raises exception when too many retries' do
|
||||
expect(pipeline).to receive(:drop).twice.and_call_original
|
||||
|
||||
expect do
|
||||
described_class.retry_lock(pipeline, 1) do |subject|
|
||||
subject.lock_version = 100
|
||||
subject.drop
|
||||
end
|
||||
end.to raise_error(ActiveRecord::StaleObjectError)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -138,32 +138,26 @@ describe Ci::Pipeline, models: true do
|
|||
|
||||
describe 'state machine' do
|
||||
let(:current) { Time.now.change(usec: 0) }
|
||||
let(:build) { create_build('build1', current, 10) }
|
||||
let(:build_b) { create_build('build2', current, 20) }
|
||||
let(:build_c) { create_build('build3', current + 50, 10) }
|
||||
let(:build) { create_build('build1', 0) }
|
||||
let(:build_b) { create_build('build2', 0) }
|
||||
let(:build_c) { create_build('build3', 0) }
|
||||
|
||||
describe '#duration' do
|
||||
before do
|
||||
pipeline.update(created_at: current)
|
||||
|
||||
travel_to(current + 5) do
|
||||
pipeline.run
|
||||
pipeline.save
|
||||
end
|
||||
|
||||
travel_to(current + 30) do
|
||||
build.success
|
||||
build.run!
|
||||
build.success!
|
||||
build_b.run!
|
||||
build_c.run!
|
||||
end
|
||||
|
||||
travel_to(current + 40) do
|
||||
build_b.drop
|
||||
build_b.drop!
|
||||
end
|
||||
|
||||
travel_to(current + 70) do
|
||||
build_c.success
|
||||
build_c.success!
|
||||
end
|
||||
|
||||
pipeline.drop
|
||||
end
|
||||
|
||||
it 'matches sum of builds duration' do
|
||||
|
@ -455,7 +449,9 @@ describe Ci::Pipeline, models: true do
|
|||
context 'when all builds succeed' do
|
||||
before do
|
||||
build_a.success
|
||||
build_b.success
|
||||
|
||||
# We have to reload build_b as this is in next stage and it gets triggered by PipelineProcessWorker
|
||||
build_b.reload.success
|
||||
end
|
||||
|
||||
it 'receives a success event once' do
|
||||
|
|
|
@ -277,6 +277,7 @@ describe API::API, api: true do
|
|||
|
||||
context 'with regular branch' do
|
||||
before do
|
||||
pipeline.reload
|
||||
pipeline.update(ref: 'master',
|
||||
sha: project.commit('master').sha)
|
||||
|
||||
|
@ -288,6 +289,7 @@ describe API::API, api: true do
|
|||
|
||||
context 'with branch name containing slash' do
|
||||
before do
|
||||
pipeline.reload
|
||||
pipeline.update(ref: 'improve/awesome',
|
||||
sha: project.commit('improve/awesome').sha)
|
||||
end
|
||||
|
|
|
@ -101,11 +101,11 @@ module Ci
|
|||
it 'equalises number of running builds' do
|
||||
# after finishing the first build for project 1, get a second build from the same project
|
||||
expect(service.execute(shared_runner)).to eq(build1_project1)
|
||||
build1_project1.success
|
||||
build1_project1.reload.success
|
||||
expect(service.execute(shared_runner)).to eq(build2_project1)
|
||||
|
||||
expect(service.execute(shared_runner)).to eq(build1_project2)
|
||||
build1_project2.success
|
||||
build1_project2.reload.success
|
||||
expect(service.execute(shared_runner)).to eq(build2_project2)
|
||||
expect(service.execute(shared_runner)).to eq(build1_project3)
|
||||
expect(service.execute(shared_runner)).to eq(build3_project1)
|
||||
|
|
|
@ -147,6 +147,7 @@ describe MergeRequests::MergeWhenBuildSucceedsService do
|
|||
expect(MergeWorker).not_to receive(:perform_async)
|
||||
|
||||
build.success
|
||||
test.reload
|
||||
test.drop
|
||||
end
|
||||
|
||||
|
@ -154,6 +155,7 @@ describe MergeRequests::MergeWhenBuildSucceedsService do
|
|||
expect(MergeWorker).to receive(:perform_async)
|
||||
|
||||
build.success
|
||||
test.reload
|
||||
test.success
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue