mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Remove parallel code paths caused by "push" and "push_bulk" (#5182)
* Remove parallel code paths caused by "push" and "push_bulk" Fixes #5158 * Refactor where jid is generated * Hardcode client class * Make generate_jid private * Build client outside the loop
This commit is contained in:
parent
00200869ee
commit
7cd97b4253
3 changed files with 17 additions and 13 deletions
|
@ -54,6 +54,7 @@ module Sidekiq
|
||||||
# at - timestamp to schedule the job (optional), must be Numeric (e.g. Time.now.to_f)
|
# at - timestamp to schedule the job (optional), must be Numeric (e.g. Time.now.to_f)
|
||||||
# retry - whether to retry this job if it fails, default true or an integer number of retries
|
# retry - whether to retry this job if it fails, default true or an integer number of retries
|
||||||
# backtrace - whether to save any error backtrace, default false
|
# backtrace - whether to save any error backtrace, default false
|
||||||
|
# jid - job ID when retrying a job, will be automatically generated otherwise
|
||||||
#
|
#
|
||||||
# If class is set to the class name, the jobs' options will be based on Sidekiq's default
|
# If class is set to the class name, the jobs' options will be based on Sidekiq's default
|
||||||
# worker options. Otherwise, they will be based on the job class's options.
|
# worker options. Otherwise, they will be based on the job class's options.
|
||||||
|
@ -70,13 +71,7 @@ module Sidekiq
|
||||||
# push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
|
# push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
|
||||||
#
|
#
|
||||||
def push(item)
|
def push(item)
|
||||||
normed = normalize_item(item)
|
push_bulk(item.merge("args" => [item["args"]])).first
|
||||||
payload = process_single(item["class"], normed)
|
|
||||||
|
|
||||||
if payload
|
|
||||||
raw_push([payload])
|
|
||||||
payload["jid"]
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
##
|
##
|
||||||
|
@ -101,9 +96,12 @@ module Sidekiq
|
||||||
raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all? { |entry| entry.is_a?(Numeric) })
|
raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all? { |entry| entry.is_a?(Numeric) })
|
||||||
raise ArgumentError, "Job 'at' Array must have same size as 'args' Array" if at.is_a?(Array) && at.size != args.size
|
raise ArgumentError, "Job 'at' Array must have same size as 'args' Array" if at.is_a?(Array) && at.size != args.size
|
||||||
|
|
||||||
|
jid = items.delete("jid")
|
||||||
|
raise ArgumentError, "Explicitly passing 'jid' when pushing more than one job is not supported" if jid && args.size > 1
|
||||||
|
|
||||||
normed = normalize_item(items)
|
normed = normalize_item(items)
|
||||||
payloads = args.map.with_index { |job_args, index|
|
payloads = args.map.with_index { |job_args, index|
|
||||||
copy = normed.merge("args" => job_args, "jid" => SecureRandom.hex(12))
|
copy = normed.merge("args" => job_args, "jid" => jid || generate_jid)
|
||||||
copy["at"] = (at.is_a?(Array) ? at[index] : at) if at
|
copy["at"] = (at.is_a?(Array) ? at[index] : at) if at
|
||||||
|
|
||||||
result = process_single(items["class"], copy)
|
result = process_single(items["class"], copy)
|
||||||
|
|
|
@ -41,7 +41,6 @@ module Sidekiq
|
||||||
|
|
||||||
item["class"] = item["class"].to_s
|
item["class"] = item["class"].to_s
|
||||||
item["queue"] = item["queue"].to_s
|
item["queue"] = item["queue"].to_s
|
||||||
item["jid"] ||= SecureRandom.hex(12)
|
|
||||||
item["created_at"] ||= Time.now.to_f
|
item["created_at"] ||= Time.now.to_f
|
||||||
|
|
||||||
item
|
item
|
||||||
|
@ -61,5 +60,9 @@ module Sidekiq
|
||||||
def json_safe?(item)
|
def json_safe?(item)
|
||||||
JSON.parse(JSON.dump(item["args"])) == item["args"]
|
JSON.parse(JSON.dump(item["args"])) == item["args"]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def generate_jid
|
||||||
|
SecureRandom.hex(12)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -206,6 +206,7 @@ module Sidekiq
|
||||||
|
|
||||||
# validate and normalize payload
|
# validate and normalize payload
|
||||||
item = normalize_item(raw)
|
item = normalize_item(raw)
|
||||||
|
item["jid"] ||= generate_jid
|
||||||
queue = item["queue"]
|
queue = item["queue"]
|
||||||
|
|
||||||
# run client-side middleware
|
# run client-side middleware
|
||||||
|
@ -237,8 +238,7 @@ module Sidekiq
|
||||||
alias_method :perform_sync, :perform_inline
|
alias_method :perform_sync, :perform_inline
|
||||||
|
|
||||||
def perform_bulk(args, batch_size: 1_000)
|
def perform_bulk(args, batch_size: 1_000)
|
||||||
pool = Thread.current[:sidekiq_via_pool] || @klass.get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
client = @klass.build_client
|
||||||
client = Sidekiq::Client.new(pool)
|
|
||||||
result = args.each_slice(batch_size).flat_map do |slice|
|
result = args.each_slice(batch_size).flat_map do |slice|
|
||||||
client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
|
client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
|
||||||
end
|
end
|
||||||
|
@ -353,10 +353,13 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def client_push(item) # :nodoc:
|
def client_push(item) # :nodoc:
|
||||||
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
|
||||||
raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) }
|
raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) }
|
||||||
|
build_client.push_bulk(item.merge("args" => [item["args"]])).first
|
||||||
|
end
|
||||||
|
|
||||||
Sidekiq::Client.new(pool).push(item)
|
def build_client # :nodoc:
|
||||||
|
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
||||||
|
Sidekiq::Client.new(pool)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Reference in a new issue