mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Remove explicit freeze, which should not be necessary anymore with frozen_string_literal: true. Fixes #3759
This commit is contained in:
parent
abca42db3c
commit
7de6f4cc2f
14 changed files with 74 additions and 73 deletions
|
@ -2,6 +2,12 @@
|
||||||
|
|
||||||
[Sidekiq Changes](https://github.com/mperham/sidekiq/blob/master/Changes.md) | [Sidekiq Pro Changes](https://github.com/mperham/sidekiq/blob/master/Pro-Changes.md) | [Sidekiq Enterprise Changes](https://github.com/mperham/sidekiq/blob/master/Ent-Changes.md)
|
[Sidekiq Changes](https://github.com/mperham/sidekiq/blob/master/Changes.md) | [Sidekiq Pro Changes](https://github.com/mperham/sidekiq/blob/master/Pro-Changes.md) | [Sidekiq Enterprise Changes](https://github.com/mperham/sidekiq/blob/master/Ent-Changes.md)
|
||||||
|
|
||||||
|
HEAD
|
||||||
|
-----------
|
||||||
|
|
||||||
|
- Remove `freeze` calls on String constants. This is superfluous with Ruby
|
||||||
|
2.3+ and `frozen_string_literal: true`. [#3759]
|
||||||
|
|
||||||
5.1.1
|
5.1.1
|
||||||
-----------
|
-----------
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# encoding: utf-8
|
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
require 'sidekiq/version'
|
require 'sidekiq/version'
|
||||||
fail "Sidekiq #{Sidekiq::VERSION} does not support Ruby versions below 2.2.2." if RUBY_PLATFORM != 'java' && RUBY_VERSION < '2.2.2'
|
fail "Sidekiq #{Sidekiq::VERSION} does not support Ruby versions below 2.2.2." if RUBY_PLATFORM != 'java' && RUBY_VERSION < '2.2.2'
|
||||||
|
@ -12,7 +11,7 @@ require 'sidekiq/delay'
|
||||||
require 'json'
|
require 'json'
|
||||||
|
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
NAME = 'Sidekiq'.freeze
|
NAME = 'Sidekiq'
|
||||||
LICENSE = 'See LICENSE and the LGPL-3.0 for licensing details.'
|
LICENSE = 'See LICENSE and the LGPL-3.0 for licensing details.'
|
||||||
|
|
||||||
DEFAULTS = {
|
DEFAULTS = {
|
||||||
|
@ -48,7 +47,7 @@ module Sidekiq
|
||||||
"connected_clients" => "9999",
|
"connected_clients" => "9999",
|
||||||
"used_memory_human" => "9P",
|
"used_memory_human" => "9P",
|
||||||
"used_memory_peak_human" => "9P"
|
"used_memory_peak_human" => "9P"
|
||||||
}.freeze
|
}
|
||||||
|
|
||||||
def self.❨╯°□°❩╯︵┻━┻
|
def self.❨╯°□°❩╯︵┻━┻
|
||||||
puts "Calm down, yo."
|
puts "Calm down, yo."
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# encoding: utf-8
|
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
require 'sidekiq'
|
require 'sidekiq'
|
||||||
|
|
||||||
|
@ -51,21 +50,21 @@ module Sidekiq
|
||||||
def fetch_stats!
|
def fetch_stats!
|
||||||
pipe1_res = Sidekiq.redis do |conn|
|
pipe1_res = Sidekiq.redis do |conn|
|
||||||
conn.pipelined do
|
conn.pipelined do
|
||||||
conn.get('stat:processed'.freeze)
|
conn.get('stat:processed')
|
||||||
conn.get('stat:failed'.freeze)
|
conn.get('stat:failed')
|
||||||
conn.zcard('schedule'.freeze)
|
conn.zcard('schedule')
|
||||||
conn.zcard('retry'.freeze)
|
conn.zcard('retry')
|
||||||
conn.zcard('dead'.freeze)
|
conn.zcard('dead')
|
||||||
conn.scard('processes'.freeze)
|
conn.scard('processes')
|
||||||
conn.lrange('queue:default'.freeze, -1, -1)
|
conn.lrange('queue:default', -1, -1)
|
||||||
conn.smembers('processes'.freeze)
|
conn.smembers('processes')
|
||||||
conn.smembers('queues'.freeze)
|
conn.smembers('queues')
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
pipe2_res = Sidekiq.redis do |conn|
|
pipe2_res = Sidekiq.redis do |conn|
|
||||||
conn.pipelined do
|
conn.pipelined do
|
||||||
pipe1_res[7].each {|key| conn.hget(key, 'busy'.freeze) }
|
pipe1_res[7].each {|key| conn.hget(key, 'busy') }
|
||||||
pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
|
pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -77,7 +76,7 @@ module Sidekiq
|
||||||
default_queue_latency = if (entry = pipe1_res[6].first)
|
default_queue_latency = if (entry = pipe1_res[6].first)
|
||||||
job = Sidekiq.load_json(entry) rescue {}
|
job = Sidekiq.load_json(entry) rescue {}
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
thence = job['enqueued_at'.freeze] || now
|
thence = job['enqueued_at'] || now
|
||||||
now - thence
|
now - thence
|
||||||
else
|
else
|
||||||
0
|
0
|
||||||
|
@ -119,7 +118,7 @@ module Sidekiq
|
||||||
class Queues
|
class Queues
|
||||||
def lengths
|
def lengths
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
queues = conn.smembers('queues'.freeze)
|
queues = conn.smembers('queues')
|
||||||
|
|
||||||
lengths = conn.pipelined do
|
lengths = conn.pipelined do
|
||||||
queues.each do |queue|
|
queues.each do |queue|
|
||||||
|
@ -163,7 +162,7 @@ module Sidekiq
|
||||||
|
|
||||||
while i < @days_previous
|
while i < @days_previous
|
||||||
date = @start_date - i
|
date = @start_date - i
|
||||||
datestr = date.strftime("%Y-%m-%d".freeze)
|
datestr = date.strftime("%Y-%m-%d")
|
||||||
keys << "stat:#{stat}:#{datestr}"
|
keys << "stat:#{stat}:#{datestr}"
|
||||||
dates << datestr
|
dates << datestr
|
||||||
i += 1
|
i += 1
|
||||||
|
@ -204,7 +203,7 @@ module Sidekiq
|
||||||
# Return all known queues within Redis.
|
# Return all known queues within Redis.
|
||||||
#
|
#
|
||||||
def self.all
|
def self.all
|
||||||
Sidekiq.redis { |c| c.smembers('queues'.freeze) }.sort.map { |q| Sidekiq::Queue.new(q) }
|
Sidekiq.redis { |c| c.smembers('queues') }.sort.map { |q| Sidekiq::Queue.new(q) }
|
||||||
end
|
end
|
||||||
|
|
||||||
attr_reader :name
|
attr_reader :name
|
||||||
|
@ -273,7 +272,7 @@ module Sidekiq
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
conn.multi do
|
conn.multi do
|
||||||
conn.del(@rname)
|
conn.del(@rname)
|
||||||
conn.srem("queues".freeze, name)
|
conn.srem("queues", name)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -349,9 +348,9 @@ module Sidekiq
|
||||||
job_args
|
job_args
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
if self['encrypt'.freeze]
|
if self['encrypt']
|
||||||
# no point in showing 150+ bytes of random garbage
|
# no point in showing 150+ bytes of random garbage
|
||||||
args[-1] = '[encrypted data]'.freeze
|
args[-1] = '[encrypted data]'
|
||||||
end
|
end
|
||||||
args
|
args
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# encoding: utf-8
|
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
$stdout.sync = true
|
$stdout.sync = true
|
||||||
|
|
||||||
|
@ -17,7 +16,7 @@ module Sidekiq
|
||||||
include Singleton unless $TESTING
|
include Singleton unless $TESTING
|
||||||
|
|
||||||
PROCTITLES = [
|
PROCTITLES = [
|
||||||
proc { 'sidekiq'.freeze },
|
proc { 'sidekiq' },
|
||||||
proc { Sidekiq::VERSION },
|
proc { Sidekiq::VERSION },
|
||||||
proc { |me, data| data['tag'] },
|
proc { |me, data| data['tag'] },
|
||||||
proc { |me, data| "[#{Processor::WORKER_STATE.size} of #{data['concurrency']} busy]" },
|
proc { |me, data| "[#{Processor::WORKER_STATE.size} of #{data['concurrency']} busy]" },
|
||||||
|
|
|
@ -68,11 +68,11 @@ module Sidekiq
|
||||||
#
|
#
|
||||||
def push(item)
|
def push(item)
|
||||||
normed = normalize_item(item)
|
normed = normalize_item(item)
|
||||||
payload = process_single(item['class'.freeze], normed)
|
payload = process_single(item['class'], normed)
|
||||||
|
|
||||||
if payload
|
if payload
|
||||||
raw_push([payload])
|
raw_push([payload])
|
||||||
payload['jid'.freeze]
|
payload['jid']
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -89,19 +89,19 @@ module Sidekiq
|
||||||
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
|
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
|
||||||
# than the number given if the middleware stopped processing for one or more jobs.
|
# than the number given if the middleware stopped processing for one or more jobs.
|
||||||
def push_bulk(items)
|
def push_bulk(items)
|
||||||
arg = items['args'.freeze].first
|
arg = items['args'].first
|
||||||
return [] unless arg # no jobs to push
|
return [] unless arg # no jobs to push
|
||||||
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !arg.is_a?(Array)
|
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !arg.is_a?(Array)
|
||||||
|
|
||||||
normed = normalize_item(items)
|
normed = normalize_item(items)
|
||||||
payloads = items['args'.freeze].map do |args|
|
payloads = items['args'].map do |args|
|
||||||
copy = normed.merge('args'.freeze => args, 'jid'.freeze => SecureRandom.hex(12), 'enqueued_at'.freeze => Time.now.to_f)
|
copy = normed.merge('args' => args, 'jid' => SecureRandom.hex(12), 'enqueued_at' => Time.now.to_f)
|
||||||
result = process_single(items['class'.freeze], copy)
|
result = process_single(items['class'], copy)
|
||||||
result ? result : nil
|
result ? result : nil
|
||||||
end.compact
|
end.compact
|
||||||
|
|
||||||
raw_push(payloads) if !payloads.empty?
|
raw_push(payloads) if !payloads.empty?
|
||||||
payloads.collect { |payload| payload['jid'.freeze] }
|
payloads.collect { |payload| payload['jid'] }
|
||||||
end
|
end
|
||||||
|
|
||||||
# Allows sharding of jobs across any number of Redis instances. All jobs
|
# Allows sharding of jobs across any number of Redis instances. All jobs
|
||||||
|
@ -144,14 +144,14 @@ module Sidekiq
|
||||||
# Messages are enqueued to the 'default' queue.
|
# Messages are enqueued to the 'default' queue.
|
||||||
#
|
#
|
||||||
def enqueue(klass, *args)
|
def enqueue(klass, *args)
|
||||||
klass.client_push('class'.freeze => klass, 'args'.freeze => args)
|
klass.client_push('class' => klass, 'args' => args)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Example usage:
|
# Example usage:
|
||||||
# Sidekiq::Client.enqueue_to(:queue_name, MyWorker, 'foo', 1, :bat => 'bar')
|
# Sidekiq::Client.enqueue_to(:queue_name, MyWorker, 'foo', 1, :bat => 'bar')
|
||||||
#
|
#
|
||||||
def enqueue_to(queue, klass, *args)
|
def enqueue_to(queue, klass, *args)
|
||||||
klass.client_push('queue'.freeze => queue, 'class'.freeze => klass, 'args'.freeze => args)
|
klass.client_push('queue' => queue, 'class' => klass, 'args' => args)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Example usage:
|
# Example usage:
|
||||||
|
@ -162,8 +162,8 @@ module Sidekiq
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
ts = (int < 1_000_000_000 ? now + int : int)
|
ts = (int < 1_000_000_000 ? now + int : int)
|
||||||
|
|
||||||
item = { 'class'.freeze => klass, 'args'.freeze => args, 'at'.freeze => ts, 'queue'.freeze => queue }
|
item = { 'class' => klass, 'args' => args, 'at' => ts, 'queue' => queue }
|
||||||
item.delete('at'.freeze) if ts <= now
|
item.delete('at') if ts <= now
|
||||||
|
|
||||||
klass.client_push(item)
|
klass.client_push(item)
|
||||||
end
|
end
|
||||||
|
@ -188,25 +188,25 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def atomic_push(conn, payloads)
|
def atomic_push(conn, payloads)
|
||||||
if payloads.first['at'.freeze]
|
if payloads.first['at']
|
||||||
conn.zadd('schedule'.freeze, payloads.map do |hash|
|
conn.zadd('schedule', payloads.map do |hash|
|
||||||
at = hash.delete('at'.freeze).to_s
|
at = hash.delete('at').to_s
|
||||||
[at, Sidekiq.dump_json(hash)]
|
[at, Sidekiq.dump_json(hash)]
|
||||||
end)
|
end)
|
||||||
else
|
else
|
||||||
q = payloads.first['queue'.freeze]
|
q = payloads.first['queue']
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
to_push = payloads.map do |entry|
|
to_push = payloads.map do |entry|
|
||||||
entry['enqueued_at'.freeze] = now
|
entry['enqueued_at'] = now
|
||||||
Sidekiq.dump_json(entry)
|
Sidekiq.dump_json(entry)
|
||||||
end
|
end
|
||||||
conn.sadd('queues'.freeze, q)
|
conn.sadd('queues', q)
|
||||||
conn.lpush("queue:#{q}", to_push)
|
conn.lpush("queue:#{q}", to_push)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_single(worker_class, item)
|
def process_single(worker_class, item)
|
||||||
queue = item['queue'.freeze]
|
queue = item['queue']
|
||||||
|
|
||||||
middleware.invoke(worker_class, item, queue, @redis_pool) do
|
middleware.invoke(worker_class, item, queue, @redis_pool) do
|
||||||
item
|
item
|
||||||
|
@ -214,25 +214,25 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def normalize_item(item)
|
def normalize_item(item)
|
||||||
raise(ArgumentError, "Job must be a Hash with 'class' and 'args' keys: { 'class' => SomeWorker, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash) && item.has_key?('class'.freeze) && item.has_key?('args'.freeze)
|
raise(ArgumentError, "Job must be a Hash with 'class' and 'args' keys: { 'class' => SomeWorker, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash) && item.has_key?('class') && item.has_key?('args')
|
||||||
raise(ArgumentError, "Job args must be an Array") unless item['args'].is_a?(Array)
|
raise(ArgumentError, "Job args must be an Array") unless item['args'].is_a?(Array)
|
||||||
raise(ArgumentError, "Job class must be either a Class or String representation of the class name") unless item['class'.freeze].is_a?(Class) || item['class'.freeze].is_a?(String)
|
raise(ArgumentError, "Job class must be either a Class or String representation of the class name") unless item['class'].is_a?(Class) || item['class'].is_a?(String)
|
||||||
raise(ArgumentError, "Job 'at' must be a Numeric timestamp") if item.has_key?('at'.freeze) && !item['at'].is_a?(Numeric)
|
raise(ArgumentError, "Job 'at' must be a Numeric timestamp") if item.has_key?('at') && !item['at'].is_a?(Numeric)
|
||||||
#raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args']
|
#raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args']
|
||||||
|
|
||||||
normalized_hash(item['class'.freeze])
|
normalized_hash(item['class'])
|
||||||
.each{ |key, value| item[key] = value if item[key].nil? }
|
.each{ |key, value| item[key] = value if item[key].nil? }
|
||||||
|
|
||||||
item['class'.freeze] = item['class'.freeze].to_s
|
item['class'] = item['class'].to_s
|
||||||
item['queue'.freeze] = item['queue'.freeze].to_s
|
item['queue'] = item['queue'].to_s
|
||||||
item['jid'.freeze] ||= SecureRandom.hex(12)
|
item['jid'] ||= SecureRandom.hex(12)
|
||||||
item['created_at'.freeze] ||= Time.now.to_f
|
item['created_at'] ||= Time.now.to_f
|
||||||
item
|
item
|
||||||
end
|
end
|
||||||
|
|
||||||
def normalized_hash(item_class)
|
def normalized_hash(item_class)
|
||||||
if item_class.is_a?(Class)
|
if item_class.is_a?(Class)
|
||||||
raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item_class.ancestors.inspect}") if !item_class.respond_to?('get_sidekiq_options'.freeze)
|
raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item_class.ancestors.inspect}") if !item_class.respond_to?('get_sidekiq_options')
|
||||||
item_class.get_sidekiq_options
|
item_class.get_sidekiq_options
|
||||||
else
|
else
|
||||||
Sidekiq.default_worker_options
|
Sidekiq.default_worker_options
|
||||||
|
|
|
@ -13,7 +13,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def queue_name
|
def queue_name
|
||||||
queue.sub(/.*queue:/, ''.freeze)
|
queue.sub(/.*queue:/, '')
|
||||||
end
|
end
|
||||||
|
|
||||||
def requeue
|
def requeue
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# encoding: utf-8
|
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
require 'sidekiq/manager'
|
require 'sidekiq/manager'
|
||||||
require 'sidekiq/fetch'
|
require 'sidekiq/fetch'
|
||||||
|
@ -76,13 +75,13 @@ module Sidekiq
|
||||||
Processor::FAILURE.update {|curr| fails = curr; 0 }
|
Processor::FAILURE.update {|curr| fails = curr; 0 }
|
||||||
Processor::PROCESSED.update {|curr| procd = curr; 0 }
|
Processor::PROCESSED.update {|curr| procd = curr; 0 }
|
||||||
|
|
||||||
workers_key = "#{key}:workers".freeze
|
workers_key = "#{key}:workers"
|
||||||
nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze)
|
nowdate = Time.now.utc.strftime("%Y-%m-%d")
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
conn.multi do
|
conn.multi do
|
||||||
conn.incrby("stat:processed".freeze, procd)
|
conn.incrby("stat:processed", procd)
|
||||||
conn.incrby("stat:processed:#{nowdate}", procd)
|
conn.incrby("stat:processed:#{nowdate}", procd)
|
||||||
conn.incrby("stat:failed".freeze, fails)
|
conn.incrby("stat:failed", fails)
|
||||||
conn.incrby("stat:failed:#{nowdate}", fails)
|
conn.incrby("stat:failed:#{nowdate}", fails)
|
||||||
conn.del(workers_key)
|
conn.del(workers_key)
|
||||||
Processor::WORKER_STATE.each_pair do |tid, hash|
|
Processor::WORKER_STATE.each_pair do |tid, hash|
|
||||||
|
|
|
@ -33,9 +33,9 @@ module Sidekiq
|
||||||
def self.job_hash_context(job_hash)
|
def self.job_hash_context(job_hash)
|
||||||
# If we're using a wrapper class, like ActiveJob, use the "wrapped"
|
# If we're using a wrapper class, like ActiveJob, use the "wrapped"
|
||||||
# attribute to expose the underlying thing.
|
# attribute to expose the underlying thing.
|
||||||
klass = job_hash['wrapped'.freeze] || job_hash["class".freeze]
|
klass = job_hash['wrapped'] || job_hash["class"]
|
||||||
bid = job_hash['bid'.freeze]
|
bid = job_hash['bid']
|
||||||
"#{klass} JID-#{job_hash['jid'.freeze]}#{" BID-#{bid}" if bid}"
|
"#{klass} JID-#{job_hash['jid']}#{" BID-#{bid}" if bid}"
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.with_job_hash_context(job_hash, &block)
|
def self.with_job_hash_context(job_hash, &block)
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
# encoding: utf-8
|
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
require 'sidekiq/util'
|
require 'sidekiq/util'
|
||||||
require 'sidekiq/processor'
|
require 'sidekiq/processor'
|
||||||
|
|
|
@ -132,9 +132,9 @@ module Sidekiq
|
||||||
# the Reloader. It handles code loading, db connection management, etc.
|
# the Reloader. It handles code loading, db connection management, etc.
|
||||||
# Effectively this block denotes a "unit of work" to Rails.
|
# Effectively this block denotes a "unit of work" to Rails.
|
||||||
@reloader.call do
|
@reloader.call do
|
||||||
klass = constantize(job_hash['class'.freeze])
|
klass = constantize(job_hash['class'])
|
||||||
worker = klass.new
|
worker = klass.new
|
||||||
worker.jid = job_hash['jid'.freeze]
|
worker.jid = job_hash['jid']
|
||||||
@retrier.local(worker, pristine, queue) do
|
@retrier.local(worker, pristine, queue) do
|
||||||
yield worker
|
yield worker
|
||||||
end
|
end
|
||||||
|
@ -166,7 +166,7 @@ module Sidekiq
|
||||||
ack = true
|
ack = true
|
||||||
dispatch(job_hash, queue) do |worker|
|
dispatch(job_hash, queue) do |worker|
|
||||||
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
|
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
|
||||||
execute_job(worker, cloned(job_hash['args'.freeze]))
|
execute_job(worker, cloned(job_hash['args']))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
rescue Sidekiq::Shutdown
|
rescue Sidekiq::Shutdown
|
||||||
|
|
|
@ -70,7 +70,7 @@ module Sidekiq
|
||||||
opts.delete(:network_timeout)
|
opts.delete(:network_timeout)
|
||||||
end
|
end
|
||||||
|
|
||||||
opts[:driver] ||= 'ruby'.freeze
|
opts[:driver] ||= 'ruby'
|
||||||
|
|
||||||
# Issue #3303, redis-rb will silently retry an operation.
|
# Issue #3303, redis-rb will silently retry an operation.
|
||||||
# This can lead to duplicate jobs if Sidekiq::Client's LPUSH
|
# This can lead to duplicate jobs if Sidekiq::Client's LPUSH
|
||||||
|
|
|
@ -17,7 +17,7 @@ module Sidekiq
|
||||||
# We need to go through the list one at a time to reduce the risk of something
|
# We need to go through the list one at a time to reduce the risk of something
|
||||||
# going wrong between the time jobs are popped from the scheduled queue and when
|
# going wrong between the time jobs are popped from the scheduled queue and when
|
||||||
# they are pushed onto a work queue and losing the jobs.
|
# they are pushed onto a work queue and losing the jobs.
|
||||||
while job = conn.zrangebyscore(sorted_set, '-inf'.freeze, now, :limit => [0, 1]).first do
|
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
|
||||||
|
|
||||||
# Pop item off the queue and add it to the work queue. If the job can't be popped from
|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
|
||||||
# the queue, it's because another process already popped it so we can move on to the
|
# the queue, it's because another process already popped it so we can move on to the
|
||||||
|
|
|
@ -21,7 +21,7 @@ module Sidekiq
|
||||||
|
|
||||||
def safe_thread(name, &block)
|
def safe_thread(name, &block)
|
||||||
Thread.new do
|
Thread.new do
|
||||||
Thread.current['sidekiq_label'.freeze] = name
|
Thread.current['sidekiq_label'] = name
|
||||||
watchdog(name, &block)
|
watchdog(name, &block)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -47,7 +47,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform_async(*args)
|
def perform_async(*args)
|
||||||
@klass.client_push(@opts.merge('args'.freeze => args, 'class'.freeze => @klass))
|
@klass.client_push(@opts.merge('args' => args, 'class' => @klass))
|
||||||
end
|
end
|
||||||
|
|
||||||
# +interval+ must be a timestamp, numeric or something that acts
|
# +interval+ must be a timestamp, numeric or something that acts
|
||||||
|
@ -57,9 +57,9 @@ module Sidekiq
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
ts = (int < 1_000_000_000 ? now + int : int)
|
ts = (int < 1_000_000_000 ? now + int : int)
|
||||||
|
|
||||||
payload = @opts.merge('class'.freeze => @klass, 'args'.freeze => args, 'at'.freeze => ts)
|
payload = @opts.merge('class' => @klass, 'args' => args, 'at' => ts)
|
||||||
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||||
payload.delete('at'.freeze) if ts <= now
|
payload.delete('at') if ts <= now
|
||||||
@klass.client_push(payload)
|
@klass.client_push(payload)
|
||||||
end
|
end
|
||||||
alias_method :perform_at, :perform_in
|
alias_method :perform_at, :perform_in
|
||||||
|
@ -84,7 +84,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform_async(*args)
|
def perform_async(*args)
|
||||||
client_push('class'.freeze => self, 'args'.freeze => args)
|
client_push('class' => self, 'args' => args)
|
||||||
end
|
end
|
||||||
|
|
||||||
# +interval+ must be a timestamp, numeric or something that acts
|
# +interval+ must be a timestamp, numeric or something that acts
|
||||||
|
@ -94,10 +94,10 @@ module Sidekiq
|
||||||
now = Time.now.to_f
|
now = Time.now.to_f
|
||||||
ts = (int < 1_000_000_000 ? now + int : int)
|
ts = (int < 1_000_000_000 ? now + int : int)
|
||||||
|
|
||||||
item = { 'class'.freeze => self, 'args'.freeze => args, 'at'.freeze => ts }
|
item = { 'class' => self, 'args' => args, 'at' => ts }
|
||||||
|
|
||||||
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||||
item.delete('at'.freeze) if ts <= now
|
item.delete('at') if ts <= now
|
||||||
|
|
||||||
client_push(item)
|
client_push(item)
|
||||||
end
|
end
|
||||||
|
@ -134,7 +134,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def client_push(item) # :nodoc:
|
def client_push(item) # :nodoc:
|
||||||
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'.freeze] || Sidekiq.redis_pool
|
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
|
||||||
# stringify
|
# stringify
|
||||||
item.keys.each do |key|
|
item.keys.each do |key|
|
||||||
item[key.to_s] = item.delete(key)
|
item[key.to_s] = item.delete(key)
|
||||||
|
|
Loading…
Add table
Reference in a new issue