Per-job execution metrics (#5384)
* New execution stats data format, #5283 We store time and counts per-queue, per-class and totals, daily. * break metrics into separate jobs and queues hashes * tweak * Move metrics tracking to middleware, start work on Query API * Add support for labeled points in time * Add fetch method for deploy marks * rejigger metrics file layout * fix tests * Remove per-queue metrics, adds a lot of complexity with little value IMO * store per-minute histograms * Keep basic stats hardwired as is The idea being that metrics are optional middleware as they have significantly more overhead in CPU time and Redis space. * Implement top N metrics dashboard * Add topN and job-specific metric pages * Supply histogram data to job metrics page * cant use local time as CI is in a different tz * Add basic metrics graph, refactor dashboard JS to make Rickshaw reuseable * prepare for public beta
This commit is contained in:
parent
dcd6d32c35
commit
f220897db9
|
@ -34,7 +34,10 @@ module Sidekiq
|
|||
startup: [],
|
||||
quiet: [],
|
||||
shutdown: [],
|
||||
heartbeat: []
|
||||
# triggers when we fire the first heartbeat on startup OR repairing a network partition
|
||||
heartbeat: [],
|
||||
# triggers on EVERY heartbeat call, every 10 seconds
|
||||
beat: []
|
||||
},
|
||||
dead_max_jobs: 10_000,
|
||||
dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months
|
||||
|
|
|
@ -3,7 +3,10 @@
|
|||
require "sidekiq"
|
||||
|
||||
require "zlib"
|
||||
require "set"
|
||||
require "base64"
|
||||
require "sidekiq/metrics/deploy"
|
||||
require "sidekiq/metrics/query"
|
||||
|
||||
module Sidekiq
|
||||
# Retrieve runtime statistics from Redis regarding
|
||||
|
|
|
@ -426,3 +426,4 @@ module Sidekiq # :nodoc:
|
|||
end
|
||||
|
||||
require "sidekiq/systemd"
|
||||
require "sidekiq/metrics/tracking"
|
||||
|
|
|
@ -47,6 +47,7 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def fire_event(event, options = {})
|
||||
oneshot = options.fetch(:oneshot, true)
|
||||
reverse = options[:reverse]
|
||||
reraise = options[:reraise]
|
||||
|
||||
|
@ -58,7 +59,7 @@ module Sidekiq
|
|||
handle_exception(ex, {context: "Exception during Sidekiq lifecycle event.", event: event})
|
||||
raise ex if reraise
|
||||
end
|
||||
arr.clear # once we've fired an event, we never fire it again
|
||||
arr.clear if oneshot # once we've fired an event, we never fire it again
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -79,6 +79,8 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def clear_heartbeat
|
||||
flush_stats
|
||||
|
||||
# Remove record from Redis since we are shutting down.
|
||||
# Note we don't stop the heartbeat thread; if the process
|
||||
# doesn't actually exit, it'll reappear in the Web UI.
|
||||
|
@ -98,7 +100,7 @@ module Sidekiq
|
|||
❤
|
||||
end
|
||||
|
||||
def self.flush_stats
|
||||
def flush_stats
|
||||
fails = Processor::FAILURE.reset
|
||||
procd = Processor::PROCESSED.reset
|
||||
return if fails + procd == 0
|
||||
|
@ -122,7 +124,6 @@ module Sidekiq
|
|||
Sidekiq.logger.warn("Unable to flush stats: #{ex}")
|
||||
end
|
||||
end
|
||||
at_exit(&method(:flush_stats))
|
||||
|
||||
def ❤
|
||||
key = identity
|
||||
|
@ -179,6 +180,7 @@ module Sidekiq
|
|||
|
||||
# first heartbeat or recovering from an outage and need to reestablish our heartbeat
|
||||
fire_event(:heartbeat) unless exists
|
||||
fire_event(:beat, oneshot: false)
|
||||
|
||||
return unless msg
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
require "sidekiq"
|
||||
require "date"
|
||||
|
||||
# This file is designed to be required within the user's
|
||||
# deployment script; it should need a bare minimum of dependencies.
|
||||
#
|
||||
# require "sidekiq/metrics/deploy"
|
||||
# gitdesc = `git log -1 --format="%h %s"`.strip
|
||||
# d = Sidekiq::Metrics::Deploy.new
|
||||
# d.mark(label: gitdesc)
|
||||
#
|
||||
# Note that you cannot mark more than once per minute. This is a feature, not a bug.
|
||||
module Sidekiq
|
||||
module Metrics
|
||||
class Deploy
|
||||
MARK_TTL = 90 * 24 * 60 * 60 # 90 days
|
||||
|
||||
def initialize(pool = Sidekiq.redis_pool)
|
||||
@pool = pool
|
||||
end
|
||||
|
||||
def mark(at: Time.now, label: "")
|
||||
# we need to round the timestamp so that we gracefully
|
||||
# handle an excepted common error in marking deploys:
|
||||
# having every process mark its deploy, leading
|
||||
# to N marks for each deploy. Instead we round the time
|
||||
# to the minute so that multple marks within that minute
|
||||
# will all naturally rollup into one mark per minute.
|
||||
whence = at.utc
|
||||
floor = Time.utc(whence.year, whence.month, whence.mday, whence.hour, whence.min, 0)
|
||||
datecode = floor.strftime("%Y%m%d")
|
||||
key = "#{datecode}-marks"
|
||||
@pool.with do |c|
|
||||
c.pipelined do |pipe|
|
||||
pipe.hsetnx(key, floor.rfc3339, label)
|
||||
pipe.expire(key, MARK_TTL)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def fetch(date = Time.now.utc.to_date)
|
||||
datecode = date.strftime("%Y%m%d")
|
||||
@pool.with { |c| c.hgetall("#{datecode}-marks") }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,124 @@
|
|||
require "sidekiq"
|
||||
require "date"
|
||||
require "set"
|
||||
|
||||
require "sidekiq/metrics/shared"
|
||||
|
||||
module Sidekiq
|
||||
module Metrics
|
||||
# Allows caller to query for Sidekiq execution metrics within Redis.
|
||||
# Caller sets a set of attributes to act as filters. {#fetch} will call
|
||||
# Redis and return a Hash of results.
|
||||
#
|
||||
# NB: all metrics and times/dates are UTC only. We specifically do not
|
||||
# support timezones.
|
||||
class Query
|
||||
# :hour, :day, :month
|
||||
attr_accessor :period
|
||||
|
||||
# a specific job class, e.g. "App::OrderJob"
|
||||
attr_accessor :klass
|
||||
|
||||
# the date specific to the period
|
||||
# for :day or :hour, something like Date.today or Date.new(2022, 7, 13)
|
||||
# for :month, Date.new(2022, 7, 1)
|
||||
attr_accessor :date
|
||||
|
||||
# for period = :hour, the specific hour, integer e.g. 1 or 18
|
||||
# note that hours and minutes do not have a leading zero so minute-specific
|
||||
# keys will look like "j|20220718|7:3" for data at 07:03.
|
||||
attr_accessor :hour
|
||||
|
||||
def initialize(pool: Sidekiq.redis_pool, now: Time.now)
|
||||
@time = now.utc
|
||||
@pool = pool
|
||||
@klass = nil
|
||||
end
|
||||
|
||||
# Get metric data from the last hour and roll it up
|
||||
# into top processed count and execution time based on class.
|
||||
def top_jobs
|
||||
resultset = {}
|
||||
resultset[:date] = @time.to_date
|
||||
resultset[:period] = :hour
|
||||
resultset[:ends_at] = @time
|
||||
time = @time
|
||||
|
||||
results = @pool.with do |conn|
|
||||
conn.pipelined do |pipe|
|
||||
resultset[:size] = 60
|
||||
60.times do |idx|
|
||||
key = "j|#{time.strftime("%Y%m%d")}|#{time.hour}:#{time.min}"
|
||||
pipe.hgetall key
|
||||
time -= 60
|
||||
end
|
||||
resultset[:starts_at] = time
|
||||
end
|
||||
end
|
||||
|
||||
t = Hash.new(0)
|
||||
klsset = Set.new
|
||||
# merge the per-minute data into a totals hash for the hour
|
||||
results.each do |hash|
|
||||
hash.each { |k, v| t[k] = t[k] + v.to_i }
|
||||
klsset.merge(hash.keys.map { |k| k.split("|")[0] })
|
||||
end
|
||||
resultset[:job_classes] = klsset.delete_if { |item| item.size < 3 }
|
||||
resultset[:totals] = t
|
||||
top = t.each_with_object({}) do |(k, v), memo|
|
||||
(kls, metric) = k.split("|")
|
||||
memo[metric] ||= Hash.new(0)
|
||||
memo[metric][kls] = v
|
||||
end
|
||||
|
||||
sorted = {}
|
||||
top.each_pair do |metric, hash|
|
||||
sorted[metric] = hash.sort_by { |k, v| v }.reverse.to_h
|
||||
end
|
||||
resultset[:top_classes] = sorted
|
||||
resultset
|
||||
end
|
||||
|
||||
def for_job(klass)
|
||||
resultset = {}
|
||||
resultset[:date] = @time.to_date
|
||||
resultset[:period] = :hour
|
||||
resultset[:ends_at] = @time
|
||||
marks = @pool.with { |c| c.hgetall("#{@time.strftime("%Y%m%d")}-marks") }
|
||||
|
||||
time = @time
|
||||
initial = @pool.with do |conn|
|
||||
conn.pipelined do |pipe|
|
||||
resultset[:size] = 60
|
||||
60.times do |idx|
|
||||
key = "j|#{time.strftime("%Y%m%d|%-H:%-M")}"
|
||||
pipe.hmget key, "#{klass}|ms", "#{klass}|p", "#{klass}|f"
|
||||
time -= 60
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
time = @time
|
||||
hist = Histogram.new(klass)
|
||||
results = @pool.with do |conn|
|
||||
initial.map do |(ms, p, f)|
|
||||
tm = Time.utc(time.year, time.month, time.mday, time.hour, time.min, 0)
|
||||
{
|
||||
time: tm.rfc3339,
|
||||
epoch: tm.to_i,
|
||||
ms: ms.to_i, p: p.to_i, f: f.to_i, hist: hist.fetch(conn, time)
|
||||
}.tap { |x|
|
||||
x[:mark] = marks[x[:time]] if marks[x[:time]]
|
||||
time -= 60
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
resultset[:marks] = marks
|
||||
resultset[:starts_at] = time
|
||||
resultset[:data] = results
|
||||
resultset
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,94 @@
|
|||
require "concurrent"
|
||||
|
||||
module Sidekiq
|
||||
module Metrics
|
||||
# TODO Support apps without concurrent-ruby
|
||||
Counter = ::Concurrent::AtomicFixnum
|
||||
|
||||
# Implements space-efficient but statistically useful histogram storage.
|
||||
# A precise time histogram stores every time. Instead we break times into a set of
|
||||
# known buckets and increment counts of the associated time bucket. Even if we call
|
||||
# the histogram a million times, we'll still only store 26 buckets.
|
||||
# NB: needs to be thread-safe or resiliant to races.
|
||||
#
|
||||
# To store this data, we use Redis' BITFIELD command to store unsigned 16-bit counters
|
||||
# per bucket per klass per minute. It's unlikely that most people will be executing more
|
||||
# than 1000 job/sec for a full minute of a specific type.
|
||||
class Histogram
|
||||
include Enumerable
|
||||
|
||||
# This number represents the maximum milliseconds for this bucket.
|
||||
# 20 means all job executions up to 20ms, e.g. if a job takes
|
||||
# 280ms, it'll increment bucket[7]. Note we can track job executions
|
||||
# up to about 5.5 minutes. After that, it's assumed you're probably
|
||||
# not too concerned with its performance.
|
||||
BUCKET_INTERVALS = [
|
||||
20, 30, 45, 65, 100,
|
||||
150, 225, 335, 500, 750,
|
||||
1100, 1700, 2500, 3800, 5750,
|
||||
8500, 13000, 20000, 30000, 45000,
|
||||
65000, 100000, 150000, 225000, 335000,
|
||||
Float::INFINITY # the "maybe your job is too long" bucket
|
||||
]
|
||||
LABELS = [
|
||||
"20ms", "30ms", "45ms", "65ms", "100ms",
|
||||
"150ms", "225ms", "335ms", "500ms", "750ms",
|
||||
"1.1s", "1.7s", "2.5s", "3.8s", "5.75s",
|
||||
"8.5s", "13s", "20s", "30s", "45s",
|
||||
"65s", "100s", "150s", "225s", "335s",
|
||||
"Slow"
|
||||
]
|
||||
|
||||
FETCH = "GET u16 #0 GET u16 #1 GET u16 #2 GET u16 #3 \
|
||||
GET u16 #4 GET u16 #5 GET u16 #6 GET u16 #7 \
|
||||
GET u16 #8 GET u16 #9 GET u16 #10 GET u16 #11 \
|
||||
GET u16 #12 GET u16 #13 GET u16 #14 GET u16 #15 \
|
||||
GET u16 #16 GET u16 #17 GET u16 #18 GET u16 #19 \
|
||||
GET u16 #20 GET u16 #21 GET u16 #22 GET u16 #23 \
|
||||
GET u16 #24 GET u16 #25".split
|
||||
|
||||
def each
|
||||
buckets.each { |counter| yield counter.value }
|
||||
end
|
||||
|
||||
def label(idx)
|
||||
LABELS[idx]
|
||||
end
|
||||
|
||||
attr_reader :buckets
|
||||
def initialize(klass)
|
||||
@klass = klass
|
||||
@buckets = Array.new(BUCKET_INTERVALS.size) { Counter.new }
|
||||
end
|
||||
|
||||
def record_time(ms)
|
||||
index_to_use = BUCKET_INTERVALS.each_index do |idx|
|
||||
break idx if ms < BUCKET_INTERVALS[idx]
|
||||
end
|
||||
|
||||
@buckets[index_to_use].increment
|
||||
end
|
||||
|
||||
def fetch(conn, now = Time.now)
|
||||
window = now.utc.strftime("%d-%H:%-M")
|
||||
key = "#{@klass}-#{window}"
|
||||
conn.bitfield(key, *FETCH)
|
||||
end
|
||||
|
||||
def persist(conn, now = Time.now)
|
||||
buckets, @buckets = @buckets, []
|
||||
window = now.utc.strftime("%d-%H:%-M")
|
||||
key = "#{@klass}-#{window}"
|
||||
cmd = [key, "OVERFLOW", "SAT"]
|
||||
buckets.each_with_index do |counter, idx|
|
||||
val = counter.value
|
||||
cmd << "INCRBY" << "u16" << "##{idx}" << val.to_s if val > 0
|
||||
end
|
||||
|
||||
conn.bitfield(*cmd) if cmd.size > 3
|
||||
conn.expire(key, 86400)
|
||||
key
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,134 @@
|
|||
require "time"
|
||||
require "sidekiq"
|
||||
require "sidekiq/metrics/shared"
|
||||
|
||||
# This file contains the components which track execution metrics within Sidekiq.
|
||||
module Sidekiq
|
||||
module Metrics
|
||||
class ExecutionTracker
|
||||
include Sidekiq::Component
|
||||
|
||||
def initialize(config)
|
||||
@config = config
|
||||
@jobs = Hash.new(0)
|
||||
@totals = Hash.new(0)
|
||||
@grams = Hash.new { |hash, key| hash[key] = Histogram.new(key) }
|
||||
@lock = Mutex.new
|
||||
end
|
||||
|
||||
def track(queue, klass)
|
||||
start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond)
|
||||
time_ms = 0
|
||||
begin
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
finish = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond)
|
||||
time_ms = finish - start
|
||||
end
|
||||
# We don't track time for failed jobs as they can have very unpredictable
|
||||
# execution times. more important to know average time for successful jobs so we
|
||||
# can better recognize when a perf regression is introduced.
|
||||
@lock.synchronize {
|
||||
@grams[klass].record_time(time_ms)
|
||||
@jobs["#{klass}|ms"] += time_ms
|
||||
@totals["ms"] += time_ms
|
||||
}
|
||||
rescue Exception
|
||||
@lock.synchronize {
|
||||
@jobs["#{klass}|f"] += 1
|
||||
@totals["f"] += 1
|
||||
}
|
||||
raise
|
||||
ensure
|
||||
@lock.synchronize {
|
||||
@jobs["#{klass}|p"] += 1
|
||||
@totals["p"] += 1
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
LONG_TERM = 90 * 24 * 60 * 60
|
||||
MID_TERM = 7 * 24 * 60 * 60
|
||||
SHORT_TERM = 8 * 60 * 60
|
||||
|
||||
def flush(time = Time.now)
|
||||
totals, jobs, grams = reset
|
||||
procd = totals["p"]
|
||||
fails = totals["f"]
|
||||
return if procd == 0 && fails == 0
|
||||
|
||||
now = time.utc
|
||||
nowdate = now.strftime("%Y%m%d")
|
||||
nowhour = now.strftime("%Y%m%d|%-H")
|
||||
nowmin = now.strftime("%Y%m%d|%-H:%-M")
|
||||
count = 0
|
||||
|
||||
redis do |conn|
|
||||
if grams.size > 0
|
||||
conn.pipelined do |pipe|
|
||||
grams.each do |_, gram|
|
||||
gram.persist(pipe, now)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
[
|
||||
["j", jobs, nowdate, LONG_TERM],
|
||||
["j", jobs, nowhour, MID_TERM],
|
||||
["j", jobs, nowmin, SHORT_TERM]
|
||||
].each do |prefix, data, bucket, ttl|
|
||||
# Quietly seed the new 7.0 stats format so migration is painless.
|
||||
conn.pipelined do |xa|
|
||||
stats = "#{prefix}|#{bucket}"
|
||||
# logger.debug "Flushing metrics #{stats}"
|
||||
data.each_pair do |key, value|
|
||||
xa.hincrby stats, key, value
|
||||
count += 1
|
||||
end
|
||||
xa.expire(stats, ttl)
|
||||
end
|
||||
end
|
||||
logger.info "Flushed #{count} elements"
|
||||
count
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def reset
|
||||
@lock.synchronize {
|
||||
array = [@totals, @jobs, @grams]
|
||||
@totals = Hash.new(0)
|
||||
@jobs = Hash.new(0)
|
||||
@grams = Hash.new { |hash, key| hash[key] = Histogram.new(key) }
|
||||
array
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
class Middleware
|
||||
include Sidekiq::ServerMiddleware
|
||||
|
||||
def initialize(options)
|
||||
@exec = options
|
||||
end
|
||||
|
||||
def call(_instance, hash, queue, &block)
|
||||
@exec.track(queue, hash["wrapped"] || hash["class"], &block)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
if ENV["SIDEKIQ_METRICS_BETA"] == "1"
|
||||
Sidekiq.configure_server do |config|
|
||||
exec = Sidekiq::Metrics::ExecutionTracker.new(config)
|
||||
config.server_middleware do |chain|
|
||||
chain.add Sidekiq::Metrics::Middleware, exec
|
||||
end
|
||||
config.on(:beat) do
|
||||
exec.flush
|
||||
end
|
||||
end
|
||||
end
|
|
@ -33,6 +33,10 @@ module Sidekiq
|
|||
"Dead" => "morgue"
|
||||
}
|
||||
|
||||
if ENV["SIDEKIQ_METRICS_BETA"] == "1"
|
||||
DEFAULT_TABS["Metrics"] = "metrics"
|
||||
end
|
||||
|
||||
class << self
|
||||
def settings
|
||||
self
|
||||
|
|
|
@ -60,6 +60,19 @@ module Sidekiq
|
|||
erb(:dashboard)
|
||||
end
|
||||
|
||||
get "/metrics" do
|
||||
q = Sidekiq::Metrics::Query.new
|
||||
@resultset = q.top_jobs
|
||||
erb(:metrics)
|
||||
end
|
||||
|
||||
get "/metrics/:name" do
|
||||
@name = route_params[:name]
|
||||
q = Sidekiq::Metrics::Query.new
|
||||
@resultset = q.for_job(@name)
|
||||
erb(:metrics_for_job)
|
||||
end
|
||||
|
||||
get "/busy" do
|
||||
erb(:busy)
|
||||
end
|
||||
|
|
|
@ -4,3 +4,12 @@
|
|||
require_relative "config/application"
|
||||
|
||||
Rails.application.load_tasks
|
||||
|
||||
task :seed_jobs => :environment do
|
||||
# see classes defined in config/initializers/sidekiq.rb
|
||||
[FooJob, BarJob, StoreCardJob, OrderJunkJob, SpamUserJob, FastJob, SlowJob].each do |kls|
|
||||
(kls.name.size * 10).times do
|
||||
kls.perform_in(rand * 300)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -48,3 +48,73 @@ require "sidekiq/middleware/current_attributes"
|
|||
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
|
||||
|
||||
# Sidekiq.transactional_push!
|
||||
|
||||
# create a label based on the shorthash and subject line of the latest commit in git.
|
||||
# WARNING: you only want to run this ONCE! If this runs on boot for 20 different Sidekiq processes,
|
||||
# you will get 20 different deploy marks in Redis! Instead this should go into the script
|
||||
# that runs your deploy, e.g. your capistrano script.
|
||||
Sidekiq.configure_server do |config|
|
||||
label = `git log -1 --format="%h %s"`.strip
|
||||
require "sidekiq/metrics/deploy"
|
||||
Sidekiq::Metrics::Deploy.new.mark(label: label)
|
||||
end
|
||||
|
||||
|
||||
# helper jobs for seeding metrics data
|
||||
# you will need to restart if you change any of these
|
||||
class FooJob
|
||||
include Sidekiq::Job
|
||||
def perform(*)
|
||||
raise "boom" if rand < 0.1
|
||||
sleep(rand)
|
||||
end
|
||||
end
|
||||
|
||||
class BarJob
|
||||
include Sidekiq::Job
|
||||
def perform(*)
|
||||
raise "boom" if rand < 0.1
|
||||
sleep(rand)
|
||||
end
|
||||
end
|
||||
|
||||
class StoreCardJob
|
||||
include Sidekiq::Job
|
||||
def perform(*)
|
||||
raise "boom" if rand < 0.1
|
||||
sleep(rand)
|
||||
end
|
||||
end
|
||||
|
||||
class OrderJunkJob
|
||||
include Sidekiq::Job
|
||||
def perform(*)
|
||||
raise "boom" if rand < 0.1
|
||||
sleep(rand)
|
||||
end
|
||||
end
|
||||
|
||||
class SpamUserJob
|
||||
include Sidekiq::Job
|
||||
def perform(*)
|
||||
raise "boom" if rand < 0.1
|
||||
sleep(rand)
|
||||
end
|
||||
end
|
||||
|
||||
class FastJob
|
||||
include Sidekiq::Job
|
||||
def perform(*)
|
||||
raise "boom" if rand < 0.2
|
||||
sleep(rand * 0.1)
|
||||
end
|
||||
end
|
||||
|
||||
class SlowJob
|
||||
include Sidekiq::Job
|
||||
def perform(*)
|
||||
raise "boom" if rand < 0.3
|
||||
sleep(rand * 10)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require_relative "helper"
|
||||
require "sidekiq/component"
|
||||
require "sidekiq/metrics/tracking"
|
||||
require "sidekiq/metrics/query"
|
||||
require "sidekiq/metrics/deploy"
|
||||
require "sidekiq/api"
|
||||
|
||||
describe Sidekiq::Metrics do
|
||||
before do
|
||||
Sidekiq.redis { |c| c.flushdb }
|
||||
end
|
||||
|
||||
def fixed_time
|
||||
@whence ||= Time.utc(2022, 7, 22, 22, 3, 0)
|
||||
end
|
||||
|
||||
def create_known_metrics(time = fixed_time)
|
||||
smet = Sidekiq::Metrics::ExecutionTracker.new(Sidekiq)
|
||||
smet.track("critical", "App::SomeJob") { sleep 0.001 }
|
||||
smet.track("critical", "App::FooJob") { sleep 0.001 }
|
||||
assert_raises RuntimeError do
|
||||
smet.track("critical", "App::SomeJob") do
|
||||
raise "boom"
|
||||
end
|
||||
end
|
||||
smet.flush(time)
|
||||
smet.track("critical", "App::FooJob") { sleep 0.001 }
|
||||
smet.track("critical", "App::FooJob") { sleep 0.025 }
|
||||
smet.track("critical", "App::FooJob") { sleep 0.001 }
|
||||
smet.track("critical", "App::SomeJob") { sleep 0.001 }
|
||||
smet.flush(time - 60)
|
||||
end
|
||||
|
||||
it "tracks metrics" do
|
||||
count = create_known_metrics
|
||||
assert_equal 12, count
|
||||
end
|
||||
|
||||
describe "marx" do
|
||||
it "owns the means of production" do
|
||||
whence = Time.local(2022, 7, 17, 18, 43, 15)
|
||||
floor = whence.utc.rfc3339.sub(":15", ":00")
|
||||
|
||||
d = Sidekiq::Metrics::Deploy.new
|
||||
d.mark(at: whence, label: "cafed00d - some git summary line")
|
||||
|
||||
q = Sidekiq::Metrics::Query.new(now: whence)
|
||||
rs = q.for_job("FooJob")
|
||||
refute_nil rs[:marks]
|
||||
assert_equal 1, rs[:marks].size
|
||||
assert_equal "cafed00d - some git summary line", rs[:marks][floor], rs.inspect
|
||||
|
||||
d = Sidekiq::Metrics::Deploy.new
|
||||
rs = d.fetch(whence)
|
||||
refute_nil rs
|
||||
assert_equal 1, rs.size
|
||||
assert_equal "cafed00d - some git summary line", rs[floor]
|
||||
end
|
||||
end
|
||||
|
||||
describe "histograms" do
|
||||
it "buckets a bunch of times" do
|
||||
h = Sidekiq::Metrics::Histogram.new("App::FooJob")
|
||||
assert_equal 0, h.sum
|
||||
h.record_time(10)
|
||||
h.record_time(46)
|
||||
h.record_time(47)
|
||||
h.record_time(48)
|
||||
h.record_time(300)
|
||||
h.record_time(301)
|
||||
h.record_time(302)
|
||||
h.record_time(300000000)
|
||||
assert_equal 8, h.sum
|
||||
key = Sidekiq.redis do |conn|
|
||||
h.persist(conn, fixed_time)
|
||||
end
|
||||
assert_equal 0, h.sum
|
||||
refute_nil key
|
||||
assert_equal "App::FooJob-22-22:3", key
|
||||
|
||||
h = Sidekiq::Metrics::Histogram.new("App::FooJob")
|
||||
data = Sidekiq.redis { |c| h.fetch(c, fixed_time) }
|
||||
{0 => 1, 3 => 3, 7 => 3, 25 => 1}.each_pair do |idx, val|
|
||||
assert_equal val, data[idx]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "querying" do
|
||||
it "handles empty metrics" do
|
||||
q = Sidekiq::Metrics::Query.new(now: fixed_time)
|
||||
rs = q.top_jobs
|
||||
refute_nil rs
|
||||
assert_equal 8, rs.size
|
||||
|
||||
q = Sidekiq::Metrics::Query.new(now: fixed_time)
|
||||
rs = q.for_job("DoesntExist")
|
||||
refute_nil rs
|
||||
assert_equal 7, rs.size
|
||||
end
|
||||
|
||||
it "fetches top job data" do
|
||||
create_known_metrics
|
||||
q = Sidekiq::Metrics::Query.new(now: fixed_time)
|
||||
rs = q.top_jobs
|
||||
assert_equal Date.new(2022, 7, 22), rs[:date]
|
||||
assert_equal 2, rs[:job_classes].size
|
||||
assert_equal "App::SomeJob", rs[:job_classes].first
|
||||
bucket = rs[:totals]
|
||||
refute_nil bucket
|
||||
assert_equal bucket.keys.sort, ["App::FooJob|ms", "App::FooJob|p", "App::SomeJob|f", "App::SomeJob|ms", "App::SomeJob|p"]
|
||||
assert_equal 3, bucket["App::SomeJob|p"]
|
||||
assert_equal 4, bucket["App::FooJob|p"]
|
||||
assert_equal 1, bucket["App::SomeJob|f"]
|
||||
end
|
||||
|
||||
it "fetches job-specific data" do
|
||||
create_known_metrics
|
||||
d = Sidekiq::Metrics::Deploy.new
|
||||
d.mark(at: fixed_time - 300, label: "cafed00d - some git summary line")
|
||||
|
||||
q = Sidekiq::Metrics::Query.new(now: fixed_time)
|
||||
rs = q.for_job("App::FooJob")
|
||||
assert_equal Date.new(2022, 7, 22), rs[:date]
|
||||
assert_equal 60, rs[:data].size
|
||||
assert_equal ["2022-07-22T21:58:00Z", "cafed00d - some git summary line"], rs[:marks].first
|
||||
|
||||
data = rs[:data]
|
||||
assert_equal({time: "2022-07-22T22:03:00Z", p: 1, f: 0}, data[0].slice(:time, :p, :f))
|
||||
assert_equal({time: "2022-07-22T22:02:00Z", p: 3, f: 0}, data[1].slice(:time, :p, :f))
|
||||
assert_equal "cafed00d - some git summary line", data[5][:mark]
|
||||
|
||||
# from create_known_data
|
||||
hist = data[1][:hist]
|
||||
assert_equal 2, hist[0]
|
||||
assert_equal 1, hist[1]
|
||||
end
|
||||
end
|
||||
end
|
|
@ -290,60 +290,15 @@ describe Sidekiq::Processor do
|
|||
end
|
||||
end
|
||||
|
||||
describe "stats" do
|
||||
before do
|
||||
Sidekiq.redis { |c| c.flushdb }
|
||||
end
|
||||
|
||||
describe "when successful" do
|
||||
let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
|
||||
|
||||
def successful_job
|
||||
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
|
||||
@processor.process(work(msg))
|
||||
end
|
||||
|
||||
it "increments processed stat" do
|
||||
Sidekiq::Processor::PROCESSED.reset
|
||||
successful_job
|
||||
assert_equal 1, Sidekiq::Processor::PROCESSED.reset
|
||||
end
|
||||
end
|
||||
|
||||
describe "custom job logger class" do
|
||||
class CustomJobLogger < Sidekiq::JobLogger
|
||||
def call(item, queue)
|
||||
yield
|
||||
rescue Exception
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
||||
before do
|
||||
opts = {queues: ["default"], job_logger: CustomJobLogger}
|
||||
@processor = ::Sidekiq::Processor.new(opts) { |pr, ex| }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "stats" do
|
||||
before do
|
||||
Sidekiq.redis { |c| c.flushdb }
|
||||
end
|
||||
|
||||
def successful_job
|
||||
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
|
||||
@processor.process(work(msg))
|
||||
end
|
||||
|
||||
it "increments processed stat" do
|
||||
Sidekiq::Processor::PROCESSED.reset
|
||||
successful_job
|
||||
assert_equal 1, Sidekiq::Processor::PROCESSED.reset
|
||||
end
|
||||
end
|
||||
|
||||
describe "custom job logger class" do
|
||||
class CustomJobLogger < Sidekiq::JobLogger
|
||||
def call(item, queue)
|
||||
yield
|
||||
rescue Exception
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
||||
before do
|
||||
opts = Sidekiq
|
||||
opts[:job_logger] = CustomJobLogger
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -84,3 +84,6 @@ en: # <---- change this to your locale code
|
|||
Latency: Latency
|
||||
Pause: Pause
|
||||
Unpause: Unpause
|
||||
Metrics: Metrics
|
||||
NoDataFound: No data found
|
||||
ExecutionTime: Execution Time
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
<script type="text/javascript" src="<%= root_path %>javascripts/graph.js"></script>
|
||||
<script type="text/javascript" src="<%= root_path %>javascripts/dashboard.js"></script>
|
||||
<div class= "dashboard clearfix">
|
||||
<h3 >
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
|
||||
<h1><%= t('Metrics') %></h1>
|
||||
|
||||
<h3>Top Jobs by Processed Count</h3>
|
||||
<% top = @resultset[:top_classes] %>
|
||||
|
||||
<% topp = top["p"]&.first(10) %>
|
||||
<div class="table_container">
|
||||
<table class="table table-bordered table-striped table-hover">
|
||||
<tbody>
|
||||
<tr>
|
||||
<th><%= t('Name') %></th>
|
||||
<th><%= t('Processed') %></th>
|
||||
<th><%= t('ExecutionTime') %></th>
|
||||
</tr>
|
||||
<% if topp %>
|
||||
<% topp.each do |kls, val| %>
|
||||
<tr>
|
||||
<td><code><a href="<%= root_path %>metrics/<%= kls %>"><%= kls %></a></code></td>
|
||||
<td><%= val %></td>
|
||||
<td><%= top["ms"][kls] %></td>
|
||||
</tr>
|
||||
<% end %>
|
||||
<% else %>
|
||||
<tr><td colspan=3><%= t("NoDataFound") %></td></tr>
|
||||
<% end %>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<h3>Top Jobs by Execution Time</h3>
|
||||
|
||||
<% topms = top["ms"]&.first(10) %>
|
||||
<div class="table_container">
|
||||
<table class="table table-bordered table-striped table-hover">
|
||||
<tbody>
|
||||
<tr>
|
||||
<th><%= t('Name') %></th>
|
||||
<th><%= t('Processed') %></th>
|
||||
<th><%= t('ExecutionTime') %></th>
|
||||
</tr>
|
||||
<% if topms %>
|
||||
<% topms.each do |kls, val| %>
|
||||
<tr>
|
||||
<td><code><a href="<%= root_path %>metrics/<%= kls %>"><%= kls %></a></code></td>
|
||||
<td><%= top["p"][kls] %></td>
|
||||
<td><%= val %></td>
|
||||
</tr>
|
||||
<% end %>
|
||||
<% else %>
|
||||
<tr><td colspan=3><%= t("NoDataFound") %></td></tr>
|
||||
<% end %>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<p>
|
||||
Data from <%= @resultset[:starts_at] %> to <%= @resultset[:ends_at] %>
|
||||
</p>
|
|
@ -0,0 +1,82 @@
|
|||
|
||||
<h2><%= t('Metrics') %> / <%= h @name %></h2>
|
||||
|
||||
<div class="row chart">
|
||||
<div id="realtime" data-processed-label="<%= t('Processed') %>" data-failed-label="<%= t('Failed') %>"></div>
|
||||
</div>
|
||||
|
||||
<% data = @resultset[:data] %>
|
||||
<div class="table_container">
|
||||
<table class="table table-bordered table-striped table-hover">
|
||||
<tbody>
|
||||
<tr>
|
||||
<th><%= t('Time') %></th>
|
||||
<th><%= t('Processed') %></th>
|
||||
<th><%= t('ExecutionTime') %></th>
|
||||
<th><%= t('Failed') %></th>
|
||||
<th><%= t('Deploy') %></th>
|
||||
<th><%= t('Histogram') %></th>
|
||||
</tr>
|
||||
<% data.each do |hash| %>
|
||||
<tr>
|
||||
<td><%= hash[:time] %></td>
|
||||
<td><%= hash[:p] %></td>
|
||||
<td><%= hash[:ms] %></td>
|
||||
<td><%= hash[:f] %></td>
|
||||
<td><%= hash[:mark] %></td>
|
||||
<td><%= hash[:hist] %></td>
|
||||
</tr>
|
||||
<% end %>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<p>
|
||||
Data from <%= @resultset[:starts_at] %> to <%= @resultset[:ends_at] %>
|
||||
</p>
|
||||
|
||||
<% atad = data.reverse %>
|
||||
<script type="text/javascript" src="<%= root_path %>javascripts/graph.js"></script>
|
||||
<%# <script type="text/javascript" src="<%= root_path %>javascripts/metrics.js"></script> %>
|
||||
<script>
|
||||
var palette = new Rickshaw.Color.Palette();
|
||||
var data = [ {
|
||||
name: "Processed",
|
||||
color: palette.color(),
|
||||
data: [ <% atad.each do |hash| %>
|
||||
{ x: <%= hash[:epoch] %>, y: <%= hash[:p] %> },
|
||||
<% end %> ]
|
||||
}, {
|
||||
name: "Failed",
|
||||
color: palette.color(),
|
||||
data: [ <% atad.each do |hash| %>
|
||||
{ x: <%= hash[:epoch] %>, y: <%= hash[:f] %> },
|
||||
<% end %> ]
|
||||
}, {
|
||||
name: "Execution Time",
|
||||
color: palette.color(),
|
||||
data: [ <% atad.each do |hash| %>
|
||||
{ x: <%= hash[:epoch] %>, y: <%= hash[:ms] %> },
|
||||
<% end %> ]
|
||||
} ];
|
||||
|
||||
var timeInterval = 60000;
|
||||
var graphElement = document.getElementById("realtime");
|
||||
|
||||
var graph = new Rickshaw.Graph({
|
||||
element: graphElement,
|
||||
width: responsiveWidth(),
|
||||
renderer: 'line',
|
||||
interpolation: 'linear',
|
||||
series: data,
|
||||
});
|
||||
var x_axis = new Rickshaw.Graph.Axis.Time( { graph: graph } );
|
||||
|
||||
var y_axis = new Rickshaw.Graph.Axis.Y( {
|
||||
graph: graph,
|
||||
tickFormat: Rickshaw.Fixtures.Number.formatKMBT,
|
||||
ticksTreatment: 'glow'
|
||||
});
|
||||
|
||||
graph.render();
|
||||
</script>
|
Loading…
Reference in New Issue