2012-02-08 17:04:02 -08:00
require 'sidekiq/middleware/chain'
2012-01-21 16:42:21 -08:00
module Sidekiq
class Client
2012-02-10 20:20:01 -08:00
2012-02-19 13:02:32 -08:00
def self . default_middleware
Middleware :: Chain . new do | m |
end
end
2012-02-15 11:28:19 -08:00
def self . registered_workers
2012-03-14 09:56:13 -07:00
Sidekiq . redis { | x | x . smembers ( 'workers' ) }
2012-02-15 11:28:19 -08:00
end
def self . registered_queues
2012-03-14 09:56:13 -07:00
Sidekiq . redis { | x | x . smembers ( 'queues' ) }
2012-02-15 11:28:19 -08:00
end
2012-04-19 14:42:17 -07:00
##
# The main method used to push a job to Redis. Accepts a number of options:
#
# queue - the named queue to use, default 'default'
# class - the worker class to call, required
# args - an array of simple arguments to the perform method, must be JSON-serializable
# retry - whether to retry this job if it fails, true or false, default true
2012-04-27 20:25:46 -07:00
# backtrace - whether to save any error backtrace, default false
2012-04-19 14:42:17 -07:00
#
# All options must be strings, not symbols. NB: because we are serializing to JSON, all
# symbols in 'args' will be converted to strings.
#
2012-08-11 11:47:25 -07:00
# Returns nil if not pushed to Redis or a unique Job ID if pushed.
#
2012-04-19 14:42:17 -07:00
# Example:
# Sidekiq::Client.push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
#
2012-04-01 19:53:45 -07:00
def self . push ( item )
raise ( ArgumentError , " Message must be a Hash of the form: { 'class' => SomeWorker, 'args' => ['bob', 1, :foo => 'bar'] } " ) unless item . is_a? ( Hash )
2012-01-23 12:56:49 -08:00
raise ( ArgumentError , " Message must include a class and set of arguments: #{ item . inspect } " ) if ! item [ 'class' ] || ! item [ 'args' ]
2012-04-01 19:53:45 -07:00
raise ( ArgumentError , " Message must include a Sidekiq::Worker class, not class name: #{ item [ 'class' ] . ancestors . inspect } " ) if ! item [ 'class' ] . is_a? ( Class ) || ! item [ 'class' ] . respond_to? ( 'get_sidekiq_options' )
2012-01-21 16:42:21 -08:00
2012-04-01 19:53:45 -07:00
worker_class = item [ 'class' ]
item [ 'class' ] = item [ 'class' ] . to_s
2012-04-26 08:40:07 -07:00
2012-04-29 13:22:39 -07:00
item = worker_class . get_sidekiq_options . merge ( item )
item [ 'retry' ] = ! ! item [ 'retry' ]
queue = item [ 'queue' ]
2012-08-11 11:47:25 -07:00
item [ 'jid' ] = SecureRandom . base64
2012-02-10 20:20:01 -08:00
pushed = false
2012-04-01 19:53:45 -07:00
Sidekiq . client_middleware . invoke ( worker_class , item , queue ) do
2012-04-22 14:02:35 -07:00
payload = Sidekiq . dump_json ( item )
2012-03-14 09:56:13 -07:00
Sidekiq . redis do | conn |
2012-05-25 20:21:42 -07:00
if item [ 'at' ]
2012-06-20 16:24:23 -06:00
pushed = conn . zadd ( 'schedule' , item [ 'at' ] . to_s , payload )
2012-05-25 20:21:42 -07:00
else
_ , pushed = conn . multi do
conn . sadd ( 'queues' , queue )
conn . rpush ( " queue: #{ queue } " , payload )
end
2012-03-10 13:07:19 -08:00
end
end
2012-02-07 03:29:09 -08:00
end
2012-08-11 11:47:25 -07:00
pushed ? item [ 'jid' ] : nil
2012-01-21 16:42:21 -08:00
end
2012-04-01 19:53:45 -07:00
# Redis compatibility helper. Example usage:
2012-01-21 16:42:21 -08:00
#
# Sidekiq::Client.enqueue(MyWorker, 'foo', 1, :bat => 'bar')
#
2012-02-09 20:32:59 -08:00
# Messages are enqueued to the 'default' queue.
2012-01-21 16:42:21 -08:00
#
def self . enqueue ( klass , * args )
2012-05-01 14:20:35 -07:00
klass . perform_async ( * args )
2012-01-21 16:42:21 -08:00
end
end
end