mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor recent client changes
I find `push` is easier to read this way. push_bulk is quite complex and adds a net-negative layer of abstraction.
This commit is contained in:
parent
44a5bb3326
commit
ba557da4f4
3 changed files with 14 additions and 19 deletions
|
@ -70,7 +70,14 @@ module Sidekiq
|
|||
# push('queue' => 'my_queue', 'class' => MyJob, 'args' => ['foo', 1, :bat => 'bar'])
|
||||
#
|
||||
def push(item)
|
||||
push_bulk(item.merge("args" => [item["args"]])).first
|
||||
normed = normalize_item(item)
|
||||
payload = middleware.invoke(normed["class"], normed, normed["queue"], @redis_pool) do
|
||||
normed
|
||||
end
|
||||
if payload
|
||||
raw_push([payload])
|
||||
payload["jid"]
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
|
@ -100,10 +107,11 @@ module Sidekiq
|
|||
|
||||
normed = normalize_item(items)
|
||||
payloads = args.map.with_index { |job_args, index|
|
||||
copy = normed.merge("args" => job_args, "jid" => jid || generate_jid)
|
||||
copy = normed.merge("args" => job_args, "jid" => SecureRandom.hex(12))
|
||||
copy["at"] = (at.is_a?(Array) ? at[index] : at) if at
|
||||
|
||||
result = process_single(items["class"], copy)
|
||||
result = middleware.invoke(copy["class"], copy, copy["queue"], @redis_pool) do
|
||||
copy
|
||||
end
|
||||
result || nil
|
||||
}.compact
|
||||
|
||||
|
@ -225,13 +233,5 @@ module Sidekiq
|
|||
conn.lpush("queue:#{queue}", to_push)
|
||||
end
|
||||
end
|
||||
|
||||
def process_single(job_class, item)
|
||||
queue = item["queue"]
|
||||
|
||||
middleware.invoke(job_class, item, queue, @redis_pool) do
|
||||
item
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -40,10 +40,10 @@ module Sidekiq
|
|||
|
||||
raise(ArgumentError, "Job must include a valid queue name") if item["queue"].nil? || item["queue"] == ""
|
||||
|
||||
item["jid"] ||= SecureRandom.hex(12)
|
||||
item["class"] = item["class"].to_s
|
||||
item["queue"] = item["queue"].to_s
|
||||
item["created_at"] ||= Time.now.to_f
|
||||
|
||||
item
|
||||
end
|
||||
|
||||
|
@ -61,9 +61,5 @@ module Sidekiq
|
|||
def json_safe?(item)
|
||||
JSON.parse(JSON.dump(item["args"])) == item["args"]
|
||||
end
|
||||
|
||||
def generate_jid
|
||||
SecureRandom.hex(12)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -206,7 +206,6 @@ module Sidekiq
|
|||
|
||||
# validate and normalize payload
|
||||
item = normalize_item(raw)
|
||||
item["jid"] ||= generate_jid
|
||||
queue = item["queue"]
|
||||
|
||||
# run client-side middleware
|
||||
|
@ -355,7 +354,7 @@ module Sidekiq
|
|||
|
||||
def client_push(item) # :nodoc:
|
||||
raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) }
|
||||
build_client.push_bulk(item.merge("args" => [item["args"]])).first
|
||||
build_client.push(item)
|
||||
end
|
||||
|
||||
def build_client # :nodoc:
|
||||
|
|
Loading…
Reference in a new issue