mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Freeze common strings to minimize allocations, #3435
This commit is contained in:
parent
2b38f0c4a8
commit
7385564bcd
4 changed files with 22 additions and 22 deletions
|
@ -12,7 +12,7 @@ require 'sidekiq/delay'
|
||||||
require 'json'
|
require 'json'
|
||||||
|
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
NAME = 'Sidekiq'
|
NAME = 'Sidekiq'.freeze
|
||||||
LICENSE = 'See LICENSE and the LGPL-3.0 for licensing details.'
|
LICENSE = 'See LICENSE and the LGPL-3.0 for licensing details.'
|
||||||
|
|
||||||
DEFAULTS = {
|
DEFAULTS = {
|
||||||
|
|
|
@ -63,11 +63,11 @@ module Sidekiq
|
||||||
#
|
#
|
||||||
def push(item)
|
def push(item)
|
||||||
normed = normalize_item(item)
|
normed = normalize_item(item)
|
||||||
payload = process_single(item['class'], normed)
|
payload = process_single(item['class'.freeze], normed)
|
||||||
|
|
||||||
if payload
|
if payload
|
||||||
raw_push([payload])
|
raw_push([payload])
|
||||||
payload['jid']
|
payload['jid'.freeze]
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -84,19 +84,19 @@ module Sidekiq
|
||||||
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
|
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
|
||||||
# than the number given if the middleware stopped processing for one or more jobs.
|
# than the number given if the middleware stopped processing for one or more jobs.
|
||||||
def push_bulk(items)
|
def push_bulk(items)
|
||||||
arg = items['args'].first
|
arg = items['args'.freeze].first
|
||||||
return [] unless arg # no jobs to push
|
return [] unless arg # no jobs to push
|
||||||
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !arg.is_a?(Array)
|
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !arg.is_a?(Array)
|
||||||
|
|
||||||
normed = normalize_item(items)
|
normed = normalize_item(items)
|
||||||
payloads = items['args'].map do |args|
|
payloads = items['args'.freeze].map do |args|
|
||||||
copy = normed.merge('args' => args, 'jid' => SecureRandom.hex(12), 'enqueued_at' => Time.now.to_f)
|
copy = normed.merge('args'.freeze => args, 'jid'.freeze => SecureRandom.hex(12), 'enqueued_at'.freeze => Time.now.to_f)
|
||||||
result = process_single(items['class'], copy)
|
result = process_single(items['class'.freeze], copy)
|
||||||
result ? result : nil
|
result ? result : nil
|
||||||
end.compact
|
end.compact
|
||||||
|
|
||||||
raw_push(payloads) if !payloads.empty?
|
raw_push(payloads) if !payloads.empty?
|
||||||
payloads.collect { |payload| payload['jid'] }
|
payloads.collect { |payload| payload['jid'.freeze] }
|
||||||
end
|
end
|
||||||
|
|
||||||
# Allows sharding of jobs across any number of Redis instances. All jobs
|
# Allows sharding of jobs across any number of Redis instances. All jobs
|
||||||
|
@ -140,14 +140,14 @@ module Sidekiq
|
||||||
# Messages are enqueued to the 'default' queue.
|
# Messages are enqueued to the 'default' queue.
|
||||||
#
|
#
|
||||||
def enqueue(klass, *args)
|
def enqueue(klass, *args)
|
||||||
klass.client_push('class' => klass, 'args' => args)
|
klass.client_push('class'.freeze => klass, 'args'.freeze => args)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Example usage:
|
# Example usage:
|
||||||
# Sidekiq::Client.enqueue_to(:queue_name, MyWorker, 'foo', 1, :bat => 'bar')
|
# Sidekiq::Client.enqueue_to(:queue_name, MyWorker, 'foo', 1, :bat => 'bar')
|
||||||
#
|
#
|
||||||
def enqueue_to(queue, klass, *args)
|
def enqueue_to(queue, klass, *args)
|
||||||
klass.client_push('queue' => queue, 'class' => klass, 'args' => args)
|
klass.client_push('queue'.freeze => queue, 'class'.freeze => klass, 'args'.freeze => args)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Example usage:
|
# Example usage:
|
||||||
|
@ -158,7 +158,7 @@ module Sidekiq
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
ts = (int < 1_000_000_000 ? now + int : int)
|
ts = (int < 1_000_000_000 ? now + int : int)
|
||||||
|
|
||||||
item = { 'class' => klass, 'args' => args, 'at' => ts, 'queue' => queue }
|
item = { 'class'.freeze => klass, 'args'.freeze => args, 'at'.freeze => ts, 'queue'.freeze => queue }
|
||||||
item.delete('at'.freeze) if ts <= now
|
item.delete('at'.freeze) if ts <= now
|
||||||
|
|
||||||
klass.client_push(item)
|
klass.client_push(item)
|
||||||
|
@ -184,13 +184,13 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def atomic_push(conn, payloads)
|
def atomic_push(conn, payloads)
|
||||||
if payloads.first['at']
|
if payloads.first['at'.freeze]
|
||||||
conn.zadd('schedule'.freeze, payloads.map do |hash|
|
conn.zadd('schedule'.freeze, payloads.map do |hash|
|
||||||
at = hash.delete('at'.freeze).to_s
|
at = hash.delete('at'.freeze).to_s
|
||||||
[at, Sidekiq.dump_json(hash)]
|
[at, Sidekiq.dump_json(hash)]
|
||||||
end)
|
end)
|
||||||
else
|
else
|
||||||
q = payloads.first['queue']
|
q = payloads.first['queue'.freeze]
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
to_push = payloads.map do |entry|
|
to_push = payloads.map do |entry|
|
||||||
entry['enqueued_at'.freeze] = now
|
entry['enqueued_at'.freeze] = now
|
||||||
|
@ -202,7 +202,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_single(worker_class, item)
|
def process_single(worker_class, item)
|
||||||
queue = item['queue']
|
queue = item['queue'.freeze]
|
||||||
|
|
||||||
middleware.invoke(worker_class, item, queue, @redis_pool) do
|
middleware.invoke(worker_class, item, queue, @redis_pool) do
|
||||||
item
|
item
|
||||||
|
|
|
@ -67,7 +67,7 @@ module Sidekiq
|
||||||
opts.delete(:network_timeout)
|
opts.delete(:network_timeout)
|
||||||
end
|
end
|
||||||
|
|
||||||
opts[:driver] ||= 'ruby'
|
opts[:driver] ||= 'ruby'.freeze
|
||||||
|
|
||||||
# Issue #3303, redis-rb will silently retry an operation.
|
# Issue #3303, redis-rb will silently retry an operation.
|
||||||
# This can lead to duplicate jobs if Sidekiq::Client's LPUSH
|
# This can lead to duplicate jobs if Sidekiq::Client's LPUSH
|
||||||
|
|
|
@ -48,7 +48,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform_async(*args)
|
def perform_async(*args)
|
||||||
@opts['class'].client_push(@opts.merge!('args' => args))
|
@opts['class'.freeze].client_push(@opts.merge!('args'.freeze => args))
|
||||||
end
|
end
|
||||||
|
|
||||||
# +interval+ must be a timestamp, numeric or something that acts
|
# +interval+ must be a timestamp, numeric or something that acts
|
||||||
|
@ -58,10 +58,10 @@ module Sidekiq
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
ts = (int < 1_000_000_000 ? now + int : int)
|
ts = (int < 1_000_000_000 ? now + int : int)
|
||||||
|
|
||||||
@opts.merge! 'args' => args, 'at' => ts
|
@opts.merge! 'args'.freeze => args, 'at'.freeze => ts
|
||||||
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||||
@opts.delete('at'.freeze) if ts <= now
|
@opts.delete('at'.freeze) if ts <= now
|
||||||
@opts['class'].client_push(@opts)
|
@opts['class'.freeze].client_push(@opts)
|
||||||
end
|
end
|
||||||
alias_method :perform_at, :perform_in
|
alias_method :perform_at, :perform_in
|
||||||
end
|
end
|
||||||
|
@ -81,11 +81,11 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def set(options)
|
def set(options)
|
||||||
Setter.new(options.merge!('class' => self))
|
Setter.new(options.merge!('class'.freeze => self))
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform_async(*args)
|
def perform_async(*args)
|
||||||
client_push('class' => self, 'args' => args)
|
client_push('class'.freeze => self, 'args'.freeze => args)
|
||||||
end
|
end
|
||||||
|
|
||||||
# +interval+ must be a timestamp, numeric or something that acts
|
# +interval+ must be a timestamp, numeric or something that acts
|
||||||
|
@ -95,7 +95,7 @@ module Sidekiq
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
ts = (int < 1_000_000_000 ? now + int : int)
|
ts = (int < 1_000_000_000 ? now + int : int)
|
||||||
|
|
||||||
item = { 'class' => self, 'args' => args, 'at' => ts }
|
item = { 'class'.freeze => self, 'args'.freeze => args, 'at'.freeze => ts }
|
||||||
|
|
||||||
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||||
item.delete('at'.freeze) if ts <= now
|
item.delete('at'.freeze) if ts <= now
|
||||||
|
@ -134,7 +134,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def client_push(item) # :nodoc:
|
def client_push(item) # :nodoc:
|
||||||
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
|
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'.freeze] || Sidekiq.redis_pool
|
||||||
hash = item.stringify_keys
|
hash = item.stringify_keys
|
||||||
Sidekiq::Client.new(pool).push(hash)
|
Sidekiq::Client.new(pool).push(hash)
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue