1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/api.rb

667 lines
16 KiB
Ruby

# encoding: utf-8
require 'sidekiq'
module Sidekiq
class Stats
def processed
Sidekiq.redis { |conn| conn.get("stat:processed") }.to_i
end
def failed
Sidekiq.redis { |conn| conn.get("stat:failed") }.to_i
end
def reset(*stats)
all = %w(failed processed)
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
mset_args = []
stats.each do |stat|
mset_args << "stat:#{stat}"
mset_args << 0
end
Sidekiq.redis do |conn|
conn.mset(*mset_args)
end
end
def queues
Sidekiq.redis do |conn|
queues = conn.smembers('queues')
lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
end
i = 0
array_of_arrays = queues.inject({}) do |memo, queue|
memo[queue] = lengths[i]
i += 1
memo
end.sort_by { |_, size| size }
Hash[array_of_arrays.reverse]
end
end
def enqueued
queues.values.inject(&:+) || 0
end
def scheduled_size
Sidekiq.redis {|c| c.zcard('schedule') }
end
def retry_size
Sidekiq.redis {|c| c.zcard('retry') }
end
def dead_size
Sidekiq.redis {|c| c.zcard('dead') }
end
class History
def initialize(days_previous, start_date = nil)
@days_previous = days_previous
@start_date = start_date || Time.now.utc.to_date
end
def processed
date_stat_hash("processed")
end
def failed
date_stat_hash("failed")
end
private
def date_stat_hash(stat)
i = 0
stat_hash = {}
keys = []
dates = []
while i < @days_previous
date = @start_date - i
keys << "stat:#{stat}:#{date}"
dates << date
i += 1
end
Sidekiq.redis do |conn|
conn.mget(keys).each_with_index do |value, i|
stat_hash[dates[i].to_s] = value ? value.to_i : 0
end
end
stat_hash
end
end
end
##
# Encapsulates a queue within Sidekiq.
# Allows enumeration of all jobs within the queue
# and deletion of jobs.
#
# queue = Sidekiq::Queue.new("mailer")
# queue.each do |job|
# job.klass # => 'MyWorker'
# job.args # => [1, 2, 3]
# job.delete if job.jid == 'abcdef1234567890'
# end
#
class Queue
include Enumerable
def self.all
Sidekiq.redis {|c| c.smembers('queues') }.sort.map {|q| Sidekiq::Queue.new(q) }
end
attr_reader :name
def initialize(name="default")
@name = name
@rname = "queue:#{name}"
end
def size
Sidekiq.redis { |con| con.llen(@rname) }
end
# Sidekiq Pro overrides this
def paused?
false
end
def latency
entry = Sidekiq.redis do |conn|
conn.lrange(@rname, -1, -1)
end.first
return 0 unless entry
Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at']
end
def each(&block)
initial_size = size
deleted_size = 0
page = 0
page_size = 50
loop do
range_start = page * page_size - deleted_size
range_end = page * page_size - deleted_size + (page_size - 1)
entries = Sidekiq.redis do |conn|
conn.lrange @rname, range_start, range_end
end
break if entries.empty?
page += 1
entries.each do |entry|
block.call Job.new(entry, @name)
end
deleted_size = initial_size - size
end
end
def find_job(jid)
self.detect { |j| j.jid == jid }
end
def clear
Sidekiq.redis do |conn|
conn.multi do
conn.del(@rname)
conn.srem("queues", name)
end
end
end
alias_method :💣, :clear
end
##
# Encapsulates a pending job within a Sidekiq queue or
# sorted set.
#
# The job should be considered immutable but may be
# removed from the queue via Job#delete.
#
class Job
KNOWN_WRAPPERS = [/\ASidekiq::Extensions::Delayed/, "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"]
attr_reader :item
def initialize(item, queue_name=nil)
@value = item
@item = item.is_a?(Hash) ? item : Sidekiq.load_json(item)
@queue = queue_name || @item['queue']
end
def klass
@item['class']
end
def display_class
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@klass ||= case klass
when /\ASidekiq::Extensions::Delayed/
safe_load(args[0], klass) do |target, method, _|
"#{target}.#{method}"
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
args[0]
else
klass
end
end
def display_args
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@args ||= case klass
when /\ASidekiq::Extensions::Delayed/
safe_load(args[0], args) do |_, _, arg|
arg
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
args[1..-1]
else
args
end
end
def args
@item['args']
end
def jid
@item['jid']
end
def enqueued_at
Time.at(@item['enqueued_at'] || 0).utc
end
def queue
@queue
end
def latency
Time.now.to_f - @item['enqueued_at']
end
##
# Remove this job from the queue.
def delete
count = Sidekiq.redis do |conn|
conn.lrem("queue:#{@queue}", 1, @value)
end
count != 0
end
def [](name)
@item.__send__(:[], name)
end
private
def safe_load(content, default)
begin
yield *YAML.load(content)
rescue ::ArgumentError => ex
# #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
# memory yet so the YAML can't be loaded.
Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == 'development'
default
end
end
end
class SortedEntry < Job
attr_reader :score
attr_reader :parent
def initialize(parent, score, item)
super(item)
@score = score
@parent = parent
end
def at
Time.at(score).utc
end
def delete
@parent.delete(score, jid)
end
def reschedule(at)
@parent.delete(score, jid)
@parent.schedule(at, item)
end
def add_to_queue
remove_job do |message|
msg = Sidekiq.load_json(message)
Sidekiq::Client.push(msg)
end
end
def retry
raise "Retry not available on jobs which have not failed" unless item["failed_at"]
remove_job do |message|
msg = Sidekiq.load_json(message)
msg['retry_count'] = msg['retry_count'] - 1
Sidekiq::Client.push(msg)
end
end
private
def remove_job
Sidekiq.redis do |conn|
results = conn.multi do
conn.zrangebyscore(parent.name, score, score)
conn.zremrangebyscore(parent.name, score, score)
end.first
if results.size == 1
yield results.first
else
# multiple jobs with the same score
# find the one with the right JID and push it
hash = results.group_by do |message|
if message.index(jid)
msg = Sidekiq.load_json(message)
msg['jid'] == jid
else
false
end
end
message = hash[true].first
yield message
# push the rest back onto the sorted set
conn.multi do
hash[false].each do |message|
conn.zadd(parent.name, score.to_f.to_s, message)
end
end
end
end
end
end
class SortedSet
include Enumerable
attr_reader :name
def initialize(name)
@name = name
@_size = size
end
def size
Sidekiq.redis {|c| c.zcard(name) }
end
def clear
Sidekiq.redis do |conn|
conn.del(name)
end
end
alias_method :💣, :clear
end
class JobSet < SortedSet
def schedule(timestamp, message)
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
end
end
def each(&block)
initial_size = @_size
offset_size = 0
page = -1
page_size = 50
loop do
range_start = page * page_size + offset_size
range_end = page * page_size + offset_size + (page_size - 1)
elements = Sidekiq.redis do |conn|
conn.zrange name, range_start, range_end, :with_scores => true
end
break if elements.empty?
page -= 1
elements.each do |element, score|
block.call SortedEntry.new(self, score, element)
end
offset_size = initial_size - @_size
end
end
def fetch(score, jid = nil)
elements = Sidekiq.redis do |conn|
conn.zrangebyscore(name, score, score)
end
elements.inject([]) do |result, element|
entry = SortedEntry.new(self, score, element)
if jid
result << entry if entry.jid == jid
else
result << entry
end
result
end
end
def find_job(jid)
self.detect { |j| j.jid == jid }
end
def delete(score, jid = nil)
if jid
elements = Sidekiq.redis do |conn|
conn.zrangebyscore(name, score, score)
end
elements_with_jid = elements.map do |element|
message = Sidekiq.load_json(element)
if message["jid"] == jid
_, @_size = Sidekiq.redis do |conn|
conn.multi do
conn.zrem(name, element)
conn.zcard name
end
end
end
end
elements_with_jid.count != 0
else
count, @_size = Sidekiq.redis do |conn|
conn.multi do
conn.zremrangebyscore(name, score, score)
conn.zcard name
end
end
count != 0
end
end
end
##
# Allows enumeration of scheduled jobs within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the retry queue.
#
# r = Sidekiq::ScheduledSet.new
# r.select do |retri|
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
# retri.args[0] == 'User' &&
# retri.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class ScheduledSet < JobSet
def initialize
super 'schedule'
end
end
##
# Allows enumeration of retries within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the retry queue.
#
# r = Sidekiq::RetrySet.new
# r.select do |retri|
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
# retri.args[0] == 'User' &&
# retri.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class RetrySet < JobSet
def initialize
super 'retry'
end
def retry_all
while size > 0
each(&:retry)
end
end
end
class DeadSet < JobSet
def initialize
super 'dead'
end
def retry_all
while size > 0
each(&:retry)
end
end
end
##
# Enumerates the set of Sidekiq processes which are actively working
# right now. Each process send a heartbeat to Redis every 5 seconds
# so this set should be relatively accurate, barring network partitions.
#
# Yields a Sidekiq::Process.
#
class ProcessSet
include Enumerable
def each(&block)
procs = Sidekiq.redis { |conn| conn.smembers('processes') }
to_prune = []
sorted = procs.sort
Sidekiq.redis do |conn|
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
result = conn.pipelined do
sorted.each do |key|
conn.hmget(key, 'info', 'busy', 'beat')
end
end
result.each_with_index do |(info, busy, at_s), i|
# the hash named key has an expiry of 60 seconds.
# if it's not found, that means the process has not reported
# in to Redis and probably died.
(to_prune << sorted[i]; next) if info.nil?
hash = Sidekiq.load_json(info)
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f))
end
end
Sidekiq.redis {|conn| conn.srem('processes', to_prune) } unless to_prune.empty?
nil
end
# This method is not guaranteed accurate since it does not prune the set
# based on current heartbeat. #each does that and ensures the set only
# contains Sidekiq processes which have sent a heartbeat within the last
# 60 seconds.
def size
Sidekiq.redis { |conn| conn.scard('processes') }
end
end
#
# Sidekiq::Process has a set of attributes which look like this:
#
# {
# 'hostname' => 'app-1.example.com',
# 'started_at' => <process start time>,
# 'pid' => 12345,
# 'tag' => 'myapp'
# 'concurrency' => 25,
# 'queues' => ['default', 'low'],
# 'busy' => 10,
# 'beat' => <last heartbeat>,
# }
class Process
def initialize(hash)
@attribs = hash
end
def [](key)
@attribs[key]
end
def quiet!
signal('USR1')
end
def stop!
signal('TERM')
end
private
def signal(sig)
key = "#{identity}-signals"
Sidekiq.redis do |c|
c.multi do
c.lpush(key, sig)
c.expire(key, 60)
end
end
end
def identity
@id ||= "#{self['hostname']}:#{self['pid']}"
end
end
##
# Programmatic access to the current active worker set.
#
# WARNING WARNING WARNING
#
# This is live data that can change every millisecond.
# If you call #size => 5 and then expect #each to be
# called 5 times, you're going to have a bad time.
#
# workers = Sidekiq::Workers.new
# workers.size => 2
# workers.each do |process_id, thread_id, work|
# # process_id is a unique identifier per Sidekiq process
# # thread_id is a unique identifier per thread
# # work is a Hash which looks like:
# # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
# # run_at is an epoch Integer.
# end
#
class Workers
include Enumerable
def each(&block)
Sidekiq.redis do |conn|
procs = conn.smembers('processes')
procs.sort.each do |key|
valid, workers = conn.pipelined do
conn.exists(key)
conn.hgetall("#{key}:workers")
end
next unless valid
workers.each_pair do |tid, json|
yield key, tid, Sidekiq.load_json(json)
end
end
end
end
# Note that #size is only as accurate as Sidekiq's heartbeat,
# which happens every 5 seconds. It is NOT real-time.
#
# Not very efficient if you have lots of Sidekiq
# processes but the alternative is a global counter
# which can easily get out of sync with crashy processes.
def size
Sidekiq.redis do |conn|
procs = conn.smembers('processes')
return 0 if procs.empty?
conn.pipelined do
procs.each do |key|
conn.hget(key, 'busy')
end
end.map(&:to_i).inject(:+)
end
end
end
end