304 lines
11 KiB
Ruby
304 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Gitlab
|
|
class UsageData
|
|
class Topology
|
|
include Gitlab::Utils::UsageData
|
|
|
|
JOB_TO_SERVICE_NAME = {
|
|
'gitlab-rails' => 'web',
|
|
'gitlab-sidekiq' => 'sidekiq',
|
|
'gitlab-workhorse' => 'workhorse',
|
|
'redis' => 'redis',
|
|
'postgres' => 'postgres',
|
|
'gitaly' => 'gitaly',
|
|
'prometheus' => 'prometheus',
|
|
'node' => 'node-exporter',
|
|
'registry' => 'registry'
|
|
}.freeze
|
|
|
|
# If these errors occur, all subsequent queries are likely to fail for the same error
|
|
TIMEOUT_ERRORS = [Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout].freeze
|
|
|
|
CollectionFailure = Struct.new(:query, :error) do
|
|
def to_h
|
|
{ query => error }
|
|
end
|
|
end
|
|
|
|
def topology_usage_data
|
|
@failures = []
|
|
@instances = Set[]
|
|
topology_data, duration = measure_duration { topology_fetch_all_data }
|
|
{
|
|
topology: topology_data
|
|
.merge(duration_s: duration)
|
|
.merge(failures: @failures.map(&:to_h))
|
|
}
|
|
end
|
|
|
|
private
|
|
|
|
def topology_fetch_all_data
|
|
with_prometheus_client(fallback: {}, verify: false) do |client|
|
|
{
|
|
application_requests_per_hour: topology_app_requests_per_hour(client),
|
|
query_apdex_weekly_average: topology_query_apdex_weekly_average(client),
|
|
nodes: topology_node_data(client)
|
|
}.compact
|
|
end
|
|
rescue StandardError => e
|
|
@failures << CollectionFailure.new('other', e.class.to_s)
|
|
|
|
{}
|
|
end
|
|
|
|
def topology_app_requests_per_hour(client)
|
|
result = query_safely('gitlab_usage_ping:ops:rate5m', 'app_requests', fallback: nil) do |query|
|
|
client.query(aggregate_one_week(query)).first
|
|
end
|
|
|
|
return unless result
|
|
|
|
# the metric is recorded as a per-second rate
|
|
(result['value'].last.to_f * 1.hour).to_i
|
|
end
|
|
|
|
def topology_query_apdex_weekly_average(client)
|
|
result = query_safely('gitlab_usage_ping:sql_duration_apdex:ratio_rate5m', 'query_apdex', fallback: nil) do |query|
|
|
client.query(aggregate_one_week(query)).first
|
|
end
|
|
|
|
return unless result
|
|
|
|
result['value'].last.to_f
|
|
end
|
|
|
|
def topology_node_data(client)
|
|
# node-level data
|
|
by_instance_mem = topology_node_memory(client)
|
|
by_instance_mem_utilization = topology_node_memory_utilization(client)
|
|
by_instance_cpus = topology_node_cpus(client)
|
|
by_instance_cpu_utilization = topology_node_cpu_utilization(client)
|
|
by_instance_uname_info = topology_node_uname_info(client)
|
|
# service-level data
|
|
by_instance_by_job_by_type_memory = topology_all_service_memory(client)
|
|
by_instance_by_job_process_count = topology_all_service_process_count(client)
|
|
by_instance_by_job_server_types = topology_all_service_server_types(client)
|
|
|
|
@instances.map do |instance|
|
|
{
|
|
node_memory_total_bytes: by_instance_mem[instance],
|
|
node_memory_utilization: by_instance_mem_utilization[instance],
|
|
node_cpus: by_instance_cpus[instance],
|
|
node_cpu_utilization: by_instance_cpu_utilization[instance],
|
|
node_uname_info: by_instance_uname_info[instance],
|
|
node_services:
|
|
topology_node_services(
|
|
instance, by_instance_by_job_process_count, by_instance_by_job_by_type_memory, by_instance_by_job_server_types
|
|
)
|
|
}.compact
|
|
end
|
|
end
|
|
|
|
def topology_node_memory(client)
|
|
query_safely('gitlab_usage_ping:node_memory_total_bytes:max', 'node_memory', fallback: {}) do |query|
|
|
aggregate_by_instance(client, aggregate_one_week(query, aggregation: :max))
|
|
end
|
|
end
|
|
|
|
def topology_node_memory_utilization(client)
|
|
query_safely('gitlab_usage_ping:node_memory_utilization:avg', 'node_memory_utilization', fallback: {}) do |query|
|
|
aggregate_by_instance(client, aggregate_one_week(query), transform_value: :to_f)
|
|
end
|
|
end
|
|
|
|
def topology_node_cpus(client)
|
|
query_safely('gitlab_usage_ping:node_cpus:count', 'node_cpus', fallback: {}) do |query|
|
|
aggregate_by_instance(client, aggregate_one_week(query, aggregation: :max))
|
|
end
|
|
end
|
|
|
|
def topology_node_cpu_utilization(client)
|
|
query_safely('gitlab_usage_ping:node_cpu_utilization:avg', 'node_cpu_utilization', fallback: {}) do |query|
|
|
aggregate_by_instance(client, aggregate_one_week(query), transform_value: :to_f)
|
|
end
|
|
end
|
|
|
|
def topology_node_uname_info(client)
|
|
node_uname_info = query_safely('node_uname_info', 'node_uname_info', fallback: []) do |query|
|
|
client.query(query)
|
|
end
|
|
|
|
map_instance_labels(node_uname_info, %w(machine sysname release))
|
|
end
|
|
|
|
def topology_all_service_memory(client)
|
|
{
|
|
rss: topology_service_memory_rss(client),
|
|
uss: topology_service_memory_uss(client),
|
|
pss: topology_service_memory_pss(client)
|
|
}
|
|
end
|
|
|
|
def topology_service_memory_rss(client)
|
|
query_safely(
|
|
'gitlab_usage_ping:node_service_process_resident_memory_bytes:avg', 'service_rss', fallback: {}
|
|
) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
|
|
end
|
|
|
|
def topology_service_memory_uss(client)
|
|
query_safely(
|
|
'gitlab_usage_ping:node_service_process_unique_memory_bytes:avg', 'service_uss', fallback: {}
|
|
) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
|
|
end
|
|
|
|
def topology_service_memory_pss(client)
|
|
query_safely(
|
|
'gitlab_usage_ping:node_service_process_proportional_memory_bytes:avg', 'service_pss', fallback: {}
|
|
) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
|
|
end
|
|
|
|
def topology_all_service_process_count(client)
|
|
query_safely(
|
|
'gitlab_usage_ping:node_service_process:count', 'service_process_count', fallback: {}
|
|
) { |query| aggregate_by_labels(client, aggregate_one_week(query)) }
|
|
end
|
|
|
|
def topology_all_service_server_types(client)
|
|
query_safely(
|
|
'gitlab_usage_ping:node_service_app_server_workers:sum', 'service_workers', fallback: {}
|
|
) { |query| aggregate_by_labels(client, query) }
|
|
end
|
|
|
|
def query_safely(query, query_name, fallback:)
|
|
if timeout_error_exists?
|
|
@failures << CollectionFailure.new(query_name, 'timeout_cancellation')
|
|
return fallback
|
|
end
|
|
|
|
result = yield query
|
|
|
|
return result if result.present?
|
|
|
|
@failures << CollectionFailure.new(query_name, 'empty_result')
|
|
fallback
|
|
rescue StandardError => e
|
|
@failures << CollectionFailure.new(query_name, e.class.to_s)
|
|
fallback
|
|
end
|
|
|
|
def timeout_error_exists?
|
|
timeout_error_names = TIMEOUT_ERRORS.map(&:to_s).to_set
|
|
|
|
@failures.any? do |failure|
|
|
timeout_error_names.include?(failure.error)
|
|
end
|
|
end
|
|
|
|
def topology_node_services(instance, all_process_counts, all_process_memory, all_server_types)
|
|
# returns all node service data grouped by service name as the key
|
|
instance_service_data =
|
|
topology_instance_service_process_count(instance, all_process_counts)
|
|
.deep_merge(topology_instance_service_memory(instance, all_process_memory))
|
|
.deep_merge(topology_instance_service_server_types(instance, all_server_types))
|
|
|
|
# map to list of hashes where service names become values instead, and skip
|
|
# unknown services, since they might not be ours
|
|
instance_service_data.each_with_object([]) do |entry, list|
|
|
service, service_metrics = entry
|
|
service_name = service.to_s.strip
|
|
|
|
if gitlab_service = JOB_TO_SERVICE_NAME[service_name]
|
|
list << { name: gitlab_service }.merge(service_metrics)
|
|
else
|
|
@failures << CollectionFailure.new('service_unknown', service_name)
|
|
end
|
|
end
|
|
end
|
|
|
|
def topology_instance_service_process_count(instance, all_instance_data)
|
|
topology_data_for_instance(instance, all_instance_data).to_h do |metric, count|
|
|
[metric['job'], { process_count: count }]
|
|
end
|
|
end
|
|
|
|
# Given a hash mapping memory set types to Prometheus response data, returns a hash
|
|
# mapping instance/node names to services and their respective memory use in bytes
|
|
def topology_instance_service_memory(instance, instance_data_by_type)
|
|
result = {}
|
|
instance_data_by_type.each do |memory_type, instance_data|
|
|
topology_data_for_instance(instance, instance_data).each do |metric, memory_bytes|
|
|
job = metric['job']
|
|
key = "process_memory_#{memory_type}".to_sym
|
|
|
|
result[job] ||= {}
|
|
result[job][key] ||= memory_bytes
|
|
end
|
|
end
|
|
|
|
result
|
|
end
|
|
|
|
def topology_instance_service_server_types(instance, all_instance_data)
|
|
topology_data_for_instance(instance, all_instance_data).to_h do |metric, _value|
|
|
[metric['job'], { server: metric['server'] }]
|
|
end
|
|
end
|
|
|
|
def topology_data_for_instance(instance, all_instance_data)
|
|
all_instance_data.filter { |metric, _value| metric['instance'] == instance }
|
|
end
|
|
|
|
def normalize_instance_label(instance)
|
|
normalize_localhost_address(drop_port_number(instance))
|
|
end
|
|
|
|
def normalize_localhost_address(instance)
|
|
ip_addr = IPAddr.new(instance)
|
|
is_local_ip = ip_addr.loopback? || ip_addr.to_i == 0
|
|
|
|
is_local_ip ? 'localhost' : instance
|
|
rescue IPAddr::InvalidAddressError
|
|
# This most likely means it was a host name, not an IP address
|
|
instance
|
|
end
|
|
|
|
def drop_port_number(instance)
|
|
instance.gsub(/:\d+$/, '')
|
|
end
|
|
|
|
def normalize_and_track_instance(instance)
|
|
normalize_instance_label(instance).tap do |normalized_instance|
|
|
@instances << normalized_instance
|
|
end
|
|
end
|
|
|
|
def aggregate_one_week(query, aggregation: :avg)
|
|
"#{aggregation}_over_time (#{query}[1w])"
|
|
end
|
|
|
|
def aggregate_by_instance(client, query, transform_value: :to_i)
|
|
client.aggregate(query, transform_value: transform_value) { |metric| normalize_and_track_instance(metric['instance']) }
|
|
end
|
|
|
|
# Will retain a composite key that values are mapped to
|
|
def aggregate_by_labels(client, query, transform_value: :to_i)
|
|
client.aggregate(query, transform_value: transform_value) do |metric|
|
|
metric['instance'] = normalize_and_track_instance(metric['instance'])
|
|
metric
|
|
end
|
|
end
|
|
|
|
# Given query result vector, map instance to a hash of target labels key/value.
|
|
# @return [Hash] mapping instance to a hash of target labels key/value, or the empty hash if input empty vector
|
|
def map_instance_labels(query_result_vector, target_labels)
|
|
query_result_vector.to_h do |result|
|
|
key = normalize_and_track_instance(result['metric']['instance'])
|
|
value = result['metric'].slice(*target_labels).symbolize_keys
|
|
[key, value]
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|