1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

merge main

This commit is contained in:
Mike Perham 2022-07-12 13:37:45 -07:00
commit 1b4ffdb06f
17 changed files with 262 additions and 79 deletions

View file

@ -66,7 +66,7 @@ bundle install
redis-server
```
#### 7. Navivate to myapp (small Rails app inside Sidekiq repository used for development)
#### 7. Navigate to myapp (small Rails app inside Sidekiq repository used for development)
```
cd myapp/

View file

@ -11,4 +11,4 @@ jobs:
- name: 'Checkout Repository'
uses: actions/checkout@v3
- name: 'Dependency Review'
uses: actions/dependency-review-action@v1
uses: actions/dependency-review-action@v2

2
.gitignore vendored
View file

@ -17,3 +17,5 @@ development.log
/docker-compose.yml
Gemfile.lock
*.DS_Store
doc/
.yardoc/

4
.yardopts Normal file
View file

@ -0,0 +1,4 @@
--title "Sidekiq"
--readme docs/menu.md
--hide-api private
lib/sidekiq/api.rb lib/sidekiq/middleware/chain.rb - Changes.md

View file

@ -2,6 +2,11 @@
[Sidekiq Changes](https://github.com/mperham/sidekiq/blob/main/Changes.md) | [Sidekiq Pro Changes](https://github.com/mperham/sidekiq/blob/main/Pro-Changes.md) | [Sidekiq Enterprise Changes](https://github.com/mperham/sidekiq/blob/main/Ent-Changes.md)
6.5.1
----------
- Fix `push_bulk` breakage [#5387]
6.5.0
---------

View file

@ -4,6 +4,11 @@
Please see [sidekiq.org](https://sidekiq.org) for more details and how to buy.
2.5.1
-------------
- Fix crash with empty periodic data [#5374]
2.5.0
-------------

View file

@ -4,6 +4,11 @@
Please see [sidekiq.org](https://sidekiq.org/) for more details and how to buy.
5.5.1
---------
- Unbreak queue pausing [#5382]
5.5.0
---------

View file

@ -1,16 +1,12 @@
require "bundler/gem_tasks"
require "rake/testtask"
require "standard/rake"
require "rdoc/task"
RDoc::Task.new do |rdoc|
rdoc.main = "docs/rdoc.rdoc"
rdoc.rdoc_files.include("docs/rdoc.rdoc",
"lib/sidekiq/api.rb",
"lib/sidekiq/client.rb",
"lib/sidekiq/worker.rb",
"lib/sidekiq/job.rb")
end
# If you want to generate API docs:
# gem install yard && yard && open doc/index.html
# YARD readme: https://rubydoc.info/gems/yard/file/README.md
# YARD tags: https://www.rubydoc.info/gems/yard/file/docs/Tags.md
# YARD cheatsheet: https://gist.github.com/phansch/db18a595d2f5f1ef16646af72fe1fb0e
Rake::TestTask.new(:test) do |test|
test.warning = true

29
docs/menu.md Normal file
View file

@ -0,0 +1,29 @@
# Sidekiq public API documentation
Sidekiq provides a number of public APIs for various functionality.
1. Middleware
2. Lifecycle Events
3. Data API
4. Components
## Middleware
Middleware run around the the client-side push and the server-side execution of jobs. This allows plugins which mutate job data or provide additional functionality during the executiong of specific jobs.
## Lifecycle Events
With lifecycle events, Sidekiq plugins can register a callback upon `startup`, `quiet` or `shutdown`.
This is useful for starting and stopping your own Threads or services within the Sidekiq process.
## Data API
The code in `sidekiq/api` provides a Ruby facade on top of Sidekiq's persistent data within Redis.
It contains many classes and methods for discovering, searching and iterating through the real-time job data within the queues and sets inside Redis.
This API powers the Sidekiq::Web UI.
## Components (ALPHA)
Coming in Sidekiq 7.0, Components are elements of code which run inside each Sidekiq process.
They are passed a handle to the Sidekiq container which gives them direct access
to resources like the Logger, Redis connection pool and other registered resources.

BIN
lib/sidekiq/.DS_Store vendored

Binary file not shown.

View file

@ -6,6 +6,11 @@ require "zlib"
require "base64"
module Sidekiq
# Retrieve runtime statistics from Redis regarding
# this Sidekiq cluster.
#
# stat = Sidekiq::Stats.new
# stat.processed
class Stats
def initialize
fetch_stats_fast!
@ -52,6 +57,7 @@ module Sidekiq
end
# O(1) redis calls
# @api private
def fetch_stats_fast!
pipe1_res = Sidekiq.redis { |conn|
conn.pipelined do |pipeline|
@ -91,6 +97,7 @@ module Sidekiq
end
# O(number of processes + number of queues) redis calls
# @api private
def fetch_stats_slow!
processes = Sidekiq.redis { |conn|
conn.sscan_each("processes").to_a
@ -116,11 +123,13 @@ module Sidekiq
@stats
end
# @api private
def fetch_stats!
fetch_stats_fast!
fetch_stats_slow!
end
# @api private
def reset(*stats)
all = %w[failed processed]
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
@ -202,9 +211,10 @@ module Sidekiq
end
##
# Encapsulates a queue within Sidekiq.
# Represents a queue within Sidekiq.
# Allows enumeration of all jobs within the queue
# and deletion of jobs.
# and deletion of jobs. NB: this queue data is real-time
# and is changing within Redis moment by moment.
#
# queue = Sidekiq::Queue.new("mailer")
# queue.each do |job|
@ -212,7 +222,6 @@ module Sidekiq
# job.args # => [1, 2, 3]
# job.delete if job.jid == 'abcdef1234567890'
# end
#
class Queue
include Enumerable
@ -296,6 +305,7 @@ module Sidekiq
end
# delete all jobs within this queue
# @return [Boolean] true
def clear
Sidekiq.redis do |conn|
conn.multi do |transaction|
@ -303,34 +313,45 @@ module Sidekiq
transaction.srem("queues", name)
end
end
true
end
alias_method :💣, :clear
def as_json(options = nil) # :nodoc:
# :nodoc:
# @api private
def as_json(options = nil)
{name: name} # 5336
end
end
##
# Encapsulates a pending job within a Sidekiq queue or
# sorted set.
# Represents a pending job within a Sidekiq queue.
#
# The job should be considered immutable but may be
# removed from the queue via JobRecord#delete.
#
class JobRecord
# the parsed Hash of job data
# @!attribute [r] Item
attr_reader :item
# the underlying String in Redis
# @!attribute [r] Value
attr_reader :value
# the queue associated with this job
# @!attribute [r] Queue
attr_reader :queue
def initialize(item, queue_name = nil) # :nodoc:
# :nodoc:
# @api private
def initialize(item, queue_name = nil)
@args = nil
@value = item
@item = item.is_a?(Hash) ? item : parse(item)
@queue = queue_name || @item["queue"]
end
def parse(item) # :nodoc:
# :nodoc:
# @api private
def parse(item)
Sidekiq.load_json(item)
rescue JSON::ParserError
# If the job payload in Redis is invalid JSON, we'll load
@ -341,6 +362,8 @@ module Sidekiq
{}
end
# This is the job class which Sidekiq will execute. If using ActiveJob,
# this class will be the ActiveJob adapter class rather than a specific job.
def klass
self["class"]
end
@ -445,21 +468,27 @@ module Sidekiq
end
# Represents a job within a Redis sorted set where the score
# represents a timestamp for the job.
# represents a timestamp associated with the job. This timestamp
# could be the scheduled time for it to run (e.g. scheduled set),
# or the expiration date after which the entry should be deleted (e.g. dead set).
class SortedEntry < JobRecord
attr_reader :score
attr_reader :parent
def initialize(parent, score, item) # :nodoc:
# :nodoc:
# @api private
def initialize(parent, score, item)
super(item)
@score = Float(score)
@parent = parent
end
# The timestamp associated with this entry
def at
Time.at(score).utc
end
# remove this entry from the sorted set
def delete
if @value
@parent.delete_by_value(@parent.name, @value)
@ -470,7 +499,7 @@ module Sidekiq
# Change the scheduled time for this job.
#
# @param [Time] the new timestamp when this job will be enqueued.
# @param at [Time] the new timestamp for this job
def reschedule(at)
Sidekiq.redis do |conn|
conn.zincrby(@parent.name, at.to_f - @score, Sidekiq.dump_json(@item))
@ -544,20 +573,32 @@ module Sidekiq
end
end
# Base class for all sorted sets within Sidekiq.
class SortedSet
include Enumerable
# Redis key of the set
# @!attribute [r] Name
attr_reader :name
# :nodoc:
# @api private
def initialize(name)
@name = name
@_size = size
end
# real-time size of the set, will change
def size
Sidekiq.redis { |c| c.zcard(name) }
end
# Scan through each element of the sorted set, yielding each to the supplied block.
# Please see Redis's <a href="https://redis.io/commands/scan/">SCAN documentation</a> for implementation details.
#
# @param match [String] a snippet or regexp to filter matches.
# @param count [Integer] number of elements to retrieve at a time, default 100
# @yieldparam [Sidekiq::SortedEntry] each entry
def scan(match, count = 100)
return to_enum(:scan, match, count) unless block_given?
@ -569,22 +610,32 @@ module Sidekiq
end
end
# @return [Boolean] always true
def clear
Sidekiq.redis do |conn|
conn.unlink(name)
end
true
end
alias_method :💣, :clear
def as_json(options = nil) # :nodoc:
# :nodoc:
# @api private
def as_json(options = nil)
{name: name} # 5336
end
end
# Base class for all sorted sets which contain jobs, e.g. scheduled, retry and dead.
# Sidekiq Pro and Enterprise add additional sorted sets which do not contain job data,
# e.g. Batches.
class JobSet < SortedSet
def schedule(timestamp, message)
# Add a job with the associated timestamp to this set.
# @param timestamp [Time] the score for the job
# @param job [Hash] the job data
def schedule(timestamp, job)
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(job))
end
end
@ -612,6 +663,10 @@ module Sidekiq
##
# Fetch jobs that match a given time or Range. Job ID is an
# optional second argument.
#
# @param score [Time,Range] a specific timestamp or range
# @param jid [String, optional] find a specific JID within the score
# @return [Array<SortedEntry>] any results found, can be empty
def fetch(score, jid = nil)
begin_score, end_score =
if score.is_a?(Range)
@ -633,7 +688,10 @@ module Sidekiq
##
# Find the job with the given JID within this sorted set.
# This is a slower O(n) operation. Do not use for app logic.
# *This is a slow O(n) operation*. Do not use for app logic.
#
# @param jid [String] the job identifier
# @return [SortedEntry] the record or nil
def find_job(jid)
Sidekiq.redis do |conn|
conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score|
@ -645,6 +703,8 @@ module Sidekiq
nil
end
# :nodoc:
# @api private
def delete_by_value(name, value)
Sidekiq.redis do |conn|
ret = conn.zrem(name, value)
@ -653,6 +713,8 @@ module Sidekiq
end
end
# :nodoc:
# @api private
def delete_by_jid(score, jid)
Sidekiq.redis do |conn|
elements = conn.zrangebyscore(name, score, score)
@ -673,10 +735,10 @@ module Sidekiq
end
##
# Allows enumeration of scheduled jobs within Sidekiq.
# The set of scheduled jobs within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the schedule queue.
# example where I'm selecting jobs based on some complex logic
# and deleting them from the scheduled set.
#
# See the API wiki page for usage notes and examples.
#
@ -687,7 +749,7 @@ module Sidekiq
end
##
# Allows enumeration of retries within Sidekiq.
# The set of retries within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the retry queue.
@ -699,23 +761,29 @@ module Sidekiq
super "retry"
end
# Enqueues all jobs pending within the retry set.
def retry_all
each(&:retry) while size > 0
end
# Kills all jobs pending within the retry set.
def kill_all
each(&:kill) while size > 0
end
end
##
# Allows enumeration of dead jobs within Sidekiq.
# The set of dead jobs within Sidekiq. Dead jobs have failed all of
# their retries and are helding in this set pending some sort of manual
# fix. They will be removed after 6 months (dead_timeout) if not.
#
class DeadSet < JobSet
def initialize
super "dead"
end
# Add the given job to the Dead set.
# @param message [String] the job data as JSON
def kill(message, opts = {})
now = Time.now.to_f
Sidekiq.redis do |conn|
@ -737,14 +805,19 @@ module Sidekiq
true
end
# Enqueue all dead jobs
def retry_all
each(&:retry) while size > 0
end
# The maximum size of the Dead set. Older entries will be trimmed
# to stay within this limit. Default value is 10,000.
def self.max_jobs
Sidekiq[:dead_max_jobs]
end
# The time limit for entries within the Dead set. Older entries will be thrown away.
# Default value is six months.
def self.timeout
Sidekiq[:dead_timeout_in_seconds]
end
@ -755,17 +828,21 @@ module Sidekiq
# right now. Each process sends a heartbeat to Redis every 5 seconds
# so this set should be relatively accurate, barring network partitions.
#
# Yields a Sidekiq::Process.
# @yieldparam [Sidekiq::Process]
#
class ProcessSet
include Enumerable
# :nodoc:
# @api private
def initialize(clean_plz = true)
cleanup if clean_plz
end
# Cleans up dead processes recorded in Redis.
# Returns the number of processes cleaned.
# :nodoc:
# @api private
def cleanup
count = 0
Sidekiq.redis do |conn|
@ -820,6 +897,7 @@ module Sidekiq
# based on current heartbeat. #each does that and ensures the set only
# contains Sidekiq processes which have sent a heartbeat within the last
# 60 seconds.
# @return [Integer] current number of registered Sidekiq processes
def size
Sidekiq.redis { |conn| conn.scard("processes") }
end
@ -827,10 +905,12 @@ module Sidekiq
# Total number of threads available to execute jobs.
# For Sidekiq Enterprise customers this number (in production) must be
# less than or equal to your licensed concurrency.
# @return [Integer] the sum of process concurrency
def total_concurrency
sum { |x| x["concurrency"].to_i }
end
# @return [Integer] total amount of RSS memory consumed by Sidekiq processes
def total_rss_in_kb
sum { |x| x["rss"].to_i }
end
@ -839,6 +919,8 @@ module Sidekiq
# Returns the identity of the current cluster leader or "" if no leader.
# This is a Sidekiq Enterprise feature, will always return "" in Sidekiq
# or Sidekiq Pro.
# @return [String] Identity of cluster leader
# @return [String] empty string if no leader
def leader
@leader ||= begin
x = Sidekiq.redis { |c| c.get("dear-leader") }
@ -865,6 +947,8 @@ module Sidekiq
# 'identity' => <unique string identifying the process>,
# }
class Process
# :nodoc:
# @api private
def initialize(hash)
@attribs = hash
end
@ -889,18 +973,31 @@ module Sidekiq
self["queues"]
end
# Signal this process to stop processing new jobs.
# It will continue to execute jobs it has already fetched.
# This method is *asynchronous* and it can take 5-10
# seconds for the process to quiet.
def quiet!
signal("TSTP")
end
# Signal this process to shutdown.
# It will shutdown within its configured :timeout value, default 25 seconds.
# This method is *asynchronous* and it can take 5-10
# seconds for the process to start shutting down.
def stop!
signal("TERM")
end
# Signal this process to log backtraces for all threads.
# Useful if you have a frozen or deadlocked process which is
# still sending a heartbeat.
# This method is *asynchronous* and it can take 5-10 seconds.
def dump_threads
signal("TTIN")
end
# @return [Boolean] true if this process is quiet or shutting down
def stopping?
self["quiet"] == "true"
end

View file

@ -220,7 +220,7 @@ module Sidekiq
def atomic_push(conn, payloads)
if payloads.first.key?("at")
conn.zadd("schedule", *payloads.map { |hash|
conn.zadd("schedule", payloads.flat_map { |hash|
at = hash.delete("at").to_s
[at, Sidekiq.dump_json(hash)]
})

View file

@ -176,7 +176,7 @@ module Sidekiq
# logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
Sidekiq.redis do |conn|
redis do |conn|
conn.zadd("retry", retry_at.to_s, payload)
end
else
@ -195,7 +195,7 @@ module Sidekiq
send_to_morgue(msg) unless msg["dead"] == false
Sidekiq.death_handlers.each do |handler|
config.death_handlers.each do |handler|
handler.call(msg, exception)
rescue => e
handle_exception(e, {context: "Error calling death handler", job: msg})

View file

@ -4,84 +4,98 @@ require "sidekiq/middleware/modules"
module Sidekiq
# Middleware is code configured to run before/after
# a message is processed. It is patterned after Rack
# a job is processed. It is patterned after Rack
# middleware. Middleware exists for the client side
# (pushing jobs onto the queue) as well as the server
# side (when jobs are actually processed).
#
# Callers will register middleware Classes and Sidekiq will
# create new instances of the middleware for every job. This
# is important so that instance state is not shared accidentally
# between job executions.
#
# To add middleware for the client:
#
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.add MyClientHook
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.add MyClientHook
# end
# end
# end
#
# To modify middleware for the server, just call
# with another block:
#
# Sidekiq.configure_server do |config|
# config.server_middleware do |chain|
# chain.add MyServerHook
# chain.remove ActiveRecord
# Sidekiq.configure_server do |config|
# config.server_middleware do |chain|
# chain.add MyServerHook
# chain.remove ActiveRecord
# end
# end
# end
#
# To insert immediately preceding another entry:
#
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.insert_before ActiveRecord, MyClientHook
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.insert_before ActiveRecord, MyClientHook
# end
# end
# end
#
# To insert immediately after another entry:
#
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.insert_after ActiveRecord, MyClientHook
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.insert_after ActiveRecord, MyClientHook
# end
# end
# end
#
# This is an example of a minimal server middleware:
#
# class MyServerHook
# include Sidekiq::ServerMiddleware
# def call(job_instance, msg, queue)
# logger.info "Before job"
# redis {|conn| conn.get("foo") } # do something in Redis
# yield
# logger.info "After job"
# class MyServerHook
# include Sidekiq::ServerMiddleware
#
# def call(job_instance, msg, queue)
# logger.info "Before job"
# redis {|conn| conn.get("foo") } # do something in Redis
# yield
# logger.info "After job"
# end
# end
# end
#
# This is an example of a minimal client middleware, note
# the method must return the result or the job will not push
# to Redis:
#
# class MyClientHook
# include Sidekiq::ClientMiddleware
# def call(job_class, msg, queue, redis_pool)
# logger.info "Before push"
# result = yield
# logger.info "After push"
# result
# class MyClientHook
# include Sidekiq::ClientMiddleware
#
# def call(job_class, msg, queue, redis_pool)
# logger.info "Before push"
# result = yield
# logger.info "After push"
# result
# end
# end
# end
#
module Middleware
class Chain
include Enumerable
# A unique instance of the middleware chain is created for
# each job executed in order to be thread-safe.
# @param copy [Sidekiq::Middleware::Chain] New instance of Chain
# @returns nil
def initialize_copy(copy)
copy.instance_variable_set(:@entries, entries.dup)
nil
end
# Iterate through each middleware in the chain
def each(&block)
entries.each(&block)
end
def initialize(config = nil)
# @api private
def initialize(config = nil) # :nodoc:
@config = config
@entries = nil
yield self if block_given?
@ -91,20 +105,33 @@ module Sidekiq
@entries ||= []
end
# Remove all middleware matching the given Class
# @param klass [Class]
def remove(klass)
entries.delete_if { |entry| entry.klass == klass }
end
# Add the given middleware to the end of the chain.
# Sidekiq will call `klass.new(*args)` to create a clean
# copy of your middleware for every job executed.
#
# chain.add(Statsd::Metrics, { collector: "localhost:8125" })
#
# @param klass [Class] Your middleware class
# @param *args [Array<Object>] Set of arguments to pass to every instance of your middleware
def add(klass, *args)
remove(klass)
entries << Entry.new(@config, klass, *args)
end
# Identical to {#add} except the middleware is added to the front of the chain.
def prepend(klass, *args)
remove(klass)
entries.insert(0, Entry.new(@config, klass, *args))
end
# Inserts +newklass+ before +oldklass+ in the chain.
# Useful if one middleware must run before another middleware.
def insert_before(oldklass, newklass, *args)
i = entries.index { |entry| entry.klass == newklass }
new_entry = i.nil? ? Entry.new(@config, newklass, *args) : entries.delete_at(i)
@ -112,6 +139,8 @@ module Sidekiq
entries.insert(i, new_entry)
end
# Inserts +newklass+ after +oldklass+ in the chain.
# Useful if one middleware must run after another middleware.
def insert_after(oldklass, newklass, *args)
i = entries.index { |entry| entry.klass == newklass }
new_entry = i.nil? ? Entry.new(@config, newklass, *args) : entries.delete_at(i)
@ -119,10 +148,12 @@ module Sidekiq
entries.insert(i + 1, new_entry)
end
# @return [Boolean] if the given class is already in the chain
def exists?(klass)
any? { |entry| entry.klass == klass }
end
# @return [Boolean] if the chain contains no middleware
def empty?
@entries.nil? || @entries.empty?
end
@ -135,6 +166,8 @@ module Sidekiq
entries.clear
end
# Used by Sidekiq to execute the middleware at runtime
# @api private
def invoke(*args)
return yield if empty?
@ -152,6 +185,8 @@ module Sidekiq
private
# Represents each link in the middleware chain
# @api private
class Entry
attr_reader :klass

View file

@ -1,4 +1,6 @@
module Sidekiq
# Server-side middleware must import this Module in order
# to get access to server resources during `call`.
module ServerMiddleware
attr_accessor :config
def redis_pool

View file

@ -1,5 +1,5 @@
# frozen_string_literal: true
module Sidekiq
VERSION = "6.5.0"
VERSION = "6.5.1"
end

View file

@ -279,13 +279,16 @@ describe Sidekiq::Client do
assert_equal 1_000, jids.size
end
it "can push jobs scheduled at different times" do
first_at = Time.new(2019, 1, 1)
second_at = Time.new(2019, 1, 2)
jids = Sidekiq::Client.push_bulk("class" => QueuedWorker, "args" => [[1], [2]], "at" => [first_at.to_f, second_at.to_f])
(first_jid, second_jid) = jids
assert_equal first_at, Sidekiq::ScheduledSet.new.find_job(first_jid).at
assert_equal second_at, Sidekiq::ScheduledSet.new.find_job(second_jid).at
[1, 2, 3].each do |job_count|
it "can push #{job_count} jobs scheduled at different times" do
times = job_count.times.map { |i| Time.new(2019, 1, i + 1) }
args = job_count.times.map { |i| [i] }
jids = Sidekiq::Client.push_bulk("class" => QueuedWorker, "args" => args, "at" => times.map(&:to_f))
assert_equal job_count, jids.size
assert_equal times, jids.map { |jid| Sidekiq::ScheduledSet.new.find_job(jid).at }
end
end
it "can push jobs scheduled using ActiveSupport::Duration" do