Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
This commit is contained in:
Mike Perham 2022-06-13 10:43:52 -07:00
parent 9102c38735
commit 4508477bf5

View file

@ -591,8 +591,7 @@ module Sidekiq
# Base class for all sorted sets within Sidekiq, e.g. scheduled, retry and dead.
# Sidekiq Pro and Enterprise add additional sorted sets for Batches, etc.
# Base class for all sorted sets within Sidekiq.
class SortedSet
include Enumerable
@ -612,9 +611,9 @@ module Sidekiq
# Scan through each element of the sorted set, yielding each to the supplied block.
# Please see Redis's <a href="https://redis.io/commands/scan/">SCAN documentation</a> for implementation details.
# @param match [String] a snippet or regexp to filter matches
# @param match [String] a snippet or regexp to filter matches.
# @param count [Integer] number of elements to retrieve at a time, default 100
# @yieldparam [SortedEntry] each entry
# @yieldparam [Sidekiq::SortedEntry] each entry
def scan(match, count = 100)
return to_enum(:scan, match, count) unless block_given?
@ -626,10 +625,12 @@ module Sidekiq
# @return [Boolean] always true
def clear
Sidekiq.redis do |conn|
alias_method :💣, :clear
@ -638,10 +639,17 @@ module Sidekiq
# Base class for all sorted sets which contain jobs, e.g. scheduled, retry and dead.
# Sidekiq Pro and Enterprise add additional sorted sets which do not contain job data,
# e.g. Batches.
class JobSet < SortedSet
def schedule(timestamp, message)
# Add a job with the associated timestamp to this set.
# @param timestamp [Time] the score for the job
# @param job [Hash] the job data
def schedule(timestamp, job)
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(job))
@ -669,6 +677,11 @@ module Sidekiq
# Fetch jobs that match a given time or Range. Job ID is an
# optional second argument.
# @param score [Time] a specific timestamp
# @param score [Range] a timestamp range
# @param jid [String, optional] find a specific JID within the score
# @return [Array<SortedEntry>] any results found, can be empty
def fetch(score, jid = nil)
begin_score, end_score =
if score.is_a?(Range)
@ -690,7 +703,10 @@ module Sidekiq
# Find the job with the given JID within this sorted set.
# This is a slower O(n) operation. Do not use for app logic.
# *This is a slow O(n) operation*. Do not use for app logic.
# @param jid [String] the job identifier
# @returns [SortedEntry] the record or nil
def find_job(jid)
Sidekiq.redis do |conn|
conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score|
@ -702,7 +718,7 @@ module Sidekiq
def delete_by_value(name, value)
def delete_by_value(name, value) # :nodoc:
Sidekiq.redis do |conn|
ret = conn.zrem(name, value)
@_size -= 1 if ret
@ -710,7 +726,7 @@ module Sidekiq
def delete_by_jid(score, jid)
def delete_by_jid(score, jid) # :nodoc:
Sidekiq.redis do |conn|
elements = conn.zrangebyscore(name, score, score)
elements.each do |element|
@ -730,10 +746,10 @@ module Sidekiq
# Allows enumeration of scheduled jobs within Sidekiq.
# The set of scheduled jobs within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the schedule queue.
# example where I'm selecting jobs based on some complex logic
# and deleting them from the scheduled set.
# r = Sidekiq::ScheduledSet.new
# r.select do |scheduled|
@ -748,7 +764,7 @@ module Sidekiq
# Allows enumeration of retries within Sidekiq.
# The set of retries within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the retry queue.
@ -764,23 +780,29 @@ module Sidekiq
super "retry"
# Enqueues all jobs pending within the retry set.
def retry_all
each(&:retry) while size > 0
# Kills all jobs pending within the retry set.
def kill_all
each(&:kill) while size > 0
# Allows enumeration of dead jobs within Sidekiq.
# The set of dead jobs within Sidekiq. Dead jobs have failed all of
# their retries and are helding in this set pending some sort of manual
# fix. They will be removed after 6 months (dead_timeout) if not.
class DeadSet < JobSet
def initialize
super "dead"
# Add the given job to the Dead set.
# @param message [String] the job data as JSON
def kill(message, opts = {})
now = Time.now.to_f
Sidekiq.redis do |conn|
@ -802,14 +824,19 @@ module Sidekiq
# Enqueue all dead jobs
def retry_all
each(&:retry) while size > 0
# The maximum size of the Dead set. Older entries will be trimmed
# to stay within this limit. Default value is 10,000.
def self.max_jobs
# The time limit for entries within the Dead set. Older entries will be thrown away.
# Default value is six months.
def self.timeout
@ -820,18 +847,18 @@ module Sidekiq
# right now. Each process sends a heartbeat to Redis every 5 seconds
# so this set should be relatively accurate, barring network partitions.
# Yields a Sidekiq::Process.
# @yieldparam [Sidekiq::Process]
class ProcessSet
include Enumerable
def initialize(clean_plz = true)
def initialize(clean_plz = true) # :nodoc:
cleanup if clean_plz
# Cleans up dead processes recorded in Redis.
# Returns the number of processes cleaned.
def cleanup
def cleanup # :nodoc:
count = 0
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a.sort
@ -885,6 +912,7 @@ module Sidekiq
# based on current heartbeat. #each does that and ensures the set only
# contains Sidekiq processes which have sent a heartbeat within the last
# 60 seconds.
# @return [Integer] current number of registered Sidekiq processes
def size
Sidekiq.redis { |conn| conn.scard("processes") }
@ -892,10 +920,12 @@ module Sidekiq
# Total number of threads available to execute jobs.
# For Sidekiq Enterprise customers this number (in production) must be
# less than or equal to your licensed concurrency.
# @return [Integer] the sum of process concurrency
def total_concurrency
sum { |x| x["concurrency"].to_i }
# @return [Integer] total amount of RSS memory consumed by Sidekiq processes
def total_rss_in_kb
sum { |x| x["rss"].to_i }
@ -930,7 +960,7 @@ module Sidekiq
# 'identity' => <unique string identifying the process>,
# }
class Process
def initialize(hash)
def initialize(hash) # :nodoc:
@attribs = hash
@ -954,18 +984,31 @@ module Sidekiq
# Signal this process to stop processing new jobs.
# It will continue to execute jobs it has already fetched.
# This method is *asynchronous* and it can take 5-10
# seconds for the process to quiet.
def quiet!
# Signal this process to shutdown.
# It will shutdown within its configured :timeout value, default 25 seconds.
# This method is *asynchronous* and it can take 5-10
# seconds for the process to start shutting down.
def stop!
# Signal this process to log backtraces for all threads.
# Useful if you have a frozen or deadlocked process which is
# still sending a heartbeat.
# This method is *asynchronous* and it can take 5-10 seconds.
def dump_threads
# @return [Boolean] true if this process is quiet or shutting down
def stopping?
self["quiet"] == "true"