mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Add APIs for manipulating job and retry queues, finishes #457
This commit is contained in:
parent
3f5047f555
commit
05cbc6333d
4 changed files with 228 additions and 0 deletions
|
@ -1,3 +1,9 @@
|
|||
HEAD
|
||||
-----------
|
||||
|
||||
- Add APIs for manipulating the retry and job queues. See sidekiq/api. [#457]
|
||||
|
||||
|
||||
2.4.0
|
||||
-----------
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ require 'sidekiq/worker'
|
|||
require 'sidekiq/redis_connection'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/stats'
|
||||
require 'sidekiq/api'
|
||||
|
||||
require 'sidekiq/extensions/class_methods'
|
||||
require 'sidekiq/extensions/action_mailer'
|
||||
|
|
147
lib/sidekiq/api.rb
Normal file
147
lib/sidekiq/api.rb
Normal file
|
@ -0,0 +1,147 @@
|
|||
require 'sidekiq'
|
||||
|
||||
module Sidekiq
|
||||
|
||||
##
|
||||
# Encapsulates a queue within Sidekiq.
|
||||
# Allows enumeration of all jobs within the queue
|
||||
# and deletion of jobs.
|
||||
#
|
||||
# queue = Sidekiq::Queue.new("mailer")
|
||||
# queue.each do |job|
|
||||
# job.klass # => 'MyWorker'
|
||||
# job.args # => [1, 2, 3]
|
||||
# job.delete if job.jid == 'abcdef1234567890'
|
||||
# end
|
||||
#
|
||||
class Queue
|
||||
include Enumerable
|
||||
|
||||
attr_reader :name
|
||||
|
||||
def initialize(name="default")
|
||||
@name = name
|
||||
@rname = "queue:#{name}"
|
||||
end
|
||||
|
||||
def size
|
||||
Sidekiq.redis { |con| con.llen(@rname) }
|
||||
end
|
||||
|
||||
def each(&block)
|
||||
# page thru the sorted set backwards so deleting entries doesn't screw up indexing
|
||||
page = 0
|
||||
page_size = 50
|
||||
|
||||
loop do
|
||||
entries = Sidekiq.redis do |conn|
|
||||
conn.lrange @rname, page * page_size, (page * page_size) + page_size - 1
|
||||
end
|
||||
break if entries.empty?
|
||||
page += 1
|
||||
entries.each do |entry|
|
||||
block.call Job.new(entry, @name)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Encapsulates a pending job within a Sidekiq queue.
|
||||
# The job should be considered immutable but may be
|
||||
# removed from the queue via Job#delete.
|
||||
#
|
||||
class Job
|
||||
attr_reader :item
|
||||
|
||||
def initialize(item, queue_name=nil)
|
||||
@value = item
|
||||
@item = Sidekiq.load_json(item)
|
||||
@queue = queue_name || @item['queue']
|
||||
end
|
||||
|
||||
def klass
|
||||
@item['class']
|
||||
end
|
||||
|
||||
def args
|
||||
@item['args']
|
||||
end
|
||||
|
||||
def jid
|
||||
@item['jid']
|
||||
end
|
||||
|
||||
def queue
|
||||
@queue
|
||||
end
|
||||
|
||||
##
|
||||
# Remove this job from the queue.
|
||||
def delete
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.lrem("queue:#{@queue}", 0, @value)
|
||||
end
|
||||
count != 0
|
||||
end
|
||||
|
||||
def [](name)
|
||||
@item.send(:[], name)
|
||||
end
|
||||
end
|
||||
|
||||
# Encapsulates a single job awaiting retry
|
||||
class Retry < Job
|
||||
attr_reader :score
|
||||
|
||||
def initialize(score, item)
|
||||
super(item)
|
||||
@score = score
|
||||
end
|
||||
|
||||
def delete
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.zremrangebyscore('retry', @score, @score)
|
||||
end
|
||||
count != 0
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Allows enumeration of all retries pending in Sidekiq.
|
||||
# Based on this, you can search/filter for jobs. Here's an
|
||||
# example where I'm selecting all jobs of a certain type
|
||||
# and deleting them from the retry queue.
|
||||
#
|
||||
# r = Sidekiq::Retries.new
|
||||
# r.select do |retri|
|
||||
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
||||
# retri.args[0] == 'User' &&
|
||||
# retri.args[1] == 'setup_new_subscriber'
|
||||
# end.map(&:delete)
|
||||
class Retries
|
||||
include Enumerable
|
||||
|
||||
def size
|
||||
Sidekiq.redis {|c| c.zcard('retry') }
|
||||
end
|
||||
|
||||
def each(&block)
|
||||
# page thru the sorted set backwards so deleting entries doesn't screw up indexing
|
||||
page = -1
|
||||
page_size = 50
|
||||
|
||||
loop do
|
||||
retries = Sidekiq.redis do |conn|
|
||||
conn.zrange 'retry', page * page_size, (page * page_size) + (page_size - 1), :with_scores => true
|
||||
end
|
||||
break if retries.empty?
|
||||
page -= 1
|
||||
retries.each do |retri, score|
|
||||
block.call Retry.new(score, retri)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
74
test/test_api.rb
Normal file
74
test/test_api.rb
Normal file
|
@ -0,0 +1,74 @@
|
|||
require 'helper'
|
||||
|
||||
class TestApi < MiniTest::Unit::TestCase
|
||||
describe 'with an empty database' do
|
||||
before do
|
||||
Sidekiq.redis {|c| c.flushdb }
|
||||
end
|
||||
|
||||
it 'shows queue as empty' do
|
||||
q = Sidekiq::Queue.new
|
||||
assert_equal 0, q.size
|
||||
end
|
||||
|
||||
class ApiWorker
|
||||
include Sidekiq::Worker
|
||||
end
|
||||
|
||||
it 'can enumerate jobs' do
|
||||
q = Sidekiq::Queue.new
|
||||
ApiWorker.perform_async(1, 'mike')
|
||||
assert_equal ['TestApi::ApiWorker'], q.map(&:klass)
|
||||
|
||||
job = q.first
|
||||
assert_equal 24, job.jid.size
|
||||
assert_equal [1, 'mike'], job.args
|
||||
|
||||
q = Sidekiq::Queue.new('other')
|
||||
assert_equal 0, q.size
|
||||
end
|
||||
|
||||
it 'can delete jobs' do
|
||||
q = Sidekiq::Queue.new
|
||||
ApiWorker.perform_async(1, 'mike')
|
||||
assert_equal 1, q.size
|
||||
assert_equal [true], q.map(&:delete)
|
||||
assert_equal 0, q.size
|
||||
end
|
||||
|
||||
it 'shows empty retries' do
|
||||
r = Sidekiq::Retries.new
|
||||
assert_equal 0, r.size
|
||||
end
|
||||
|
||||
it 'can enumerate retries' do
|
||||
add_retry
|
||||
|
||||
r = Sidekiq::Retries.new
|
||||
assert_equal 1, r.size
|
||||
array = r.to_a
|
||||
assert_equal 1, array.size
|
||||
|
||||
retri = array.first
|
||||
assert_equal 'ApiWorker', retri.klass
|
||||
assert_equal 'default', retri.queue
|
||||
assert_equal 'bob', retri.jid
|
||||
end
|
||||
|
||||
it 'can delete retries' do
|
||||
add_retry
|
||||
r = Sidekiq::Retries.new
|
||||
assert_equal 1, r.size
|
||||
r.map(&:delete)
|
||||
assert_equal 0, r.size
|
||||
end
|
||||
|
||||
def add_retry
|
||||
at = Time.now.to_f
|
||||
payload = Sidekiq.dump_json('class' => 'ApiWorker', 'args' => [1, 'mike'], 'queue' => 'default', 'jid' => 'bob')
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zadd('retry', at.to_s, payload)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Reference in a new issue