mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Implement Sidekiq::Worker.perform_bulk
(#5042)
* Sketch out some failing tests to capture the behavior Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com> * Implement Sidekiq::Client.perform_bulk Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com> * Allow .perform_bulk to operate on different batch sizes Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com> * Write a failing test to capture the Sidekiq::Worker::Setter.perform_bulk behavior Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com> * Implement Sidekiq::Worker::Setter.perform_bulk Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com> * Write a small comment for to document the method Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com> * Add a Changes.md entry Co-authored-by: jeffcarbs <jeff.carbonella@gmail.com>
This commit is contained in:
parent
fd76471cea
commit
4a0432622f
4 changed files with 78 additions and 0 deletions
10
Changes.md
10
Changes.md
|
@ -24,6 +24,16 @@ end
|
|||
require "sidekiq/middleware/current_attributes"
|
||||
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
|
||||
```
|
||||
- **FEATURE**: Introduce new method, `.perform_bulk` on `Sidekiq::Worker` that makes enqueuing
|
||||
jobs in bulk adhere to Redis best practices by enqueuing 1,000 jobs per round trip. This
|
||||
shares a similar args syntax to `Sidekiq::Client.push_bulk`. Batch sizes can be configured
|
||||
with the optional `batch_size:` keyword argument.
|
||||
```ruby
|
||||
MyJob.perform_bulk([[1], [2], [3]])
|
||||
|
||||
# With a batch size provided:
|
||||
MyJob.perform_bulk([[1], [2], [3]], batch_size: 100)
|
||||
```
|
||||
- Implement `queue_as`, `wait` and `wait_until` for ActiveJob compatibility [#5003]
|
||||
- Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985]
|
||||
- Run existing signal traps, if any, before running Sidekiq's trap. [#4991]
|
||||
|
|
|
@ -191,6 +191,12 @@ module Sidekiq
|
|||
@klass.client_push(@opts.merge("args" => args, "class" => @klass))
|
||||
end
|
||||
|
||||
def perform_bulk(args, batch_size: 1_000)
|
||||
args.each_slice(batch_size).flat_map do |slice|
|
||||
Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
|
||||
end
|
||||
end
|
||||
|
||||
# +interval+ must be a timestamp, numeric or something that acts
|
||||
# numeric (like an activesupport time interval).
|
||||
def perform_in(interval, *args)
|
||||
|
@ -235,6 +241,32 @@ module Sidekiq
|
|||
client_push("class" => self, "args" => args)
|
||||
end
|
||||
|
||||
##
|
||||
# Push a large number of jobs to Redis, while limiting the batch of
|
||||
# each job payload to 1,000. This method helps cut down on the number
|
||||
# of round trips to Redis, which can increase the performance of enqueueing
|
||||
# large numbers of jobs.
|
||||
#
|
||||
# +items+ must be an Array of Arrays.
|
||||
#
|
||||
# For finer-grained control, use `Sidekiq::Client.push_bulk` directly.
|
||||
#
|
||||
# Example (3 Redis round trips):
|
||||
#
|
||||
# SomeWorker.perform_async(1)
|
||||
# SomeWorker.perform_async(2)
|
||||
# SomeWorker.perform_async(3)
|
||||
#
|
||||
# Would instead become (1 Redis round trip):
|
||||
#
|
||||
# SomeWorker.perform_bulk([[1], [2], [3]])
|
||||
#
|
||||
def perform_bulk(items, batch_size: 1_000)
|
||||
items.each_slice(batch_size).flat_map do |slice|
|
||||
Sidekiq::Client.push_bulk("class" => self, "args" => slice)
|
||||
end
|
||||
end
|
||||
|
||||
# +interval+ must be a timestamp, numeric or something that acts
|
||||
# numeric (like an activesupport time interval).
|
||||
def perform_in(interval, *args)
|
||||
|
|
|
@ -183,6 +183,31 @@ describe Sidekiq::Client do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '.perform_bulk' do
|
||||
it 'pushes a large set of jobs' do
|
||||
jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) })
|
||||
assert_equal 1_001, jids.size
|
||||
end
|
||||
|
||||
it 'pushes a large set of jobs with a different batch size' do
|
||||
jids = MyWorker.perform_bulk((1..1_001).to_a.map { |x| Array(x) }, batch_size: 100)
|
||||
assert_equal 1_001, jids.size
|
||||
end
|
||||
|
||||
it 'handles no jobs' do
|
||||
jids = MyWorker.perform_bulk([])
|
||||
assert_equal 0, jids.size
|
||||
end
|
||||
|
||||
describe 'errors' do
|
||||
it 'raises ArgumentError with invalid params' do
|
||||
assert_raises ArgumentError do
|
||||
Sidekiq::Client.push_bulk('class' => 'MyWorker', 'args' => [[1], 2])
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class BaseWorker
|
||||
|
|
|
@ -80,5 +80,16 @@ describe Sidekiq::Worker do
|
|||
assert_equal 'foo', job['queue']
|
||||
assert_equal 'xyz', job['bar']
|
||||
end
|
||||
|
||||
it 'works with .perform_bulk' do
|
||||
q = Sidekiq::Queue.new('bar')
|
||||
assert_equal 0, q.size
|
||||
|
||||
set = SetWorker.set('queue' => 'bar')
|
||||
jids = set.perform_bulk((1..1_001).to_a.map { |x| Array(x) })
|
||||
|
||||
assert_equal 1_001, q.size
|
||||
assert_equal 1_001, jids.size
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue