From 2898a715d7f46bdeff7ccec5317bb0e20002acaa Mon Sep 17 00:00:00 2001 From: Hasan Kumar Date: Mon, 15 Nov 2021 15:57:48 +0530 Subject: [PATCH] feat: adds support for `.perform_inline` which processes the job inline after passing through available client and server middlewares --- lib/sidekiq/client.rb | 8 ++++++-- lib/sidekiq/worker.rb | 21 +++++++++++++++++++++ test/test_worker.rb | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 6d1ba83b..ab9dfd37 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -67,8 +67,7 @@ module Sidekiq # push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) # def push(item) - normed = normalize_item(item) - payload = process_single(item["class"], normed) + payload = payload_for_push(item) if payload raw_push([payload]) @@ -76,6 +75,11 @@ module Sidekiq end end + def payload_for_push(item) + normed = normalize_item(item) + process_single(item["class"], normed) + end + ## # Push a large number of jobs to Redis. This method cuts out the redis # network round trip latency. I wouldn't recommend pushing more than diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 73c32ba5..2d1c9e6a 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -191,6 +191,22 @@ module Sidekiq @klass.client_push(@opts.merge("args" => args, "class" => @klass)) end + # Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware + def perform_inline(*args) + item = @opts.merge('args' => args, 'class' => @klass).transform_keys(&:to_s) + payload = Sidekiq::Client.new.payload_for_push(item) + + msg = Sidekiq.load_json(Sidekiq.dump_json(payload)) + klass = msg['class'].constantize + job = klass.new + job.jid = msg['jid'] + msg['id'] ||= SecureRandom.hex(12) + + Sidekiq.server_middleware.invoke(job, msg, msg['queue']) do + job.perform(*msg['args']) + end + end + def perform_bulk(args, batch_size: 1_000) args.each_slice(batch_size).flat_map do |slice| Sidekiq::Client.push_bulk(@opts.merge("class" => @klass, "args" => slice)) @@ -241,6 +257,11 @@ module Sidekiq client_push("class" => self, "args" => args) end + # Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware + def perform_inline(*args) + set({}).perform_inline(*args) + end + ## # Push a large number of jobs to Redis, while limiting the batch of # each job payload to 1,000. This method helps cut down on the number diff --git a/test/test_worker.rb b/test/test_worker.rb index 576e019e..9a64290d 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -92,4 +92,43 @@ describe Sidekiq::Worker do assert_equal 1_001, jids.size end end + + describe '#perform_inline' do + $my_recorder = [] + + class MyCustomWorker + include Sidekiq::Worker + + def perform(recorder) + $my_recorder << ['work_performed'] + end + end + + class MyCustomMiddleware + def initialize(name, recorder) + @name = name + @recorder = recorder + end + + def call(*args) + @recorder << "#{@name}-before" + response = yield + @recorder << "#{@name}-after" + return response + end + end + + it 'executes middleware & runs job inline' do + server_chain = Sidekiq::Middleware::Chain.new + server_chain.add MyCustomMiddleware, "1-server", $my_recorder + client_chain = Sidekiq::Middleware::Chain.new + client_chain.add MyCustomMiddleware, "1-client", $my_recorder + Sidekiq.stub(:server_middleware, server_chain) do + Sidekiq.stub(:client_middleware, client_chain) do + MyCustomWorker.perform_inline($my_recorder) + assert_equal $my_recorder.flatten, %w(1-client-before 1-client-after 1-server-before work_performed 1-server-after) + end + end + end + end end