2012-01-22 19:01:46 -05:00
|
|
|
require 'celluloid'
|
|
|
|
require 'redis'
|
|
|
|
require 'multi_json'
|
|
|
|
|
2012-01-23 17:05:03 -05:00
|
|
|
require 'sidekiq/util'
|
2012-01-25 16:32:51 -05:00
|
|
|
require 'sidekiq/processor'
|
2012-01-22 19:01:46 -05:00
|
|
|
|
2012-01-16 19:14:47 -05:00
|
|
|
module Sidekiq
|
|
|
|
|
|
|
|
##
|
2012-02-03 13:02:57 -05:00
|
|
|
# The main router in the system. This
|
2012-01-25 16:32:51 -05:00
|
|
|
# manages the processor state and fetches messages
|
2012-02-03 13:02:57 -05:00
|
|
|
# from Redis to be dispatched to an idle processor.
|
2012-01-16 19:14:47 -05:00
|
|
|
#
|
2012-02-03 13:02:57 -05:00
|
|
|
class Manager
|
2012-01-22 14:32:38 -05:00
|
|
|
include Util
|
2012-01-16 23:02:58 -05:00
|
|
|
include Celluloid
|
2012-01-16 19:18:36 -05:00
|
|
|
|
2012-01-25 16:32:51 -05:00
|
|
|
trap_exit :processor_died
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2012-02-11 16:14:03 -05:00
|
|
|
def initialize(options={})
|
2012-02-14 12:00:26 -05:00
|
|
|
logger.info "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}"
|
|
|
|
logger.debug { options.inspect }
|
2012-02-16 12:45:55 -05:00
|
|
|
@count = options[:concurrency] || 25
|
2012-01-22 14:32:38 -05:00
|
|
|
@queues = options[:queues]
|
2012-02-05 16:22:57 -05:00
|
|
|
@done_callback = nil
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2012-01-22 19:01:46 -05:00
|
|
|
@done = false
|
|
|
|
@busy = []
|
2012-02-11 02:19:05 -05:00
|
|
|
@ready = @count.times.map { Processor.new_link(current_actor) }
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def stop
|
|
|
|
@done = true
|
|
|
|
@ready.each(&:terminate)
|
|
|
|
@ready.clear
|
|
|
|
|
2012-01-23 17:05:03 -05:00
|
|
|
after(5) do
|
|
|
|
signal(:shutdown)
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
2012-02-11 16:14:03 -05:00
|
|
|
|
|
|
|
redis.with_connection do |conn|
|
2012-02-15 12:31:00 -05:00
|
|
|
workers = conn.smembers('workers')
|
|
|
|
workers.each do |name|
|
2012-02-15 15:30:31 -05:00
|
|
|
conn.srem('workers', name) if name =~ /:#{process_id}-/
|
2012-02-11 16:14:03 -05:00
|
|
|
end
|
|
|
|
end
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def start
|
2012-01-24 01:08:38 -05:00
|
|
|
dispatch(true)
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
2012-01-16 19:18:36 -05:00
|
|
|
|
2012-02-09 11:15:31 -05:00
|
|
|
def when_done(&blk)
|
|
|
|
@done_callback = blk
|
2012-02-03 13:02:57 -05:00
|
|
|
end
|
|
|
|
|
2012-01-25 16:32:51 -05:00
|
|
|
def processor_done(processor)
|
2012-02-10 00:46:44 -05:00
|
|
|
watchdog('sidekiq processor_done crashed!') do
|
|
|
|
@done_callback.call(processor) if @done_callback
|
|
|
|
@busy.delete(processor)
|
|
|
|
if stopped?
|
|
|
|
processor.terminate
|
|
|
|
else
|
|
|
|
@ready << processor
|
|
|
|
end
|
|
|
|
dispatch
|
2012-01-16 23:02:58 -05:00
|
|
|
end
|
2012-01-16 19:18:36 -05:00
|
|
|
end
|
2012-01-16 23:02:58 -05:00
|
|
|
|
2012-01-25 16:32:51 -05:00
|
|
|
def processor_died(processor, reason)
|
|
|
|
@busy.delete(processor)
|
2012-01-23 17:05:03 -05:00
|
|
|
|
2012-01-22 19:01:46 -05:00
|
|
|
unless stopped?
|
2012-01-25 16:32:51 -05:00
|
|
|
@ready << Processor.new_link(current_actor)
|
2012-01-22 19:01:46 -05:00
|
|
|
dispatch
|
|
|
|
end
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
2012-01-22 19:01:46 -05:00
|
|
|
private
|
|
|
|
|
2012-02-11 02:19:05 -05:00
|
|
|
def find_work(queue)
|
2012-02-11 16:14:03 -05:00
|
|
|
msg = redis.lpop("queue:#{queue}")
|
2012-01-24 01:08:38 -05:00
|
|
|
if msg
|
2012-01-25 16:32:51 -05:00
|
|
|
processor = @ready.pop
|
|
|
|
@busy << processor
|
2012-02-11 02:21:03 -05:00
|
|
|
processor.process!(MultiJson.decode(msg), queue)
|
2012-01-24 01:08:38 -05:00
|
|
|
end
|
2012-02-03 13:02:57 -05:00
|
|
|
!!msg
|
2012-01-24 01:08:38 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def dispatch(schedule = false)
|
2012-01-22 14:32:38 -05:00
|
|
|
watchdog("Fatal error in sidekiq, dispatch loop died") do
|
|
|
|
return if stopped?
|
|
|
|
|
2012-02-11 02:19:05 -05:00
|
|
|
# Dispatch loop
|
2012-01-22 14:32:38 -05:00
|
|
|
loop do
|
2012-02-14 12:00:26 -05:00
|
|
|
break logger.debug('no processors') if @ready.empty?
|
2012-02-11 02:19:05 -05:00
|
|
|
found = false
|
|
|
|
@ready.size.times do
|
|
|
|
found ||= find_work(@queues.sample)
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
2012-02-14 12:00:26 -05:00
|
|
|
break logger.debug('nothing to process') unless found
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
2012-02-03 13:02:57 -05:00
|
|
|
# This is the polling loop that ensures we check Redis every
|
|
|
|
# second for work, even if there was nothing to do this time
|
|
|
|
# around.
|
2012-02-14 12:00:26 -05:00
|
|
|
after(1) do
|
|
|
|
dispatch(schedule)
|
|
|
|
end if schedule
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def stopped?
|
|
|
|
@done
|
|
|
|
end
|
2012-01-16 19:14:47 -05:00
|
|
|
end
|
|
|
|
end
|