diff --git a/Changes.md b/Changes.md index 09c3c3ac..d01dbb54 100644 --- a/Changes.md +++ b/Changes.md @@ -5,6 +5,10 @@ HEAD --------- +- Support `Client.push_bulk` with different delays [#4243] +```ruby +Sidekiq::Client.push_bulk("class" => FooJob, "args" => [[1], [2]], "at" => [1.minute.from_now.to_f, 5.minutes.from_now.to_f]) +``` - Add `sidekiqmon` to gemspec executables [#4242] - Gracefully handle `Sidekiq.logger = nil` [#4240] - Inject Sidekiq::LogContext module if user-supplied logger does not include it [#4239] diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 3f9da443..f0257408 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -94,9 +94,14 @@ module Sidekiq return [] unless arg # no jobs to push raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" unless arg.is_a?(Array) + at = items.delete("at") + raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all?(Numeric)) + normed = normalize_item(items) - payloads = items["args"].map { |args| - copy = normed.merge("args" => args, "jid" => SecureRandom.hex(12), "enqueued_at" => Time.now.to_f) + payloads = items["args"].map.with_index { |args, index| + single_at = at.is_a?(Array) ? at[index] : at + copy = normed.merge("args" => args, "jid" => SecureRandom.hex(12), "at" => single_at, "enqueued_at" => Time.now.to_f) + result = process_single(items["class"], copy) result || nil }.compact diff --git a/test/test_client.rb b/test/test_client.rb index a304e690..0e74092a 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -28,6 +28,10 @@ describe Sidekiq::Client do assert_raises ArgumentError do Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1], 'at' => Time.now) end + + assert_raises ArgumentError do + Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2], 'at' => [Time.now.to_f, :not_a_numeric]) + end end end @@ -118,19 +122,32 @@ describe Sidekiq::Client do after do Sidekiq::Queue.new.clear end + it 'can push a large set of jobs at once' do jids = Sidekiq::Client.push_bulk('class' => QueuedWorker, 'args' => (1..1_000).to_a.map { |x| Array(x) }) assert_equal 1_000, jids.size end + it 'can push a large set of jobs at once using a String class' do jids = Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => (1..1_000).to_a.map { |x| Array(x) }) assert_equal 1_000, jids.size end + + it 'can push jobs scheduled at different times' do + first_at = Time.new(2019, 1, 1) + second_at = Time.new(2019, 1, 2) + jids = Sidekiq::Client.push_bulk('class' => QueuedWorker, 'args' => [[1], [2]], 'at' => [first_at.to_f, second_at.to_f]) + (first_jid, second_jid) = jids + assert_equal first_at, Sidekiq::ScheduledSet.new.find_job(first_jid).at + assert_equal second_at, Sidekiq::ScheduledSet.new.find_job(second_jid).at + end + it 'returns the jids for the jobs' do Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => (1..2).to_a.map { |x| Array(x) }).each do |jid| assert_match(/[0-9a-f]{12}/, jid) end end + it 'handles no jobs' do result = Sidekiq::Client.push_bulk('class' => 'QueuedWorker', 'args' => []) assert_equal 0, result.size