mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Handle deletion of queued jobs when iterating over a Sidekiq::Queue
This commit is contained in:
parent
c32e00b00f
commit
d6609c2abd
2 changed files with 16 additions and 1 deletions
|
@ -117,18 +117,23 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def each(&block)
|
def each(&block)
|
||||||
|
initial_size = size
|
||||||
|
deleted_size = 0
|
||||||
page = 0
|
page = 0
|
||||||
page_size = 50
|
page_size = 50
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
|
range_start = page * page_size - deleted_size
|
||||||
|
range_end = page * page_size - deleted_size + (page_size - 1)
|
||||||
entries = Sidekiq.redis do |conn|
|
entries = Sidekiq.redis do |conn|
|
||||||
conn.lrange @rname, page * page_size, (page * page_size) + page_size - 1
|
conn.lrange @rname, range_start, range_end
|
||||||
end
|
end
|
||||||
break if entries.empty?
|
break if entries.empty?
|
||||||
page += 1
|
page += 1
|
||||||
entries.each do |entry|
|
entries.each do |entry|
|
||||||
block.call Job.new(entry, @name)
|
block.call Job.new(entry, @name)
|
||||||
end
|
end
|
||||||
|
deleted_size = initial_size - size
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -202,6 +202,16 @@ class TestApi < Sidekiq::Test
|
||||||
assert_equal set.size, 0
|
assert_equal set.size, 0
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'can remove jobs when iterating over a queue' do
|
||||||
|
# initial queue size must be greater than Queue#each underlying page size
|
||||||
|
51.times do
|
||||||
|
ApiWorker.perform_async(1, 'aaron')
|
||||||
|
end
|
||||||
|
q = Sidekiq::Queue.new
|
||||||
|
q.map(&:delete)
|
||||||
|
assert_equal q.size, 0
|
||||||
|
end
|
||||||
|
|
||||||
it 'can find job by id in queues' do
|
it 'can find job by id in queues' do
|
||||||
q = Sidekiq::Queue.new
|
q = Sidekiq::Queue.new
|
||||||
job_id = ApiWorker.perform_async(1, 'jason')
|
job_id = ApiWorker.perform_async(1, 'jason')
|
||||||
|
|
Loading…
Add table
Reference in a new issue