mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
1127 lines
30 KiB
Ruby
1127 lines
30 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "sidekiq"
|
|
|
|
require "zlib"
|
|
require "base64"
|
|
|
|
# @api public
|
|
module Sidekiq
|
|
# @api private
|
|
class Stats
|
|
def initialize
|
|
fetch_stats_fast!
|
|
end
|
|
|
|
def processed
|
|
stat :processed
|
|
end
|
|
|
|
def failed
|
|
stat :failed
|
|
end
|
|
|
|
def scheduled_size
|
|
stat :scheduled_size
|
|
end
|
|
|
|
def retry_size
|
|
stat :retry_size
|
|
end
|
|
|
|
def dead_size
|
|
stat :dead_size
|
|
end
|
|
|
|
def enqueued
|
|
stat :enqueued
|
|
end
|
|
|
|
def processes_size
|
|
stat :processes_size
|
|
end
|
|
|
|
def workers_size
|
|
stat :workers_size
|
|
end
|
|
|
|
def default_queue_latency
|
|
stat :default_queue_latency
|
|
end
|
|
|
|
def queues
|
|
Sidekiq::Stats::Queues.new.lengths
|
|
end
|
|
|
|
# O(1) redis calls
|
|
def fetch_stats_fast!
|
|
pipe1_res = Sidekiq.redis { |conn|
|
|
conn.pipelined do |pipeline|
|
|
pipeline.get("stat:processed")
|
|
pipeline.get("stat:failed")
|
|
pipeline.zcard("schedule")
|
|
pipeline.zcard("retry")
|
|
pipeline.zcard("dead")
|
|
pipeline.scard("processes")
|
|
pipeline.lrange("queue:default", -1, -1)
|
|
end
|
|
}
|
|
|
|
default_queue_latency = if (entry = pipe1_res[6].first)
|
|
job = begin
|
|
Sidekiq.load_json(entry)
|
|
rescue
|
|
{}
|
|
end
|
|
now = Time.now.to_f
|
|
thence = job["enqueued_at"] || now
|
|
now - thence
|
|
else
|
|
0
|
|
end
|
|
|
|
@stats = {
|
|
processed: pipe1_res[0].to_i,
|
|
failed: pipe1_res[1].to_i,
|
|
scheduled_size: pipe1_res[2],
|
|
retry_size: pipe1_res[3],
|
|
dead_size: pipe1_res[4],
|
|
processes_size: pipe1_res[5],
|
|
|
|
default_queue_latency: default_queue_latency
|
|
}
|
|
end
|
|
|
|
# O(number of processes + number of queues) redis calls
|
|
def fetch_stats_slow!
|
|
processes = Sidekiq.redis { |conn|
|
|
conn.sscan_each("processes").to_a
|
|
}
|
|
|
|
queues = Sidekiq.redis { |conn|
|
|
conn.sscan_each("queues").to_a
|
|
}
|
|
|
|
pipe2_res = Sidekiq.redis { |conn|
|
|
conn.pipelined do |pipeline|
|
|
processes.each { |key| pipeline.hget(key, "busy") }
|
|
queues.each { |queue| pipeline.llen("queue:#{queue}") }
|
|
end
|
|
}
|
|
|
|
s = processes.size
|
|
workers_size = pipe2_res[0...s].sum(&:to_i)
|
|
enqueued = pipe2_res[s..-1].sum(&:to_i)
|
|
|
|
@stats[:workers_size] = workers_size
|
|
@stats[:enqueued] = enqueued
|
|
@stats
|
|
end
|
|
|
|
def fetch_stats!
|
|
fetch_stats_fast!
|
|
fetch_stats_slow!
|
|
end
|
|
|
|
def reset(*stats)
|
|
all = %w[failed processed]
|
|
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
|
|
|
|
mset_args = []
|
|
stats.each do |stat|
|
|
mset_args << "stat:#{stat}"
|
|
mset_args << 0
|
|
end
|
|
Sidekiq.redis do |conn|
|
|
conn.mset(*mset_args)
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def stat(s)
|
|
fetch_stats_slow! if @stats[s].nil?
|
|
@stats[s] || raise(ArgumentError, "Unknown stat #{s}")
|
|
end
|
|
|
|
class Queues
|
|
def lengths
|
|
Sidekiq.redis do |conn|
|
|
queues = conn.sscan_each("queues").to_a
|
|
|
|
lengths = conn.pipelined { |pipeline|
|
|
queues.each do |queue|
|
|
pipeline.llen("queue:#{queue}")
|
|
end
|
|
}
|
|
|
|
array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size }
|
|
array_of_arrays.to_h
|
|
end
|
|
end
|
|
end
|
|
|
|
class History
|
|
def initialize(days_previous, start_date = nil)
|
|
# we only store five years of data in Redis
|
|
raise ArgumentError if days_previous < 1 || days_previous > (5 * 365)
|
|
@days_previous = days_previous
|
|
@start_date = start_date || Time.now.utc.to_date
|
|
end
|
|
|
|
def processed
|
|
@processed ||= date_stat_hash("processed")
|
|
end
|
|
|
|
def failed
|
|
@failed ||= date_stat_hash("failed")
|
|
end
|
|
|
|
private
|
|
|
|
def date_stat_hash(stat)
|
|
stat_hash = {}
|
|
dates = @start_date.downto(@start_date - @days_previous + 1).map { |date|
|
|
date.strftime("%Y-%m-%d")
|
|
}
|
|
|
|
keys = dates.map { |datestr| "stat:#{stat}:#{datestr}" }
|
|
|
|
begin
|
|
Sidekiq.redis do |conn|
|
|
conn.mget(keys).each_with_index do |value, idx|
|
|
stat_hash[dates[idx]] = value ? value.to_i : 0
|
|
end
|
|
end
|
|
rescue RedisConnection.adapter::CommandError
|
|
# mget will trigger a CROSSSLOT error when run against a Cluster
|
|
# TODO Someone want to add Cluster support?
|
|
end
|
|
|
|
stat_hash
|
|
end
|
|
end
|
|
end
|
|
|
|
##
|
|
# Represents a queue within Sidekiq.
|
|
# Allows enumeration of all jobs within the queue
|
|
# and deletion of jobs. NB: this queue data is real-time
|
|
# and is changing within Redis moment by moment.
|
|
#
|
|
# queue = Sidekiq::Queue.new("mailer")
|
|
# queue.each do |job|
|
|
# job.klass # => 'MyWorker'
|
|
# job.args # => [1, 2, 3]
|
|
# job.delete if job.jid == 'abcdef1234567890'
|
|
# end
|
|
class Queue
|
|
include Enumerable
|
|
|
|
##
|
|
# Fetch all known queues within Redis.
|
|
#
|
|
# @return [Array<Sidekiq::Queue>]
|
|
def self.all
|
|
Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
|
|
end
|
|
|
|
attr_reader :name
|
|
|
|
# @param name [String] the name of the queue
|
|
def initialize(name = "default")
|
|
@name = name.to_s
|
|
@rname = "queue:#{name}"
|
|
end
|
|
|
|
# The current size of the queue within Redis.
|
|
# This value is real-time and can change between calls.
|
|
#
|
|
# @return [Integer] the size
|
|
def size
|
|
Sidekiq.redis { |con| con.llen(@rname) }
|
|
end
|
|
|
|
# @return [Boolean] if the queue is currently paused
|
|
def paused?
|
|
false
|
|
end
|
|
|
|
##
|
|
# Calculates this queue's latency, the difference in seconds since the oldest
|
|
# job in the queue was enqueued.
|
|
#
|
|
# @return [Float] in seconds
|
|
def latency
|
|
entry = Sidekiq.redis { |conn|
|
|
conn.lrange(@rname, -1, -1)
|
|
}.first
|
|
return 0 unless entry
|
|
job = Sidekiq.load_json(entry)
|
|
now = Time.now.to_f
|
|
thence = job["enqueued_at"] || now
|
|
now - thence
|
|
end
|
|
|
|
def each
|
|
initial_size = size
|
|
deleted_size = 0
|
|
page = 0
|
|
page_size = 50
|
|
|
|
loop do
|
|
range_start = page * page_size - deleted_size
|
|
range_end = range_start + page_size - 1
|
|
entries = Sidekiq.redis { |conn|
|
|
conn.lrange @rname, range_start, range_end
|
|
}
|
|
break if entries.empty?
|
|
page += 1
|
|
entries.each do |entry|
|
|
yield JobRecord.new(entry, @name)
|
|
end
|
|
deleted_size = initial_size - size
|
|
end
|
|
end
|
|
|
|
##
|
|
# Find the job with the given JID within this queue.
|
|
#
|
|
# This is a *slow, inefficient* operation. Do not use under
|
|
# normal conditions.
|
|
#
|
|
# @param jid [String] the job_id to look for
|
|
# @return [Sidekiq::JobRecord]
|
|
# @return [nil] if not found
|
|
def find_job(jid)
|
|
detect { |j| j.jid == jid }
|
|
end
|
|
|
|
# delete all jobs within this queue
|
|
# @return [Boolean] true
|
|
def clear
|
|
Sidekiq.redis do |conn|
|
|
conn.multi do |transaction|
|
|
transaction.unlink(@rname)
|
|
transaction.srem("queues", name)
|
|
end
|
|
end
|
|
true
|
|
end
|
|
alias_method :💣, :clear
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def as_json(options = nil)
|
|
{name: name} # 5336
|
|
end
|
|
end
|
|
|
|
##
|
|
# Represents a pending job within a Sidekiq queue.
|
|
#
|
|
# The job should be considered immutable but may be
|
|
# removed from the queue via JobRecord#delete.
|
|
class JobRecord
|
|
|
|
# the parsed Hash of job data
|
|
# @!attribute [r] Item
|
|
attr_reader :item
|
|
# the underlying String in Redis
|
|
# @!attribute [r] Value
|
|
attr_reader :value
|
|
# the queue associated with this job
|
|
# @!attribute [r] Queue
|
|
attr_reader :queue
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def initialize(item, queue_name = nil)
|
|
@args = nil
|
|
@value = item
|
|
@item = item.is_a?(Hash) ? item : parse(item)
|
|
@queue = queue_name || @item["queue"]
|
|
end
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def parse(item)
|
|
Sidekiq.load_json(item)
|
|
rescue JSON::ParserError
|
|
# If the job payload in Redis is invalid JSON, we'll load
|
|
# the item as an empty hash and store the invalid JSON as
|
|
# the job 'args' for display in the Web UI.
|
|
@invalid = true
|
|
@args = [item]
|
|
{}
|
|
end
|
|
|
|
# This is the job class which Sidekiq will execute. If using ActiveJob,
|
|
# this class will be the ActiveJob adapter class rather than a specific job.
|
|
def klass
|
|
self["class"]
|
|
end
|
|
|
|
def display_class
|
|
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
|
|
@klass ||= self["display_class"] || begin
|
|
case klass
|
|
when /\ASidekiq::Extensions::Delayed/
|
|
safe_load(args[0], klass) do |target, method, _|
|
|
"#{target}.#{method}"
|
|
end
|
|
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
|
|
job_class = @item["wrapped"] || args[0]
|
|
if job_class == "ActionMailer::DeliveryJob" || job_class == "ActionMailer::MailDeliveryJob"
|
|
# MailerClass#mailer_method
|
|
args[0]["arguments"][0..1].join("#")
|
|
else
|
|
job_class
|
|
end
|
|
else
|
|
klass
|
|
end
|
|
end
|
|
end
|
|
|
|
def display_args
|
|
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
|
|
@display_args ||= case klass
|
|
when /\ASidekiq::Extensions::Delayed/
|
|
safe_load(args[0], args) do |_, _, arg, kwarg|
|
|
if !kwarg || kwarg.empty?
|
|
arg
|
|
else
|
|
[arg, kwarg]
|
|
end
|
|
end
|
|
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
|
|
job_args = self["wrapped"] ? args[0]["arguments"] : []
|
|
if (self["wrapped"] || args[0]) == "ActionMailer::DeliveryJob"
|
|
# remove MailerClass, mailer_method and 'deliver_now'
|
|
job_args.drop(3)
|
|
elsif (self["wrapped"] || args[0]) == "ActionMailer::MailDeliveryJob"
|
|
# remove MailerClass, mailer_method and 'deliver_now'
|
|
job_args.drop(3).first["args"]
|
|
else
|
|
job_args
|
|
end
|
|
else
|
|
if self["encrypt"]
|
|
# no point in showing 150+ bytes of random garbage
|
|
args[-1] = "[encrypted data]"
|
|
end
|
|
args
|
|
end
|
|
end
|
|
|
|
def args
|
|
@args || @item["args"]
|
|
end
|
|
|
|
def jid
|
|
self["jid"]
|
|
end
|
|
|
|
def enqueued_at
|
|
self["enqueued_at"] ? Time.at(self["enqueued_at"]).utc : nil
|
|
end
|
|
|
|
def created_at
|
|
Time.at(self["created_at"] || self["enqueued_at"] || 0).utc
|
|
end
|
|
|
|
def tags
|
|
self["tags"] || []
|
|
end
|
|
|
|
def error_backtrace
|
|
# Cache nil values
|
|
if defined?(@error_backtrace)
|
|
@error_backtrace
|
|
else
|
|
value = self["error_backtrace"]
|
|
@error_backtrace = value && uncompress_backtrace(value)
|
|
end
|
|
end
|
|
|
|
def latency
|
|
now = Time.now.to_f
|
|
now - (@item["enqueued_at"] || @item["created_at"] || now)
|
|
end
|
|
|
|
# Remove this job from the queue
|
|
def delete
|
|
count = Sidekiq.redis { |conn|
|
|
conn.lrem("queue:#{@queue}", 1, @value)
|
|
}
|
|
count != 0
|
|
end
|
|
|
|
# Access arbitrary attributes within the job hash
|
|
def [](name)
|
|
# nil will happen if the JSON fails to parse.
|
|
# We don't guarantee Sidekiq will work with bad job JSON but we should
|
|
# make a best effort to minimize the damage.
|
|
@item ? @item[name] : nil
|
|
end
|
|
|
|
private
|
|
|
|
def safe_load(content, default)
|
|
yield(*YAML.load(content))
|
|
rescue => ex
|
|
# #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
|
|
# memory yet so the YAML can't be loaded.
|
|
# TODO is this still necessary? Zeitwerk reloader should handle?
|
|
Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.config[:environment] == "development"
|
|
default
|
|
end
|
|
|
|
def uncompress_backtrace(backtrace)
|
|
if backtrace.is_a?(Array)
|
|
# Handle old jobs with raw Array backtrace format
|
|
backtrace
|
|
else
|
|
decoded = Base64.decode64(backtrace)
|
|
uncompressed = Zlib::Inflate.inflate(decoded)
|
|
begin
|
|
Sidekiq.load_json(uncompressed)
|
|
rescue
|
|
# Handle old jobs with marshalled backtrace format
|
|
# TODO Remove in 7.x
|
|
Marshal.load(uncompressed)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# Represents a job within a Redis sorted set where the score
|
|
# represents a timestamp associated with the job. This timestamp
|
|
# could be the scheduled time for it to run (e.g. scheduled set),
|
|
# or the expiration date after which the entry should be deleted (e.g. dead set).
|
|
class SortedEntry < JobRecord
|
|
attr_reader :score
|
|
attr_reader :parent
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def initialize(parent, score, item)
|
|
super(item)
|
|
@score = Float(score)
|
|
@parent = parent
|
|
end
|
|
|
|
# The timestamp associated with this entry
|
|
def at
|
|
Time.at(score).utc
|
|
end
|
|
|
|
# remove this entry from the sorted set
|
|
def delete
|
|
if @value
|
|
@parent.delete_by_value(@parent.name, @value)
|
|
else
|
|
@parent.delete_by_jid(score, jid)
|
|
end
|
|
end
|
|
|
|
# Change the scheduled time for this job.
|
|
#
|
|
# @param at [Time] the new timestamp for this job
|
|
def reschedule(at)
|
|
Sidekiq.redis do |conn|
|
|
conn.zincrby(@parent.name, at.to_f - @score, Sidekiq.dump_json(@item))
|
|
end
|
|
end
|
|
|
|
# Enqueue this job from the scheduled or dead set so it will
|
|
# be executed at some point in the near future.
|
|
def add_to_queue
|
|
remove_job do |message|
|
|
msg = Sidekiq.load_json(message)
|
|
Sidekiq::Client.push(msg)
|
|
end
|
|
end
|
|
|
|
# enqueue this job from the retry set so it will be executed
|
|
# at some point in the near future.
|
|
def retry
|
|
remove_job do |message|
|
|
msg = Sidekiq.load_json(message)
|
|
msg["retry_count"] -= 1 if msg["retry_count"]
|
|
Sidekiq::Client.push(msg)
|
|
end
|
|
end
|
|
|
|
# Move this job from its current set into the Dead set.
|
|
def kill
|
|
remove_job do |message|
|
|
DeadSet.new.kill(message)
|
|
end
|
|
end
|
|
|
|
def error?
|
|
!!item["error_class"]
|
|
end
|
|
|
|
private
|
|
|
|
def remove_job
|
|
Sidekiq.redis do |conn|
|
|
results = conn.multi { |transaction|
|
|
transaction.zrangebyscore(parent.name, score, score)
|
|
transaction.zremrangebyscore(parent.name, score, score)
|
|
}.first
|
|
|
|
if results.size == 1
|
|
yield results.first
|
|
else
|
|
# multiple jobs with the same score
|
|
# find the one with the right JID and push it
|
|
matched, nonmatched = results.partition { |message|
|
|
if message.index(jid)
|
|
msg = Sidekiq.load_json(message)
|
|
msg["jid"] == jid
|
|
else
|
|
false
|
|
end
|
|
}
|
|
|
|
msg = matched.first
|
|
yield msg if msg
|
|
|
|
# push the rest back onto the sorted set
|
|
conn.multi do |transaction|
|
|
nonmatched.each do |message|
|
|
transaction.zadd(parent.name, score.to_f.to_s, message)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# Base class for all sorted sets within Sidekiq.
|
|
class SortedSet
|
|
include Enumerable
|
|
|
|
# Redis key of the set
|
|
# @!attribute [r] Name
|
|
attr_reader :name
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def initialize(name)
|
|
@name = name
|
|
@_size = size
|
|
end
|
|
|
|
# real-time size of the set, will change
|
|
def size
|
|
Sidekiq.redis { |c| c.zcard(name) }
|
|
end
|
|
|
|
# Scan through each element of the sorted set, yielding each to the supplied block.
|
|
# Please see Redis's <a href="https://redis.io/commands/scan/">SCAN documentation</a> for implementation details.
|
|
#
|
|
# @param match [String] a snippet or regexp to filter matches.
|
|
# @param count [Integer] number of elements to retrieve at a time, default 100
|
|
# @yieldparam [Sidekiq::SortedEntry] each entry
|
|
def scan(match, count = 100)
|
|
return to_enum(:scan, match, count) unless block_given?
|
|
|
|
match = "*#{match}*" unless match.include?("*")
|
|
Sidekiq.redis do |conn|
|
|
conn.zscan_each(name, match: match, count: count) do |entry, score|
|
|
yield SortedEntry.new(self, score, entry)
|
|
end
|
|
end
|
|
end
|
|
|
|
# @return [Boolean] always true
|
|
def clear
|
|
Sidekiq.redis do |conn|
|
|
conn.unlink(name)
|
|
end
|
|
true
|
|
end
|
|
alias_method :💣, :clear
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def as_json(options = nil)
|
|
{name: name} # 5336
|
|
end
|
|
end
|
|
|
|
# Base class for all sorted sets which contain jobs, e.g. scheduled, retry and dead.
|
|
# Sidekiq Pro and Enterprise add additional sorted sets which do not contain job data,
|
|
# e.g. Batches.
|
|
class JobSet < SortedSet
|
|
|
|
# Add a job with the associated timestamp to this set.
|
|
# @param timestamp [Time] the score for the job
|
|
# @param job [Hash] the job data
|
|
def schedule(timestamp, job)
|
|
Sidekiq.redis do |conn|
|
|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(job))
|
|
end
|
|
end
|
|
|
|
def each
|
|
initial_size = @_size
|
|
offset_size = 0
|
|
page = -1
|
|
page_size = 50
|
|
|
|
loop do
|
|
range_start = page * page_size + offset_size
|
|
range_end = range_start + page_size - 1
|
|
elements = Sidekiq.redis { |conn|
|
|
conn.zrange name, range_start, range_end, withscores: true
|
|
}
|
|
break if elements.empty?
|
|
page -= 1
|
|
elements.reverse_each do |element, score|
|
|
yield SortedEntry.new(self, score, element)
|
|
end
|
|
offset_size = initial_size - @_size
|
|
end
|
|
end
|
|
|
|
##
|
|
# Fetch jobs that match a given time or Range. Job ID is an
|
|
# optional second argument.
|
|
#
|
|
# @param score [Time,Range] a specific timestamp or range
|
|
# @param jid [String, optional] find a specific JID within the score
|
|
# @return [Array<SortedEntry>] any results found, can be empty
|
|
def fetch(score, jid = nil)
|
|
begin_score, end_score =
|
|
if score.is_a?(Range)
|
|
[score.first, score.last]
|
|
else
|
|
[score, score]
|
|
end
|
|
|
|
elements = Sidekiq.redis { |conn|
|
|
conn.zrangebyscore(name, begin_score, end_score, withscores: true)
|
|
}
|
|
|
|
elements.each_with_object([]) do |element, result|
|
|
data, job_score = element
|
|
entry = SortedEntry.new(self, job_score, data)
|
|
result << entry if jid.nil? || entry.jid == jid
|
|
end
|
|
end
|
|
|
|
##
|
|
# Find the job with the given JID within this sorted set.
|
|
# *This is a slow O(n) operation*. Do not use for app logic.
|
|
#
|
|
# @param jid [String] the job identifier
|
|
# @return [SortedEntry] the record or nil
|
|
def find_job(jid)
|
|
Sidekiq.redis do |conn|
|
|
conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score|
|
|
job = JSON.parse(entry)
|
|
matched = job["jid"] == jid
|
|
return SortedEntry.new(self, score, entry) if matched
|
|
end
|
|
end
|
|
nil
|
|
end
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def delete_by_value(name, value)
|
|
Sidekiq.redis do |conn|
|
|
ret = conn.zrem(name, value)
|
|
@_size -= 1 if ret
|
|
ret
|
|
end
|
|
end
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def delete_by_jid(score, jid)
|
|
Sidekiq.redis do |conn|
|
|
elements = conn.zrangebyscore(name, score, score)
|
|
elements.each do |element|
|
|
if element.index(jid)
|
|
message = Sidekiq.load_json(element)
|
|
if message["jid"] == jid
|
|
ret = conn.zrem(name, element)
|
|
@_size -= 1 if ret
|
|
break ret
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
alias_method :delete, :delete_by_jid
|
|
end
|
|
|
|
##
|
|
# The set of scheduled jobs within Sidekiq.
|
|
# Based on this, you can search/filter for jobs. Here's an
|
|
# example where I'm selecting jobs based on some complex logic
|
|
# and deleting them from the scheduled set.
|
|
#
|
|
# r = Sidekiq::ScheduledSet.new
|
|
# r.select do |scheduled|
|
|
# scheduled.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
|
# scheduled.args[0] == 'User' &&
|
|
# scheduled.args[1] == 'setup_new_subscriber'
|
|
# end.map(&:delete)
|
|
class ScheduledSet < JobSet
|
|
def initialize
|
|
super "schedule"
|
|
end
|
|
end
|
|
|
|
##
|
|
# The set of retries within Sidekiq.
|
|
# Based on this, you can search/filter for jobs. Here's an
|
|
# example where I'm selecting all jobs of a certain type
|
|
# and deleting them from the retry queue.
|
|
#
|
|
# r = Sidekiq::RetrySet.new
|
|
# r.select do |retri|
|
|
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
|
# retri.args[0] == 'User' &&
|
|
# retri.args[1] == 'setup_new_subscriber'
|
|
# end.map(&:delete)
|
|
class RetrySet < JobSet
|
|
def initialize
|
|
super "retry"
|
|
end
|
|
|
|
# Enqueues all jobs pending within the retry set.
|
|
def retry_all
|
|
each(&:retry) while size > 0
|
|
end
|
|
|
|
# Kills all jobs pending within the retry set.
|
|
def kill_all
|
|
each(&:kill) while size > 0
|
|
end
|
|
end
|
|
|
|
##
|
|
# The set of dead jobs within Sidekiq. Dead jobs have failed all of
|
|
# their retries and are helding in this set pending some sort of manual
|
|
# fix. They will be removed after 6 months (dead_timeout) if not.
|
|
#
|
|
class DeadSet < JobSet
|
|
def initialize
|
|
super "dead"
|
|
end
|
|
|
|
# Add the given job to the Dead set.
|
|
# @param message [String] the job data as JSON
|
|
def kill(message, opts = {})
|
|
now = Time.now.to_f
|
|
Sidekiq.redis do |conn|
|
|
conn.multi do |transaction|
|
|
transaction.zadd(name, now.to_s, message)
|
|
transaction.zremrangebyscore(name, "-inf", now - self.class.timeout)
|
|
transaction.zremrangebyrank(name, 0, - self.class.max_jobs)
|
|
end
|
|
end
|
|
|
|
if opts[:notify_failure] != false
|
|
job = Sidekiq.load_json(message)
|
|
r = RuntimeError.new("Job killed by API")
|
|
r.set_backtrace(caller)
|
|
Sidekiq.death_handlers.each do |handle|
|
|
handle.call(job, r)
|
|
end
|
|
end
|
|
true
|
|
end
|
|
|
|
# Enqueue all dead jobs
|
|
def retry_all
|
|
each(&:retry) while size > 0
|
|
end
|
|
|
|
# The maximum size of the Dead set. Older entries will be trimmed
|
|
# to stay within this limit. Default value is 10,000.
|
|
def self.max_jobs
|
|
Sidekiq[:dead_max_jobs]
|
|
end
|
|
|
|
# The time limit for entries within the Dead set. Older entries will be thrown away.
|
|
# Default value is six months.
|
|
def self.timeout
|
|
Sidekiq[:dead_timeout_in_seconds]
|
|
end
|
|
end
|
|
|
|
##
|
|
# Enumerates the set of Sidekiq processes which are actively working
|
|
# right now. Each process sends a heartbeat to Redis every 5 seconds
|
|
# so this set should be relatively accurate, barring network partitions.
|
|
#
|
|
# @yieldparam [Sidekiq::Process]
|
|
#
|
|
class ProcessSet
|
|
include Enumerable
|
|
|
|
# :nodoc:
|
|
# @api private
|
|
def initialize(clean_plz = true)
|
|
cleanup if clean_plz
|
|
end
|
|
|
|
# Cleans up dead processes recorded in Redis.
|
|
# Returns the number of processes cleaned.
|
|
# :nodoc:
|
|
# @api private
|
|
def cleanup
|
|
count = 0
|
|
Sidekiq.redis do |conn|
|
|
procs = conn.sscan_each("processes").to_a.sort
|
|
heartbeats = conn.pipelined { |pipeline|
|
|
procs.each do |key|
|
|
pipeline.hget(key, "info")
|
|
end
|
|
}
|
|
|
|
# the hash named key has an expiry of 60 seconds.
|
|
# if it's not found, that means the process has not reported
|
|
# in to Redis and probably died.
|
|
to_prune = procs.select.with_index { |proc, i|
|
|
heartbeats[i].nil?
|
|
}
|
|
count = conn.srem("processes", to_prune) unless to_prune.empty?
|
|
end
|
|
count
|
|
end
|
|
|
|
def each
|
|
result = Sidekiq.redis { |conn|
|
|
procs = conn.sscan_each("processes").to_a.sort
|
|
|
|
# We're making a tradeoff here between consuming more memory instead of
|
|
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
|
|
# you'll be happier this way
|
|
conn.pipelined do |pipeline|
|
|
procs.each do |key|
|
|
pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
|
|
end
|
|
end
|
|
}
|
|
|
|
result.each do |info, busy, at_s, quiet, rss, rtt|
|
|
# If a process is stopped between when we query Redis for `procs` and
|
|
# when we query for `result`, we will have an item in `result` that is
|
|
# composed of `nil` values.
|
|
next if info.nil?
|
|
|
|
hash = Sidekiq.load_json(info)
|
|
yield Process.new(hash.merge("busy" => busy.to_i,
|
|
"beat" => at_s.to_f,
|
|
"quiet" => quiet,
|
|
"rss" => rss.to_i,
|
|
"rtt_us" => rtt.to_i))
|
|
end
|
|
end
|
|
|
|
# This method is not guaranteed accurate since it does not prune the set
|
|
# based on current heartbeat. #each does that and ensures the set only
|
|
# contains Sidekiq processes which have sent a heartbeat within the last
|
|
# 60 seconds.
|
|
# @return [Integer] current number of registered Sidekiq processes
|
|
def size
|
|
Sidekiq.redis { |conn| conn.scard("processes") }
|
|
end
|
|
|
|
# Total number of threads available to execute jobs.
|
|
# For Sidekiq Enterprise customers this number (in production) must be
|
|
# less than or equal to your licensed concurrency.
|
|
# @return [Integer] the sum of process concurrency
|
|
def total_concurrency
|
|
sum { |x| x["concurrency"].to_i }
|
|
end
|
|
|
|
# @return [Integer] total amount of RSS memory consumed by Sidekiq processes
|
|
def total_rss_in_kb
|
|
sum { |x| x["rss"].to_i }
|
|
end
|
|
alias_method :total_rss, :total_rss_in_kb
|
|
|
|
# Returns the identity of the current cluster leader or "" if no leader.
|
|
# This is a Sidekiq Enterprise feature, will always return "" in Sidekiq
|
|
# or Sidekiq Pro.
|
|
# @return [String] Identity of cluster leader
|
|
# @return [String] empty string if no leader
|
|
def leader
|
|
@leader ||= begin
|
|
x = Sidekiq.redis { |c| c.get("dear-leader") }
|
|
# need a non-falsy value so we can memoize
|
|
x ||= ""
|
|
x
|
|
end
|
|
end
|
|
end
|
|
|
|
#
|
|
# Sidekiq::Process represents an active Sidekiq process talking with Redis.
|
|
# Each process has a set of attributes which look like this:
|
|
#
|
|
# {
|
|
# 'hostname' => 'app-1.example.com',
|
|
# 'started_at' => <process start time>,
|
|
# 'pid' => 12345,
|
|
# 'tag' => 'myapp'
|
|
# 'concurrency' => 25,
|
|
# 'queues' => ['default', 'low'],
|
|
# 'busy' => 10,
|
|
# 'beat' => <last heartbeat>,
|
|
# 'identity' => <unique string identifying the process>,
|
|
# }
|
|
class Process
|
|
# :nodoc:
|
|
# @api private
|
|
def initialize(hash)
|
|
@attribs = hash
|
|
end
|
|
|
|
def tag
|
|
self["tag"]
|
|
end
|
|
|
|
def labels
|
|
Array(self["labels"])
|
|
end
|
|
|
|
def [](key)
|
|
@attribs[key]
|
|
end
|
|
|
|
def identity
|
|
self["identity"]
|
|
end
|
|
|
|
def queues
|
|
self["queues"]
|
|
end
|
|
|
|
# Signal this process to stop processing new jobs.
|
|
# It will continue to execute jobs it has already fetched.
|
|
# This method is *asynchronous* and it can take 5-10
|
|
# seconds for the process to quiet.
|
|
def quiet!
|
|
signal("TSTP")
|
|
end
|
|
|
|
# Signal this process to shutdown.
|
|
# It will shutdown within its configured :timeout value, default 25 seconds.
|
|
# This method is *asynchronous* and it can take 5-10
|
|
# seconds for the process to start shutting down.
|
|
def stop!
|
|
signal("TERM")
|
|
end
|
|
|
|
# Signal this process to log backtraces for all threads.
|
|
# Useful if you have a frozen or deadlocked process which is
|
|
# still sending a heartbeat.
|
|
# This method is *asynchronous* and it can take 5-10 seconds.
|
|
def dump_threads
|
|
signal("TTIN")
|
|
end
|
|
|
|
# @return [Boolean] true if this process is quiet or shutting down
|
|
def stopping?
|
|
self["quiet"] == "true"
|
|
end
|
|
|
|
private
|
|
|
|
def signal(sig)
|
|
key = "#{identity}-signals"
|
|
Sidekiq.redis do |c|
|
|
c.multi do |transaction|
|
|
transaction.lpush(key, sig)
|
|
transaction.expire(key, 60)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
##
|
|
# The WorkSet stores the work being done by this Sidekiq cluster.
|
|
# It tracks the process and thread working on each job.
|
|
#
|
|
# WARNING WARNING WARNING
|
|
#
|
|
# This is live data that can change every millisecond.
|
|
# If you call #size => 5 and then expect #each to be
|
|
# called 5 times, you're going to have a bad time.
|
|
#
|
|
# works = Sidekiq::WorkSet.new
|
|
# works.size => 2
|
|
# works.each do |process_id, thread_id, work|
|
|
# # process_id is a unique identifier per Sidekiq process
|
|
# # thread_id is a unique identifier per thread
|
|
# # work is a Hash which looks like:
|
|
# # { 'queue' => name, 'run_at' => timestamp, 'payload' => job_hash }
|
|
# # run_at is an epoch Integer.
|
|
# end
|
|
#
|
|
class WorkSet
|
|
include Enumerable
|
|
|
|
def each(&block)
|
|
results = []
|
|
Sidekiq.redis do |conn|
|
|
procs = conn.sscan_each("processes").to_a
|
|
procs.sort.each do |key|
|
|
valid, workers = conn.pipelined { |pipeline|
|
|
pipeline.exists?(key)
|
|
pipeline.hgetall("#{key}:work")
|
|
}
|
|
next unless valid
|
|
workers.each_pair do |tid, json|
|
|
hsh = Sidekiq.load_json(json)
|
|
p = hsh["payload"]
|
|
# avoid breaking API, this is a side effect of the JSON optimization in #4316
|
|
hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
|
|
results << [key, tid, hsh]
|
|
end
|
|
end
|
|
end
|
|
|
|
results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
|
|
end
|
|
|
|
# Note that #size is only as accurate as Sidekiq's heartbeat,
|
|
# which happens every 5 seconds. It is NOT real-time.
|
|
#
|
|
# Not very efficient if you have lots of Sidekiq
|
|
# processes but the alternative is a global counter
|
|
# which can easily get out of sync with crashy processes.
|
|
def size
|
|
Sidekiq.redis do |conn|
|
|
procs = conn.sscan_each("processes").to_a
|
|
if procs.empty?
|
|
0
|
|
else
|
|
conn.pipelined { |pipeline|
|
|
procs.each do |key|
|
|
pipeline.hget(key, "busy")
|
|
end
|
|
}.sum(&:to_i)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
# Since "worker" is a nebulous term, we've deprecated the use of this class name.
|
|
# Is "worker" a process, a type of job, a thread? Undefined!
|
|
# WorkSet better describes the data.
|
|
Workers = WorkSet
|
|
end
|