mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Client middleware should be able to stop bulk job push, fixes #2887
This commit is contained in:
parent
c7f2f5f93d
commit
59061042c9
3 changed files with 28 additions and 2 deletions
|
@ -1,5 +1,10 @@
|
|||
# Sidekiq Changes
|
||||
|
||||
4.1.2
|
||||
-----------
|
||||
|
||||
- Client middleware can now stop bulk job push. [#2887]
|
||||
|
||||
4.1.1
|
||||
-----------
|
||||
|
||||
|
|
|
@ -82,10 +82,14 @@ module Sidekiq
|
|||
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
|
||||
# than the number given if the middleware stopped processing for one or more jobs.
|
||||
def push_bulk(items)
|
||||
arg = items['args'].first
|
||||
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !arg.is_a?(Array)
|
||||
|
||||
normed = normalize_item(items)
|
||||
payloads = items['args'].map do |args|
|
||||
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array)
|
||||
process_single(items['class'], normed.merge('args' => args, 'jid' => SecureRandom.hex(12), 'enqueued_at' => Time.now.to_f))
|
||||
copy = normed.merge('args' => args, 'jid' => SecureRandom.hex(12), 'enqueued_at' => Time.now.to_f)
|
||||
result = process_single(items['class'], copy)
|
||||
result ? result : nil
|
||||
end.compact
|
||||
|
||||
raw_push(payloads) if !payloads.empty?
|
||||
|
|
|
@ -32,6 +32,23 @@ class TestClient < Sidekiq::Test
|
|||
assert_equal 24, jid.size
|
||||
end
|
||||
|
||||
it 'allows middleware to stop bulk jobs' do
|
||||
mware = Class.new do
|
||||
def call(worker_klass,msg,q,r)
|
||||
msg['args'][0] == 1 ? yield : false
|
||||
end
|
||||
end
|
||||
client = Sidekiq::Client.new
|
||||
client.middleware do |chain|
|
||||
chain.add mware
|
||||
end
|
||||
q = Sidekiq::Queue.new
|
||||
q.clear
|
||||
result = client.push_bulk('class' => 'Blah', 'args' => [[1],[2],[3]])
|
||||
assert_equal 1, result.size
|
||||
assert_equal 1, q.size
|
||||
end
|
||||
|
||||
it 'allows local middleware modification' do
|
||||
$called = false
|
||||
mware = Class.new { def call(worker_klass,msg,q,r); $called = true; msg;end }
|
||||
|
|
Loading…
Add table
Reference in a new issue