From e96ab6939b34826b403ababf9ac49d87f6faeee1 Mon Sep 17 00:00:00 2001 From: Kelly Sutton Date: Tue, 16 Nov 2021 09:54:35 -0700 Subject: [PATCH] Make `.perform_bulk` immediately evaluate lazy enumerators. (#5060) * Create failing tests to capture the cases we care about Co-authored-by: Noa Elad * Call .force on args if they are a lazy enumerator to evaluate them immediately Co-authored-by: Noa Elad * Change implementation of `.perform_bulk` to lazily pull values from a lazy enumerator. This prevents us from loading the entire contents of a lazy enumerator into memory while providing the "unsurprising" behavior of enqueueing jobs when `.perform_bulk` is called. Note: We'll still load all JIDs into memory. Co-authored-by: Noa Elad Co-authored-by: Eda Zhou Co-authored-by: Noa Elad Co-authored-by: Eda Zhou --- lib/sidekiq/worker.rb | 8 ++++++-- test/test_client.rb | 8 ++++++++ test/test_worker.rb | 14 ++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 73c32ba5..8b2e3751 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -192,9 +192,11 @@ module Sidekiq end def perform_bulk(args, batch_size: 1_000) - args.each_slice(batch_size).flat_map do |slice| + result = args.each_slice(batch_size).flat_map do |slice| Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice)) end + + result.is_a?(Enumerator::Lazy) ? result.force : result end # +interval+ must be a timestamp, numeric or something that acts @@ -262,9 +264,11 @@ module Sidekiq # SomeWorker.perform_bulk([[1], [2], [3]]) # def perform_bulk(items, batch_size: 1_000) - items.each_slice(batch_size).flat_map do |slice| + result = items.each_slice(batch_size).flat_map do |slice| Sidekiq::Client.push_bulk("class" => self, "args" => slice) end + + result.is_a?(Enumerator::Lazy) ? result.force : result end # +interval+ must be a timestamp, numeric or something that acts diff --git a/test/test_client.rb b/test/test_client.rb index 883ec35c..f9d39955 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -207,6 +207,14 @@ describe Sidekiq::Client do end end end + + describe 'lazy enumerator' do + it 'enqueues the jobs by evaluating the enumerator' do + lazy_array = (1..1_001).to_a.map { |x| Array(x) }.lazy + jids = MyWorker.perform_bulk(lazy_array) + assert_equal 1_001, jids.size + end + end end end diff --git a/test/test_worker.rb b/test/test_worker.rb index 576e019e..5dc4d008 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -91,5 +91,19 @@ describe Sidekiq::Worker do assert_equal 1_001, q.size assert_equal 1_001, jids.size end + + describe '.perform_bulk and lazy enumerators' do + it 'evaluates lazy enumerators' do + q = Sidekiq::Queue.new('bar') + assert_equal 0, q.size + + set = SetWorker.set('queue' => 'bar') + lazy_args = (1..1_001).to_a.map { |x| Array(x) }.lazy + jids = set.perform_bulk(lazy_args) + + assert_equal 1_001, q.size + assert_equal 1_001, jids.size + end + end end end