mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Extract client's job utility methods, use them for perform_inline
.
Sidekiq::Client is Redis-only, no inline support.
This commit is contained in:
parent
c950470cbc
commit
6ae5b02236
3 changed files with 85 additions and 57 deletions
|
@ -2,9 +2,12 @@
|
|||
|
||||
require "securerandom"
|
||||
require "sidekiq/middleware/chain"
|
||||
require "sidekiq/job_util"
|
||||
|
||||
module Sidekiq
|
||||
class Client
|
||||
include Sidekiq::JobUtil
|
||||
|
||||
##
|
||||
# Define client-side middleware:
|
||||
#
|
||||
|
@ -76,21 +79,6 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def perform_inline(item)
|
||||
normed = normalize_item(item)
|
||||
payload = process_single(item["class"], normed)
|
||||
|
||||
msg = Sidekiq.load_json(Sidekiq.dump_json(payload))
|
||||
klass = msg['class'].constantize
|
||||
job = klass.new
|
||||
job.jid = msg['jid']
|
||||
msg['id'] ||= SecureRandom.hex(12)
|
||||
|
||||
Sidekiq.server_middleware.invoke(job, msg, msg['queue']) do
|
||||
job.perform(*msg['args'])
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Push a large number of jobs to Redis. This method cuts out the redis
|
||||
# network round trip latency. I wouldn't recommend pushing more than
|
||||
|
@ -233,42 +221,5 @@ module Sidekiq
|
|||
item
|
||||
end
|
||||
end
|
||||
|
||||
def validate(item)
|
||||
raise(ArgumentError, "Job must be a Hash with 'class' and 'args' keys: `#{item}`") unless item.is_a?(Hash) && item.key?("class") && item.key?("args")
|
||||
raise(ArgumentError, "Job args must be an Array: `#{item}`") unless item["args"].is_a?(Array)
|
||||
raise(ArgumentError, "Job class must be either a Class or String representation of the class name: `#{item}`") unless item["class"].is_a?(Class) || item["class"].is_a?(String)
|
||||
raise(ArgumentError, "Job 'at' must be a Numeric timestamp: `#{item}`") if item.key?("at") && !item["at"].is_a?(Numeric)
|
||||
raise(ArgumentError, "Job tags must be an Array: `#{item}`") if item["tags"] && !item["tags"].is_a?(Array)
|
||||
end
|
||||
|
||||
def normalize_item(item)
|
||||
validate(item)
|
||||
# raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args']
|
||||
|
||||
# merge in the default sidekiq_options for the item's class and/or wrapped element
|
||||
# this allows ActiveJobs to control sidekiq_options too.
|
||||
defaults = normalized_hash(item["class"])
|
||||
defaults = defaults.merge(item["wrapped"].get_sidekiq_options) if item["wrapped"].respond_to?("get_sidekiq_options")
|
||||
item = defaults.merge(item)
|
||||
|
||||
raise(ArgumentError, "Job must include a valid queue name") if item["queue"].nil? || item["queue"] == ""
|
||||
|
||||
item["class"] = item["class"].to_s
|
||||
item["queue"] = item["queue"].to_s
|
||||
item["jid"] ||= SecureRandom.hex(12)
|
||||
item["created_at"] ||= Time.now.to_f
|
||||
|
||||
item
|
||||
end
|
||||
|
||||
def normalized_hash(item_class)
|
||||
if item_class.is_a?(Class)
|
||||
raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item_class.ancestors.inspect}") unless item_class.respond_to?("get_sidekiq_options")
|
||||
item_class.get_sidekiq_options
|
||||
else
|
||||
Sidekiq.default_worker_options
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
46
lib/sidekiq/job_util.rb
Normal file
46
lib/sidekiq/job_util.rb
Normal file
|
@ -0,0 +1,46 @@
|
|||
require "securerandom"
|
||||
require "time"
|
||||
|
||||
module Sidekiq
|
||||
module JobUtil
|
||||
# These functions encapsulate various job utilities.
|
||||
# They must be simple and free from side effects.
|
||||
|
||||
def validate(item)
|
||||
raise(ArgumentError, "Job must be a Hash with 'class' and 'args' keys: `#{item}`") unless item.is_a?(Hash) && item.key?("class") && item.key?("args")
|
||||
raise(ArgumentError, "Job args must be an Array: `#{item}`") unless item["args"].is_a?(Array)
|
||||
raise(ArgumentError, "Job class must be either a Class or String representation of the class name: `#{item}`") unless item["class"].is_a?(Class) || item["class"].is_a?(String)
|
||||
raise(ArgumentError, "Job 'at' must be a Numeric timestamp: `#{item}`") if item.key?("at") && !item["at"].is_a?(Numeric)
|
||||
raise(ArgumentError, "Job tags must be an Array: `#{item}`") if item["tags"] && !item["tags"].is_a?(Array)
|
||||
end
|
||||
|
||||
def normalize_item(item)
|
||||
validate(item)
|
||||
# raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args']
|
||||
|
||||
# merge in the default sidekiq_options for the item's class and/or wrapped element
|
||||
# this allows ActiveJobs to control sidekiq_options too.
|
||||
defaults = normalized_hash(item["class"])
|
||||
defaults = defaults.merge(item["wrapped"].get_sidekiq_options) if item["wrapped"].respond_to?("get_sidekiq_options")
|
||||
item = defaults.merge(item)
|
||||
|
||||
raise(ArgumentError, "Job must include a valid queue name") if item["queue"].nil? || item["queue"] == ""
|
||||
|
||||
item["class"] = item["class"].to_s
|
||||
item["queue"] = item["queue"].to_s
|
||||
item["jid"] ||= SecureRandom.hex(12)
|
||||
item["created_at"] ||= Time.now.to_f
|
||||
|
||||
item
|
||||
end
|
||||
|
||||
def normalized_hash(item_class)
|
||||
if item_class.is_a?(Class)
|
||||
raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item_class.ancestors.inspect}") unless item_class.respond_to?("get_sidekiq_options")
|
||||
item_class.get_sidekiq_options
|
||||
else
|
||||
Sidekiq.default_worker_options
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -171,6 +171,8 @@ module Sidekiq
|
|||
# SomeWorker.set(queue: 'foo').perform_async(....)
|
||||
#
|
||||
class Setter
|
||||
include Sidekiq::JobUtil
|
||||
|
||||
def initialize(klass, opts)
|
||||
@klass = klass
|
||||
@opts = opts
|
||||
|
@ -191,10 +193,40 @@ module Sidekiq
|
|||
@klass.client_push(@opts.merge("args" => args, "class" => @klass))
|
||||
end
|
||||
|
||||
# Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware
|
||||
# Explicit inline execution of a job. Returns nil if the job did not
|
||||
# execute, true otherwise.
|
||||
def perform_inline(*args)
|
||||
item = @opts.merge('args' => args, 'class' => @klass).transform_keys(&:to_s)
|
||||
Sidekiq::Client.new.perform_inline(item)
|
||||
raw = @opts.merge("args" => args, "class" => @klass).transform_keys(&:to_s)
|
||||
|
||||
# validate and normalize payload
|
||||
item = normalize_item(raw)
|
||||
queue = item["queue"]
|
||||
|
||||
# run client-side middleware
|
||||
result = Sidekiq.client_middleware.invoke(item["class"], item, queue, Sidekiq.redis_pool) do
|
||||
item
|
||||
end
|
||||
return nil unless result
|
||||
|
||||
# round-trip the payload via JSON
|
||||
msg = Sidekiq.load_json(Sidekiq.dump_json(item))
|
||||
|
||||
# prepare the job instance
|
||||
klass = msg["class"].constantize
|
||||
job = klass.new
|
||||
job.jid = msg["jid"]
|
||||
job.bid = msg["bid"] if job.respond_to?(:bid)
|
||||
|
||||
# run the job through server-side middleware
|
||||
result = Sidekiq.server_middleware.invoke(job, msg, msg["queue"]) do
|
||||
# perform it
|
||||
job.perform(*msg["args"])
|
||||
true
|
||||
end
|
||||
return nil unless result
|
||||
# jobs do not return a result. they should store any
|
||||
# modified state.
|
||||
true
|
||||
end
|
||||
|
||||
def perform_bulk(args, batch_size: 1_000)
|
||||
|
@ -249,8 +281,7 @@ module Sidekiq
|
|||
|
||||
# Inline execution of job's perform method after passing through Sidekiq.client_middleware and Sidekiq.server_middleware
|
||||
def perform_inline(*args)
|
||||
item = {"class" => self, "args" => args }
|
||||
Sidekiq::Client.new.perform_inline(item)
|
||||
Setter.new(self, {}).perform_inline(*args)
|
||||
end
|
||||
|
||||
##
|
||||
|
|
Loading…
Add table
Reference in a new issue