mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Make .perform_bulk
immediately evaluate lazy enumerators. (#5060)
* Create failing tests to capture the cases we care about Co-authored-by: Noa Elad <noa.elad@zenpayroll.com> * Call .force on args if they are a lazy enumerator to evaluate them immediately Co-authored-by: Noa Elad <noa.elad@zenpayroll.com> * Change implementation of `.perform_bulk` to lazily pull values from a lazy enumerator. This prevents us from loading the entire contents of a lazy enumerator into memory while providing the "unsurprising" behavior of enqueueing jobs when `.perform_bulk` is called. Note: We'll still load all JIDs into memory. Co-authored-by: Noa Elad <noa.elad@zenpayroll.com> Co-authored-by: Eda Zhou <eda.zhou@gusto.com> Co-authored-by: Noa Elad <noa.elad@zenpayroll.com> Co-authored-by: Eda Zhou <eda.zhou@gusto.com>
This commit is contained in:
parent
6bef01546b
commit
e96ab6939b
3 changed files with 28 additions and 2 deletions
|
@ -192,9 +192,11 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def perform_bulk(args, batch_size: 1_000)
|
||||
args.each_slice(batch_size).flat_map do |slice|
|
||||
result = args.each_slice(batch_size).flat_map do |slice|
|
||||
Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
|
||||
end
|
||||
|
||||
result.is_a?(Enumerator::Lazy) ? result.force : result
|
||||
end
|
||||
|
||||
# +interval+ must be a timestamp, numeric or something that acts
|
||||
|
@ -262,9 +264,11 @@ module Sidekiq
|
|||
# SomeWorker.perform_bulk([[1], [2], [3]])
|
||||
#
|
||||
def perform_bulk(items, batch_size: 1_000)
|
||||
items.each_slice(batch_size).flat_map do |slice|
|
||||
result = items.each_slice(batch_size).flat_map do |slice|
|
||||
Sidekiq::Client.push_bulk("class" => self, "args" => slice)
|
||||
end
|
||||
|
||||
result.is_a?(Enumerator::Lazy) ? result.force : result
|
||||
end
|
||||
|
||||
# +interval+ must be a timestamp, numeric or something that acts
|
||||
|
|
|
@ -207,6 +207,14 @@ describe Sidekiq::Client do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'lazy enumerator' do
|
||||
it 'enqueues the jobs by evaluating the enumerator' do
|
||||
lazy_array = (1..1_001).to_a.map { |x| Array(x) }.lazy
|
||||
jids = MyWorker.perform_bulk(lazy_array)
|
||||
assert_equal 1_001, jids.size
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -91,5 +91,19 @@ describe Sidekiq::Worker do
|
|||
assert_equal 1_001, q.size
|
||||
assert_equal 1_001, jids.size
|
||||
end
|
||||
|
||||
describe '.perform_bulk and lazy enumerators' do
|
||||
it 'evaluates lazy enumerators' do
|
||||
q = Sidekiq::Queue.new('bar')
|
||||
assert_equal 0, q.size
|
||||
|
||||
set = SetWorker.set('queue' => 'bar')
|
||||
lazy_args = (1..1_001).to_a.map { |x| Array(x) }.lazy
|
||||
jids = set.perform_bulk(lazy_args)
|
||||
|
||||
assert_equal 1_001, q.size
|
||||
assert_equal 1_001, jids.size
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue