diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 2fe87b99..2b991a2d 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -2,9 +2,12 @@ require "securerandom" require "sidekiq/middleware/chain" +require "sidekiq/job_util" module Sidekiq class Client + include Sidekiq::JobUtil + ## # Define client-side middleware: # @@ -76,21 +79,6 @@ module Sidekiq end end - def perform_inline(item) - normed = normalize_item(item) - payload = process_single(item["class"], normed) - - msg = Sidekiq.load_json(Sidekiq.dump_json(payload)) - klass = msg['class'].constantize - job = klass.new - job.jid = msg['jid'] - msg['id'] ||= SecureRandom.hex(12) - - Sidekiq.server_middleware.invoke(job, msg, msg['queue']) do - job.perform(*msg['args']) - end - end - ## # Push a large number of jobs to Redis. This method cuts out the redis # network round trip latency. I wouldn't recommend pushing more than @@ -233,42 +221,5 @@ module Sidekiq item end end - - def validate(item) - raise(ArgumentError, "Job must be a Hash with 'class' and 'args' keys: `#{item}`") unless item.is_a?(Hash) && item.key?("class") && item.key?("args") - raise(ArgumentError, "Job args must be an Array: `#{item}`") unless item["args"].is_a?(Array) - raise(ArgumentError, "Job class must be either a Class or String representation of the class name: `#{item}`") unless item["class"].is_a?(Class) || item["class"].is_a?(String) - raise(ArgumentError, "Job 'at' must be a Numeric timestamp: `#{item}`") if item.key?("at") && !item["at"].is_a?(Numeric) - raise(ArgumentError, "Job tags must be an Array: `#{item}`") if item["tags"] && !item["tags"].is_a?(Array) - end - - def normalize_item(item) - validate(item) - # raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args'] - - # merge in the default sidekiq_options for the item's class and/or wrapped element - # this allows ActiveJobs to control sidekiq_options too. - defaults = normalized_hash(item["class"]) - defaults = defaults.merge(item["wrapped"].get_sidekiq_options) if item["wrapped"].respond_to?("get_sidekiq_options") - item = defaults.merge(item) - - raise(ArgumentError, "Job must include a valid queue name") if item["queue"].nil? || item["queue"] == "" - - item["class"] = item["class"].to_s - item["queue"] = item["queue"].to_s - item["jid"] ||= SecureRandom.hex(12) - item["created_at"] ||= Time.now.to_f - - item - end - - def normalized_hash(item_class) - if item_class.is_a?(Class) - raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item_class.ancestors.inspect}") unless item_class.respond_to?("get_sidekiq_options") - item_class.get_sidekiq_options - else - Sidekiq.default_worker_options - end - end end end diff --git a/lib/sidekiq/job_util.rb b/lib/sidekiq/job_util.rb new file mode 100644 index 00000000..e9a14cc2 --- /dev/null +++ b/lib/sidekiq/job_util.rb @@ -0,0 +1,46 @@ +require "securerandom" +require "time" + +module Sidekiq + module JobUtil + # These functions encapsulate various job utilities. + # They must be simple and free from side effects. + + def validate(item) + raise(ArgumentError, "Job must be a Hash with 'class' and 'args' keys: `#{item}`") unless item.is_a?(Hash) && item.key?("class") && item.key?("args") + raise(ArgumentError, "Job args must be an Array: `#{item}`") unless item["args"].is_a?(Array) + raise(ArgumentError, "Job class must be either a Class or String representation of the class name: `#{item}`") unless item["class"].is_a?(Class) || item["class"].is_a?(String) + raise(ArgumentError, "Job 'at' must be a Numeric timestamp: `#{item}`") if item.key?("at") && !item["at"].is_a?(Numeric) + raise(ArgumentError, "Job tags must be an Array: `#{item}`") if item["tags"] && !item["tags"].is_a?(Array) + end + + def normalize_item(item) + validate(item) + # raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args'] + + # merge in the default sidekiq_options for the item's class and/or wrapped element + # this allows ActiveJobs to control sidekiq_options too. + defaults = normalized_hash(item["class"]) + defaults = defaults.merge(item["wrapped"].get_sidekiq_options) if item["wrapped"].respond_to?("get_sidekiq_options") + item = defaults.merge(item) + + raise(ArgumentError, "Job must include a valid queue name") if item["queue"].nil? || item["queue"] == "" + + item["class"] = item["class"].to_s + item["queue"] = item["queue"].to_s + item["jid"] ||= SecureRandom.hex(12) + item["created_at"] ||= Time.now.to_f + + item + end + + def normalized_hash(item_class) + if item_class.is_a?(Class) + raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item_class.ancestors.inspect}") unless item_class.respond_to?("get_sidekiq_options") + item_class.get_sidekiq_options + else + Sidekiq.default_worker_options + end + end + end +end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 438eb0b8..8d78895c 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -171,6 +171,8 @@ module Sidekiq # SomeWorker.set(queue: 'foo').perform_async(....) # class Setter + include Sidekiq::JobUtil + def initialize(klass, opts) @klass = klass @opts = opts @@ -191,10 +193,40 @@ module Sidekiq @klass.client_push(@opts.merge("args" => args, "class" => @klass)) end - # Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware + # Explicit inline execution of a job. Returns nil if the job did not + # execute, true otherwise. def perform_inline(*args) - item = @opts.merge('args' => args, 'class' => @klass).transform_keys(&:to_s) - Sidekiq::Client.new.perform_inline(item) + raw = @opts.merge("args" => args, "class" => @klass).transform_keys(&:to_s) + + # validate and normalize payload + item = normalize_item(raw) + queue = item["queue"] + + # run client-side middleware + result = Sidekiq.client_middleware.invoke(item["class"], item, queue, Sidekiq.redis_pool) do + item + end + return nil unless result + + # round-trip the payload via JSON + msg = Sidekiq.load_json(Sidekiq.dump_json(item)) + + # prepare the job instance + klass = msg["class"].constantize + job = klass.new + job.jid = msg["jid"] + job.bid = msg["bid"] if job.respond_to?(:bid) + + # run the job through server-side middleware + result = Sidekiq.server_middleware.invoke(job, msg, msg["queue"]) do + # perform it + job.perform(*msg["args"]) + true + end + return nil unless result + # jobs do not return a result. they should store any + # modified state. + true end def perform_bulk(args, batch_size: 1_000) @@ -249,8 +281,7 @@ module Sidekiq # Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware def perform_inline(*args) - item = {"class" => self, "args" => args } - Sidekiq::Client.new.perform_inline(item) + Setter.new(self, {}).perform_inline(*args) end ##