Merge branch 'lock-trace-writes' into 'master'

Lock writes to trace stream

Closes #51502

See merge request gitlab-org/gitlab-ce!23332
This commit is contained in:
Stan Hu 2018-11-27 20:56:40 +00:00
commit 6c83c2d8b9
11 changed files with 104 additions and 63 deletions

View file

@ -76,7 +76,7 @@ module Ci
raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0
raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize)
in_lock(*lock_params) do # Write opetation is atomic
in_lock(*lock_params) do # Write operation is atomic
unsafe_set_data!(data.byteslice(0, offset) + new_data)
end
@ -100,7 +100,7 @@ module Ci
end
def persist_data!
in_lock(*lock_params) do # Write opetation is atomic
in_lock(*lock_params) do # Write operation is atomic
unsafe_persist_to!(self.class.persistable_store)
end
end

View file

@ -0,0 +1,5 @@
---
title: Lock writes to trace stream
merge_request:
author:
type: fixed

View file

@ -50,6 +50,10 @@ module API
rack_response({ 'message' => '404 Not found' }.to_json, 404)
end
rescue_from ::Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError do
rack_response({ 'message' => '409 Conflict: Resource lock' }.to_json, 409)
end
rescue_from UploadedFile::InvalidPathError do |e|
rack_response({ 'message' => e.message }.to_json, 400)
end

View file

@ -3,9 +3,11 @@
module Gitlab
module Ci
class Trace
include ExclusiveLeaseGuard
include ::Gitlab::ExclusiveLeaseHelpers
LEASE_TIMEOUT = 1.hour
LOCK_TTL = 1.minute
LOCK_RETRIES = 2
LOCK_SLEEP = 0.001.seconds
ArchiveError = Class.new(StandardError)
AlreadyArchivedError = Class.new(StandardError)
@ -82,7 +84,35 @@ module Gitlab
stream&.close
end
def write(mode)
def write(mode, &blk)
in_write_lock do
unsafe_write!(mode, &blk)
end
end
def erase!
##
# Erase the archived trace
trace_artifact&.destroy!
##
# Erase the live trace
job.trace_chunks.fast_destroy_all # Destroy chunks of a live trace
FileUtils.rm_f(current_path) if current_path # Remove a trace file of a live trace
job.erase_old_trace! if job.has_old_trace? # Remove a trace in database of a live trace
ensure
@current_path = nil
end
def archive!
in_write_lock do
unsafe_archive!
end
end
private
def unsafe_write!(mode, &blk)
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
raise AlreadyArchivedError, 'Could not write to the archived trace'
@ -102,28 +132,6 @@ module Gitlab
stream&.close
end
def erase!
##
# Erase the archived trace
trace_artifact&.destroy!
##
# Erase the live trace
job.trace_chunks.fast_destroy_all # Destroy chunks of a live trace
FileUtils.rm_f(current_path) if current_path # Remove a trace file of a live trace
job.erase_old_trace! if job.has_old_trace? # Remove a trace in database of a live trace
ensure
@current_path = nil
end
def archive!
try_obtain_lease do
unsafe_archive!
end
end
private
def unsafe_archive!
raise AlreadyArchivedError, 'Could not archive again' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete?
@ -146,6 +154,11 @@ module Gitlab
end
end
def in_write_lock(&blk)
lock_key = "trace:write:lock:#{job.id}"
in_lock(lock_key, ttl: LOCK_TTL, retries: LOCK_RETRIES, sleep_sec: LOCK_SLEEP, &blk)
end
def archive_stream!(stream)
clone_file!(stream, JobArtifactUploader.workhorse_upload_path) do |clone_path|
create_build_trace!(job, clone_path)
@ -226,16 +239,6 @@ module Gitlab
def trace_artifact
job.job_artifacts_trace
end
# For ExclusiveLeaseGuard concern
def lease_key
@lease_key ||= "trace:archive:#{job.id}"
end
# For ExclusiveLeaseGuard concern
def lease_timeout
LEASE_TIMEOUT
end
end
end
end

View file

