diff --git a/lib/sidekiq/extensions/generic_proxy.rb b/lib/sidekiq/extensions/generic_proxy.rb index 313dd48c..139c420c 100644 --- a/lib/sidekiq/extensions/generic_proxy.rb +++ b/lib/sidekiq/extensions/generic_proxy.rb @@ -10,7 +10,7 @@ module Sidekiq def initialize(performable, target, options = {}) @performable = performable @target = target - @opts = options + @opts = options.transform_keys(&:to_s) end def method_missing(name, *args) diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 4e0140a8..b262bd16 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -175,16 +175,18 @@ module Sidekiq def initialize(klass, opts) @klass = klass - @opts = opts + # NB: the internal hash always has stringified keys + @opts = opts.transform_keys(&:to_s) # ActiveJob compatibility - interval = @opts.delete(:wait_until) || @opts.delete(:wait) + interval = @opts.delete("wait_until") || @opts.delete("wait") at(interval) if interval end def set(options) - interval = options.delete(:wait_until) || options.delete(:wait) - @opts.merge!(options) + hash = options.transform_keys(&:to_s) + interval = hash.delete("wait_until") || @opts.delete("wait") + @opts.merge!(hash) at(interval) if interval self end @@ -200,7 +202,7 @@ module Sidekiq # Explicit inline execution of a job. Returns nil if the job did not # execute, true otherwise. def perform_inline(*args) - raw = @opts.merge("args" => args, "class" => @klass).transform_keys(&:to_s) + raw = @opts.merge("args" => args, "class" => @klass) # validate and normalize payload item = normalize_item(raw) @@ -235,11 +237,10 @@ module Sidekiq alias_method :perform_sync, :perform_inline def perform_bulk(args, batch_size: 1_000) - hash = @opts.transform_keys(&:to_s) pool = Thread.current[:sidekiq_via_pool] || @klass.get_sidekiq_options["pool"] || Sidekiq.redis_pool client = Sidekiq::Client.new(pool) result = args.each_slice(batch_size).flat_map do |slice| - client.push_bulk(hash.merge("class" => @klass, "args" => slice)) + client.push_bulk(@opts.merge("class" => @klass, "args" => slice)) end result.is_a?(Enumerator::Lazy) ? result.force : result @@ -353,9 +354,9 @@ module Sidekiq def client_push(item) # :nodoc: pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool - stringified_item = item.transform_keys(&:to_s) + raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) } - Sidekiq::Client.new(pool).push(stringified_item) + Sidekiq::Client.new(pool).push(item) end end end