mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
feat: adds support for .perform_inline
which processes the job inline
after passing through available client and server middlewares
This commit is contained in:
parent
a486ffa555
commit
2898a715d7
3 changed files with 66 additions and 2 deletions
|
@ -67,8 +67,7 @@ module Sidekiq
|
|||
# push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
|
||||
#
|
||||
def push(item)
|
||||
normed = normalize_item(item)
|
||||
payload = process_single(item["class"], normed)
|
||||
payload = payload_for_push(item)
|
||||
|
||||
if payload
|
||||
raw_push([payload])
|
||||
|
@ -76,6 +75,11 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def payload_for_push(item)
|
||||
normed = normalize_item(item)
|
||||
process_single(item["class"], normed)
|
||||
end
|
||||
|
||||
##
|
||||
# Push a large number of jobs to Redis. This method cuts out the redis
|
||||
# network round trip latency. I wouldn't recommend pushing more than
|
||||
|
|
|
@ -191,6 +191,22 @@ module Sidekiq
|
|||
@klass.client_push(@opts.merge("args" => args, "class" => @klass))
|
||||
end
|
||||
|
||||
# Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware
|
||||
def perform_inline(*args)
|
||||
item = @opts.merge('args' => args, 'class' => @klass).transform_keys(&:to_s)
|
||||
payload = Sidekiq::Client.new.payload_for_push(item)
|
||||
|
||||
msg = Sidekiq.load_json(Sidekiq.dump_json(payload))
|
||||
klass = msg['class'].constantize
|
||||
job = klass.new
|
||||
job.jid = msg['jid']
|
||||
msg['id'] ||= SecureRandom.hex(12)
|
||||
|
||||
Sidekiq.server_middleware.invoke(job, msg, msg['queue']) do
|
||||
job.perform(*msg['args'])
|
||||
end
|
||||
end
|
||||
|
||||
def perform_bulk(args, batch_size: 1_000)
|
||||
args.each_slice(batch_size).flat_map do |slice|
|
||||
Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
|
||||
|
@ -241,6 +257,11 @@ module Sidekiq
|
|||
client_push("class" => self, "args" => args)
|
||||
end
|
||||
|
||||
# Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware
|
||||
def perform_inline(*args)
|
||||
set({}).perform_inline(*args)
|
||||
end
|
||||
|
||||
##
|
||||
# Push a large number of jobs to Redis, while limiting the batch of
|
||||
# each job payload to 1,000. This method helps cut down on the number
|
||||
|
|
|
@ -92,4 +92,43 @@ describe Sidekiq::Worker do
|
|||
assert_equal 1_001, jids.size
|
||||
end
|
||||
end
|
||||
|
||||
describe '#perform_inline' do
|
||||
$my_recorder = []
|
||||
|
||||
class MyCustomWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform(recorder)
|
||||
$my_recorder << ['work_performed']
|
||||
end
|
||||
end
|
||||
|
||||
class MyCustomMiddleware
|
||||
def initialize(name, recorder)
|
||||
@name = name
|
||||
@recorder = recorder
|
||||
end
|
||||
|
||||
def call(*args)
|
||||
@recorder << "#{@name}-before"
|
||||
response = yield
|
||||
@recorder << "#{@name}-after"
|
||||
return response
|
||||
end
|
||||
end
|
||||
|
||||
it 'executes middleware & runs job inline' do
|
||||
server_chain = Sidekiq::Middleware::Chain.new
|
||||
server_chain.add MyCustomMiddleware, "1-server", $my_recorder
|
||||
client_chain = Sidekiq::Middleware::Chain.new
|
||||
client_chain.add MyCustomMiddleware, "1-client", $my_recorder
|
||||
Sidekiq.stub(:server_middleware, server_chain) do
|
||||
Sidekiq.stub(:client_middleware, client_chain) do
|
||||
MyCustomWorker.perform_inline($my_recorder)
|
||||
assert_equal $my_recorder.flatten, %w(1-client-before 1-client-after 1-server-before work_performed 1-server-after)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue