2014-03-07 00:53:16 -05:00
|
|
|
# encoding: utf-8
|
2016-11-22 23:39:00 -05:00
|
|
|
# frozen_string_literal: true
|
2012-10-20 17:03:43 -04:00
|
|
|
require 'sidekiq'
|
|
|
|
|
|
|
|
module Sidekiq
|
2012-12-04 08:11:25 -05:00
|
|
|
class Stats
|
2015-01-16 20:54:34 -05:00
|
|
|
def initialize
|
|
|
|
fetch_stats!
|
|
|
|
end
|
|
|
|
|
2012-12-04 13:14:38 -05:00
|
|
|
def processed
|
2015-01-16 20:54:34 -05:00
|
|
|
stat :processed
|
2012-12-04 13:14:38 -05:00
|
|
|
end
|
2012-12-04 08:11:25 -05:00
|
|
|
|
2012-12-04 13:14:38 -05:00
|
|
|
def failed
|
2015-01-16 20:54:34 -05:00
|
|
|
stat :failed
|
2015-01-16 17:39:51 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def scheduled_size
|
2015-01-16 20:54:34 -05:00
|
|
|
stat :scheduled_size
|
2015-01-16 17:39:51 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def retry_size
|
2015-01-16 20:54:34 -05:00
|
|
|
stat :retry_size
|
2015-01-16 17:39:51 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def dead_size
|
2015-01-16 20:54:34 -05:00
|
|
|
stat :dead_size
|
|
|
|
end
|
|
|
|
|
|
|
|
def enqueued
|
|
|
|
stat :enqueued
|
|
|
|
end
|
|
|
|
|
|
|
|
def processes_size
|
|
|
|
stat :processes_size
|
|
|
|
end
|
|
|
|
|
|
|
|
def workers_size
|
|
|
|
stat :workers_size
|
|
|
|
end
|
|
|
|
|
|
|
|
def default_queue_latency
|
|
|
|
stat :default_queue_latency
|
2015-01-16 17:39:51 -05:00
|
|
|
end
|
|
|
|
|
2015-01-28 11:47:09 -05:00
|
|
|
def queues
|
|
|
|
Sidekiq::Stats::Queues.new.lengths
|
|
|
|
end
|
|
|
|
|
2015-01-16 18:25:42 -05:00
|
|
|
def fetch_stats!
|
2015-01-16 20:54:34 -05:00
|
|
|
pipe1_res = Sidekiq.redis do |conn|
|
|
|
|
conn.pipelined do
|
2015-01-17 17:03:45 -05:00
|
|
|
conn.get('stat:processed'.freeze)
|
|
|
|
conn.get('stat:failed'.freeze)
|
|
|
|
conn.zcard('schedule'.freeze)
|
|
|
|
conn.zcard('retry'.freeze)
|
|
|
|
conn.zcard('dead'.freeze)
|
|
|
|
conn.scard('processes'.freeze)
|
|
|
|
conn.lrange('queue:default'.freeze, -1, -1)
|
|
|
|
conn.smembers('processes'.freeze)
|
2015-01-16 20:54:34 -05:00
|
|
|
conn.smembers('queues'.freeze)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
pipe2_res = Sidekiq.redis do |conn|
|
|
|
|
conn.pipelined do
|
2015-01-17 17:03:45 -05:00
|
|
|
pipe1_res[7].each {|key| conn.hget(key, 'busy'.freeze) }
|
2015-01-16 20:54:34 -05:00
|
|
|
pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2015-01-16 22:06:09 -05:00
|
|
|
s = pipe1_res[7].size
|
|
|
|
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
|
|
|
|
enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)
|
2015-01-16 20:54:34 -05:00
|
|
|
|
|
|
|
default_queue_latency = if (entry = pipe1_res[6].first)
|
2017-03-24 14:56:10 -04:00
|
|
|
job = Sidekiq.load_json(entry) rescue {}
|
2017-02-02 15:25:29 -05:00
|
|
|
now = Time.now.to_f
|
|
|
|
thence = job['enqueued_at'.freeze] || now
|
|
|
|
now - thence
|
2015-01-16 20:54:34 -05:00
|
|
|
else
|
|
|
|
0
|
|
|
|
end
|
|
|
|
@stats = {
|
|
|
|
processed: pipe1_res[0].to_i,
|
|
|
|
failed: pipe1_res[1].to_i,
|
|
|
|
scheduled_size: pipe1_res[2],
|
|
|
|
retry_size: pipe1_res[3],
|
|
|
|
dead_size: pipe1_res[4],
|
|
|
|
processes_size: pipe1_res[5],
|
|
|
|
|
|
|
|
default_queue_latency: default_queue_latency,
|
|
|
|
workers_size: workers_size,
|
|
|
|
enqueued: enqueued
|
|
|
|
}
|
2015-01-16 18:25:42 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def reset(*stats)
|
|
|
|
all = %w(failed processed)
|
|
|
|
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
|
|
|
|
|
|
|
|
mset_args = []
|
|
|
|
stats.each do |stat|
|
|
|
|
mset_args << "stat:#{stat}"
|
|
|
|
mset_args << 0
|
|
|
|
end
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
conn.mset(*mset_args)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
private
|
|
|
|
|
2015-01-16 17:39:51 -05:00
|
|
|
def stat(s)
|
2015-01-16 20:54:34 -05:00
|
|
|
@stats[s]
|
2015-01-16 17:39:51 -05:00
|
|
|
end
|
|
|
|
|
2015-01-16 20:54:34 -05:00
|
|
|
class Queues
|
|
|
|
def lengths
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
queues = conn.smembers('queues'.freeze)
|
|
|
|
|
|
|
|
lengths = conn.pipelined do
|
|
|
|
queues.each do |queue|
|
|
|
|
conn.llen("queue:#{queue}")
|
2015-01-16 17:39:51 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2015-01-16 20:54:34 -05:00
|
|
|
i = 0
|
|
|
|
array_of_arrays = queues.inject({}) do |memo, queue|
|
|
|
|
memo[queue] = lengths[i]
|
|
|
|
i += 1
|
|
|
|
memo
|
|
|
|
end.sort_by { |_, size| size }
|
|
|
|
|
|
|
|
Hash[array_of_arrays.reverse]
|
|
|
|
end
|
2015-01-16 17:39:51 -05:00
|
|
|
end
|
2012-12-04 13:14:38 -05:00
|
|
|
end
|
2012-12-04 08:11:25 -05:00
|
|
|
|
2012-12-05 20:35:49 -05:00
|
|
|
class History
|
|
|
|
def initialize(days_previous, start_date = nil)
|
|
|
|
@days_previous = days_previous
|
|
|
|
@start_date = start_date || Time.now.utc.to_date
|
|
|
|
end
|
|
|
|
|
|
|
|
def processed
|
2017-05-31 15:08:57 -04:00
|
|
|
@processed ||= date_stat_hash("processed")
|
2012-12-05 20:35:49 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def failed
|
2017-05-31 15:08:57 -04:00
|
|
|
@failed ||= date_stat_hash("failed")
|
2012-12-05 20:35:49 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
private
|
|
|
|
|
|
|
|
def date_stat_hash(stat)
|
|
|
|
i = 0
|
|
|
|
stat_hash = {}
|
2014-03-22 00:28:27 -04:00
|
|
|
keys = []
|
|
|
|
dates = []
|
2012-12-05 20:35:49 -05:00
|
|
|
|
2014-03-22 00:28:27 -04:00
|
|
|
while i < @days_previous
|
|
|
|
date = @start_date - i
|
2015-07-14 00:27:30 -04:00
|
|
|
datestr = date.strftime("%Y-%m-%d".freeze)
|
|
|
|
keys << "stat:#{stat}:#{datestr}"
|
|
|
|
dates << datestr
|
2014-03-22 00:28:27 -04:00
|
|
|
i += 1
|
|
|
|
end
|
2012-12-05 20:35:49 -05:00
|
|
|
|
2017-05-31 15:08:57 -04:00
|
|
|
begin
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
conn.mget(keys).each_with_index do |value, idx|
|
|
|
|
stat_hash[dates[idx]] = value ? value.to_i : 0
|
|
|
|
end
|
2012-12-05 20:35:49 -05:00
|
|
|
end
|
2017-05-31 15:08:57 -04:00
|
|
|
rescue Redis::CommandError
|
|
|
|
# mget will trigger a CROSSSLOT error when run against a Cluster
|
|
|
|
# TODO Someone want to add Cluster support?
|
2012-12-05 20:35:49 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
stat_hash
|
|
|
|
end
|
|
|
|
end
|
2012-12-04 08:11:25 -05:00
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
##
|
|
|
|
# Encapsulates a queue within Sidekiq.
|
|
|
|
# Allows enumeration of all jobs within the queue
|
|
|
|
# and deletion of jobs.
|
|
|
|
#
|
|
|
|
# queue = Sidekiq::Queue.new("mailer")
|
|
|
|
# queue.each do |job|
|
|
|
|
# job.klass # => 'MyWorker'
|
|
|
|
# job.args # => [1, 2, 3]
|
|
|
|
# job.delete if job.jid == 'abcdef1234567890'
|
|
|
|
# end
|
|
|
|
#
|
|
|
|
class Queue
|
|
|
|
include Enumerable
|
|
|
|
|
2016-01-18 15:40:01 -05:00
|
|
|
##
|
|
|
|
# Return all known queues within Redis.
|
|
|
|
#
|
2013-06-01 17:54:29 -04:00
|
|
|
def self.all
|
2016-04-21 06:37:21 -04:00
|
|
|
Sidekiq.redis { |c| c.smembers('queues'.freeze) }.sort.map { |q| Sidekiq::Queue.new(q) }
|
2013-06-01 17:54:29 -04:00
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
attr_reader :name
|
|
|
|
|
|
|
|
def initialize(name="default")
|
|
|
|
@name = name
|
|
|
|
@rname = "queue:#{name}"
|
|
|
|
end
|
|
|
|
|
|
|
|
def size
|
|
|
|
Sidekiq.redis { |con| con.llen(@rname) }
|
|
|
|
end
|
|
|
|
|
2014-04-25 01:16:34 -04:00
|
|
|
# Sidekiq Pro overrides this
|
|
|
|
def paused?
|
|
|
|
false
|
|
|
|
end
|
|
|
|
|
2016-01-18 15:40:01 -05:00
|
|
|
##
|
|
|
|
# Calculates this queue's latency, the difference in seconds since the oldest
|
|
|
|
# job in the queue was enqueued.
|
|
|
|
#
|
|
|
|
# @return Float
|
2013-05-24 01:58:06 -04:00
|
|
|
def latency
|
|
|
|
entry = Sidekiq.redis do |conn|
|
|
|
|
conn.lrange(@rname, -1, -1)
|
|
|
|
end.first
|
|
|
|
return 0 unless entry
|
2017-02-02 15:25:29 -05:00
|
|
|
job = Sidekiq.load_json(entry)
|
|
|
|
now = Time.now.to_f
|
|
|
|
thence = job['enqueued_at'] || now
|
|
|
|
now - thence
|
2013-05-24 01:58:06 -04:00
|
|
|
end
|
|
|
|
|
2015-02-12 14:54:41 -05:00
|
|
|
def each
|
2013-10-12 18:56:34 -04:00
|
|
|
initial_size = size
|
|
|
|
deleted_size = 0
|
2012-10-20 17:03:43 -04:00
|
|
|
page = 0
|
|
|
|
page_size = 50
|
|
|
|
|
2015-10-29 12:38:09 -04:00
|
|
|
while true do
|
2013-10-12 18:56:34 -04:00
|
|
|
range_start = page * page_size - deleted_size
|
2016-01-14 11:07:37 -05:00
|
|
|
range_end = range_start + page_size - 1
|
2012-10-20 17:03:43 -04:00
|
|
|
entries = Sidekiq.redis do |conn|
|
2013-10-12 18:56:34 -04:00
|
|
|
conn.lrange @rname, range_start, range_end
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
break if entries.empty?
|
|
|
|
page += 1
|
|
|
|
entries.each do |entry|
|
2015-02-12 14:54:41 -05:00
|
|
|
yield Job.new(entry, @name)
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
2013-10-12 18:56:34 -04:00
|
|
|
deleted_size = initial_size - size
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
end
|
2012-11-25 21:43:48 -05:00
|
|
|
|
2016-01-18 15:40:01 -05:00
|
|
|
##
|
|
|
|
# Find the job with the given JID within this queue.
|
|
|
|
#
|
|
|
|
# This is a slow, inefficient operation. Do not use under
|
|
|
|
# normal conditions. Sidekiq Pro contains a faster version.
|
2013-04-17 14:11:29 -04:00
|
|
|
def find_job(jid)
|
2015-02-12 14:57:40 -05:00
|
|
|
detect { |j| j.jid == jid }
|
2013-04-17 14:11:29 -04:00
|
|
|
end
|
|
|
|
|
2012-11-25 21:43:48 -05:00
|
|
|
def clear
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
conn.multi do
|
2013-06-20 10:14:51 -04:00
|
|
|
conn.del(@rname)
|
2015-01-11 21:53:39 -05:00
|
|
|
conn.srem("queues".freeze, name)
|
2012-11-25 21:43:48 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
2014-03-07 00:53:16 -05:00
|
|
|
alias_method :💣, :clear
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
##
|
2012-10-30 13:06:20 -04:00
|
|
|
# Encapsulates a pending job within a Sidekiq queue or
|
|
|
|
# sorted set.
|
|
|
|
#
|
2012-10-20 17:03:43 -04:00
|
|
|
# The job should be considered immutable but may be
|
|
|
|
# removed from the queue via Job#delete.
|
|
|
|
#
|
|
|
|
class Job
|
|
|
|
attr_reader :item
|
2016-03-09 17:10:15 -05:00
|
|
|
attr_reader :value
|
2012-10-20 17:03:43 -04:00
|
|
|
|
|
|
|
def initialize(item, queue_name=nil)
|
2017-03-24 14:56:10 -04:00
|
|
|
@args = nil
|
2012-10-20 17:03:43 -04:00
|
|
|
@value = item
|
2017-03-24 14:56:10 -04:00
|
|
|
@item = item.is_a?(Hash) ? item : parse(item)
|
|
|
|
@queue = queue_name || @item['queue']
|
|
|
|
end
|
|
|
|
|
|
|
|
def parse(item)
|
|
|
|
Sidekiq.load_json(item)
|
|
|
|
rescue JSON::ParserError
|
|
|
|
# If the job payload in Redis is invalid JSON, we'll load
|
|
|
|
# the item as an empty hash and store the invalid JSON as
|
|
|
|
# the job 'args' for display in the Web UI.
|
|
|
|
@invalid = true
|
|
|
|
@args = [item]
|
|
|
|
{}
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
def klass
|
2017-01-17 17:58:08 -05:00
|
|
|
self['class']
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
2014-05-19 23:54:28 -04:00
|
|
|
def display_class
|
|
|
|
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
|
|
|
|
@klass ||= case klass
|
|
|
|
when /\ASidekiq::Extensions::Delayed/
|
2014-06-04 23:52:22 -04:00
|
|
|
safe_load(args[0], klass) do |target, method, _|
|
2014-06-04 18:54:00 -04:00
|
|
|
"#{target}.#{method}"
|
|
|
|
end
|
2014-05-19 23:54:28 -04:00
|
|
|
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
|
2015-12-14 16:29:02 -05:00
|
|
|
job_class = @item['wrapped'] || args[0]
|
|
|
|
if 'ActionMailer::DeliveryJob' == job_class
|
|
|
|
# MailerClass#mailer_method
|
|
|
|
args[0]['arguments'][0..1].join('#')
|
|
|
|
else
|
|
|
|
job_class
|
|
|
|
end
|
2014-05-19 23:54:28 -04:00
|
|
|
else
|
|
|
|
klass
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def display_args
|
|
|
|
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
|
|
|
|
@args ||= case klass
|
|
|
|
when /\ASidekiq::Extensions::Delayed/
|
2014-06-04 23:52:22 -04:00
|
|
|
safe_load(args[0], args) do |_, _, arg|
|
2014-06-04 18:54:00 -04:00
|
|
|
arg
|
|
|
|
end
|
2014-05-19 23:54:28 -04:00
|
|
|
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
|
2017-01-17 17:58:08 -05:00
|
|
|
job_args = self['wrapped'] ? args[0]["arguments"] : []
|
|
|
|
if 'ActionMailer::DeliveryJob' == (self['wrapped'] || args[0])
|
2017-04-24 19:33:16 -04:00
|
|
|
# remove MailerClass, mailer_method and 'deliver_now'
|
|
|
|
job_args.drop(3)
|
2015-12-14 16:29:02 -05:00
|
|
|
else
|
2017-04-24 19:33:16 -04:00
|
|
|
job_args
|
2015-12-14 16:29:02 -05:00
|
|
|
end
|
2014-05-19 23:54:28 -04:00
|
|
|
else
|
2017-04-24 19:40:28 -04:00
|
|
|
if self['encrypt'.freeze]
|
|
|
|
# no point in showing 150+ bytes of random garbage
|
|
|
|
args[-1] = '[encrypted data]'.freeze
|
|
|
|
end
|
2014-05-19 23:54:28 -04:00
|
|
|
args
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
def args
|
2017-03-24 14:56:10 -04:00
|
|
|
@args || @item['args']
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
def jid
|
2017-01-17 17:58:08 -05:00
|
|
|
self['jid']
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
2013-05-23 07:31:41 -04:00
|
|
|
def enqueued_at
|
2017-01-17 17:58:08 -05:00
|
|
|
self['enqueued_at'] ? Time.at(self['enqueued_at']).utc : nil
|
2013-05-23 07:31:41 -04:00
|
|
|
end
|
|
|
|
|
2015-06-03 06:45:35 -04:00
|
|
|
def created_at
|
2017-01-17 17:58:08 -05:00
|
|
|
Time.at(self['created_at'] || self['enqueued_at'] || 0).utc
|
2015-06-03 06:45:35 -04:00
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
def queue
|
|
|
|
@queue
|
|
|
|
end
|
|
|
|
|
2013-05-24 22:59:40 -04:00
|
|
|
def latency
|
2017-02-02 15:25:29 -05:00
|
|
|
now = Time.now.to_f
|
|
|
|
now - (@item['enqueued_at'] || @item['created_at'] || now)
|
2013-05-24 22:59:40 -04:00
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
##
|
|
|
|
# Remove this job from the queue.
|
|
|
|
def delete
|
|
|
|
count = Sidekiq.redis do |conn|
|
2013-11-20 17:51:58 -05:00
|
|
|
conn.lrem("queue:#{@queue}", 1, @value)
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
count != 0
|
|
|
|
end
|
|
|
|
|
|
|
|
def [](name)
|
2017-01-17 17:58:08 -05:00
|
|
|
# nil will happen if the JSON fails to parse.
|
|
|
|
# We don't guarantee Sidekiq will work with bad job JSON but we should
|
|
|
|
# make a best effort to minimize the damage.
|
|
|
|
@item ? @item[name] : nil
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
2014-06-04 23:52:22 -04:00
|
|
|
|
|
|
|
private
|
|
|
|
|
|
|
|
def safe_load(content, default)
|
|
|
|
begin
|
2015-01-12 17:25:04 -05:00
|
|
|
yield(*YAML.load(content))
|
2015-03-19 11:38:34 -04:00
|
|
|
rescue => ex
|
2014-06-04 23:52:22 -04:00
|
|
|
# #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
|
|
|
|
# memory yet so the YAML can't be loaded.
|
|
|
|
Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == 'development'
|
|
|
|
default
|
|
|
|
end
|
|
|
|
end
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
2012-10-30 13:06:20 -04:00
|
|
|
class SortedEntry < Job
|
2012-10-20 17:03:43 -04:00
|
|
|
attr_reader :score
|
2014-02-10 00:17:05 -05:00
|
|
|
attr_reader :parent
|
2012-10-20 17:03:43 -04:00
|
|
|
|
2012-10-30 13:06:20 -04:00
|
|
|
def initialize(parent, score, item)
|
2012-10-20 17:03:43 -04:00
|
|
|
super(item)
|
|
|
|
@score = score
|
2012-10-30 13:06:20 -04:00
|
|
|
@parent = parent
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
2012-10-30 13:06:20 -04:00
|
|
|
def at
|
2013-08-15 23:06:16 -04:00
|
|
|
Time.at(score).utc
|
2012-10-20 17:09:27 -04:00
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
def delete
|
2015-08-02 14:57:57 -04:00
|
|
|
if @value
|
|
|
|
@parent.delete_by_value(@parent.name, @value)
|
|
|
|
else
|
2015-08-08 21:23:56 -04:00
|
|
|
@parent.delete_by_jid(score, jid)
|
2015-08-02 14:57:57 -04:00
|
|
|
end
|
2012-11-26 11:22:48 -05:00
|
|
|
end
|
|
|
|
|
2013-01-29 16:17:59 -05:00
|
|
|
def reschedule(at)
|
2015-08-02 14:57:57 -04:00
|
|
|
delete
|
2013-01-29 16:17:59 -05:00
|
|
|
@parent.schedule(at, item)
|
|
|
|
end
|
|
|
|
|
2013-06-21 22:43:06 -04:00
|
|
|
def add_to_queue
|
2014-06-11 23:31:39 -04:00
|
|
|
remove_job do |message|
|
|
|
|
msg = Sidekiq.load_json(message)
|
|
|
|
Sidekiq::Client.push(msg)
|
2013-06-21 22:43:06 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-11-26 11:22:48 -05:00
|
|
|
def retry
|
2014-06-11 23:31:39 -04:00
|
|
|
remove_job do |message|
|
|
|
|
msg = Sidekiq.load_json(message)
|
2017-03-16 14:33:24 -04:00
|
|
|
msg['retry_count'] -= 1 if msg['retry_count']
|
2014-06-11 23:31:39 -04:00
|
|
|
Sidekiq::Client.push(msg)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-07-28 16:42:06 -04:00
|
|
|
##
|
|
|
|
# Place job in the dead set
|
|
|
|
def kill
|
|
|
|
remove_job do |message|
|
|
|
|
now = Time.now.to_f
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
conn.multi do
|
|
|
|
conn.zadd('dead', now, message)
|
2015-02-04 15:33:49 -05:00
|
|
|
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
|
|
|
|
conn.zremrangebyrank('dead', 0, - DeadSet.max_jobs)
|
2014-07-28 16:42:06 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2017-03-16 14:33:24 -04:00
|
|
|
def error?
|
|
|
|
!!item['error_class']
|
|
|
|
end
|
|
|
|
|
2014-06-11 23:31:39 -04:00
|
|
|
private
|
|
|
|
|
|
|
|
def remove_job
|
2012-11-26 11:22:48 -05:00
|
|
|
Sidekiq.redis do |conn|
|
2014-02-18 09:08:43 -05:00
|
|
|
results = conn.multi do
|
2014-02-21 21:02:28 -05:00
|
|
|
conn.zrangebyscore(parent.name, score, score)
|
|
|
|
conn.zremrangebyscore(parent.name, score, score)
|
2014-02-18 09:08:43 -05:00
|
|
|
end.first
|
2014-06-11 23:31:39 -04:00
|
|
|
|
|
|
|
if results.size == 1
|
|
|
|
yield results.first
|
|
|
|
else
|
|
|
|
# multiple jobs with the same score
|
|
|
|
# find the one with the right JID and push it
|
|
|
|
hash = results.group_by do |message|
|
|
|
|
if message.index(jid)
|
|
|
|
msg = Sidekiq.load_json(message)
|
|
|
|
msg['jid'] == jid
|
|
|
|
else
|
|
|
|
false
|
|
|
|
end
|
|
|
|
end
|
2014-07-19 17:16:55 -04:00
|
|
|
|
|
|
|
msg = hash.fetch(true, []).first
|
|
|
|
yield msg if msg
|
2014-06-11 23:31:39 -04:00
|
|
|
|
|
|
|
# push the rest back onto the sorted set
|
|
|
|
conn.multi do
|
2014-07-19 17:16:55 -04:00
|
|
|
hash.fetch(false, []).each do |message|
|
2014-06-11 23:31:39 -04:00
|
|
|
conn.zadd(parent.name, score.to_f.to_s, message)
|
|
|
|
end
|
|
|
|
end
|
2012-11-26 11:22:48 -05:00
|
|
|
end
|
|
|
|
end
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
2014-06-11 23:31:39 -04:00
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
2012-10-30 13:06:20 -04:00
|
|
|
class SortedSet
|
2012-10-20 17:03:43 -04:00
|
|
|
include Enumerable
|
|
|
|
|
2014-02-10 00:17:05 -05:00
|
|
|
attr_reader :name
|
|
|
|
|
2012-10-30 13:06:20 -04:00
|
|
|
def initialize(name)
|
2014-02-10 00:17:05 -05:00
|
|
|
@name = name
|
2013-10-23 22:30:53 -04:00
|
|
|
@_size = size
|
2012-10-30 13:06:20 -04:00
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
def size
|
2014-12-17 15:09:20 -05:00
|
|
|
Sidekiq.redis { |c| c.zcard(name) }
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
2014-03-02 19:36:00 -05:00
|
|
|
def clear
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
conn.del(name)
|
|
|
|
end
|
|
|
|
end
|
2014-03-07 00:56:59 -05:00
|
|
|
alias_method :💣, :clear
|
2014-03-02 19:36:00 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
class JobSet < SortedSet
|
|
|
|
|
2013-01-29 16:17:59 -05:00
|
|
|
def schedule(timestamp, message)
|
|
|
|
Sidekiq.redis do |conn|
|
2014-02-10 00:17:05 -05:00
|
|
|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
|
2013-01-29 16:17:59 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2015-02-12 14:54:41 -05:00
|
|
|
def each
|
2013-10-23 22:30:53 -04:00
|
|
|
initial_size = @_size
|
2013-10-23 23:36:13 -04:00
|
|
|
offset_size = 0
|
2012-10-20 17:03:43 -04:00
|
|
|
page = -1
|
|
|
|
page_size = 50
|
|
|
|
|
2015-10-29 12:38:09 -04:00
|
|
|
while true do
|
2013-10-23 23:36:13 -04:00
|
|
|
range_start = page * page_size + offset_size
|
2016-01-14 11:07:37 -05:00
|
|
|
range_end = range_start + page_size - 1
|
2012-10-30 13:06:20 -04:00
|
|
|
elements = Sidekiq.redis do |conn|
|
2014-12-17 15:09:20 -05:00
|
|
|
conn.zrange name, range_start, range_end, with_scores: true
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
2012-10-30 13:06:20 -04:00
|
|
|
break if elements.empty?
|
2012-10-20 17:03:43 -04:00
|
|
|
page -= 1
|
2012-10-30 13:06:20 -04:00
|
|
|
elements.each do |element, score|
|
2015-02-12 14:54:41 -05:00
|
|
|
yield SortedEntry.new(self, score, element)
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
2013-10-23 23:36:13 -04:00
|
|
|
offset_size = initial_size - @_size
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
end
|
2012-10-30 13:06:20 -04:00
|
|
|
|
2012-11-26 14:53:22 -05:00
|
|
|
def fetch(score, jid = nil)
|
|
|
|
elements = Sidekiq.redis do |conn|
|
2014-02-10 00:17:05 -05:00
|
|
|
conn.zrangebyscore(name, score, score)
|
2012-11-26 14:53:22 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
elements.inject([]) do |result, element|
|
|
|
|
entry = SortedEntry.new(self, score, element)
|
|
|
|
if jid
|
|
|
|
result << entry if entry.jid == jid
|
|
|
|
else
|
|
|
|
result << entry
|
|
|
|
end
|
2012-12-02 23:32:16 -05:00
|
|
|
result
|
2012-11-26 14:53:22 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2016-01-18 15:40:01 -05:00
|
|
|
##
|
|
|
|
# Find the job with the given JID within this sorted set.
|
|
|
|
#
|
|
|
|
# This is a slow, inefficient operation. Do not use under
|
|
|
|
# normal conditions. Sidekiq Pro contains a faster version.
|
2013-04-16 15:43:24 -04:00
|
|
|
def find_job(jid)
|
2013-04-17 14:11:29 -04:00
|
|
|
self.detect { |j| j.jid == jid }
|
2013-04-16 15:43:24 -04:00
|
|
|
end
|
|
|
|
|
2015-08-02 14:57:57 -04:00
|
|
|
def delete_by_value(name, value)
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
ret = conn.zrem(name, value)
|
|
|
|
@_size -= 1 if ret
|
|
|
|
ret
|
|
|
|
end
|
|
|
|
end
|
2012-11-26 14:53:22 -05:00
|
|
|
|
2015-08-02 14:57:57 -04:00
|
|
|
def delete_by_jid(score, jid)
|
2015-08-08 21:23:56 -04:00
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
elements = conn.zrangebyscore(name, score, score)
|
|
|
|
elements.each do |element|
|
2012-11-26 14:53:22 -05:00
|
|
|
message = Sidekiq.load_json(element)
|
|
|
|
if message["jid"] == jid
|
2015-08-02 14:57:57 -04:00
|
|
|
ret = conn.zrem(name, element)
|
|
|
|
@_size -= 1 if ret
|
2015-08-08 21:23:56 -04:00
|
|
|
break ret
|
2013-10-23 23:36:13 -04:00
|
|
|
end
|
2015-08-08 21:23:56 -04:00
|
|
|
false
|
2012-11-26 14:53:22 -05:00
|
|
|
end
|
2012-10-30 13:06:20 -04:00
|
|
|
end
|
|
|
|
end
|
2012-11-25 21:43:48 -05:00
|
|
|
|
2015-08-08 21:23:56 -04:00
|
|
|
alias_method :delete, :delete_by_jid
|
2012-10-30 13:06:20 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
##
|
|
|
|
# Allows enumeration of scheduled jobs within Sidekiq.
|
|
|
|
# Based on this, you can search/filter for jobs. Here's an
|
|
|
|
# example where I'm selecting all jobs of a certain type
|
2017-02-10 10:46:44 -05:00
|
|
|
# and deleting them from the schedule queue.
|
2012-10-30 13:06:20 -04:00
|
|
|
#
|
|
|
|
# r = Sidekiq::ScheduledSet.new
|
2017-02-10 10:46:44 -05:00
|
|
|
# r.select do |scheduled|
|
|
|
|
# scheduled.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
|
|
|
# scheduled.args[0] == 'User' &&
|
|
|
|
# scheduled.args[1] == 'setup_new_subscriber'
|
2012-10-30 13:06:20 -04:00
|
|
|
# end.map(&:delete)
|
2014-03-02 19:36:00 -05:00
|
|
|
class ScheduledSet < JobSet
|
2012-10-30 13:06:20 -04:00
|
|
|
def initialize
|
|
|
|
super 'schedule'
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
##
|
|
|
|
# Allows enumeration of retries within Sidekiq.
|
|
|
|
# Based on this, you can search/filter for jobs. Here's an
|
|
|
|
# example where I'm selecting all jobs of a certain type
|
|
|
|
# and deleting them from the retry queue.
|
|
|
|
#
|
|
|
|
# r = Sidekiq::RetrySet.new
|
|
|
|
# r.select do |retri|
|
|
|
|
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
|
|
|
# retri.args[0] == 'User' &&
|
|
|
|
# retri.args[1] == 'setup_new_subscriber'
|
|
|
|
# end.map(&:delete)
|
2014-03-02 19:36:00 -05:00
|
|
|
class RetrySet < JobSet
|
2012-10-30 13:06:20 -04:00
|
|
|
def initialize
|
|
|
|
super 'retry'
|
|
|
|
end
|
2013-02-19 23:36:59 -05:00
|
|
|
|
2014-02-09 17:56:01 -05:00
|
|
|
def retry_all
|
|
|
|
while size > 0
|
|
|
|
each(&:retry)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-07-28 18:42:55 -04:00
|
|
|
##
|
|
|
|
# Allows enumeration of dead jobs within Sidekiq.
|
|
|
|
#
|
2014-03-02 19:36:00 -05:00
|
|
|
class DeadSet < JobSet
|
2014-02-09 17:56:01 -05:00
|
|
|
def initialize
|
|
|
|
super 'dead'
|
|
|
|
end
|
|
|
|
|
2013-02-19 23:36:59 -05:00
|
|
|
def retry_all
|
|
|
|
while size > 0
|
|
|
|
each(&:retry)
|
|
|
|
end
|
|
|
|
end
|
2015-02-04 15:33:49 -05:00
|
|
|
|
|
|
|
def self.max_jobs
|
|
|
|
Sidekiq.options[:dead_max_jobs]
|
|
|
|
end
|
|
|
|
|
|
|
|
def self.timeout
|
|
|
|
Sidekiq.options[:dead_timeout_in_seconds]
|
|
|
|
end
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|
|
|
|
|
2014-03-03 00:18:26 -05:00
|
|
|
##
|
|
|
|
# Enumerates the set of Sidekiq processes which are actively working
|
|
|
|
# right now. Each process send a heartbeat to Redis every 5 seconds
|
|
|
|
# so this set should be relatively accurate, barring network partitions.
|
2014-03-08 01:41:10 -05:00
|
|
|
#
|
2014-05-13 23:33:20 -04:00
|
|
|
# Yields a Sidekiq::Process.
|
2014-03-08 01:41:10 -05:00
|
|
|
#
|
2014-03-03 00:18:26 -05:00
|
|
|
class ProcessSet
|
|
|
|
include Enumerable
|
2014-03-02 19:36:00 -05:00
|
|
|
|
2014-10-06 11:53:06 -04:00
|
|
|
def initialize(clean_plz=true)
|
|
|
|
self.class.cleanup if clean_plz
|
|
|
|
end
|
|
|
|
|
|
|
|
# Cleans up dead processes recorded in Redis.
|
|
|
|
# Returns the number of processes cleaned.
|
|
|
|
def self.cleanup
|
|
|
|
count = 0
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
procs = conn.smembers('processes').sort
|
|
|
|
heartbeats = conn.pipelined do
|
|
|
|
procs.each do |key|
|
|
|
|
conn.hget(key, 'info')
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
# the hash named key has an expiry of 60 seconds.
|
|
|
|
# if it's not found, that means the process has not reported
|
|
|
|
# in to Redis and probably died.
|
|
|
|
to_prune = []
|
|
|
|
heartbeats.each_with_index do |beat, i|
|
|
|
|
to_prune << procs[i] if beat.nil?
|
|
|
|
end
|
|
|
|
count = conn.srem('processes', to_prune) unless to_prune.empty?
|
|
|
|
end
|
|
|
|
count
|
|
|
|
end
|
|
|
|
|
2015-02-12 14:54:41 -05:00
|
|
|
def each
|
2014-10-06 11:53:06 -04:00
|
|
|
procs = Sidekiq.redis { |conn| conn.smembers('processes') }.sort
|
2014-03-03 00:18:26 -05:00
|
|
|
|
2014-03-08 01:41:10 -05:00
|
|
|
Sidekiq.redis do |conn|
|
2014-03-22 00:24:19 -04:00
|
|
|
# We're making a tradeoff here between consuming more memory instead of
|
|
|
|
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
|
|
|
|
# you'll be happier this way
|
|
|
|
result = conn.pipelined do
|
2014-10-06 11:53:06 -04:00
|
|
|
procs.each do |key|
|
2016-01-07 17:19:37 -05:00
|
|
|
conn.hmget(key, 'info', 'busy', 'beat', 'quiet')
|
2014-03-22 00:24:19 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2016-01-07 17:19:37 -05:00
|
|
|
result.each do |info, busy, at_s, quiet|
|
2017-05-02 13:08:26 -04:00
|
|
|
# If a process is stopped between when we query Redis for `procs` and
|
|
|
|
# when we query for `result`, we will have an item in `result` that is
|
|
|
|
# composed of `nil` values.
|
|
|
|
next if info.nil?
|
|
|
|
|
2014-03-08 01:41:10 -05:00
|
|
|
hash = Sidekiq.load_json(info)
|
2017-05-26 13:35:19 -04:00
|
|
|
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet))
|
2014-03-02 19:36:00 -05:00
|
|
|
end
|
|
|
|
end
|
2014-03-03 00:18:26 -05:00
|
|
|
|
|
|
|
nil
|
2014-03-02 19:36:00 -05:00
|
|
|
end
|
2014-03-09 17:32:27 -04:00
|
|
|
|
|
|
|
# This method is not guaranteed accurate since it does not prune the set
|
|
|
|
# based on current heartbeat. #each does that and ensures the set only
|
|
|
|
# contains Sidekiq processes which have sent a heartbeat within the last
|
|
|
|
# 60 seconds.
|
|
|
|
def size
|
|
|
|
Sidekiq.redis { |conn| conn.scard('processes') }
|
|
|
|
end
|
2017-05-26 13:25:32 -04:00
|
|
|
|
|
|
|
# Returns the identity of the current cluster leader or "" if no leader.
|
|
|
|
# This is a Sidekiq Enterprise feature, will always return "" in Sidekiq
|
|
|
|
# or Sidekiq Pro.
|
|
|
|
def leader
|
|
|
|
@leader ||= begin
|
|
|
|
x = Sidekiq.redis {|c| c.get("dear-leader") }
|
|
|
|
# need a non-falsy value so we can memoize
|
|
|
|
x = "" unless x
|
|
|
|
x
|
|
|
|
end
|
|
|
|
end
|
2014-03-02 19:36:00 -05:00
|
|
|
end
|
2013-01-24 12:50:30 -05:00
|
|
|
|
2014-05-13 23:33:20 -04:00
|
|
|
#
|
2016-01-18 15:40:01 -05:00
|
|
|
# Sidekiq::Process represents an active Sidekiq process talking with Redis.
|
|
|
|
# Each process has a set of attributes which look like this:
|
2014-05-13 23:33:20 -04:00
|
|
|
#
|
|
|
|
# {
|
|
|
|
# 'hostname' => 'app-1.example.com',
|
|
|
|
# 'started_at' => <process start time>,
|
|
|
|
# 'pid' => 12345,
|
|
|
|
# 'tag' => 'myapp'
|
|
|
|
# 'concurrency' => 25,
|
|
|
|
# 'queues' => ['default', 'low'],
|
|
|
|
# 'busy' => 10,
|
|
|
|
# 'beat' => <last heartbeat>,
|
2014-12-30 20:25:55 -05:00
|
|
|
# 'identity' => <unique string identifying the process>,
|
2014-05-13 23:33:20 -04:00
|
|
|
# }
|
|
|
|
class Process
|
|
|
|
def initialize(hash)
|
|
|
|
@attribs = hash
|
|
|
|
end
|
|
|
|
|
2014-08-06 12:41:46 -04:00
|
|
|
def tag
|
|
|
|
self['tag']
|
|
|
|
end
|
|
|
|
|
|
|
|
def labels
|
|
|
|
Array(self['labels'])
|
|
|
|
end
|
|
|
|
|
2014-05-13 23:33:20 -04:00
|
|
|
def [](key)
|
|
|
|
@attribs[key]
|
|
|
|
end
|
2014-05-14 00:41:40 -04:00
|
|
|
|
2017-05-26 13:25:32 -04:00
|
|
|
def identity
|
|
|
|
self['identity']
|
|
|
|
end
|
|
|
|
|
2014-05-14 00:41:40 -04:00
|
|
|
def quiet!
|
2017-01-03 18:24:48 -05:00
|
|
|
signal('TSTP')
|
2014-05-16 00:12:44 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
def stop!
|
|
|
|
signal('TERM')
|
|
|
|
end
|
|
|
|
|
2015-03-22 19:46:41 -04:00
|
|
|
def dump_threads
|
|
|
|
signal('TTIN')
|
|
|
|
end
|
|
|
|
|
2016-01-07 16:33:37 -05:00
|
|
|
def stopping?
|
2016-01-07 17:19:37 -05:00
|
|
|
self['quiet'] == 'true'
|
2016-01-07 16:33:37 -05:00
|
|
|
end
|
2015-03-22 19:46:41 -04:00
|
|
|
|
2014-05-16 00:12:44 -04:00
|
|
|
private
|
|
|
|
|
|
|
|
def signal(sig)
|
2014-05-14 00:41:40 -04:00
|
|
|
key = "#{identity}-signals"
|
|
|
|
Sidekiq.redis do |c|
|
|
|
|
c.multi do
|
2014-05-16 00:12:44 -04:00
|
|
|
c.lpush(key, sig)
|
2014-05-14 00:41:40 -04:00
|
|
|
c.expire(key, 60)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-05-13 23:33:20 -04:00
|
|
|
end
|
|
|
|
|
2013-01-24 12:50:30 -05:00
|
|
|
##
|
2016-01-18 15:40:01 -05:00
|
|
|
# A worker is a thread that is currently processing a job.
|
2013-01-24 12:50:30 -05:00
|
|
|
# Programmatic access to the current active worker set.
|
|
|
|
#
|
|
|
|
# WARNING WARNING WARNING
|
|
|
|
#
|
|
|
|
# This is live data that can change every millisecond.
|
2013-05-23 00:50:22 -04:00
|
|
|
# If you call #size => 5 and then expect #each to be
|
2013-01-24 12:50:30 -05:00
|
|
|
# called 5 times, you're going to have a bad time.
|
|
|
|
#
|
|
|
|
# workers = Sidekiq::Workers.new
|
|
|
|
# workers.size => 2
|
2014-03-08 17:21:52 -05:00
|
|
|
# workers.each do |process_id, thread_id, work|
|
|
|
|
# # process_id is a unique identifier per Sidekiq process
|
|
|
|
# # thread_id is a unique identifier per thread
|
2013-01-24 12:50:30 -05:00
|
|
|
# # work is a Hash which looks like:
|
|
|
|
# # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
|
2014-02-01 23:48:44 -05:00
|
|
|
# # run_at is an epoch Integer.
|
2013-01-24 12:50:30 -05:00
|
|
|
# end
|
2014-03-08 17:21:52 -05:00
|
|
|
#
|
2013-01-24 12:50:30 -05:00
|
|
|
class Workers
|
|
|
|
include Enumerable
|
|
|
|
|
2015-02-12 14:54:41 -05:00
|
|
|
def each
|
2014-03-08 01:41:10 -05:00
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
procs = conn.smembers('processes')
|
|
|
|
procs.sort.each do |key|
|
2014-03-22 00:18:33 -04:00
|
|
|
valid, workers = conn.pipelined do
|
2014-03-08 01:41:10 -05:00
|
|
|
conn.exists(key)
|
|
|
|
conn.hgetall("#{key}:workers")
|
|
|
|
end
|
|
|
|
next unless valid
|
|
|
|
workers.each_pair do |tid, json|
|
2014-03-08 17:21:52 -05:00
|
|
|
yield key, tid, Sidekiq.load_json(json)
|
2014-03-08 01:41:10 -05:00
|
|
|
end
|
|
|
|
end
|
2013-01-24 12:50:30 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-03-08 17:21:52 -05:00
|
|
|
# Note that #size is only as accurate as Sidekiq's heartbeat,
|
|
|
|
# which happens every 5 seconds. It is NOT real-time.
|
|
|
|
#
|
2014-03-08 17:10:27 -05:00
|
|
|
# Not very efficient if you have lots of Sidekiq
|
|
|
|
# processes but the alternative is a global counter
|
|
|
|
# which can easily get out of sync with crashy processes.
|
2013-01-24 12:50:30 -05:00
|
|
|
def size
|
2014-03-08 17:10:27 -05:00
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
procs = conn.smembers('processes')
|
2015-04-10 17:28:28 -04:00
|
|
|
if procs.empty?
|
|
|
|
0
|
|
|
|
else
|
|
|
|
conn.pipelined do
|
|
|
|
procs.each do |key|
|
|
|
|
conn.hget(key, 'busy')
|
|
|
|
end
|
|
|
|
end.map(&:to_i).inject(:+)
|
|
|
|
end
|
2014-03-08 17:10:27 -05:00
|
|
|
end
|
2013-01-24 12:50:30 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-10-20 17:03:43 -04:00
|
|
|
end
|