1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

merge main

This commit is contained in:
Mike Perham 2022-08-03 10:28:05 -07:00
commit 1d1e166fc0
36 changed files with 2277 additions and 131 deletions

View file

@ -2,6 +2,17 @@
[Sidekiq Changes](https://github.com/mperham/sidekiq/blob/main/Changes.md) | [Sidekiq Pro Changes](https://github.com/mperham/sidekiq/blob/main/Pro-Changes.md) | [Sidekiq Enterprise Changes](https://github.com/mperham/sidekiq/blob/main/Ent-Changes.md)
HEAD
----------
- [Job Metrics](https://github.com/mperham/sidekiq/wiki/Metrics#contributing) **BETA**
- Add `Context` column on queue page which shows any CurrentAttributes [#5450]
- `sidekiq_retry_in` may now return `:discard` or `:kill` to dynamically stop job retries [#5406]
- Smarter sorting of processes in /busy Web UI [#5398]
- Fix broken hamburger menu in mobile UI [#5428]
- Require redis-rb 4.5.0. Note that Sidekiq will break if you use the
[`Redis.exists_returns_integer = false`](https://github.com/redis/redis-rb/blob/master/CHANGELOG.md#450) flag. [#5394]
6.5.1
----------

View file

@ -4,6 +4,11 @@
Please see [sidekiq.org](https://sidekiq.org/) for more details and how to buy.
5.5.2
---------
- Fix overly aggressive orphan check with large Sidekiq clusters [#5435]
5.5.1
---------

View file

@ -33,7 +33,10 @@ module Sidekiq
startup: [],
quiet: [],
shutdown: [],
heartbeat: []
# triggers when we fire the first heartbeat on startup OR repairing a network partition
heartbeat: [],
# triggers on EVERY heartbeat call, every 10 seconds
beat: []
},
dead_max_jobs: 10_000,
dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months
@ -83,6 +86,11 @@ module Sidekiq
logger.warn(ex.backtrace.join("\n")) unless ex.backtrace.nil?
end
# DEFAULT_ERROR_HANDLER is a constant that allows the default error handler to
# be referenced. It must be defined here, after the default_error_handler
# method is defined.
DEFAULT_ERROR_HANDLER = method(:default_error_handler)
@config = DEFAULTS.dup
def self.options
logger.warn "`config.options[:key] = value` is deprecated, use `config[:key] = value`: #{caller(1..2)}"

View file

@ -3,7 +3,10 @@
require "sidekiq"
require "zlib"
require "set"
require "base64"
require "sidekiq/metrics/deploy"
require "sidekiq/metrics/query"
module Sidekiq
# Retrieve runtime statistics from Redis regarding
@ -844,6 +847,7 @@ module Sidekiq
# :nodoc:
# @api private
def cleanup
return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) }
count = 0
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a.sort

View file

@ -425,3 +425,4 @@ module Sidekiq # :nodoc:
end
require "sidekiq/systemd"
require "sidekiq/metrics/tracking"

View file

@ -47,6 +47,7 @@ module Sidekiq
end
def fire_event(event, options = {})
oneshot = options.fetch(:oneshot, true)
reverse = options[:reverse]
reraise = options[:reraise]
@ -58,7 +59,7 @@ module Sidekiq
handle_exception(ex, {context: "Exception during Sidekiq lifecycle event.", event: event})
raise ex if reraise
end
arr.clear # once we've fired an event, we never fire it again
arr.clear if oneshot # once we've fired an event, we never fire it again
end
end
end

View file

@ -91,7 +91,7 @@ module Sidekiq
msg = Sidekiq.load_json(jobstr)
if msg["retry"]
attempt_retry(nil, msg, queue, e)
process_retry(nil, msg, queue, e)
else
Sidekiq.death_handlers.each do |handler|
handler.call(msg, e)
@ -128,7 +128,7 @@ module Sidekiq
end
raise e unless msg["retry"]
attempt_retry(jobinst, msg, queue, e)
process_retry(jobinst, msg, queue, e)
# We've handled this error associated with this job, don't
# need to handle it at the global level
raise Skip
@ -139,7 +139,7 @@ module Sidekiq
# Note that +jobinst+ can be nil here if an error is raised before we can
# instantiate the job instance. All access must be guarded and
# best effort.
def attempt_retry(jobinst, msg, queue, exception)
def process_retry(jobinst, msg, queue, exception)
max_retry_attempts = retry_attempts_from(msg["retry"], @max_retries)
msg["queue"] = (msg["retry_queue"] || queue)
@ -170,19 +170,50 @@ module Sidekiq
msg["error_backtrace"] = compress_backtrace(lines)
end
if count < max_retry_attempts
delay = delay_for(jobinst, count, exception)
# Logging here can break retries if the logging device raises ENOSPC #3979
# logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
redis do |conn|
conn.zadd("retry", retry_at.to_s, payload)
end
else
# Goodbye dear message, you (re)tried your best I'm sure.
retries_exhausted(jobinst, msg, exception)
# Goodbye dear message, you (re)tried your best I'm sure.
return retries_exhausted(jobinst, msg, exception) if count >= max_retry_attempts
strategy, delay = delay_for(jobinst, count, exception)
case strategy
when :discard
return # poof!
when :kill
return retries_exhausted(jobinst, msg, exception)
end
# Logging here can break retries if the logging device raises ENOSPC #3979
# logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
jitter = rand(10) * (count + 1)
retry_at = Time.now.to_f + delay + jitter
payload = Sidekiq.dump_json(msg)
redis do |conn|
conn.zadd("retry", retry_at.to_s, payload)
end
end
# returns (strategy, seconds)
def delay_for(jobinst, count, exception)
rv = begin
# sidekiq_retry_in can return two different things:
# 1. When to retry next, as an integer of seconds
# 2. A symbol which re-routes the job elsewhere, e.g. :discard, :kill, :default
jobinst&.sidekiq_retry_in_block&.call(count, exception)
rescue Exception => e
handle_exception(e, {context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{jobinst.class.name}, falling back to default"})
nil
end
delay = if Integer === rv && rv > 0
rv
elsif rv == :discard
return [:discard, nil] # do nothing, job goes poof
elsif rv == :kill
return [:kill, nil]
else
(count**4) + 15
end
[:default, delay]
end
def retries_exhausted(jobinst, msg, exception)
@ -216,22 +247,6 @@ module Sidekiq
end
end
def delay_for(jobinst, count, exception)
jitter = rand(10) * (count + 1)
if jobinst&.sidekiq_retry_in_block
custom_retry_in = retry_in(jobinst, count, exception).to_i
return custom_retry_in + jitter if custom_retry_in > 0
end
(count**4) + 15 + jitter
end
def retry_in(jobinst, count, exception)
jobinst.sidekiq_retry_in_block.call(count, exception)
rescue Exception => e
handle_exception(e, {context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{jobinst.class.name}, falling back to default"})
nil
end
def exception_caused_by_shutdown?(e, checked_causes = [])
return false unless e.cause

View file

@ -79,6 +79,8 @@ module Sidekiq
end
def clear_heartbeat
flush_stats
# Remove record from Redis since we are shutting down.
# Note we don't stop the heartbeat thread; if the process
# doesn't actually exit, it'll reappear in the Web UI.
@ -98,7 +100,7 @@ module Sidekiq
end
def self.flush_stats
def flush_stats
fails = Processor::FAILURE.reset
procd = Processor::PROCESSED.reset
return if fails + procd == 0
@ -122,7 +124,6 @@ module Sidekiq
Sidekiq.logger.warn("Unable to flush stats: #{ex}")
end
end
at_exit(&method(:flush_stats))
def
key = identity
@ -179,6 +180,7 @@ module Sidekiq
# first heartbeat or recovering from an outage and need to reestablish our heartbeat
fire_event(:heartbeat) unless exists
fire_event(:beat, oneshot: false)
return unless msg

View file

@ -0,0 +1,47 @@
require "sidekiq"
require "date"
# This file is designed to be required within the user's
# deployment script; it should need a bare minimum of dependencies.
#
# require "sidekiq/metrics/deploy"
# gitdesc = `git log -1 --format="%h %s"`.strip
# d = Sidekiq::Metrics::Deploy.new
# d.mark(label: gitdesc)
#
# Note that you cannot mark more than once per minute. This is a feature, not a bug.
module Sidekiq
module Metrics
class Deploy
MARK_TTL = 90 * 24 * 60 * 60 # 90 days
def initialize(pool = Sidekiq.redis_pool)
@pool = pool
end
def mark(at: Time.now, label: "")
# we need to round the timestamp so that we gracefully
# handle an excepted common error in marking deploys:
# having every process mark its deploy, leading
# to N marks for each deploy. Instead we round the time
# to the minute so that multple marks within that minute
# will all naturally rollup into one mark per minute.
whence = at.utc
floor = Time.utc(whence.year, whence.month, whence.mday, whence.hour, whence.min, 0)
datecode = floor.strftime("%Y%m%d")
key = "#{datecode}-marks"
@pool.with do |c|
c.pipelined do |pipe|
pipe.hsetnx(key, floor.rfc3339, label)
pipe.expire(key, MARK_TTL)
end
end
end
def fetch(date = Time.now.utc.to_date)
datecode = date.strftime("%Y%m%d")
@pool.with { |c| c.hgetall("#{datecode}-marks") }
end
end
end
end

View file

@ -0,0 +1,124 @@
require "sidekiq"
require "date"
require "set"
require "sidekiq/metrics/shared"
module Sidekiq
module Metrics
# Allows caller to query for Sidekiq execution metrics within Redis.
# Caller sets a set of attributes to act as filters. {#fetch} will call
# Redis and return a Hash of results.
#
# NB: all metrics and times/dates are UTC only. We specifically do not
# support timezones.
class Query
# :hour, :day, :month
attr_accessor :period
# a specific job class, e.g. "App::OrderJob"
attr_accessor :klass
# the date specific to the period
# for :day or :hour, something like Date.today or Date.new(2022, 7, 13)
# for :month, Date.new(2022, 7, 1)
attr_accessor :date
# for period = :hour, the specific hour, integer e.g. 1 or 18
# note that hours and minutes do not have a leading zero so minute-specific
# keys will look like "j|20220718|7:3" for data at 07:03.
attr_accessor :hour
def initialize(pool: Sidekiq.redis_pool, now: Time.now)
@time = now.utc
@pool = pool
@klass = nil
end
# Get metric data from the last hour and roll it up
# into top processed count and execution time based on class.
def top_jobs
resultset = {}
resultset[:date] = @time.to_date
resultset[:period] = :hour
resultset[:ends_at] = @time
time = @time
results = @pool.with do |conn|
conn.pipelined do |pipe|
resultset[:size] = 60
60.times do |idx|
key = "j|#{time.strftime("%Y%m%d")}|#{time.hour}:#{time.min}"
pipe.hgetall key
time -= 60
end
resultset[:starts_at] = time
end
end
t = Hash.new(0)
klsset = Set.new
# merge the per-minute data into a totals hash for the hour
results.each do |hash|
hash.each { |k, v| t[k] = t[k] + v.to_i }
klsset.merge(hash.keys.map { |k| k.split("|")[0] })
end
resultset[:job_classes] = klsset.delete_if { |item| item.size < 3 }
resultset[:totals] = t
top = t.each_with_object({}) do |(k, v), memo|
(kls, metric) = k.split("|")
memo[metric] ||= Hash.new(0)
memo[metric][kls] = v
end
sorted = {}
top.each_pair do |metric, hash|
sorted[metric] = hash.sort_by { |k, v| v }.reverse.to_h
end
resultset[:top_classes] = sorted
resultset
end
def for_job(klass)
resultset = {}
resultset[:date] = @time.to_date
resultset[:period] = :hour
resultset[:ends_at] = @time
marks = @pool.with { |c| c.hgetall("#{@time.strftime("%Y%m%d")}-marks") }
time = @time
initial = @pool.with do |conn|
conn.pipelined do |pipe|
resultset[:size] = 60
60.times do |idx|
key = "j|#{time.strftime("%Y%m%d|%-H:%-M")}"
pipe.hmget key, "#{klass}|ms", "#{klass}|p", "#{klass}|f"
time -= 60
end
end
end
time = @time
hist = Histogram.new(klass)
results = @pool.with do |conn|
initial.map do |(ms, p, f)|
tm = Time.utc(time.year, time.month, time.mday, time.hour, time.min, 0)
{
time: tm.iso8601,
epoch: tm.to_i,
ms: ms.to_i, p: p.to_i, f: f.to_i, hist: hist.fetch(conn, time)
}.tap { |x|
x[:mark] = marks[x[:time]] if marks[x[:time]]
time -= 60
}
end
end
resultset[:marks] = marks
resultset[:starts_at] = time
resultset[:data] = results
resultset
end
end
end
end

View file

@ -0,0 +1,94 @@
require "concurrent"
module Sidekiq
module Metrics
# TODO Support apps without concurrent-ruby
Counter = ::Concurrent::AtomicFixnum
# Implements space-efficient but statistically useful histogram storage.
# A precise time histogram stores every time. Instead we break times into a set of
# known buckets and increment counts of the associated time bucket. Even if we call
# the histogram a million times, we'll still only store 26 buckets.
# NB: needs to be thread-safe or resiliant to races.
#
# To store this data, we use Redis' BITFIELD command to store unsigned 16-bit counters
# per bucket per klass per minute. It's unlikely that most people will be executing more
# than 1000 job/sec for a full minute of a specific type.
class Histogram
include Enumerable
# This number represents the maximum milliseconds for this bucket.
# 20 means all job executions up to 20ms, e.g. if a job takes
# 280ms, it'll increment bucket[7]. Note we can track job executions
# up to about 5.5 minutes. After that, it's assumed you're probably
# not too concerned with its performance.
BUCKET_INTERVALS = [
20, 30, 45, 65, 100,
150, 225, 335, 500, 750,
1100, 1700, 2500, 3800, 5750,
8500, 13000, 20000, 30000, 45000,
65000, 100000, 150000, 225000, 335000,
Float::INFINITY # the "maybe your job is too long" bucket
]
LABELS = [
"20ms", "30ms", "45ms", "65ms", "100ms",
"150ms", "225ms", "335ms", "500ms", "750ms",
"1.1s", "1.7s", "2.5s", "3.8s", "5.75s",
"8.5s", "13s", "20s", "30s", "45s",
"65s", "100s", "150s", "225s", "335s",
"Slow"
]
FETCH = "GET u16 #0 GET u16 #1 GET u16 #2 GET u16 #3 \
GET u16 #4 GET u16 #5 GET u16 #6 GET u16 #7 \
GET u16 #8 GET u16 #9 GET u16 #10 GET u16 #11 \
GET u16 #12 GET u16 #13 GET u16 #14 GET u16 #15 \
GET u16 #16 GET u16 #17 GET u16 #18 GET u16 #19 \
GET u16 #20 GET u16 #21 GET u16 #22 GET u16 #23 \
GET u16 #24 GET u16 #25".split
def each
buckets.each { |counter| yield counter.value }
end
def label(idx)
LABELS[idx]
end
attr_reader :buckets
def initialize(klass)
@klass = klass
@buckets = Array.new(BUCKET_INTERVALS.size) { Counter.new }
end
def record_time(ms)
index_to_use = BUCKET_INTERVALS.each_index do |idx|
break idx if ms < BUCKET_INTERVALS[idx]
end
@buckets[index_to_use].increment
end
def fetch(conn, now = Time.now)
window = now.utc.strftime("%d-%H:%-M")
key = "#{@klass}-#{window}"
conn.bitfield(key, *FETCH)
end
def persist(conn, now = Time.now)
buckets, @buckets = @buckets, []
window = now.utc.strftime("%d-%H:%-M")
key = "#{@klass}-#{window}"
cmd = [key, "OVERFLOW", "SAT"]
buckets.each_with_index do |counter, idx|
val = counter.value
cmd << "INCRBY" << "u16" << "##{idx}" << val.to_s if val > 0
end
conn.bitfield(*cmd) if cmd.size > 3
conn.expire(key, 86400)
key
end
end
end
end

View file

@ -0,0 +1,134 @@
require "time"
require "sidekiq"
require "sidekiq/metrics/shared"
# This file contains the components which track execution metrics within Sidekiq.
module Sidekiq
module Metrics
class ExecutionTracker
include Sidekiq::Component
def initialize(config)
@config = config
@jobs = Hash.new(0)
@totals = Hash.new(0)
@grams = Hash.new { |hash, key| hash[key] = Histogram.new(key) }
@lock = Mutex.new
end
def track(queue, klass)
start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond)
time_ms = 0
begin
begin
yield
ensure
finish = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond)
time_ms = finish - start
end
# We don't track time for failed jobs as they can have very unpredictable
# execution times. more important to know average time for successful jobs so we
# can better recognize when a perf regression is introduced.
@lock.synchronize {
@grams[klass].record_time(time_ms)
@jobs["#{klass}|ms"] += time_ms
@totals["ms"] += time_ms
}
rescue Exception
@lock.synchronize {
@jobs["#{klass}|f"] += 1
@totals["f"] += 1
}
raise
ensure
@lock.synchronize {
@jobs["#{klass}|p"] += 1
@totals["p"] += 1
}
end
end
LONG_TERM = 90 * 24 * 60 * 60
MID_TERM = 7 * 24 * 60 * 60
SHORT_TERM = 8 * 60 * 60
def flush(time = Time.now)
totals, jobs, grams = reset
procd = totals["p"]
fails = totals["f"]
return if procd == 0 && fails == 0
now = time.utc
nowdate = now.strftime("%Y%m%d")
nowhour = now.strftime("%Y%m%d|%-H")
nowmin = now.strftime("%Y%m%d|%-H:%-M")
count = 0
redis do |conn|
if grams.size > 0
conn.pipelined do |pipe|
grams.each do |_, gram|
gram.persist(pipe, now)
end
end
end
[
["j", jobs, nowdate, LONG_TERM],
["j", jobs, nowhour, MID_TERM],
["j", jobs, nowmin, SHORT_TERM]
].each do |prefix, data, bucket, ttl|
# Quietly seed the new 7.0 stats format so migration is painless.
conn.pipelined do |xa|
stats = "#{prefix}|#{bucket}"
# logger.debug "Flushing metrics #{stats}"
data.each_pair do |key, value|
xa.hincrby stats, key, value
count += 1
end
xa.expire(stats, ttl)
end
end
logger.info "Flushed #{count} metrics"
count
end
end
private
def reset
@lock.synchronize {
array = [@totals, @jobs, @grams]
@totals = Hash.new(0)
@jobs = Hash.new(0)
@grams = Hash.new { |hash, key| hash[key] = Histogram.new(key) }
array
}
end
end
class Middleware
include Sidekiq::ServerMiddleware
def initialize(options)
@exec = options
end
def call(_instance, hash, queue, &block)
@exec.track(queue, hash["wrapped"] || hash["class"], &block)
end
end
end
end
if ENV["SIDEKIQ_METRICS_BETA"] == "1"
Sidekiq.configure_server do |config|
exec = Sidekiq::Metrics::ExecutionTracker.new(config)
config.server_middleware do |chain|
chain.add Sidekiq::Metrics::Middleware, exec
end
config.on(:beat) do
exec.flush
end
end
end

View file

@ -23,10 +23,12 @@ module Sidekiq
def call(_, job, _, _)
attrs = @klass.attributes
if job.has_key?("cattr")
job["cattr"].merge!(attrs)
else
job["cattr"] = attrs
if attrs.any?
if job.has_key?("cattr")
job["cattr"].merge!(attrs)
else
job["cattr"] = attrs
end
end
yield
end

View file

@ -174,7 +174,7 @@ module Sidekiq
# signals that we created a retry successfully. We can acknowlege the job.
ack = true
e = h.cause || h
handle_exception(e, {context: "Job raised exception", job: job_hash, jobstr: jobstr})
handle_exception(e, {context: "Job raised exception", job: job_hash})
raise e
rescue Exception => ex
# Unexpected error! This is very bad and indicates an exception that got past

View file

@ -1,5 +1,5 @@
# frozen_string_literal: true
module Sidekiq
VERSION = "6.5.1"
VERSION = "6.5.2"
end

View file

@ -33,6 +33,10 @@ module Sidekiq
"Dead" => "morgue"
}
if ENV["SIDEKIQ_METRICS_BETA"] == "1"
DEFAULT_TABS["Metrics"] = "metrics"
end
class << self
def settings
self

View file

@ -60,6 +60,19 @@ module Sidekiq
erb(:dashboard)
end
get "/metrics" do
q = Sidekiq::Metrics::Query.new
@resultset = q.top_jobs
erb(:metrics)
end
get "/metrics/:name" do
@name = route_params[:name]
q = Sidekiq::Metrics::Query.new
@resultset = q.for_job(@name)
erb(:metrics_for_job)
end
get "/busy" do
erb(:busy)
end

View file

@ -15,7 +15,7 @@ module Sidekiq
# so extensions can be localized
@strings[lang] ||= settings.locales.each_with_object({}) do |path, global|
find_locale_files(lang).each do |file|
strs = YAML.load(File.open(file))
strs = YAML.safe_load(File.open(file))
global.merge!(strs[lang])
end
end
@ -148,6 +148,29 @@ module Sidekiq
@processes ||= Sidekiq::ProcessSet.new
end
# Sorts processes by hostname following the natural sort order so that
# 'worker.1' < 'worker.2' < 'worker.10' < 'worker.20'
# '2.1.1.1' < '192.168.0.2' < '192.168.0.10'
def sorted_processes
@sorted_processes ||= begin
return processes unless processes.all? { |p| p["hostname"] }
split_characters = /[._-]/
padding = processes.flat_map { |p| p["hostname"].split(split_characters) }.map(&:size).max
processes.to_a.sort_by do |process|
process["hostname"].split(split_characters).map do |substring|
# Left-pad the substring with '0' if it starts with a number or 'a'
# otherwise, so that '25' < 192' < 'a' ('025' < '192' < 'aaa')
padding_char = substring[0].match?(/\d/) ? "0" : "a"
substring.rjust(padding, padding_char)
end
end
end
end
def stats
@stats ||= Sidekiq::Stats.new
end

View file

@ -4,3 +4,12 @@
require_relative "config/application"
Rails.application.load_tasks
task seed_jobs: :environment do
# see classes defined in config/initializers/sidekiq.rb
[FooJob, BarJob, StoreCardJob, OrderJunkJob, SpamUserJob, FastJob, SlowJob].each do |kls|
(kls.name.size * 10).times do
kls.perform_in(rand * 300)
end
end
end

View file

@ -43,3 +43,71 @@ require "sidekiq/middleware/current_attributes"
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
# Sidekiq.transactional_push!
# create a label based on the shorthash and subject line of the latest commit in git.
# WARNING: you only want to run this ONCE! If this runs on boot for 20 different Sidekiq processes,
# you will get 20 different deploy marks in Redis! Instead this should go into the script
# that runs your deploy, e.g. your capistrano script.
Sidekiq.configure_server do |config|
label = `git log -1 --format="%h %s"`.strip
require "sidekiq/metrics/deploy"
Sidekiq::Metrics::Deploy.new.mark(label: label)
end
# helper jobs for seeding metrics data
# you will need to restart if you change any of these
class FooJob
include Sidekiq::Job
def perform(*)
raise "boom" if rand < 0.1
sleep(rand)
end
end
class BarJob
include Sidekiq::Job
def perform(*)
raise "boom" if rand < 0.1
sleep(rand)
end
end
class StoreCardJob
include Sidekiq::Job
def perform(*)
raise "boom" if rand < 0.1
sleep(rand)
end
end
class OrderJunkJob
include Sidekiq::Job
def perform(*)
raise "boom" if rand < 0.1
sleep(rand)
end
end
class SpamUserJob
include Sidekiq::Job
def perform(*)
raise "boom" if rand < 0.1
sleep(rand)
end
end
class FastJob
include Sidekiq::Job
def perform(*)
raise "boom" if rand < 0.2
sleep(rand * 0.1)
end
end
class SlowJob
include Sidekiq::Job
def perform(*)
raise "boom" if rand < 0.3
sleep(rand * 10)
end
end

1267
myapp/metrics.html Normal file

File diff suppressed because it is too large Load diff

View file

@ -634,6 +634,7 @@ describe "API" do
Sidekiq.redis do |conn|
conn.sadd("processes", "bar:987")
conn.sadd("processes", "bar:986")
conn.del("process_cleanup")
end
ps = Sidekiq::ProcessSet.new

141
test/test_metrics.rb Normal file
View file

@ -0,0 +1,141 @@
# frozen_string_literal: true
require_relative "helper"
require "sidekiq/component"
require "sidekiq/metrics/tracking"
require "sidekiq/metrics/query"
require "sidekiq/metrics/deploy"
require "sidekiq/api"
describe Sidekiq::Metrics do
before do
Sidekiq.redis { |c| c.flushdb }
end
def fixed_time
@whence ||= Time.utc(2022, 7, 22, 22, 3, 0)
end
def create_known_metrics(time = fixed_time)
smet = Sidekiq::Metrics::ExecutionTracker.new(Sidekiq)
smet.track("critical", "App::SomeJob") { sleep 0.001 }
smet.track("critical", "App::FooJob") { sleep 0.001 }
assert_raises RuntimeError do
smet.track("critical", "App::SomeJob") do
raise "boom"
end
end
smet.flush(time)
smet.track("critical", "App::FooJob") { sleep 0.001 }
smet.track("critical", "App::FooJob") { sleep 0.025 }
smet.track("critical", "App::FooJob") { sleep 0.001 }
smet.track("critical", "App::SomeJob") { sleep 0.001 }
smet.flush(time - 60)
end
it "tracks metrics" do
count = create_known_metrics
assert_equal 12, count
end
describe "marx" do
it "owns the means of production" do
whence = Time.local(2022, 7, 17, 18, 43, 15)
floor = whence.utc.rfc3339.sub(":15", ":00")
d = Sidekiq::Metrics::Deploy.new
d.mark(at: whence, label: "cafed00d - some git summary line")
q = Sidekiq::Metrics::Query.new(now: whence)
rs = q.for_job("FooJob")
refute_nil rs[:marks]
assert_equal 1, rs[:marks].size
assert_equal "cafed00d - some git summary line", rs[:marks][floor], rs.inspect
d = Sidekiq::Metrics::Deploy.new
rs = d.fetch(whence)
refute_nil rs
assert_equal 1, rs.size
assert_equal "cafed00d - some git summary line", rs[floor]
end
end
describe "histograms" do
it "buckets a bunch of times" do
h = Sidekiq::Metrics::Histogram.new("App::FooJob")
assert_equal 0, h.sum
h.record_time(10)
h.record_time(46)
h.record_time(47)
h.record_time(48)
h.record_time(300)
h.record_time(301)
h.record_time(302)
h.record_time(300000000)
assert_equal 8, h.sum
key = Sidekiq.redis do |conn|
h.persist(conn, fixed_time)
end
assert_equal 0, h.sum
refute_nil key
assert_equal "App::FooJob-22-22:3", key
h = Sidekiq::Metrics::Histogram.new("App::FooJob")
data = Sidekiq.redis { |c| h.fetch(c, fixed_time) }
{0 => 1, 3 => 3, 7 => 3, 25 => 1}.each_pair do |idx, val|
assert_equal val, data[idx]
end
end
end
describe "querying" do
it "handles empty metrics" do
q = Sidekiq::Metrics::Query.new(now: fixed_time)
rs = q.top_jobs
refute_nil rs
assert_equal 8, rs.size
q = Sidekiq::Metrics::Query.new(now: fixed_time)
rs = q.for_job("DoesntExist")
refute_nil rs
assert_equal 7, rs.size
end
it "fetches top job data" do
create_known_metrics
q = Sidekiq::Metrics::Query.new(now: fixed_time)
rs = q.top_jobs
assert_equal Date.new(2022, 7, 22), rs[:date]
assert_equal 2, rs[:job_classes].size
assert_equal "App::SomeJob", rs[:job_classes].first
bucket = rs[:totals]
refute_nil bucket
assert_equal bucket.keys.sort, ["App::FooJob|ms", "App::FooJob|p", "App::SomeJob|f", "App::SomeJob|ms", "App::SomeJob|p"]
assert_equal 3, bucket["App::SomeJob|p"]
assert_equal 4, bucket["App::FooJob|p"]
assert_equal 1, bucket["App::SomeJob|f"]
end
it "fetches job-specific data" do
create_known_metrics
d = Sidekiq::Metrics::Deploy.new
d.mark(at: fixed_time - 300, label: "cafed00d - some git summary line")
q = Sidekiq::Metrics::Query.new(now: fixed_time)
rs = q.for_job("App::FooJob")
assert_equal Date.new(2022, 7, 22), rs[:date]
assert_equal 60, rs[:data].size
assert_equal ["2022-07-22T21:58:00Z", "cafed00d - some git summary line"], rs[:marks].first
data = rs[:data]
assert_equal({time: "2022-07-22T22:03:00Z", p: 1, f: 0}, data[0].slice(:time, :p, :f))
assert_equal({time: "2022-07-22T22:02:00Z", p: 3, f: 0}, data[1].slice(:time, :p, :f))
assert_equal "cafed00d - some git summary line", data[5][:mark]
# from create_known_data
hist = data[1][:hist]
assert_equal 2, hist[0]
assert_equal 1, hist[1]
end
end
end

View file

@ -106,7 +106,6 @@ describe Sidekiq::Processor do
end
assert_equal 1, errors.count
assert_instance_of TestProcessorException, errors.first[:exception]
assert_equal msg, errors.first[:context][:jobstr]
assert_equal job_hash["jid"], errors.first[:context][:job]["jid"]
end
@ -122,7 +121,6 @@ describe Sidekiq::Processor do
end
assert_equal 1, errors.count
assert_instance_of TestProcessorException, errors.first[:exception]
assert_equal msg, errors.first[:context][:jobstr]
assert_equal job_hash, errors.first[:context][:job]
end
@ -278,7 +276,7 @@ describe Sidekiq::Processor do
assert_equal "boom", msg["args"].first
}
@processor.instance_variable_get(:@retrier).stub(:attempt_retry, retry_stub) do
@processor.instance_variable_get(:@retrier).stub(:process_retry, retry_stub) do
msg = Sidekiq.dump_json(job_data)
begin
@processor.process(work(msg))
@ -292,60 +290,15 @@ describe Sidekiq::Processor do
end
end
describe "stats" do
before do
Sidekiq.redis { |c| c.flushdb }
end
describe "when successful" do
let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
def successful_job
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
@processor.process(work(msg))
end
it "increments processed stat" do
Sidekiq::Processor::PROCESSED.reset
successful_job
assert_equal 1, Sidekiq::Processor::PROCESSED.reset
end
end
describe "custom job logger class" do
class CustomJobLogger < Sidekiq::JobLogger
def call(item, queue)
yield
rescue Exception
raise
end
end
before do
opts = {queues: ["default"], job_logger: CustomJobLogger}
@processor = ::Sidekiq::Processor.new(opts) { |pr, ex| }
end
end
end
describe "stats" do
before do
Sidekiq.redis { |c| c.flushdb }
end
def successful_job
msg = Sidekiq.dump_json({"class" => MockWorker.to_s, "args" => ["myarg"]})
@processor.process(work(msg))
end
it "increments processed stat" do
Sidekiq::Processor::PROCESSED.reset
successful_job
assert_equal 1, Sidekiq::Processor::PROCESSED.reset
end
end
describe "custom job logger class" do
class CustomJobLogger < Sidekiq::JobLogger
def call(item, queue)
yield
rescue Exception
raise
end
end
before do
opts = Sidekiq
opts[:job_logger] = CustomJobLogger

View file

@ -269,6 +269,10 @@ describe Sidekiq::JobRetry do
sidekiq_retry_in do |count, exception|
case exception
when RuntimeError
:kill
when Interrupt
:discard
when SpecialError
nil
when ArgumentError
@ -288,33 +292,68 @@ describe Sidekiq::JobRetry do
end
it "retries with a default delay" do
refute_equal 4, handler.__send__(:delay_for, worker, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, worker, 2, StandardError.new)
assert_equal :default, strat
refute_equal 4, count
end
it "retries with a custom delay and exception 1" do
assert_includes 4..35, handler.__send__(:delay_for, CustomWorkerWithException, 2, ArgumentError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, ArgumentError.new)
assert_equal :default, strat
assert_includes 4..35, count
end
it "supports discard" do
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, Interrupt.new)
assert_equal :discard, strat
assert_nil count
end
it "supports kill" do
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, RuntimeError.new)
assert_equal :kill, strat
assert_nil count
end
it "retries with a custom delay and exception 2" do
assert_includes 4..35, handler.__send__(:delay_for, CustomWorkerWithException, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, StandardError.new)
assert_equal :default, strat
assert_includes 4..35, count
end
it "retries with a default delay and exception in case of configured with nil" do
refute_equal 8, handler.__send__(:delay_for, CustomWorkerWithException, 2, SpecialError.new)
refute_equal 4, handler.__send__(:delay_for, CustomWorkerWithException, 2, SpecialError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithException, 2, SpecialError.new)
assert_equal :default, strat
refute_equal 8, count
refute_equal 4, count
end
it "retries with a custom delay without exception" do
assert_includes 4..35, handler.__send__(:delay_for, CustomWorkerWithoutException, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, CustomWorkerWithoutException, 2, StandardError.new)
assert_equal :default, strat
assert_includes 4..35, count
end
it "falls back to the default retry on exception" do
output = capture_logging do
refute_equal 4, handler.__send__(:delay_for, ErrorWorker, 2, StandardError.new)
strat, count = handler.__send__(:delay_for, ErrorWorker, 2, StandardError.new)
assert_equal :default, strat
refute_equal 4, count
end
assert_match(/Failure scheduling retry using the defined `sidekiq_retry_in`/,
output, "Log entry missing for sidekiq_retry_in")
end
it "kills when configured on special exceptions" do
ds = Sidekiq::DeadSet.new
assert_equal 0, ds.size
assert_raises Sidekiq::JobRetry::Skip do
handler.local(CustomWorkerWithException, jobstr({"class" => "CustomWorkerWithException"}), "default") do
raise "oops"
end
end
assert_equal 1, ds.size
end
end
describe "handles errors withouth cause" do

View file

@ -4,6 +4,10 @@ require_relative "helper"
require "sidekiq/web"
describe "Web helpers" do
before do
Sidekiq.redis { |c| c.flushdb }
end
class Helpers
include Sidekiq::WebHelpers
@ -100,14 +104,14 @@ describe "Web helpers" do
end
it "tests displaying of illegal args" do
o = Helpers.new
s = o.display_args([1, 2, 3])
obj = Helpers.new
s = obj.display_args([1, 2, 3])
assert_equal "1, 2, 3", s
s = o.display_args(["<html>", 12])
s = obj.display_args(["<html>", 12])
assert_equal "&quot;&lt;html&gt;&quot;, 12", s
s = o.display_args("<html>")
s = obj.display_args("<html>")
assert_equal "Invalid job payload, args must be an Array, not String", s
s = o.display_args(nil)
s = obj.display_args(nil)
assert_equal "Invalid job payload, args is nil", s
end
@ -142,4 +146,21 @@ describe "Web helpers" do
assert_equal "9.5 GB", obj.format_memory(10_000_001)
end
end
it "sorts processes using the natural sort order" do
["a.10.2", "a.2", "busybee-10_1", "a.23", "a.10.1", "a.1", "192.168.0.10", "192.168.0.2", "2.1.1.1", "busybee-2_34"].each do |hostname|
pdata = {"hostname" => hostname, "pid" => "123", "started_at" => Time.now.to_i}
key = "#{hostname}:123"
Sidekiq.redis do |conn|
conn.sadd("processes", key)
conn.hmset(key, "info", Sidekiq.dump_json(pdata), "busy", 0, "beat", Time.now.to_f)
end
end
obj = Helpers.new
assert obj.sorted_processes.all? { |process| assert_instance_of Sidekiq::Process, process }
assert_equal ["2.1.1.1", "192.168.0.2", "192.168.0.10", "a.1", "a.2", "a.10.1", "a.10.2", "a.23", "busybee-2_34", "busybee-10_1"], obj.sorted_processes.map { |process| process["hostname"] }
end
end

View file

@ -63,7 +63,7 @@ function addPollingListeners(_event) {
function addDataToggleListeners(event) {
var source = event.target || event.srcElement;
var targName = source.getAttribute("data-toggle");
var full = document.getElementById(targName + "_full");
var full = document.getElementById(targName);
if (full.style.display == "block") {
full.style.display = 'none';
} else {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -84,3 +84,7 @@ en:
Worker: Worker
active: active
idle: idle
Metrics: Metrics
NoDataFound: No data found
ExecutionTime: Execution Time
Context: Context

View file

@ -1,7 +1,7 @@
<div class="navbar navbar-default navbar-fixed-top">
<div class="container-fluid">
<div class="navbar-header" data-navbar="static">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar-menu">
<button type="button" class="navbar-toggle collapsed" data-toggle="navbar-menu" data-target="#navbar-menu">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>

View file

@ -54,7 +54,7 @@
<th>&nbsp;</th>
</thead>
<% lead = processes.leader %>
<% processes.each do |process| %>
<% sorted_processes.each do |process| %>
<tr>
<td class="box">
<%= "#{process['hostname']}:#{process['pid']}" %>

View file

@ -1,3 +1,4 @@
<script type="text/javascript" src="<%= root_path %>javascripts/graph.js"></script>
<script type="text/javascript" src="<%= root_path %>javascripts/dashboard.js"></script>
<div class= "dashboard clearfix">
<h3 >

59
web/views/metrics.erb Normal file
View file

@ -0,0 +1,59 @@
<h1><%= t('Metrics') %></h1>
<h3>Top Jobs by Processed Count</h3>
<% top = @resultset[:top_classes] %>
<% topp = top["p"]&.first(10) %>
<div class="table_container">
<table class="table table-bordered table-striped table-hover">
<tbody>
<tr>
<th><%= t('Name') %></th>
<th><%= t('Processed') %></th>
<th><%= t('ExecutionTime') %></th>
</tr>
<% if topp %>
<% topp.each do |kls, val| %>
<tr>
<td><code><a href="<%= root_path %>metrics/<%= kls %>"><%= kls %></a></code></td>
<td><%= val %></td>
<td><%= top.dig("ms", kls) %></td>
</tr>
<% end %>
<% else %>
<tr><td colspan=3><%= t("NoDataFound") %></td></tr>
<% end %>
</tbody>
</table>
</div>
<h3>Top Jobs by Execution Time</h3>
<% topms = top["ms"]&.first(10) %>
<div class="table_container">
<table class="table table-bordered table-striped table-hover">
<tbody>
<tr>
<th><%= t('Name') %></th>
<th><%= t('Processed') %></th>
<th><%= t('ExecutionTime') %></th>
</tr>
<% if topms %>
<% topms.each do |kls, val| %>
<tr>
<td><code><a href="<%= root_path %>metrics/<%= kls %>"><%= kls %></a></code></td>
<td><%= top.dig("p", kls) %></td>
<td><%= val %></td>
</tr>
<% end %>
<% else %>
<tr><td colspan=3><%= t("NoDataFound") %></td></tr>
<% end %>
</tbody>
</table>
</div>
<p>
Data from <%= @resultset[:starts_at] %> to <%= @resultset[:ends_at] %>
</p>

View file

@ -0,0 +1,92 @@
<h2><%= t('Metrics') %> / <%= h @name %></h2>
<div class="row chart">
<div id="realtime" data-processed-label="<%= t('Processed') %>" data-failed-label="<%= t('Failed') %>"></div>
</div>
<% data = @resultset[:data] %>
<div class="table_container">
<table class="table table-bordered table-striped table-hover">
<tbody>
<tr>
<th><%= t('Time') %></th>
<th><%= t('Processed') %></th>
<th><%= t('ExecutionTime') %></th>
<th><%= t('Failed') %></th>
<th><%= t('Deploy') %></th>
<th><%= t('Histogram') %></th>
</tr>
<% data.each do |hash| %>
<tr><td><%= hash[:time] %></td><td><%= hash[:p] %></td><td><%= hash[:ms] %></td><td><%= hash[:f] %></td><td><%= hash[:mark] %></td><td><%= hash[:hist] %></td></tr>
<% end %>
</tbody>
</table>
</div>
<p>
Data from <%= @resultset[:starts_at] %> to <%= @resultset[:ends_at] %>
</p>
<% atad = data.reverse %>
<script type="text/javascript" src="<%= root_path %>javascripts/graph.js"></script>
<script>
var palette = new Rickshaw.Color.Palette();
var data = [ {
name: "Processed",
color: palette.color(),
data: [ <% atad.each do |hash| %>
{ x: <%= hash[:epoch] %>, y: <%= hash[:p] %> },
<% end %> ]
}, {
name: "Failed",
color: palette.color(),
data: [ <% atad.each do |hash| %>
{ x: <%= hash[:epoch] %>, y: <%= hash[:f] %> },
<% end %>
]
}, {
name: "Execution Time",
color: palette.color(),
data: [ <% atad.each do |hash| %>
{ x: <%= hash[:epoch] %>, y: <%= hash[:ms] %> },
<% end %>
]
} ];
// TODO What to do with this? Minutely hover detail with a histogram bar chart?
var histogramData = [ <% atad.each do |hash| %>
{ x: <%= hash[:epoch] %>, hist: <%= hash[:hist] %> },
<% end %> ]
var histogramLabels = <%= Sidekiq::Metrics::Histogram::LABELS.inspect %>;
var timeInterval = 60000;
var graphElement = document.getElementById("realtime");
var graph = new Rickshaw.Graph({
element: graphElement,
width: responsiveWidth(),
renderer: 'line',
interpolation: 'linear',
series: data,
});
var x_axis = new Rickshaw.Graph.Axis.Time( { graph: graph } );
var y_axis = new Rickshaw.Graph.Axis.Y( {
graph: graph,
tickFormat: Rickshaw.Fixtures.Number.formatKMBT,
ticksTreatment: 'glow'
});
graph.render();
var hoverDetail = new Rickshaw.Graph.HoverDetail( {
graph: graph,
// formatter: function(series, x, y) {
// var date = '<span class="date">' + new Date(x * 1000).toUTCString() + '</span>';
// var swatch = '<span class="detail_swatch" style="background-color: ' + series.color + '"></span>';
// var content = swatch + series.name + ": " + parseInt(y) + '<br>' + date;
// return content;
// }
} );
</script>

View file

@ -18,6 +18,7 @@
<th><a href="<%= url %>?direction=<%= params[:direction] == 'asc' ? 'desc' : 'asc' %>"># <%= sort_direction_label %></a></th>
<th><%= t('Job') %></th>
<th><%= t('Arguments') %></th>
<th><%= t('Context') %></th>
<th></th>
</thead>
<% @jobs.each_with_index do |job, index| %>
@ -35,12 +36,15 @@
<% a = job.display_args %>
<% if a.inspect.size > 100 %>
<span id="job_<%= index %>"><%= h(a.inspect[0..100]) + "... " %></span>
<button data-toggle="job_<%= index %>" class="btn btn-default btn-xs"><%= t('ShowAll') %></button>
<button data-toggle="job_<%= index %>_full" class="btn btn-default btn-xs"><%= t('ShowAll') %></button>
<div class="toggle" id="job_<%= index %>_full"><%= display_args(a) %></div>
<% else %>
<%= display_args(job.display_args) %>
<% end %>
</td>
<td>
<%= h(job["cattr"].inspect) if job["cattr"]&.any? %>
</td>
<td>
<form action="<%= root_path %>queues/<%= CGI.escape(@name) %>/delete" method="post">
<%= csrf_tag %>