@ -43,19 +43,14 @@ module Gitlab
def append(data, offset)
data = data.force_encoding(Encoding::BINARY)
stream.truncate(offset)
stream.seek(0, IO::SEEK_END)
stream.seek(offset, IO::SEEK_SET)
stream.write(data)
stream.truncate(offset + data.bytesize)
stream.flush()
end
def set(data)
data = data.force_encoding(Encoding::BINARY)
stream.seek(0, IO::SEEK_SET)
stream.write(data)
stream.truncate(data.bytesize)
stream.flush()
append(data, 0)
end
def raw(last_lines: nil)

View file

@ -12,6 +12,8 @@ module Gitlab
# because it holds the connection until all `retries` is consumed.
# This could potentially eat up all connection pools.
def in_lock(key, ttl: 1.minute, retries: 10, sleep_sec: 0.01.seconds)
raise ArgumentError, 'Key needs to be specified' unless key
lease = Gitlab::ExclusiveLease.new(key, timeout: ttl)
until uuid = lease.try_obtain

View file

@ -257,7 +257,8 @@ describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do
let!(:last_result) { stream.html_with_state }
before do
stream.append("5678", 4)
data_stream.seek(4, IO::SEEK_SET)
data_stream.write("5678")
stream.seek(0)
end
@ -271,23 +272,27 @@ describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do
end
context 'when stream is StringIO' do
let(:stream) do
described_class.new do
let(:data_stream) do
StringIO.new("1234")
end
let(:stream) do
described_class.new { data_stream }
end
it_behaves_like 'html_with_states'
end
context 'when stream is ChunkedIO' do
let(:stream) do
described_class.new do
let(:data_stream) do
Gitlab::Ci::Trace::ChunkedIO.new(build).tap do |chunked_io|
chunked_io.write("1234")
chunked_io.seek(0, IO::SEEK_SET)
end
end
let(:stream) do
described_class.new { data_stream }
end
it_behaves_like 'html_with_states'

View file

@ -11,6 +11,14 @@ describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state do
let(:options) { {} }
context 'when unique key is not set' do
let(:unique_key) { }
it 'raises an error' do
expect { subject }.to raise_error ArgumentError
end
end
context 'when the lease is not obtained yet' do
before do
stub_exclusive_lease(unique_key, 'uuid')

View file

@ -830,6 +830,18 @@ describe API::Runner, :clean_gitlab_redis_shared_state do
expect(job.trace.raw).to eq 'BUILD TRACE UPDATED'
expect(job.job_artifacts_trace.open.read).to eq 'BUILD TRACE UPDATED'
end
context 'when concurrent update of trace is happening' do
before do
job.trace.write('wb') do
update_job(state: 'success', trace: 'BUILD TRACE UPDATED')
end
end
it 'returns that operation conflicts' do
expect(response.status).to eq(409)
end
end
end
context 'when no trace is given' do
@ -1022,6 +1034,18 @@ describe API::Runner, :clean_gitlab_redis_shared_state do
end
end
context 'when concurrent update of trace is happening' do
before do
job.trace.write('wb') do
patch_the_trace
end
end
it 'returns that operation conflicts' do
expect(response.status).to eq(409)
end
end
context 'when the job is canceled' do
before do
job.cancel

View file

@ -272,16 +272,11 @@ shared_examples_for 'common trace features' do
include ExclusiveLeaseHelpers
before do
stub_exclusive_lease_taken("trace:archive:#{trace.job.id}", timeout: 1.hour)
stub_exclusive_lease_taken("trace:write:lock:#{trace.job.id}", timeout: 1.minute)
end
it 'blocks concurrent archiving' do
expect(Rails.logger).to receive(:error).with('Cannot obtain an exclusive lease. There must be another instance already in execution.')
subject
build.reload
expect(build.job_artifacts_trace).to be_nil
expect { subject }.to raise_error(::Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
end
end
end

View file

@ -5,7 +5,7 @@ describe StuckCiJobsWorker do
let!(:runner) { create :ci_runner }
let!(:job) { create :ci_build, runner: runner }
let(:trace_lease_key) { "trace:archive:#{job.id}" }
let(:trace_lease_key) { "trace:write:lock:#{job.id}" }
let(:trace_lease_uuid) { SecureRandom.uuid }
let(:worker_lease_key) { StuckCiJobsWorker::EXCLUSIVE_LEASE_KEY }
let(:worker_lease_uuid) { SecureRandom.uuid }