mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Support Client.push_bulk with different delays
This commit is contained in:
parent
c3ea67a79b
commit
2bcf3bfff1
3 changed files with 28 additions and 2 deletions
|
@ -5,6 +5,10 @@
|
|||
HEAD
|
||||
---------
|
||||
|
||||
- Support `Client.push_bulk` with different delays [#4243]
|
||||
```ruby
|
||||
Sidekiq::Client.push_bulk("class" => FooJob, "args" => [[1], [2]], "at" => [1.minute.from_now.to_f, 5.minutes.from_now.to_f])
|
||||
```
|
||||
- Add `sidekiqmon` to gemspec executables [#4242]
|
||||
- Gracefully handle `Sidekiq.logger = nil` [#4240]
|
||||
- Inject Sidekiq::LogContext module if user-supplied logger does not include it [#4239]
|
||||
|
|
|
@ -94,9 +94,14 @@ module Sidekiq
|
|||
return [] unless arg # no jobs to push
|
||||
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" unless arg.is_a?(Array)
|
||||
|
||||
at = items.delete("at")
|
||||
raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all?(Numeric))
|
||||
|
||||
normed = normalize_item(items)
|
||||
payloads = items["args"].map { |args|
|
||||
copy = normed.merge("args" => args, "jid" => SecureRandom.hex(12), "enqueued_at" => Time.now.to_f)
|
||||
payloads = items["args"].map.with_index { |args, index|
|
||||
single_at = at.is_a?(Array) ? at[index] : at
|
||||
copy = normed.merge("args" => args, "jid" => SecureRandom.hex(12), "at" => single_at, "enqueued_at" => Time.now.to_f)
|
||||
|
||||
result = process_single(items["class"], copy)
|
||||
result || nil
|
||||
}.compact
|
||||
|
|
|
@ -28,6 +28,10 @@ describe Sidekiq::Client do
|
|||
assert_raises ArgumentError do
|
||||
Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1], 'at' => Time.now)
|
||||
end
|
||||
|
||||
assert_raises ArgumentError do
|
||||
Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2], 'at' => [Time.now.to_f, :not_a_numeric])
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -118,19 +122,32 @@ describe Sidekiq::Client do
|
|||
after do
|
||||
Sidekiq::Queue.new.clear
|
||||
end
|
||||
|
||||
it 'can push a large set of jobs at once' do
|
||||
jids = Sidekiq::Client.push_bulk('class' => QueuedWorker, 'args' => (1..1_000).to_a.map { |x| Array(x) })
|
||||
assert_equal 1_000, jids.size
|
||||
end
|
||||
|
||||
it 'can push a large set of jobs at once using a String class' do
|
||||
jids = Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => (1..1_000).to_a.map { |x| Array(x) })
|
||||
assert_equal 1_000, jids.size
|
||||
end
|
||||
|
||||
it 'can push jobs scheduled at different times' do
|
||||
first_at = Time.new(2019, 1, 1)
|
||||
second_at = Time.new(2019, 1, 2)
|
||||
jids = Sidekiq::Client.push_bulk('class' => QueuedWorker, 'args' => [[1], [2]], 'at' => [first_at.to_f, second_at.to_f])
|
||||
(first_jid, second_jid) = jids
|
||||
assert_equal first_at, Sidekiq::ScheduledSet.new.find_job(first_jid).at
|
||||
assert_equal second_at, Sidekiq::ScheduledSet.new.find_job(second_jid).at
|
||||
end
|
||||
|
||||
it 'returns the jids for the jobs' do
|
||||
Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => (1..2).to_a.map { |x| Array(x) }).each do |jid|
|
||||
assert_match(/[0-9a-f]{12}/, jid)
|
||||
end
|
||||
end
|
||||
|
||||
it 'handles no jobs' do
|
||||
result = Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => [])
|
||||
assert_equal 0, result.size
|
||||
|
|
Loading…
Add table
Reference in a new issue