gitlab-org--gitlab-foss/app/models/concerns/counter_attribute.rb

284 lines
8.3 KiB
Ruby

# frozen_string_literal: true
# Add capabilities to increment a numeric model attribute efficiently by
# using Redis and flushing the increments asynchronously to the database
# after a period of time (10 minutes).
# When an attribute is incremented by a value, the increment is added
# to a Redis key. Then, FlushCounterIncrementsWorker will execute
# `flush_increments_to_database!` which removes increments from Redis for a
# given model attribute and updates the values in the database.
#
# @example:
#
# class ProjectStatistics
# include CounterAttribute
#
# counter_attribute :commit_count
# counter_attribute :storage_size
# end
#
# To increment the counter we can use the method:
# delayed_increment_counter(:commit_count, 3)
#
# It is possible to register callbacks to be executed after increments have
# been flushed to the database. Callbacks are not executed if there are no increments
# to flush.
#
# counter_attribute_after_flush do |statistic|
# Namespaces::ScheduleAggregationWorker.perform_async(statistic.namespace_id)
# end
#
module CounterAttribute
extend ActiveSupport::Concern
extend AfterCommitQueue
include Gitlab::ExclusiveLeaseHelpers
LUA_STEAL_INCREMENT_SCRIPT = <<~EOS
local increment_key, flushed_key = KEYS[1], KEYS[2]
local increment_value = redis.call("get", increment_key) or 0
local flushed_value = redis.call("incrby", flushed_key, increment_value)
if flushed_value == 0 then
redis.call("del", increment_key, flushed_key)
else
redis.call("del", increment_key)
end
return flushed_value
EOS
WORKER_DELAY = 10.minutes
WORKER_LOCK_TTL = 10.minutes
class_methods do
def counter_attribute(attribute)
counter_attributes << attribute
end
def counter_attributes
@counter_attributes ||= Set.new
end
def after_flush_callbacks
@after_flush_callbacks ||= []
end
# perform registered callbacks after increments have been flushed to the database
def counter_attribute_after_flush(&callback)
after_flush_callbacks << callback
end
def counter_attribute_enabled?(attribute)
counter_attributes.include?(attribute)
end
end
# This method must only be called by FlushCounterIncrementsWorker
# because it should run asynchronously and with exclusive lease.
# This will
# 1. temporarily move the pending increment for a given attribute
# to a relative "flushed" Redis key, delete the increment key and return
# the value. If new increments are performed at this point, the increment
# key is recreated as part of `delayed_increment_counter`.
# The "flushed" key is used to ensure that we can keep incrementing
# counters in Redis while flushing existing values.
# 2. then the value is used to update the counter in the database.
# 3. finally the "flushed" key is deleted.
def flush_increments_to_database!(attribute)
lock_key = counter_lock_key(attribute)
with_exclusive_lease(lock_key) do
previous_db_value = read_attribute(attribute)
increment_key = counter_key(attribute)
flushed_key = counter_flushed_key(attribute)
increment_value = steal_increments(increment_key, flushed_key)
new_db_value = nil
next if increment_value == 0
transaction do
update_counters_with_lease({ attribute => increment_value })
redis_state { |redis| redis.del(flushed_key) }
new_db_value = reset.read_attribute(attribute)
end
execute_after_flush_callbacks
log_flush_counter(attribute, increment_value, previous_db_value, new_db_value)
end
end
def delayed_increment_counter(attribute, increment)
raise ArgumentError, "#{attribute} is not a counter attribute" unless counter_attribute_enabled?(attribute)
return if increment == 0
run_after_commit_or_now do
increment_counter(attribute, increment)
FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, self.class.name, self.id, attribute)
end
true
end
def increment_counter(attribute, increment)
if counter_attribute_enabled?(attribute)
new_value = redis_state do |redis|
redis.incrby(counter_key(attribute), increment)
end
log_increment_counter(attribute, increment, new_value)
end
end
def update_counters_with_lease(increments)
detect_race_on_record(log_fields: { caller: __method__, attributes: increments.keys }) do
self.class.update_counters(id, increments)
end
end
def reset_counter!(attribute)
if counter_attribute_enabled?(attribute)
detect_race_on_record(log_fields: { caller: __method__, attributes: attribute }) do
update!(attribute => 0)
clear_counter!(attribute)
end
log_clear_counter(attribute)
end
end
def get_counter_value(attribute)
if counter_attribute_enabled?(attribute)
redis_state do |redis|
redis.get(counter_key(attribute)).to_i
end
end
end
def counter_key(attribute)
"project:{#{project_id}}:counters:#{self.class}:#{id}:#{attribute}"
end
def counter_flushed_key(attribute)
counter_key(attribute) + ':flushed'
end
def counter_lock_key(attribute)
counter_key(attribute) + ':lock'
end
def counter_attribute_enabled?(attribute)
self.class.counter_attribute_enabled?(attribute)
end
private
def database_lock_key
"project:{#{project_id}}:#{self.class}:#{id}"
end
def steal_increments(increment_key, flushed_key)
redis_state do |redis|
redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key])
end
end
def clear_counter!(attribute)
redis_state do |redis|
redis.del(counter_key(attribute))
end
end
def execute_after_flush_callbacks
self.class.after_flush_callbacks.each do |callback|
callback.call(self)
end
end
def redis_state(&block)
Gitlab::Redis::SharedState.with(&block)
end
def with_exclusive_lease(lock_key)
in_lock(lock_key, ttl: WORKER_LOCK_TTL) do
yield
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# a worker is already updating the counters
end
# detect_race_on_record uses a lease to monitor access
# to the project statistics row. This is needed to detect
# concurrent attempts to increment columns, which could result in a
# race condition.
#
# As the purpose is to detect and warn concurrent attempts,
# it falls back to direct update on the row if it fails to obtain the lease.
#
# It does not guarantee that there will not be any concurrent updates.
def detect_race_on_record(log_fields: {})
return yield unless Feature.enabled?(:counter_attribute_db_lease_for_update, project)
# Ensure attributes is always an array before we log
log_fields[:attributes] = Array(log_fields[:attributes])
Gitlab::AppLogger.info(
message: 'Acquiring lease for project statistics update',
project_statistics_id: id,
project_id: project.id,
**log_fields,
**Gitlab::ApplicationContext.current
)
in_lock(database_lock_key, retries: 0) do
yield
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
Gitlab::AppLogger.warn(
message: 'Concurrent project statistics update detected',
project_statistics_id: id,
project_id: project.id,
**log_fields,
**Gitlab::ApplicationContext.current
)
yield
end
def log_increment_counter(attribute, increment, new_value)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Increment counter attribute',
attribute: attribute,
project_id: project_id,
increment: increment,
new_counter_value: new_value,
current_db_value: read_attribute(attribute)
)
Gitlab::AppLogger.info(payload)
end
def log_flush_counter(attribute, increment, previous_db_value, new_db_value)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Flush counter attribute to database',
attribute: attribute,
project_id: project_id,
increment: increment,
previous_db_value: previous_db_value,
new_db_value: new_db_value
)
Gitlab::AppLogger.info(payload)
end
def log_clear_counter(attribute)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Clear counter attribute',
attribute: attribute,
project_id: project_id
)
Gitlab::AppLogger.info(payload)
end
end