mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
cbfa33cedc
rather than 'score' so as to avoid accidentally performing actions on multiple jobs when only one was intended.
197 lines
4.3 KiB
Ruby
197 lines
4.3 KiB
Ruby
require 'sidekiq'
|
|
|
|
module Sidekiq
|
|
|
|
##
|
|
# Encapsulates a queue within Sidekiq.
|
|
# Allows enumeration of all jobs within the queue
|
|
# and deletion of jobs.
|
|
#
|
|
# queue = Sidekiq::Queue.new("mailer")
|
|
# queue.each do |job|
|
|
# job.klass # => 'MyWorker'
|
|
# job.args # => [1, 2, 3]
|
|
# job.delete if job.jid == 'abcdef1234567890'
|
|
# end
|
|
#
|
|
class Queue
|
|
include Enumerable
|
|
|
|
attr_reader :name
|
|
|
|
def initialize(name="default")
|
|
@name = name
|
|
@rname = "queue:#{name}"
|
|
end
|
|
|
|
def size
|
|
Sidekiq.redis { |con| con.llen(@rname) }
|
|
end
|
|
|
|
def each(&block)
|
|
page = 0
|
|
page_size = 50
|
|
|
|
loop do
|
|
entries = Sidekiq.redis do |conn|
|
|
conn.lrange @rname, page * page_size, (page * page_size) + page_size - 1
|
|
end
|
|
break if entries.empty?
|
|
page += 1
|
|
entries.each do |entry|
|
|
block.call Job.new(entry, @name)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
##
|
|
# Encapsulates a pending job within a Sidekiq queue or
|
|
# sorted set.
|
|
#
|
|
# The job should be considered immutable but may be
|
|
# removed from the queue via Job#delete.
|
|
#
|
|
class Job
|
|
attr_reader :item
|
|
|
|
def initialize(item, queue_name=nil)
|
|
@value = item
|
|
@item = Sidekiq.load_json(item)
|
|
@queue = queue_name || @item['queue']
|
|
end
|
|
|
|
def klass
|
|
@item['class']
|
|
end
|
|
|
|
def args
|
|
@item['args']
|
|
end
|
|
|
|
def jid
|
|
@item['jid']
|
|
end
|
|
|
|
def queue
|
|
@queue
|
|
end
|
|
|
|
##
|
|
# Remove this job from the queue.
|
|
def delete
|
|
count = Sidekiq.redis do |conn|
|
|
conn.lrem("queue:#{@queue}", 0, @value)
|
|
end
|
|
count != 0
|
|
end
|
|
|
|
def [](name)
|
|
@item.send(:[], name)
|
|
end
|
|
end
|
|
|
|
class SortedEntry < Job
|
|
attr_reader :score
|
|
|
|
def initialize(parent, score, item)
|
|
super(item)
|
|
@score = score
|
|
@parent = parent
|
|
end
|
|
|
|
def at
|
|
Time.at(score)
|
|
end
|
|
|
|
def delete
|
|
@parent.delete(score)
|
|
end
|
|
|
|
def retry
|
|
raise "Retry not available on jobs not in the Retry queue." unless item["failed_at"]
|
|
Sidekiq.redis do |conn|
|
|
results = conn.zrangebyscore('retry', score, score)
|
|
conn.zremrangebyscore('retry', score, score)
|
|
results.map do |message|
|
|
msg = Sidekiq.load_json(message)
|
|
msg['retry_count'] = msg['retry_count'] - 1
|
|
conn.rpush("queue:#{msg['queue']}", Sidekiq.dump_json(msg))
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
class SortedSet
|
|
include Enumerable
|
|
|
|
def initialize(name)
|
|
@zset = name
|
|
end
|
|
|
|
def size
|
|
Sidekiq.redis {|c| c.zcard(@zset) }
|
|
end
|
|
|
|
def each(&block)
|
|
# page thru the sorted set backwards so deleting entries doesn't screw up indexing
|
|
page = -1
|
|
page_size = 50
|
|
|
|
loop do
|
|
elements = Sidekiq.redis do |conn|
|
|
conn.zrange @zset, page * page_size, (page * page_size) + (page_size - 1), :with_scores => true
|
|
end
|
|
break if elements.empty?
|
|
page -= 1
|
|
elements.each do |element, score|
|
|
block.call SortedEntry.new(self, score, element)
|
|
end
|
|
end
|
|
end
|
|
|
|
def delete(score)
|
|
count = Sidekiq.redis do |conn|
|
|
conn.zremrangebyscore(@zset, score, score)
|
|
end
|
|
count != 0
|
|
end
|
|
end
|
|
|
|
##
|
|
# Allows enumeration of scheduled jobs within Sidekiq.
|
|
# Based on this, you can search/filter for jobs. Here's an
|
|
# example where I'm selecting all jobs of a certain type
|
|
# and deleting them from the retry queue.
|
|
#
|
|
# r = Sidekiq::ScheduledSet.new
|
|
# r.select do |retri|
|
|
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
|
# retri.args[0] == 'User' &&
|
|
# retri.args[1] == 'setup_new_subscriber'
|
|
# end.map(&:delete)
|
|
class ScheduledSet < SortedSet
|
|
def initialize
|
|
super 'schedule'
|
|
end
|
|
end
|
|
|
|
##
|
|
# Allows enumeration of retries within Sidekiq.
|
|
# Based on this, you can search/filter for jobs. Here's an
|
|
# example where I'm selecting all jobs of a certain type
|
|
# and deleting them from the retry queue.
|
|
#
|
|
# r = Sidekiq::RetrySet.new
|
|
# r.select do |retri|
|
|
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
|
# retri.args[0] == 'User' &&
|
|
# retri.args[1] == 'setup_new_subscriber'
|
|
# end.map(&:delete)
|
|
class RetrySet < SortedSet
|
|
def initialize
|
|
super 'retry'
|
|
end
|
|
end
|
|
|
|
end
|