diff --git a/Changes.md b/Changes.md index fe9852b7..95389987 100644 --- a/Changes.md +++ b/Changes.md @@ -1,5 +1,10 @@ # Sidekiq Changes +4.1.2 +----------- + +- Client middleware can now stop bulk job push. [#2887] + 4.1.1 ----------- diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 01b52951..a21ace61 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -82,10 +82,14 @@ module Sidekiq # Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less # than the number given if the middleware stopped processing for one or more jobs. def push_bulk(items) + arg = items['args'].first + raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !arg.is_a?(Array) + normed = normalize_item(items) payloads = items['args'].map do |args| - raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array) - process_single(items['class'], normed.merge('args' => args, 'jid' => SecureRandom.hex(12), 'enqueued_at' => Time.now.to_f)) + copy = normed.merge('args' => args, 'jid' => SecureRandom.hex(12), 'enqueued_at' => Time.now.to_f) + result = process_single(items['class'], copy) + result ? result : nil end.compact raw_push(payloads) if !payloads.empty? diff --git a/test/test_client.rb b/test/test_client.rb index 807fc949..426400a2 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -32,6 +32,23 @@ class TestClient < Sidekiq::Test assert_equal 24, jid.size end + it 'allows middleware to stop bulk jobs' do + mware = Class.new do + def call(worker_klass,msg,q,r) + msg['args'][0] == 1 ? yield : false + end + end + client = Sidekiq::Client.new + client.middleware do |chain| + chain.add mware + end + q = Sidekiq::Queue.new + q.clear + result = client.push_bulk('class' => 'Blah', 'args' => [[1],[2],[3]]) + assert_equal 1, result.size + assert_equal 1, q.size + end + it 'allows local middleware modification' do $called = false mware = Class.new { def call(worker_klass,msg,q,r); $called = true; msg;end }