From ba557da4f421edbcd346560ba069316c84a7a84d Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Mon, 14 Mar 2022 12:16:15 -0700 Subject: [PATCH] Refactor recent client changes I find `push` is easier to read this way. push_bulk is quite complex and adds a net-negative layer of abstraction. --- lib/sidekiq/client.rb | 24 ++++++++++++------------ lib/sidekiq/job_util.rb | 6 +----- lib/sidekiq/worker.rb | 3 +-- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 973513f8..4b137548 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -70,7 +70,14 @@ module Sidekiq # push('queue' => 'my_queue', 'class' => MyJob, 'args' => ['foo', 1, :bat => 'bar']) # def push(item) - push_bulk(item.merge("args" => [item["args"]])).first + normed = normalize_item(item) + payload = middleware.invoke(normed["class"], normed, normed["queue"], @redis_pool) do + normed + end + if payload + raw_push([payload]) + payload["jid"] + end end ## @@ -100,10 +107,11 @@ module Sidekiq normed = normalize_item(items) payloads = args.map.with_index { |job_args, index| - copy = normed.merge("args" => job_args, "jid" => jid || generate_jid) + copy = normed.merge("args" => job_args, "jid" => SecureRandom.hex(12)) copy["at"] = (at.is_a?(Array) ? at[index] : at) if at - - result = process_single(items["class"], copy) + result = middleware.invoke(copy["class"], copy, copy["queue"], @redis_pool) do + copy + end result || nil }.compact @@ -225,13 +233,5 @@ module Sidekiq conn.lpush("queue:#{queue}", to_push) end end - - def process_single(job_class, item) - queue = item["queue"] - - middleware.invoke(job_class, item, queue, @redis_pool) do - item - end - end end end diff --git a/lib/sidekiq/job_util.rb b/lib/sidekiq/job_util.rb index 3190edee..3e520acc 100644 --- a/lib/sidekiq/job_util.rb +++ b/lib/sidekiq/job_util.rb @@ -40,10 +40,10 @@ module Sidekiq raise(ArgumentError, "Job must include a valid queue name") if item["queue"].nil? || item["queue"] == "" + item["jid"] ||= SecureRandom.hex(12) item["class"] = item["class"].to_s item["queue"] = item["queue"].to_s item["created_at"] ||= Time.now.to_f - item end @@ -61,9 +61,5 @@ module Sidekiq def json_safe?(item) JSON.parse(JSON.dump(item["args"])) == item["args"] end - - def generate_jid - SecureRandom.hex(12) - end end end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 6fbfe57b..122bbe16 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -206,7 +206,6 @@ module Sidekiq # validate and normalize payload item = normalize_item(raw) - item["jid"] ||= generate_jid queue = item["queue"] # run client-side middleware @@ -355,7 +354,7 @@ module Sidekiq def client_push(item) # :nodoc: raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) } - build_client.push_bulk(item.merge("args" => [item["args"]])).first + build_client.push(item) end def build_client # :nodoc: