324 lines
11 KiB
Ruby
324 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Gitlab
|
|
module Database
|
|
module LoadBalancing
|
|
# Load balancing for ActiveRecord connections.
|
|
#
|
|
# Each host in the load balancer uses the same credentials as the primary
|
|
# database.
|
|
class LoadBalancer
|
|
CACHE_KEY = :gitlab_load_balancer_host
|
|
|
|
REPLICA_SUFFIX = '_replica'
|
|
|
|
attr_reader :host_list, :configuration
|
|
|
|
# configuration - An instance of `LoadBalancing::Configuration` that
|
|
# contains the configuration details (such as the hosts)
|
|
# for this load balancer.
|
|
def initialize(configuration)
|
|
@configuration = configuration
|
|
@primary_only = !configuration.load_balancing_enabled?
|
|
@host_list =
|
|
if @primary_only
|
|
HostList.new([PrimaryHost.new(self)])
|
|
else
|
|
HostList.new(configuration.hosts.map { |addr| Host.new(addr, self) })
|
|
end
|
|
end
|
|
|
|
def name
|
|
@configuration.db_config_name
|
|
end
|
|
|
|
def primary_only?
|
|
@primary_only
|
|
end
|
|
|
|
def disconnect!(timeout: 120)
|
|
host_list.hosts.each { |host| host.disconnect!(timeout: timeout) }
|
|
end
|
|
|
|
# Yields a connection that can be used for reads.
|
|
#
|
|
# If no secondaries were available this method will use the primary
|
|
# instead.
|
|
def read(&block)
|
|
conflict_retried = 0
|
|
|
|
while host
|
|
ensure_caching!
|
|
|
|
begin
|
|
connection = host.connection
|
|
return yield connection
|
|
rescue StandardError => error
|
|
if primary_only?
|
|
# If we only have primary configured, retrying is pointless
|
|
raise error
|
|
elsif serialization_failure?(error)
|
|
# This error can occur when a query conflicts. See
|
|
# https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT
|
|
# for more information.
|
|
#
|
|
# In this event we'll cycle through the secondaries at most 3
|
|
# times before using the primary instead.
|
|
will_retry = conflict_retried < @host_list.length * 3
|
|
|
|
::Gitlab::Database::LoadBalancing::Logger.warn(
|
|
event: :host_query_conflict,
|
|
message: 'Query conflict on host',
|
|
conflict_retried: conflict_retried,
|
|
will_retry: will_retry,
|
|
db_host: host.host,
|
|
db_port: host.port,
|
|
host_list_length: @host_list.length
|
|
)
|
|
|
|
if will_retry
|
|
conflict_retried += 1
|
|
release_host
|
|
else
|
|
break
|
|
end
|
|
elsif connection_error?(error)
|
|
host.offline!
|
|
release_host
|
|
else
|
|
raise error
|
|
end
|
|
end
|
|
end
|
|
|
|
::Gitlab::Database::LoadBalancing::Logger.warn(
|
|
event: :no_secondaries_available,
|
|
message: 'No secondaries were available, using primary instead',
|
|
conflict_retried: conflict_retried,
|
|
host_list_length: @host_list.length
|
|
)
|
|
|
|
read_write(&block)
|
|
end
|
|
|
|
# Yields a connection that can be used for both reads and writes.
|
|
def read_write
|
|
connection = nil
|
|
# In the event of a failover the primary may be briefly unavailable.
|
|
# Instead of immediately grinding to a halt we'll retry the operation
|
|
# a few times.
|
|
retry_with_backoff do
|
|
connection = pool.connection
|
|
yield connection
|
|
end
|
|
end
|
|
|
|
# Returns a host to use for queries.
|
|
#
|
|
# Hosts are scoped per thread so that multiple threads don't
|
|
# accidentally re-use the same host + connection.
|
|
def host
|
|
request_cache[CACHE_KEY] ||= @host_list.next
|
|
end
|
|
|
|
# Releases the host and connection for the current thread.
|
|
def release_host
|
|
if host = request_cache[CACHE_KEY]
|
|
host.disable_query_cache!
|
|
host.release_connection
|
|
end
|
|
|
|
request_cache.delete(CACHE_KEY)
|
|
end
|
|
|
|
def release_primary_connection
|
|
pool.release_connection
|
|
end
|
|
|
|
# Returns the transaction write location of the primary.
|
|
def primary_write_location
|
|
location = read_write do |connection|
|
|
get_write_location(connection)
|
|
end
|
|
|
|
return location if location
|
|
|
|
raise 'Failed to determine the write location of the primary database'
|
|
end
|
|
|
|
# Returns true if there was at least one host that has caught up with the given transaction.
|
|
def select_up_to_date_host(location)
|
|
all_hosts = @host_list.hosts.shuffle
|
|
host = all_hosts.find { |host| host.caught_up?(location) }
|
|
|
|
return false unless host
|
|
|
|
request_cache[CACHE_KEY] = host
|
|
|
|
true
|
|
end
|
|
|
|
# Yields a block, retrying it upon error using an exponential backoff.
|
|
def retry_with_backoff(retries = 3, time = 2)
|
|
# In CI we only use the primary, but databases may not always be
|
|
# available (or take a few seconds to become available). Retrying in
|
|
# this case can slow down CI jobs. In addition, retrying with _only_
|
|
# a primary being present isn't all that helpful.
|
|
#
|
|
# To prevent this from happening, we don't make any attempt at
|
|
# retrying unless one or more replicas are used. This matches the
|
|
# behaviour from before we enabled load balancing code even if no
|
|
# replicas were configured.
|
|
return yield if primary_only?
|
|
|
|
retried = 0
|
|
last_error = nil
|
|
|
|
while retried < retries
|
|
begin
|
|
return yield
|
|
rescue StandardError => error
|
|
raise error unless connection_error?(error)
|
|
|
|
# We need to release the primary connection as otherwise Rails
|
|
# will keep raising errors when using the connection.
|
|
release_primary_connection
|
|
|
|
last_error = error
|
|
sleep(time)
|
|
retried += 1
|
|
time **= 2
|
|
end
|
|
end
|
|
|
|
raise last_error
|
|
end
|
|
|
|
def connection_error?(error)
|
|
case error
|
|
when ActiveRecord::NoDatabaseError
|
|
# Retrying this error isn't going to magically make the database
|
|
# appear. It also slows down CI jobs that are meant to create the
|
|
# database in the first place.
|
|
false
|
|
when ActiveRecord::StatementInvalid, ActionView::Template::Error
|
|
# After connecting to the DB Rails will wrap query errors using this
|
|
# class.
|
|
if (cause = error.cause)
|
|
connection_error?(cause)
|
|
else
|
|
false
|
|
end
|
|
when *CONNECTION_ERRORS
|
|
true
|
|
else
|
|
# When PG tries to set the client encoding but fails due to a
|
|
# connection error it will raise a PG::Error instance. Catching that
|
|
# would catch all errors (even those we don't want), so instead we
|
|
# check for the message of the error.
|
|
error.message.start_with?('invalid encoding name:')
|
|
end
|
|
end
|
|
|
|
def serialization_failure?(error)
|
|
if error.cause
|
|
serialization_failure?(error.cause)
|
|
else
|
|
error.is_a?(PG::TRSerializationFailure)
|
|
end
|
|
end
|
|
|
|
# pool_size - The size of the DB pool.
|
|
# host - An optional host name to use instead of the default one.
|
|
# port - An optional port to connect to.
|
|
def create_replica_connection_pool(pool_size, host = nil, port = nil)
|
|
db_config = @configuration.replica_db_config
|
|
|
|
env_config = db_config.configuration_hash.dup
|
|
env_config[:pool] = pool_size
|
|
env_config[:host] = host if host
|
|
env_config[:port] = port if port
|
|
|
|
replica_db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new(
|
|
db_config.env_name,
|
|
db_config.name + REPLICA_SUFFIX,
|
|
env_config
|
|
)
|
|
|
|
# We cannot use ActiveRecord::Base.connection_handler.establish_connection
|
|
# as it will rewrite ActiveRecord::Base.connection
|
|
ActiveRecord::ConnectionAdapters::ConnectionHandler
|
|
.new
|
|
.establish_connection(replica_db_config)
|
|
end
|
|
|
|
# ActiveRecord::ConnectionAdapters::ConnectionHandler handles fetching,
|
|
# and caching for connections pools for each "connection", so we
|
|
# leverage that.
|
|
def pool
|
|
ActiveRecord::Base.connection_handler.retrieve_connection_pool(
|
|
@configuration.primary_connection_specification_name,
|
|
role: ActiveRecord::Base.writing_role,
|
|
shard: ActiveRecord::Base.default_shard
|
|
) || raise(::ActiveRecord::ConnectionNotEstablished)
|
|
end
|
|
|
|
def wal_diff(location1, location2)
|
|
read_write do |connection|
|
|
lsn1 = connection.quote(location1)
|
|
lsn2 = connection.quote(location2)
|
|
|
|
query = <<-SQL.squish
|
|
SELECT pg_wal_lsn_diff(#{lsn1}, #{lsn2})
|
|
AS result
|
|
SQL
|
|
|
|
row = connection.select_all(query).first
|
|
row['result'] if row
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def ensure_caching!
|
|
return unless Rails.application.executor.active?
|
|
return if host.query_cache_enabled
|
|
|
|
host.enable_query_cache!
|
|
end
|
|
|
|
def request_cache
|
|
base = SafeRequestStore[:gitlab_load_balancer] ||= {}
|
|
base[self] ||= {}
|
|
end
|
|
|
|
# @param [ActiveRecord::Connection] ar_connection
|
|
# @return [String]
|
|
def get_write_location(ar_connection)
|
|
use_new_load_balancer_query = Gitlab::Utils
|
|
.to_boolean(ENV['USE_NEW_LOAD_BALANCER_QUERY'], default: true)
|
|
|
|
sql =
|
|
if use_new_load_balancer_query
|
|
<<~NEWSQL
|
|
SELECT CASE
|
|
WHEN pg_is_in_recovery() = true AND EXISTS (SELECT 1 FROM pg_stat_get_wal_senders())
|
|
THEN pg_last_wal_replay_lsn()::text
|
|
WHEN pg_is_in_recovery() = false
|
|
THEN pg_current_wal_insert_lsn()::text
|
|
ELSE NULL
|
|
END AS location;
|
|
NEWSQL
|
|
else
|
|
<<~SQL
|
|
SELECT pg_current_wal_insert_lsn()::text AS location
|
|
SQL
|
|
end
|
|
|
|
row = ar_connection.select_all(sql).first
|
|
row['location'] if row
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|