mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Always use String keys with setter hash, fixes #5192
This commit is contained in:
parent
eab794f17c
commit
5bfaeea73a
2 changed files with 11 additions and 10 deletions
|
@ -10,7 +10,7 @@ module Sidekiq
|
||||||
def initialize(performable, target, options = {})
|
def initialize(performable, target, options = {})
|
||||||
@performable = performable
|
@performable = performable
|
||||||
@target = target
|
@target = target
|
||||||
@opts = options
|
@opts = options.transform_keys(&:to_s)
|
||||||
end
|
end
|
||||||
|
|
||||||
def method_missing(name, *args)
|
def method_missing(name, *args)
|
||||||
|
|
|
@ -175,16 +175,18 @@ module Sidekiq
|
||||||
|
|
||||||
def initialize(klass, opts)
|
def initialize(klass, opts)
|
||||||
@klass = klass
|
@klass = klass
|
||||||
@opts = opts
|
# NB: the internal hash always has stringified keys
|
||||||
|
@opts = opts.transform_keys(&:to_s)
|
||||||
|
|
||||||
# ActiveJob compatibility
|
# ActiveJob compatibility
|
||||||
interval = @opts.delete(:wait_until) || @opts.delete(:wait)
|
interval = @opts.delete("wait_until") || @opts.delete("wait")
|
||||||
at(interval) if interval
|
at(interval) if interval
|
||||||
end
|
end
|
||||||
|
|
||||||
def set(options)
|
def set(options)
|
||||||
interval = options.delete(:wait_until) || options.delete(:wait)
|
hash = options.transform_keys(&:to_s)
|
||||||
@opts.merge!(options)
|
interval = hash.delete("wait_until") || @opts.delete("wait")
|
||||||
|
@opts.merge!(hash)
|
||||||
at(interval) if interval
|
at(interval) if interval
|
||||||
self
|
self
|
||||||
end
|
end
|
||||||
|
@ -200,7 +202,7 @@ module Sidekiq
|
||||||
# Explicit inline execution of a job. Returns nil if the job did not
|
# Explicit inline execution of a job. Returns nil if the job did not
|
||||||
# execute, true otherwise.
|
# execute, true otherwise.
|
||||||
def perform_inline(*args)
|
def perform_inline(*args)
|
||||||
raw = @opts.merge("args" => args, "class" => @klass).transform_keys(&:to_s)
|
raw = @opts.merge("args" => args, "class" => @klass)
|
||||||
|
|
||||||
# validate and normalize payload
|
# validate and normalize payload
|
||||||
item = normalize_item(raw)
|
item = normalize_item(raw)
|
||||||
|
@ -235,11 +237,10 @@ module Sidekiq
|
||||||
alias_method :perform_sync, :perform_inline
|
alias_method :perform_sync, :perform_inline
|
||||||
|
|
||||||
def perform_bulk(args, batch_size: 1_000)
|
def perform_bulk(args, batch_size: 1_000)
|
||||||
hash = @opts.transform_keys(&:to_s)
|
|
||||||
pool = Thread.current[:sidekiq_via_pool] || @klass.get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
pool = Thread.current[:sidekiq_via_pool] || @klass.get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
||||||
client = Sidekiq::Client.new(pool)
|
client = Sidekiq::Client.new(pool)
|
||||||
result = args.each_slice(batch_size).flat_map do |slice|
|
result = args.each_slice(batch_size).flat_map do |slice|
|
||||||
client.push_bulk(hash.merge("class" => @klass, "args" => slice))
|
client.push_bulk(@opts.merge("class" => @klass, "args" => slice))
|
||||||
end
|
end
|
||||||
|
|
||||||
result.is_a?(Enumerator::Lazy) ? result.force : result
|
result.is_a?(Enumerator::Lazy) ? result.force : result
|
||||||
|
@ -353,9 +354,9 @@ module Sidekiq
|
||||||
|
|
||||||
def client_push(item) # :nodoc:
|
def client_push(item) # :nodoc:
|
||||||
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
||||||
stringified_item = item.transform_keys(&:to_s)
|
raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) }
|
||||||
|
|
||||||
Sidekiq::Client.new(pool).push(stringified_item)
|
Sidekiq::Client.new(pool).push(item)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Reference in a new issue