1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
This commit is contained in:
Mike Perham 2022-06-08 13:02:24 -07:00
parent 7f3f6ac916
commit f8c7579920

View file

@ -217,24 +217,30 @@ module Sidekiq
include Enumerable
##
# Return all known queues within Redis.
# Fetch all known queues within Redis.
#
# @return [Array<Sidekiq::Queue>]
def self.all
Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
end
attr_reader :name
# @param name [String] the name of the queue
def initialize(name = "default")
@name = name.to_s
@rname = "queue:#{name}"
end
# The current size of the queue within Redis.
# This value is real-time and can change between calls.
#
# @return [Integer] the size
def size
Sidekiq.redis { |con| con.llen(@rname) }
end
# Sidekiq Pro overrides this
# @return [Boolean] if the queue is currently paused
def paused?
false
end
@ -243,7 +249,7 @@ module Sidekiq
# Calculates this queue's latency, the difference in seconds since the oldest
# job in the queue was enqueued.
#
# @return Float
# @return [Float] in seconds
def latency
entry = Sidekiq.redis { |conn|
conn.lrange(@rname, -1, -1)
@ -279,24 +285,21 @@ module Sidekiq
##
# Find the job with the given JID within this queue.
#
# This is a slow, inefficient operation. Do not use under
# This is a *slow, inefficient* operation. Do not use under
# normal conditions.
#
# @param jid string - looks for job id within the queue
#
# @return success: A Sidekiq::JobRecord
# @return failure: nil
# @param jid [String] the job_id to look for
# @return [Sidekiq::JobRecord]
# @return [nil] if not found
def find_job(jid)
detect { |j| j.jid == jid }
end
##
# multioperation transaction
#
# delete all jobs within this queue
def clear
Sidekiq.redis do |conn|
conn.multi do |transaction|
transaction.unlink(@rname) # high performance delete
transaction.unlink(@rname)
transaction.srem("queues", name)
end
end
@ -318,8 +321,9 @@ module Sidekiq
class JobRecord
attr_reader :item
attr_reader :value
attr_reader :queue
def initialize(item, queue_name = nil)
def initialize(item, queue_name = nil) # :nodoc:
@args = nil
@value = item
@item = item.is_a?(Hash) ? item : parse(item)
@ -337,7 +341,7 @@ module Sidekiq
{}
end
def klass # :nodoc:
def klass
self["class"]
end
@ -424,15 +428,12 @@ module Sidekiq
end
end
attr_reader :queue
def latency
now = Time.now.to_f
now - (@item["enqueued_at"] || @item["created_at"] || now)
end
##
# Remove this job from the queue.
# Remove this job from the queue
def delete
count = Sidekiq.redis { |conn|
conn.lrem("queue:#{@queue}", 1, @value)
@ -440,6 +441,7 @@ module Sidekiq
count != 0
end
# Access arbitrary attributes within the job hash
def [](name)
# nil will happen if the JSON fails to parse.
# We don't guarantee Sidekiq will work with bad job JSON but we should
@ -477,11 +479,13 @@ module Sidekiq
end
end
# Represents a job within a Redis sorted set where the score
# represents a timestamp for the job.
class SortedEntry < JobRecord
attr_reader :score
attr_reader :parent
def initialize(parent, score, item)
def initialize(parent, score, item) # :nodoc:
super(item)
@score = Float(score)
@parent = parent
@ -499,12 +503,17 @@ module Sidekiq
end
end
# Change the scheduled time for this job.
#
# @param [Time] the new timestamp when this job will be enqueued.
def reschedule(at)
Sidekiq.redis do |conn|
conn.zincrby(@parent.name, at.to_f - @score, Sidekiq.dump_json(@item))
end
end
# Enqueue this job from the scheduled or dead set so it will
# be executed at some point in the near future.
def add_to_queue
remove_job do |message|
msg = Sidekiq.load_json(message)
@ -512,6 +521,8 @@ module Sidekiq
end
end
# enqueue this job from the retry set so it will be executed
# at some point in the near future.
def retry
remove_job do |message|
msg = Sidekiq.load_json(message)
@ -520,8 +531,7 @@ module Sidekiq
end
end
##
# Place job in the dead set
# Move this job from its current set into the Dead set.
def kill
remove_job do |message|
DeadSet.new.kill(message)