From 05cbc6333dc58ae0c4242a32f8325b4dac9e4463 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Sat, 20 Oct 2012 14:03:43 -0700 Subject: [PATCH] Add APIs for manipulating job and retry queues, finishes #457 --- Changes.md | 6 ++ lib/sidekiq.rb | 1 + lib/sidekiq/api.rb | 147 +++++++++++++++++++++++++++++++++++++++++++++ test/test_api.rb | 74 +++++++++++++++++++++++ 4 files changed, 228 insertions(+) create mode 100644 lib/sidekiq/api.rb create mode 100644 test/test_api.rb diff --git a/Changes.md b/Changes.md index c39a36fc..70495498 100644 --- a/Changes.md +++ b/Changes.md @@ -1,3 +1,9 @@ +HEAD +----------- + +- Add APIs for manipulating the retry and job queues. See sidekiq/api. [#457] + + 2.4.0 ----------- diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 4671acf0..7884a864 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -5,6 +5,7 @@ require 'sidekiq/worker' require 'sidekiq/redis_connection' require 'sidekiq/util' require 'sidekiq/stats' +require 'sidekiq/api' require 'sidekiq/extensions/class_methods' require 'sidekiq/extensions/action_mailer' diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb new file mode 100644 index 00000000..cdfb1871 --- /dev/null +++ b/lib/sidekiq/api.rb @@ -0,0 +1,147 @@ +require 'sidekiq' + +module Sidekiq + + ## + # Encapsulates a queue within Sidekiq. + # Allows enumeration of all jobs within the queue + # and deletion of jobs. + # + # queue = Sidekiq::Queue.new("mailer") + # queue.each do |job| + # job.klass # => 'MyWorker' + # job.args # => [1, 2, 3] + # job.delete if job.jid == 'abcdef1234567890' + # end + # + class Queue + include Enumerable + + attr_reader :name + + def initialize(name="default") + @name = name + @rname = "queue:#{name}" + end + + def size + Sidekiq.redis { |con| con.llen(@rname) } + end + + def each(&block) + # page thru the sorted set backwards so deleting entries doesn't screw up indexing + page = 0 + page_size = 50 + + loop do + entries = Sidekiq.redis do |conn| + conn.lrange @rname, page * page_size, (page * page_size) + page_size - 1 + end + break if entries.empty? + page += 1 + entries.each do |entry| + block.call Job.new(entry, @name) + end + end + end + end + + ## + # Encapsulates a pending job within a Sidekiq queue. + # The job should be considered immutable but may be + # removed from the queue via Job#delete. + # + class Job + attr_reader :item + + def initialize(item, queue_name=nil) + @value = item + @item = Sidekiq.load_json(item) + @queue = queue_name || @item['queue'] + end + + def klass + @item['class'] + end + + def args + @item['args'] + end + + def jid + @item['jid'] + end + + def queue + @queue + end + + ## + # Remove this job from the queue. + def delete + count = Sidekiq.redis do |conn| + conn.lrem("queue:#{@queue}", 0, @value) + end + count != 0 + end + + def [](name) + @item.send(:[], name) + end + end + + # Encapsulates a single job awaiting retry + class Retry < Job + attr_reader :score + + def initialize(score, item) + super(item) + @score = score + end + + def delete + count = Sidekiq.redis do |conn| + conn.zremrangebyscore('retry', @score, @score) + end + count != 0 + end + end + + ## + # Allows enumeration of all retries pending in Sidekiq. + # Based on this, you can search/filter for jobs. Here's an + # example where I'm selecting all jobs of a certain type + # and deleting them from the retry queue. + # + # r = Sidekiq::Retries.new + # r.select do |retri| + # retri.klass == 'Sidekiq::Extensions::DelayedClass' && + # retri.args[0] == 'User' && + # retri.args[1] == 'setup_new_subscriber' + # end.map(&:delete) + class Retries + include Enumerable + + def size + Sidekiq.redis {|c| c.zcard('retry') } + end + + def each(&block) + # page thru the sorted set backwards so deleting entries doesn't screw up indexing + page = -1 + page_size = 50 + + loop do + retries = Sidekiq.redis do |conn| + conn.zrange 'retry', page * page_size, (page * page_size) + (page_size - 1), :with_scores => true + end + break if retries.empty? + page -= 1 + retries.each do |retri, score| + block.call Retry.new(score, retri) + end + end + end + end + +end diff --git a/test/test_api.rb b/test/test_api.rb new file mode 100644 index 00000000..67f5139c --- /dev/null +++ b/test/test_api.rb @@ -0,0 +1,74 @@ +require 'helper' + +class TestApi < MiniTest::Unit::TestCase + describe 'with an empty database' do + before do + Sidekiq.redis {|c| c.flushdb } + end + + it 'shows queue as empty' do + q = Sidekiq::Queue.new + assert_equal 0, q.size + end + + class ApiWorker + include Sidekiq::Worker + end + + it 'can enumerate jobs' do + q = Sidekiq::Queue.new + ApiWorker.perform_async(1, 'mike') + assert_equal ['TestApi::ApiWorker'], q.map(&:klass) + + job = q.first + assert_equal 24, job.jid.size + assert_equal [1, 'mike'], job.args + + q = Sidekiq::Queue.new('other') + assert_equal 0, q.size + end + + it 'can delete jobs' do + q = Sidekiq::Queue.new + ApiWorker.perform_async(1, 'mike') + assert_equal 1, q.size + assert_equal [true], q.map(&:delete) + assert_equal 0, q.size + end + + it 'shows empty retries' do + r = Sidekiq::Retries.new + assert_equal 0, r.size + end + + it 'can enumerate retries' do + add_retry + + r = Sidekiq::Retries.new + assert_equal 1, r.size + array = r.to_a + assert_equal 1, array.size + + retri = array.first + assert_equal 'ApiWorker', retri.klass + assert_equal 'default', retri.queue + assert_equal 'bob', retri.jid + end + + it 'can delete retries' do + add_retry + r = Sidekiq::Retries.new + assert_equal 1, r.size + r.map(&:delete) + assert_equal 0, r.size + end + + def add_retry + at = Time.now.to_f + payload = Sidekiq.dump_json('class' => 'ApiWorker', 'args' => [1, 'mike'], 'queue' => 'default', 'jid' => 'bob') + Sidekiq.redis do |conn| + conn.zadd('retry', at.to_s, payload) + end + end + end +end