From 34d081f4ed54873663e70dc1e2795d8646bfe15a Mon Sep 17 00:00:00 2001 From: mikit <37488201+m1kit@users.noreply.github.com> Date: Sat, 22 Jan 2022 01:16:25 +0900 Subject: [PATCH] Fix the pool used by perform_bulk (#5129) * Fix the pool used by perform_bulk * Fix missing receiver * Fix missing kwargs --- lib/sidekiq/worker.rb | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index c238d305..4e0140a8 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -236,8 +236,10 @@ module Sidekiq def perform_bulk(args, batch_size: 1_000) hash = @opts.transform_keys(&:to_s) + pool = Thread.current[:sidekiq_via_pool] || @klass.get_sidekiq_options["pool"] || Sidekiq.redis_pool + client = Sidekiq::Client.new(pool) result = args.each_slice(batch_size).flat_map do |slice| - Sidekiq::Client.push_bulk(hash.merge("class" => @klass, "args" => slice)) + client.push_bulk(hash.merge("class" => @klass, "args" => slice)) end result.is_a?(Enumerator::Lazy) ? result.force : result @@ -312,12 +314,8 @@ module Sidekiq # # SomeWorker.perform_bulk([[1], [2], [3]]) # - def perform_bulk(items, batch_size: 1_000) - result = items.each_slice(batch_size).flat_map do |slice| - Sidekiq::Client.push_bulk("class" => self, "args" => slice) - end - - result.is_a?(Enumerator::Lazy) ? result.force : result + def perform_bulk(*args, **kwargs) + Setter.new(self, {}).perform_bulk(*args, **kwargs) end # +interval+ must be a timestamp, numeric or something that acts