diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 8a774cb0..5b218c85 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -54,6 +54,7 @@ module Sidekiq # at - timestamp to schedule the job (optional), must be Numeric (e.g. Time.now.to_f) # retry - whether to retry this job if it fails, default true or an integer number of retries # backtrace - whether to save any error backtrace, default false + # jid - job ID when retrying a job, will be automatically generated otherwise # # If class is set to the class name, the jobs' options will be based on Sidekiq's default # worker options. Otherwise, they will be based on the job class's options. @@ -70,13 +71,7 @@ module Sidekiq # push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) # def push(item) - normed = normalize_item(item) - payload = process_single(item["class"], normed) - - if payload - raw_push([payload]) - payload["jid"] - end + push_bulk(item.merge("args" => [item["args"]])).first end ## @@ -101,9 +96,12 @@ module Sidekiq raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all? { |entry| entry.is_a?(Numeric) }) raise ArgumentError, "Job 'at' Array must have same size as 'args' Array" if at.is_a?(Array) && at.size != args.size + jid = items.delete("jid") + raise ArgumentError, "Explicitly passing 'jid' when pushing more than one job is not supported" if jid && args.size > 1 + normed = normalize_item(items) payloads = args.map.with_index { |job_args, index| - copy = normed.merge("args" => job_args, "jid" => SecureRandom.hex(12)) + copy = normed.merge("args" => job_args, "jid" => jid || generate_jid) copy["at"] = (at.is_a?(Array) ? at[index] : at) if at result = process_single(items["class"], copy) diff --git a/lib/sidekiq/job_util.rb b/lib/sidekiq/job_util.rb index 65e354b2..7f5dab5d 100644 --- a/lib/sidekiq/job_util.rb +++ b/lib/sidekiq/job_util.rb @@ -41,7 +41,6 @@ module Sidekiq item["class"] = item["class"].to_s item["queue"] = item["queue"].to_s - item["jid"] ||= SecureRandom.hex(12) item["created_at"] ||= Time.now.to_f item @@ -61,5 +60,9 @@ module Sidekiq def json_safe?(item) JSON.parse(JSON.dump(item["args"])) == item["args"] end + + def generate_jid + SecureRandom.hex(12) + end end end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index b262bd16..3c046bcf 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -206,6 +206,7 @@ module Sidekiq # validate and normalize payload item = normalize_item(raw) + item["jid"] ||= generate_jid queue = item["queue"] # run client-side middleware @@ -237,8 +238,7 @@ module Sidekiq alias_method :perform_sync, :perform_inline def perform_bulk(args, batch_size: 1_000) - pool = Thread.current[:sidekiq_via_pool] || @klass.get_sidekiq_options["pool"] || Sidekiq.redis_pool - client = Sidekiq::Client.new(pool) + client = @klass.build_client result = args.each_slice(batch_size).flat_map do |slice| client.push_bulk(@opts.merge("class" => @klass, "args" => slice)) end @@ -353,10 +353,13 @@ module Sidekiq end def client_push(item) # :nodoc: - pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) } + build_client.push_bulk(item.merge("args" => [item["args"]])).first + end - Sidekiq::Client.new(pool).push(item) + def build_client # :nodoc: + pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool + Sidekiq::Client.new(pool) end end end