mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Validate that args for bulk push is an Array of Arrays (#4578)
This commit is contained in:
parent
19a236eb85
commit
a1805e36a0
2 changed files with 13 additions and 5 deletions
|
@ -90,16 +90,16 @@ module Sidekiq
|
||||||
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
|
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
|
||||||
# than the number given if the middleware stopped processing for one or more jobs.
|
# than the number given if the middleware stopped processing for one or more jobs.
|
||||||
def push_bulk(items)
|
def push_bulk(items)
|
||||||
arg = items["args"].first
|
args = items["args"]
|
||||||
return [] unless arg # no jobs to push
|
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" unless (args.is_a?(Array) && args.all?(Array))
|
||||||
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" unless arg.is_a?(Array)
|
return [] if args.empty? # no jobs to push
|
||||||
|
|
||||||
at = items.delete("at")
|
at = items.delete("at")
|
||||||
raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all?(Numeric))
|
raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all?(Numeric))
|
||||||
|
|
||||||
normed = normalize_item(items)
|
normed = normalize_item(items)
|
||||||
payloads = items["args"].map.with_index { |args, index|
|
payloads = args.map.with_index { |job_args, index|
|
||||||
copy = normed.merge("args" => args, "jid" => SecureRandom.hex(12), "enqueued_at" => Time.now.to_f)
|
copy = normed.merge("args" => job_args, "jid" => SecureRandom.hex(12), "enqueued_at" => Time.now.to_f)
|
||||||
copy["at"] = (at.is_a?(Array) ? at[index] : at) if at
|
copy["at"] = (at.is_a?(Array) ? at[index] : at) if at
|
||||||
|
|
||||||
result = process_single(items["class"], copy)
|
result = process_single(items["class"], copy)
|
||||||
|
|
|
@ -162,6 +162,14 @@ describe Sidekiq::Client do
|
||||||
result = Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => [])
|
result = Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => [])
|
||||||
assert_equal 0, result.size
|
assert_equal 0, result.size
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe 'errors' do
|
||||||
|
it 'raises ArgumentError with invalid params' do
|
||||||
|
assert_raises ArgumentError do
|
||||||
|
Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => [[1], 2])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
class BaseWorker
|
class BaseWorker
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue