1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/api.rb
Mike Perham 701584e24e Verify graceful handling of ill-formed job payloads, fixes #3406
Sidekiq will better handle jobs with malformed payloads.  Any job which raises a JSON::ParserError will immediately move to the Dead set.  Update the API to degrade gracefully when trying to render bad JSON in the queue, scheduled or dead.  These payloads are most often from other languages where the JSON is being pieced together manually and pushed to Redis.
2017-03-24 11:59:48 -07:00

867 lines
21 KiB
Ruby

# encoding: utf-8
# frozen_string_literal: true
require 'sidekiq'
module Sidekiq
class Stats
def initialize
fetch_stats!
end
def processed
stat :processed
end
def failed
stat :failed
end
def scheduled_size
stat :scheduled_size
end
def retry_size
stat :retry_size
end
def dead_size
stat :dead_size
end
def enqueued
stat :enqueued
end
def processes_size
stat :processes_size
end
def workers_size
stat :workers_size
end
def default_queue_latency
stat :default_queue_latency
end
def queues
Sidekiq::Stats::Queues.new.lengths
end
def fetch_stats!
pipe1_res = Sidekiq.redis do |conn|
conn.pipelined do
conn.get('stat:processed'.freeze)
conn.get('stat:failed'.freeze)
conn.zcard('schedule'.freeze)
conn.zcard('retry'.freeze)
conn.zcard('dead'.freeze)
conn.scard('processes'.freeze)
conn.lrange('queue:default'.freeze, -1, -1)
conn.smembers('processes'.freeze)
conn.smembers('queues'.freeze)
end
end
pipe2_res = Sidekiq.redis do |conn|
conn.pipelined do
pipe1_res[7].each {|key| conn.hget(key, 'busy'.freeze) }
pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
end
end
s = pipe1_res[7].size
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)
default_queue_latency = if (entry = pipe1_res[6].first)
job = Sidekiq.load_json(entry) rescue {}
now = Time.now.to_f
thence = job['enqueued_at'.freeze] || now
now - thence
else
0
end
@stats = {
processed: pipe1_res[0].to_i,
failed: pipe1_res[1].to_i,
scheduled_size: pipe1_res[2],
retry_size: pipe1_res[3],
dead_size: pipe1_res[4],
processes_size: pipe1_res[5],
default_queue_latency: default_queue_latency,
workers_size: workers_size,
enqueued: enqueued
}
end
def reset(*stats)
all = %w(failed processed)
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
mset_args = []
stats.each do |stat|
mset_args << "stat:#{stat}"
mset_args << 0
end
Sidekiq.redis do |conn|
conn.mset(*mset_args)
end
end
private
def stat(s)
@stats[s]
end
class Queues
def lengths
Sidekiq.redis do |conn|
queues = conn.smembers('queues'.freeze)
lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
end
i = 0
array_of_arrays = queues.inject({}) do |memo, queue|
memo[queue] = lengths[i]
i += 1
memo
end.sort_by { |_, size| size }
Hash[array_of_arrays.reverse]
end
end
end
class History
def initialize(days_previous, start_date = nil)
@days_previous = days_previous
@start_date = start_date || Time.now.utc.to_date
end
def processed
date_stat_hash("processed")
end
def failed
date_stat_hash("failed")
end
private
def date_stat_hash(stat)
i = 0
stat_hash = {}
keys = []
dates = []
while i < @days_previous
date = @start_date - i
datestr = date.strftime("%Y-%m-%d".freeze)
keys << "stat:#{stat}:#{datestr}"
dates << datestr
i += 1
end
Sidekiq.redis do |conn|
conn.mget(keys).each_with_index do |value, idx|
stat_hash[dates[idx]] = value ? value.to_i : 0
end
end
stat_hash
end
end
end
##
# Encapsulates a queue within Sidekiq.
# Allows enumeration of all jobs within the queue
# and deletion of jobs.
#
# queue = Sidekiq::Queue.new("mailer")
# queue.each do |job|
# job.klass # => 'MyWorker'
# job.args # => [1, 2, 3]
# job.delete if job.jid == 'abcdef1234567890'
# end
#
class Queue
include Enumerable
##
# Return all known queues within Redis.
#
def self.all
Sidekiq.redis { |c| c.smembers('queues'.freeze) }.sort.map { |q| Sidekiq::Queue.new(q) }
end
attr_reader :name
def initialize(name="default")
@name = name
@rname = "queue:#{name}"
end
def size
Sidekiq.redis { |con| con.llen(@rname) }
end
# Sidekiq Pro overrides this
def paused?
false
end
##
# Calculates this queue's latency, the difference in seconds since the oldest
# job in the queue was enqueued.
#
# @return Float
def latency
entry = Sidekiq.redis do |conn|
conn.lrange(@rname, -1, -1)
end.first
return 0 unless entry
job = Sidekiq.load_json(entry)
now = Time.now.to_f
thence = job['enqueued_at'] || now
now - thence
end
def each
initial_size = size
deleted_size = 0
page = 0
page_size = 50
while true do
range_start = page * page_size - deleted_size
range_end = range_start + page_size - 1
entries = Sidekiq.redis do |conn|
conn.lrange @rname, range_start, range_end
end
break if entries.empty?
page += 1
entries.each do |entry|
yield Job.new(entry, @name)
end
deleted_size = initial_size - size
end
end
##
# Find the job with the given JID within this queue.
#
# This is a slow, inefficient operation. Do not use under
# normal conditions. Sidekiq Pro contains a faster version.
def find_job(jid)
detect { |j| j.jid == jid }
end
def clear
Sidekiq.redis do |conn|
conn.multi do
conn.del(@rname)
conn.srem("queues".freeze, name)
end
end
end
alias_method :💣, :clear
end
##
# Encapsulates a pending job within a Sidekiq queue or
# sorted set.
#
# The job should be considered immutable but may be
# removed from the queue via Job#delete.
#
class Job
attr_reader :item
attr_reader :value
def initialize(item, queue_name=nil)
@args = nil
@value = item
@item = item.is_a?(Hash) ? item : parse(item)
@queue = queue_name || @item['queue']
end
def parse(item)
Sidekiq.load_json(item)
rescue JSON::ParserError
# If the job payload in Redis is invalid JSON, we'll load
# the item as an empty hash and store the invalid JSON as
# the job 'args' for display in the Web UI.
@invalid = true
@args = [item]
{}
end
def klass
self['class']
end
def display_class
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@klass ||= case klass
when /\ASidekiq::Extensions::Delayed/
safe_load(args[0], klass) do |target, method, _|
"#{target}.#{method}"
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
job_class = @item['wrapped'] || args[0]
if 'ActionMailer::DeliveryJob' == job_class
# MailerClass#mailer_method
args[0]['arguments'][0..1].join('#')
else
job_class
end
else
klass
end
end
def display_args
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@args ||= case klass
when /\ASidekiq::Extensions::Delayed/
safe_load(args[0], args) do |_, _, arg|
arg
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
job_args = self['wrapped'] ? args[0]["arguments"] : []
if 'ActionMailer::DeliveryJob' == (self['wrapped'] || args[0])
# remove MailerClass, mailer_method and 'deliver_now'
job_args.drop(3)
else
job_args
end
else
args
end
end
def args
@args || @item['args']
end
def jid
self['jid']
end
def enqueued_at
self['enqueued_at'] ? Time.at(self['enqueued_at']).utc : nil
end
def created_at
Time.at(self['created_at'] || self['enqueued_at'] || 0).utc
end
def queue
@queue
end
def latency
now = Time.now.to_f
now - (@item['enqueued_at'] || @item['created_at'] || now)
end
##
# Remove this job from the queue.
def delete
count = Sidekiq.redis do |conn|
conn.lrem("queue:#{@queue}", 1, @value)
end
count != 0
end
def [](name)
# nil will happen if the JSON fails to parse.
# We don't guarantee Sidekiq will work with bad job JSON but we should
# make a best effort to minimize the damage.
@item ? @item[name] : nil
end
private
def safe_load(content, default)
begin
yield(*YAML.load(content))
rescue => ex
# #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
# memory yet so the YAML can't be loaded.
Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == 'development'
default
end
end
end
class SortedEntry < Job
attr_reader :score
attr_reader :parent
def initialize(parent, score, item)
super(item)
@score = score
@parent = parent
end
def at
Time.at(score).utc
end
def delete
if @value
@parent.delete_by_value(@parent.name, @value)
else
@parent.delete_by_jid(score, jid)
end
end
def reschedule(at)
delete
@parent.schedule(at, item)
end
def add_to_queue
remove_job do |message|
msg = Sidekiq.load_json(message)
Sidekiq::Client.push(msg)
end
end
def retry
remove_job do |message|
msg = Sidekiq.load_json(message)
msg['retry_count'] -= 1 if msg['retry_count']
Sidekiq::Client.push(msg)
end
end
##
# Place job in the dead set
def kill
remove_job do |message|
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd('dead', now, message)
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
conn.zremrangebyrank('dead', 0, - DeadSet.max_jobs)
end
end
end
end
def error?
!!item['error_class']
end
private
def remove_job
Sidekiq.redis do |conn|
results = conn.multi do
conn.zrangebyscore(parent.name, score, score)
conn.zremrangebyscore(parent.name, score, score)
end.first
if results.size == 1
yield results.first
else
# multiple jobs with the same score
# find the one with the right JID and push it
hash = results.group_by do |message|
if message.index(jid)
msg = Sidekiq.load_json(message)
msg['jid'] == jid
else
false
end
end
msg = hash.fetch(true, []).first
yield msg if msg
# push the rest back onto the sorted set
conn.multi do
hash.fetch(false, []).each do |message|
conn.zadd(parent.name, score.to_f.to_s, message)
end
end
end
end
end
end
class SortedSet
include Enumerable
attr_reader :name
def initialize(name)
@name = name
@_size = size
end
def size
Sidekiq.redis { |c| c.zcard(name) }
end
def clear
Sidekiq.redis do |conn|
conn.del(name)
end
end
alias_method :💣, :clear
end
class JobSet < SortedSet
def schedule(timestamp, message)
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
end
end
def each
initial_size = @_size
offset_size = 0
page = -1
page_size = 50
while true do
range_start = page * page_size + offset_size
range_end = range_start + page_size - 1
elements = Sidekiq.redis do |conn|
conn.zrange name, range_start, range_end, with_scores: true
end
break if elements.empty?
page -= 1
elements.each do |element, score|
yield SortedEntry.new(self, score, element)
end
offset_size = initial_size - @_size
end
end
def fetch(score, jid = nil)
elements = Sidekiq.redis do |conn|
conn.zrangebyscore(name, score, score)
end
elements.inject([]) do |result, element|
entry = SortedEntry.new(self, score, element)
if jid
result << entry if entry.jid == jid
else
result << entry
end
result
end
end
##
# Find the job with the given JID within this sorted set.
#
# This is a slow, inefficient operation. Do not use under
# normal conditions. Sidekiq Pro contains a faster version.
def find_job(jid)
self.detect { |j| j.jid == jid }
end
def delete_by_value(name, value)
Sidekiq.redis do |conn|
ret = conn.zrem(name, value)
@_size -= 1 if ret
ret
end
end
def delete_by_jid(score, jid)
Sidekiq.redis do |conn|
elements = conn.zrangebyscore(name, score, score)
elements.each do |element|
message = Sidekiq.load_json(element)
if message["jid"] == jid
ret = conn.zrem(name, element)
@_size -= 1 if ret
break ret
end
false
end
end
end
alias_method :delete, :delete_by_jid
end
##
# Allows enumeration of scheduled jobs within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the schedule queue.
#
# r = Sidekiq::ScheduledSet.new
# r.select do |scheduled|
# scheduled.klass == 'Sidekiq::Extensions::DelayedClass' &&
# scheduled.args[0] == 'User' &&
# scheduled.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class ScheduledSet < JobSet
def initialize
super 'schedule'
end
end
##
# Allows enumeration of retries within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the retry queue.
#
# r = Sidekiq::RetrySet.new
# r.select do |retri|
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
# retri.args[0] == 'User' &&
# retri.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class RetrySet < JobSet
def initialize
super 'retry'
end
def retry_all
while size > 0
each(&:retry)
end
end
end
##
# Allows enumeration of dead jobs within Sidekiq.
#
class DeadSet < JobSet
def initialize
super 'dead'
end
def retry_all
while size > 0
each(&:retry)
end
end
def self.max_jobs
Sidekiq.options[:dead_max_jobs]
end
def self.timeout
Sidekiq.options[:dead_timeout_in_seconds]
end
end
##
# Enumerates the set of Sidekiq processes which are actively working
# right now. Each process send a heartbeat to Redis every 5 seconds
# so this set should be relatively accurate, barring network partitions.
#
# Yields a Sidekiq::Process.
#
class ProcessSet
include Enumerable
def initialize(clean_plz=true)
self.class.cleanup if clean_plz
end
# Cleans up dead processes recorded in Redis.
# Returns the number of processes cleaned.
def self.cleanup
count = 0
Sidekiq.redis do |conn|
procs = conn.smembers('processes').sort
heartbeats = conn.pipelined do
procs.each do |key|
conn.hget(key, 'info')
end
end
# the hash named key has an expiry of 60 seconds.
# if it's not found, that means the process has not reported
# in to Redis and probably died.
to_prune = []
heartbeats.each_with_index do |beat, i|
to_prune << procs[i] if beat.nil?
end
count = conn.srem('processes', to_prune) unless to_prune.empty?
end
count
end
def each
procs = Sidekiq.redis { |conn| conn.smembers('processes') }.sort
Sidekiq.redis do |conn|
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
result = conn.pipelined do
procs.each do |key|
conn.hmget(key, 'info', 'busy', 'beat', 'quiet')
end
end
result.each do |info, busy, at_s, quiet|
hash = Sidekiq.load_json(info)
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet))
end
end
nil
end
# This method is not guaranteed accurate since it does not prune the set
# based on current heartbeat. #each does that and ensures the set only
# contains Sidekiq processes which have sent a heartbeat within the last
# 60 seconds.
def size
Sidekiq.redis { |conn| conn.scard('processes') }
end
end
#
# Sidekiq::Process represents an active Sidekiq process talking with Redis.
# Each process has a set of attributes which look like this:
#
# {
# 'hostname' => 'app-1.example.com',
# 'started_at' => <process start time>,
# 'pid' => 12345,
# 'tag' => 'myapp'
# 'concurrency' => 25,
# 'queues' => ['default', 'low'],
# 'busy' => 10,
# 'beat' => <last heartbeat>,
# 'identity' => <unique string identifying the process>,
# }
class Process
def initialize(hash)
@attribs = hash
end
def tag
self['tag']
end
def labels
Array(self['labels'])
end
def [](key)
@attribs[key]
end
def quiet!
signal('TSTP')
end
def stop!
signal('TERM')
end
def dump_threads
signal('TTIN')
end
def stopping?
self['quiet'] == 'true'
end
private
def signal(sig)
key = "#{identity}-signals"
Sidekiq.redis do |c|
c.multi do
c.lpush(key, sig)
c.expire(key, 60)
end
end
end
def identity
self['identity']
end
end
##
# A worker is a thread that is currently processing a job.
# Programmatic access to the current active worker set.
#
# WARNING WARNING WARNING
#
# This is live data that can change every millisecond.
# If you call #size => 5 and then expect #each to be
# called 5 times, you're going to have a bad time.
#
# workers = Sidekiq::Workers.new
# workers.size => 2
# workers.each do |process_id, thread_id, work|
# # process_id is a unique identifier per Sidekiq process
# # thread_id is a unique identifier per thread
# # work is a Hash which looks like:
# # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
# # run_at is an epoch Integer.
# end
#
class Workers
include Enumerable
def each
Sidekiq.redis do |conn|
procs = conn.smembers('processes')
procs.sort.each do |key|
valid, workers = conn.pipelined do
conn.exists(key)
conn.hgetall("#{key}:workers")
end
next unless valid
workers.each_pair do |tid, json|
yield key, tid, Sidekiq.load_json(json)
end
end
end
end
# Note that #size is only as accurate as Sidekiq's heartbeat,
# which happens every 5 seconds. It is NOT real-time.
#
# Not very efficient if you have lots of Sidekiq
# processes but the alternative is a global counter
# which can easily get out of sync with crashy processes.
def size
Sidekiq.redis do |conn|
procs = conn.smembers('processes')
if procs.empty?
0
else
conn.pipelined do
procs.each do |key|
conn.hget(key, 'busy')
end
end.map(&:to_i).inject(:+)
end
end
end
end
end