mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Update API to support retries and scheduled jobs
This commit is contained in:
parent
e3a08bec0c
commit
4b0d5682ae
3 changed files with 67 additions and 33 deletions
|
@ -1,7 +1,7 @@
|
|||
HEAD
|
||||
2.5.0
|
||||
-----------
|
||||
|
||||
- REDESIGNED WEB UI! [unity]
|
||||
- REDESIGNED WEB UI! [unity, cavneb]
|
||||
- Support Honeybadger for error delivery
|
||||
- Inline testing runs the client middleware before executing jobs [#465]
|
||||
- Web UI can now remove jobs from queue. [#466, dleung]
|
||||
|
|
|
@ -46,7 +46,9 @@ module Sidekiq
|
|||
end
|
||||
|
||||
##
|
||||
# Encapsulates a pending job within a Sidekiq queue.
|
||||
# Encapsulates a pending job within a Sidekiq queue or
|
||||
# sorted set.
|
||||
#
|
||||
# The job should be considered immutable but may be
|
||||
# removed from the queue via Job#delete.
|
||||
#
|
||||
|
@ -89,44 +91,33 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
# Encapsulates a single job awaiting retry
|
||||
class Retry < Job
|
||||
class SortedEntry < Job
|
||||
attr_reader :score
|
||||
|
||||
def initialize(score, item)
|
||||
def initialize(parent, score, item)
|
||||
super(item)
|
||||
@score = score
|
||||
@parent = parent
|
||||
end
|
||||
|
||||
def retry_at
|
||||
def at
|
||||
Time.at(@score)
|
||||
end
|
||||
|
||||
def delete
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.zremrangebyscore('retry', @score, @score)
|
||||
end
|
||||
count != 0
|
||||
@parent.delete(@score)
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Allows enumeration of all retries pending in Sidekiq.
|
||||
# Based on this, you can search/filter for jobs. Here's an
|
||||
# example where I'm selecting all jobs of a certain type
|
||||
# and deleting them from the retry queue.
|
||||
#
|
||||
# r = Sidekiq::Retries.new
|
||||
# r.select do |retri|
|
||||
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
||||
# retri.args[0] == 'User' &&
|
||||
# retri.args[1] == 'setup_new_subscriber'
|
||||
# end.map(&:delete)
|
||||
class Retries
|
||||
class SortedSet
|
||||
include Enumerable
|
||||
|
||||
def initialize(name)
|
||||
@zset = name
|
||||
end
|
||||
|
||||
def size
|
||||
Sidekiq.redis {|c| c.zcard('retry') }
|
||||
Sidekiq.redis {|c| c.zcard(@zset) }
|
||||
end
|
||||
|
||||
def each(&block)
|
||||
|
@ -135,16 +126,59 @@ module Sidekiq
|
|||
page_size = 50
|
||||
|
||||
loop do
|
||||
retries = Sidekiq.redis do |conn|
|
||||
conn.zrange 'retry', page * page_size, (page * page_size) + (page_size - 1), :with_scores => true
|
||||
elements = Sidekiq.redis do |conn|
|
||||
conn.zrange @zset, page * page_size, (page * page_size) + (page_size - 1), :with_scores => true
|
||||
end
|
||||
break if retries.empty?
|
||||
break if elements.empty?
|
||||
page -= 1
|
||||
retries.each do |retri, score|
|
||||
block.call Retry.new(score, retri)
|
||||
elements.each do |element, score|
|
||||
block.call SortedEntry.new(self, score, element)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def delete(score)
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.zremrangebyscore(@zset, score, score)
|
||||
end
|
||||
count != 0
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Allows enumeration of scheduled jobs within Sidekiq.
|
||||
# Based on this, you can search/filter for jobs. Here's an
|
||||
# example where I'm selecting all jobs of a certain type
|
||||
# and deleting them from the retry queue.
|
||||
#
|
||||
# r = Sidekiq::ScheduledSet.new
|
||||
# r.select do |retri|
|
||||
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
||||
# retri.args[0] == 'User' &&
|
||||
# retri.args[1] == 'setup_new_subscriber'
|
||||
# end.map(&:delete)
|
||||
class ScheduledSet < SortedSet
|
||||
def initialize
|
||||
super 'schedule'
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Allows enumeration of retries within Sidekiq.
|
||||
# Based on this, you can search/filter for jobs. Here's an
|
||||
# example where I'm selecting all jobs of a certain type
|
||||
# and deleting them from the retry queue.
|
||||
#
|
||||
# r = Sidekiq::RetrySet.new
|
||||
# r.select do |retri|
|
||||
# retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
|
||||
# retri.args[0] == 'User' &&
|
||||
# retri.args[1] == 'setup_new_subscriber'
|
||||
# end.map(&:delete)
|
||||
class RetrySet < SortedSet
|
||||
def initialize
|
||||
super 'retry'
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -37,14 +37,14 @@ class TestApi < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'shows empty retries' do
|
||||
r = Sidekiq::Retries.new
|
||||
r = Sidekiq::RetrySet.new
|
||||
assert_equal 0, r.size
|
||||
end
|
||||
|
||||
it 'can enumerate retries' do
|
||||
add_retry
|
||||
|
||||
r = Sidekiq::Retries.new
|
||||
r = Sidekiq::RetrySet.new
|
||||
assert_equal 1, r.size
|
||||
array = r.to_a
|
||||
assert_equal 1, array.size
|
||||
|
@ -58,7 +58,7 @@ class TestApi < MiniTest::Unit::TestCase
|
|||
|
||||
it 'can delete retries' do
|
||||
add_retry
|
||||
r = Sidekiq::Retries.new
|
||||
r = Sidekiq::RetrySet.new
|
||||
assert_equal 1, r.size
|
||||
r.map(&:delete)
|
||||
assert_equal 0, r.size
|
||||
|
|
Loading…
Add table
Reference in a new issue