mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Fix the pool used by perform_bulk (#5129)
* Fix the pool used by perform_bulk * Fix missing receiver * Fix missing kwargs
This commit is contained in:
parent
e7b67c6847
commit
34d081f4ed
1 changed files with 5 additions and 7 deletions
|
@ -236,8 +236,10 @@ module Sidekiq
|
||||||
|
|
||||||
def perform_bulk(args, batch_size: 1_000)
|
def perform_bulk(args, batch_size: 1_000)
|
||||||
hash = @opts.transform_keys(&:to_s)
|
hash = @opts.transform_keys(&:to_s)
|
||||||
|
pool = Thread.current[:sidekiq_via_pool] || @klass.get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
||||||
|
client = Sidekiq::Client.new(pool)
|
||||||
result = args.each_slice(batch_size).flat_map do |slice|
|
result = args.each_slice(batch_size).flat_map do |slice|
|
||||||
Sidekiq::Client.push_bulk(hash.merge("class" => @klass, "args" => slice))
|
client.push_bulk(hash.merge("class" => @klass, "args" => slice))
|
||||||
end
|
end
|
||||||
|
|
||||||
result.is_a?(Enumerator::Lazy) ? result.force : result
|
result.is_a?(Enumerator::Lazy) ? result.force : result
|
||||||
|
@ -312,12 +314,8 @@ module Sidekiq
|
||||||
#
|
#
|
||||||
# SomeWorker.perform_bulk([[1], [2], [3]])
|
# SomeWorker.perform_bulk([[1], [2], [3]])
|
||||||
#
|
#
|
||||||
def perform_bulk(items, batch_size: 1_000)
|
def perform_bulk(*args, **kwargs)
|
||||||
result = items.each_slice(batch_size).flat_map do |slice|
|
Setter.new(self, {}).perform_bulk(*args, **kwargs)
|
||||||
Sidekiq::Client.push_bulk("class" => self, "args" => slice)
|
|
||||||
end
|
|
||||||
|
|
||||||
result.is_a?(Enumerator::Lazy) ? result.force : result
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# +interval+ must be a timestamp, numeric or something that acts
|
# +interval+ must be a timestamp, numeric or something that acts
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue