2015-12-31 15:33:35 -08:00
# frozen_string_literal: true
2019-04-01 18:20:41 +02:00
require " securerandom "
require " sidekiq/middleware/chain "
2021-11-16 10:15:14 -08:00
require " sidekiq/job_util "
2012-02-08 17:04:02 -08:00
2012-01-21 16:42:21 -08:00
module Sidekiq
class Client
2021-11-16 10:15:14 -08:00
include Sidekiq :: JobUtil
2013-10-23 21:47:57 -07:00
##
# Define client-side middleware:
#
# client = Sidekiq::Client.new
# client.middleware do |chain|
# chain.use MyClientMiddleware
# end
2022-03-03 12:37:25 -08:00
# client.push('class' => 'SomeJob', 'args' => [1,2,3])
2013-10-23 21:47:57 -07:00
#
# All client instances default to the globally-defined
# Sidekiq.client_middleware but you can change as necessary.
#
def middleware ( & block )
2020-12-16 11:07:31 -08:00
if block
2013-10-23 21:47:57 -07:00
@chain = @chain . dup
yield @chain
2012-02-19 13:02:32 -08:00
end
2013-10-23 21:47:57 -07:00
@chain
end
2014-03-26 20:58:45 -07:00
attr_accessor :redis_pool
2022-08-25 10:15:11 -07:00
# Sidekiq::Client is responsible for pushing job payloads to Redis.
# Requires the :pool or :config keyword argument.
#
# Sidekiq::Client.new(pool: Sidekiq::RedisConnection.create)
#
# Inside the Sidekiq process, you can reuse the configured resources:
#
# Sidekiq::Client.new(config: config)
#
# @param pool [ConnectionPool] explicit Redis pool to use
# @param config [Sidekiq::Config] use the pool and middleware from the given Sidekiq container
# @param chain [Sidekiq::Middleware::Chain] use the given middleware chain
def initialize ( * args , ** kwargs )
if args . size == 1 && kwargs . size == 0
warn " Sidekiq::Client.new(pool) is deprecated, please use Sidekiq::Client.new(pool: pool), #{ caller ( 0 .. 3 ) } "
# old calling method, accept 1 pool argument
@redis_pool = args [ 0 ]
@chain = Sidekiq . default_configuration . client_middleware
2022-08-30 11:51:23 -07:00
@config = Sidekiq . default_configuration
2022-08-25 10:15:11 -07:00
else
# new calling method: keyword arguments
2022-08-29 09:18:23 -07:00
@config = kwargs [ :config ] || Sidekiq . default_configuration
2022-08-30 11:51:23 -07:00
@redis_pool = kwargs [ :pool ] || Thread . current [ :sidekiq_redis_pool ] || @config & . redis_pool
2022-08-29 09:18:23 -07:00
@chain = kwargs [ :chain ] || @config & . client_middleware
2022-08-25 10:15:11 -07:00
raise ArgumentError , " No Redis pool available for Sidekiq::Client " unless @redis_pool
end
2014-03-24 13:20:16 -04:00
end
2013-10-23 21:47:57 -07:00
##
# The main method used to push a job to Redis. Accepts a number of options:
#
# queue - the named queue to use, default 'default'
2022-03-03 12:37:25 -08:00
# class - the job class to call, required
2013-10-23 21:47:57 -07:00
# args - an array of simple arguments to the perform method, must be JSON-serializable
2017-03-23 19:05:20 +01:00
# at - timestamp to schedule the job (optional), must be Numeric (e.g. Time.now.to_f)
2016-01-18 10:59:09 -05:00
# retry - whether to retry this job if it fails, default true or an integer number of retries
2013-10-23 21:47:57 -07:00
# backtrace - whether to save any error backtrace, default false
2017-08-11 15:43:06 -07:00
#
# If class is set to the class name, the jobs' options will be based on Sidekiq's default
2022-03-03 12:37:25 -08:00
# job options. Otherwise, they will be based on the job class's options.
2017-08-11 15:43:06 -07:00
#
2022-03-03 12:37:25 -08:00
# Any options valid for a job class's sidekiq_options are also available here.
2013-10-23 21:47:57 -07:00
#
# All options must be strings, not symbols. NB: because we are serializing to JSON, all
2016-01-18 10:59:09 -05:00
# symbols in 'args' will be converted to strings. Note that +backtrace: true+ can take quite a bit of
# space in Redis; a large volume of failing jobs can start Redis swapping if you aren't careful.
2013-10-23 21:47:57 -07:00
#
2015-01-26 13:17:54 -08:00
# Returns a unique Job ID. If middleware stops the job, nil will be returned instead.
2013-10-23 21:47:57 -07:00
#
# Example:
2022-03-03 12:37:25 -08:00
# push('queue' => 'my_queue', 'class' => MyJob, 'args' => ['foo', 1, :bat => 'bar'])
2013-10-23 21:47:57 -07:00
#
def push ( item )
2022-03-14 12:16:15 -07:00
normed = normalize_item ( item )
2022-05-04 10:39:16 -07:00
payload = middleware . invoke ( item [ " class " ] , normed , normed [ " queue " ] , @redis_pool ) do
2022-03-14 12:16:15 -07:00
normed
end
if payload
2022-03-17 10:42:33 -07:00
verify_json ( payload )
2022-03-14 12:16:15 -07:00
raw_push ( [ payload ] )
payload [ " jid " ]
end
2013-10-23 21:47:57 -07:00
end
##
2018-05-15 11:27:49 -07:00
# Push a large number of jobs to Redis. This method cuts out the redis
# network round trip latency. I wouldn't recommend pushing more than
# 1000 per call but YMMV based on network quality, size of job args, etc.
# A large number of jobs can cause a bit of Redis command processing latency.
2013-10-23 21:47:57 -07:00
#
2013-10-23 21:58:15 -07:00
# Takes the same arguments as #push except that args is expected to be
2013-10-23 21:47:57 -07:00
# an Array of Arrays. All other keys are duplicated for each job. Each job
# is run through the client middleware pipeline and each job gets its own Job ID
# as normal.
#
2015-01-26 12:43:12 -08:00
# Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less
# than the number given if the middleware stopped processing for one or more jobs.
2013-10-23 21:47:57 -07:00
def push_bulk ( items )
2020-05-26 23:31:22 +05:30
args = items [ " args " ]
2020-05-26 11:03:17 -07:00
raise ArgumentError , " Bulk arguments must be an Array of Arrays: [[1], [2]] " unless args . is_a? ( Array ) && args . all? ( Array )
2020-05-26 23:31:22 +05:30
return [ ] if args . empty? # no jobs to push
2016-03-16 09:26:07 -07:00
2019-09-04 22:21:48 +03:00
at = items . delete ( " at " )
2021-10-05 14:01:58 -07:00
raise ArgumentError , " Job 'at' must be a Numeric or an Array of Numeric timestamps " if at && ( Array ( at ) . empty? || ! Array ( at ) . all? { | entry | entry . is_a? ( Numeric ) } )
2020-06-18 16:33:13 +02:00
raise ArgumentError , " Job 'at' Array must have same size as 'args' Array " if at . is_a? ( Array ) && at . size != args . size
2019-09-04 22:21:48 +03:00
2022-02-16 02:14:45 +01:00
jid = items . delete ( " jid " )
raise ArgumentError , " Explicitly passing 'jid' when pushing more than one job is not supported " if jid && args . size > 1
2013-10-23 21:47:57 -07:00
normed = normalize_item ( items )
2020-05-26 23:31:22 +05:30
payloads = args . map . with_index { | job_args , index |
2022-03-14 12:16:15 -07:00
copy = normed . merge ( " args " = > job_args , " jid " = > SecureRandom . hex ( 12 ) )
2019-10-15 09:13:17 -07:00
copy [ " at " ] = ( at . is_a? ( Array ) ? at [ index ] : at ) if at
2022-05-04 10:39:16 -07:00
result = middleware . invoke ( items [ " class " ] , copy , copy [ " queue " ] , @redis_pool ) do
2022-03-17 10:42:33 -07:00
verify_json ( copy )
2022-03-14 12:16:15 -07:00
copy
end
2019-04-01 18:20:41 +02:00
result || nil
} . compact
raw_push ( payloads ) unless payloads . empty?
payloads . collect { | payload | payload [ " jid " ] }
2013-10-23 21:47:57 -07:00
end
2012-02-15 11:28:19 -08:00
2014-03-26 20:58:45 -07:00
# Allows sharding of jobs across any number of Redis instances. All jobs
# defined within the block will use the given Redis connection pool.
#
# pool = ConnectionPool.new { Redis.new }
# Sidekiq::Client.via(pool) do
2022-03-03 12:37:25 -08:00
# SomeJob.perform_async(1,2,3)
# SomeOtherJob.perform_async(1,2,3)
2014-03-26 20:58:45 -07:00
# end
#
# Generally this is only needed for very large Sidekiq installs processing
2016-01-18 10:59:09 -05:00
# thousands of jobs per second. I do not recommend sharding unless
# you cannot scale any other way (e.g. splitting your app into smaller apps).
2014-03-26 20:58:45 -07:00
def self . via ( pool )
raise ArgumentError , " No pool given " if pool . nil?
2022-08-30 11:51:23 -07:00
current_sidekiq_pool = Thread . current [ :sidekiq_redis_pool ]
Thread . current [ :sidekiq_redis_pool ] = pool
2014-03-26 20:58:45 -07:00
yield
ensure
2022-08-30 11:51:23 -07:00
Thread . current [ :sidekiq_redis_pool ] = current_sidekiq_pool
2014-03-26 20:58:45 -07:00
end
2013-10-23 21:47:57 -07:00
class << self
2013-03-24 17:42:43 -07:00
def push ( item )
2015-08-21 14:09:27 -04:00
new . push ( item )
2013-03-24 17:42:43 -07:00
end
2012-01-21 16:42:21 -08:00
2013-03-24 17:42:43 -07:00
def push_bulk ( items )
2015-08-21 14:09:27 -04:00
new . push_bulk ( items )
2013-03-24 17:42:43 -07:00
end
2012-07-17 16:48:54 +02:00
2013-09-29 14:24:25 -07:00
# Resque compatibility helpers. Note all helpers
2022-03-03 12:37:25 -08:00
# should go through Sidekiq::Job#client_push.
2013-03-24 17:42:43 -07:00
#
# Example usage:
2022-03-03 12:37:25 -08:00
# Sidekiq::Client.enqueue(MyJob, 'foo', 1, :bat => 'bar')
2013-03-24 17:42:43 -07:00
#
# Messages are enqueued to the 'default' queue.
#
def enqueue ( klass , * args )
2019-04-01 18:20:41 +02:00
klass . client_push ( " class " = > klass , " args " = > args )
2013-03-24 17:42:43 -07:00
end
2012-09-11 20:53:22 -07:00
2013-03-24 17:42:43 -07:00
# Example usage:
2022-03-03 12:37:25 -08:00
# Sidekiq::Client.enqueue_to(:queue_name, MyJob, 'foo', 1, :bat => 'bar')
2013-03-24 17:42:43 -07:00
#
def enqueue_to ( queue , klass , * args )
2019-04-01 18:20:41 +02:00
klass . client_push ( " queue " = > queue , " class " = > klass , " args " = > args )
2013-03-24 17:42:43 -07:00
end
2012-09-11 20:53:22 -07:00
2013-09-29 09:58:16 +02:00
# Example usage:
2022-03-03 12:37:25 -08:00
# Sidekiq::Client.enqueue_to_in(:queue_name, 3.minutes, MyJob, 'foo', 1, :bat => 'bar')
2013-09-29 09:58:16 +02:00
#
def enqueue_to_in ( queue , interval , klass , * args )
int = interval . to_f
now = Time . now . to_f
ts = ( int < 1_000_000_000 ? now + int : int )
2019-04-01 18:20:41 +02:00
item = { " class " = > klass , " args " = > args , " at " = > ts , " queue " = > queue }
item . delete ( " at " ) if ts < = now
2013-09-29 09:58:16 +02:00
klass . client_push ( item )
end
# Example usage:
2022-03-03 12:37:25 -08:00
# Sidekiq::Client.enqueue_in(3.minutes, MyJob, 'foo', 1, :bat => 'bar')
2013-09-29 09:58:16 +02:00
#
def enqueue_in ( interval , klass , * args )
2013-09-29 14:24:25 -07:00
klass . perform_in ( interval , * args )
2013-09-29 09:58:16 +02:00
end
2013-10-23 21:47:57 -07:00
end
2013-09-29 09:58:16 +02:00
2013-10-23 21:47:57 -07:00
private
def raw_push ( payloads )
2014-03-24 13:56:31 -04:00
@redis_pool . with do | conn |
2022-02-02 19:16:44 +01:00
retryable = true
begin
conn . pipelined do | pipeline |
atomic_push ( pipeline , payloads )
end
2022-08-25 10:15:11 -07:00
rescue RedisClient :: Error = > ex
2022-02-02 19:16:44 +01:00
# 2550 Failover can cause the server to become a replica, need
# to disconnect and reopen the socket to get back to the primary.
# 4495 Use the same logic if we have a "Not enough replicas" error from the primary
# 4985 Use the same logic when a blocking command is force-unblocked
# The retry logic is copied from sidekiq.rb
if retryable && ex . message =~ / READONLY|NOREPLICAS|UNBLOCKED /
2022-05-11 17:54:48 -07:00
conn . close
2022-02-02 19:16:44 +01:00
retryable = false
retry
end
raise
2012-10-25 20:55:32 -07:00
end
end
2014-08-31 14:04:19 -07:00
true
end
def atomic_push ( conn , payloads )
2019-10-15 09:13:17 -07:00
if payloads . first . key? ( " at " )
2022-06-13 15:27:57 +02:00
conn . zadd ( " schedule " , payloads . flat_map { | hash |
2019-04-01 18:20:41 +02:00
at = hash . delete ( " at " ) . to_s
2014-08-31 14:04:19 -07:00
[ at , Sidekiq . dump_json ( hash ) ]
2019-04-01 18:20:41 +02:00
} )
2014-08-31 14:04:19 -07:00
else
2019-04-27 21:04:46 +09:00
queue = payloads . first [ " queue " ]
2015-06-03 10:44:26 -07:00
now = Time . now . to_f
2019-04-01 18:20:41 +02:00
to_push = payloads . map { | entry |
2019-04-16 10:12:04 -07:00
entry [ " enqueued_at " ] = now
2015-06-03 12:45:35 +02:00
Sidekiq . dump_json ( entry )
2019-04-01 18:20:41 +02:00
}
2022-08-22 13:27:37 -07:00
conn . sadd ( " queues " , [ queue ] )
2019-04-27 21:04:46 +09:00
conn . lpush ( " queue: #{ queue } " , to_push )
2014-08-31 14:04:19 -07:00
end
2013-10-23 21:47:57 -07:00
end
2012-01-21 16:42:21 -08:00
end
end