Add ExclusiveLock in Ci::JobTraceChunk
This commit is contained in:
parent
180267d686
commit
76485cbf8b
2 changed files with 62 additions and 19 deletions
|
@ -8,8 +8,13 @@ module Ci
|
|||
|
||||
default_value_for :data_store, :redis
|
||||
|
||||
WriteError = Class.new(StandardError)
|
||||
|
||||
CHUNK_SIZE = 128.kilobytes
|
||||
CHUNK_REDIS_TTL = 1.week
|
||||
LOCK_RETRY = 100
|
||||
LOCK_SLEEP = 1
|
||||
LOCK_TTL = 5.minutes
|
||||
|
||||
enum data_store: {
|
||||
redis: 1,
|
||||
|
@ -27,18 +32,20 @@ module Ci
|
|||
end
|
||||
|
||||
def set_data(value)
|
||||
raise ArgumentError, 'too much data' if value.bytesize > CHUNK_SIZE
|
||||
in_lock do
|
||||
raise ArgumentError, 'too much data' if value.bytesize > CHUNK_SIZE
|
||||
|
||||
if redis?
|
||||
redis_set_data(value)
|
||||
elsif db?
|
||||
self.raw_data = value
|
||||
else
|
||||
raise 'Unsupported data store'
|
||||
if redis?
|
||||
redis_set_data(value)
|
||||
elsif db?
|
||||
self.raw_data = value
|
||||
else
|
||||
raise 'Unsupported data store'
|
||||
end
|
||||
|
||||
save! if changed?
|
||||
schedule_to_db if fullfilled?
|
||||
end
|
||||
|
||||
save! if changed?
|
||||
schedule_to_db if fullfilled?
|
||||
end
|
||||
|
||||
def truncate(offset = 0)
|
||||
|
@ -70,11 +77,13 @@ module Ci
|
|||
end
|
||||
|
||||
def use_database!
|
||||
return if db?
|
||||
return unless size > 0
|
||||
in_lock do
|
||||
return if db?
|
||||
return unless size > 0
|
||||
|
||||
self.update!(raw_data: data, data_store: :db)
|
||||
redis_delete_data
|
||||
self.update!(raw_data: data, data_store: :db)
|
||||
redis_delete_data
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -91,24 +100,47 @@ module Ci
|
|||
|
||||
def redis_data
|
||||
Gitlab::Redis::SharedState.with do |redis|
|
||||
redis.get(redis_key)
|
||||
redis.get(redis_data_key)
|
||||
end
|
||||
end
|
||||
|
||||
def redis_set_data(data)
|
||||
Gitlab::Redis::SharedState.with do |redis|
|
||||
redis.set(redis_key, data, ex: CHUNK_REDIS_TTL)
|
||||
redis.set(redis_data_key, data, ex: CHUNK_REDIS_TTL)
|
||||
end
|
||||
end
|
||||
|
||||
def redis_delete_data
|
||||
Gitlab::Redis::SharedState.with do |redis|
|
||||
redis.del(redis_key)
|
||||
redis.del(redis_data_key)
|
||||
end
|
||||
end
|
||||
|
||||
def redis_key
|
||||
"gitlab:ci:trace:#{job_id}:chunks:#{chunk_index}"
|
||||
def redis_data_key
|
||||
"gitlab:ci:trace:#{job_id}:chunks:#{chunk_index}:data"
|
||||
end
|
||||
|
||||
def redis_lock_key
|
||||
"gitlab:ci:trace:#{job_id}:chunks:#{chunk_index}:lock"
|
||||
end
|
||||
|
||||
def in_lock
|
||||
lease = Gitlab::ExclusiveLease.new(redis_lock_key, timeout: LOCK_TTL)
|
||||
retry_count = 0
|
||||
|
||||
until uuid = lease.try_obtain
|
||||
# Keep trying until we obtain the lease. To prevent hammering Redis too
|
||||
# much we'll wait for a bit between retries.
|
||||
sleep(LOCK_SLEEP)
|
||||
break if LOCK_RETRY < (retry_count += 1)
|
||||
end
|
||||
|
||||
raise WriteError, 'Failed to obtain write lock' unless uuid
|
||||
|
||||
self.reload if self.persisted?
|
||||
return yield
|
||||
ensure
|
||||
Gitlab::ExclusiveLease.cancel(redis_lock_key, uuid)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -317,6 +317,17 @@ describe Ci::JobTraceChunk, :clean_gitlab_redis_shared_state do
|
|||
end
|
||||
end
|
||||
|
||||
describe 'ExclusiveLock' do
|
||||
before do
|
||||
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { nil }
|
||||
stub_const('Ci::JobTraceChunk::LOCK_RETRY', 1)
|
||||
end
|
||||
|
||||
it 'raise an error' do
|
||||
expect { job_trace_chunk.append('ABC', 0) }.to raise_error('Failed to obtain write lock')
|
||||
end
|
||||
end
|
||||
|
||||
describe 'deletes data in redis after chunk record destroyed' do
|
||||
let(:project) { create(:project) }
|
||||
|
||||
|
|
Loading…
Reference in a new issue