1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/manager.rb

142 lines
3.4 KiB
Ruby
Raw Normal View History

2012-01-22 19:01:46 -05:00
require 'celluloid'
require 'redis'
require 'multi_json'
require 'sidekiq/util'
require 'sidekiq/processor'
require 'connection_pool/version'
2012-01-22 19:01:46 -05:00
2012-01-16 19:14:47 -05:00
module Sidekiq
##
# The main router in the system. This
# manages the processor state and fetches messages
# from Redis to be dispatched to an idle processor.
2012-01-16 19:14:47 -05:00
#
class Manager
2012-01-22 14:32:38 -05:00
include Util
2012-01-16 23:02:58 -05:00
include Celluloid
2012-01-16 19:18:36 -05:00
trap_exit :processor_died
2012-01-22 14:32:38 -05:00
def initialize(options={})
logger.info "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis {|x| x.client.location}}"
logger.info "Running in #{RUBY_DESCRIPTION}"
logger.debug { options.inspect }
@count = options[:concurrency] || 25
2012-01-22 14:32:38 -05:00
@queues = options[:queues]
2012-02-05 16:22:57 -05:00
@done_callback = nil
2012-01-22 14:32:38 -05:00
2012-01-22 19:01:46 -05:00
@done = false
@busy = []
@ready = @count.times.map { Processor.new_link(current_actor) }
2012-01-22 14:32:38 -05:00
end
def stop(options={})
shutdown = options[:shutdown]
timeout = options[:timeout]
2012-01-22 14:32:38 -05:00
@done = true
2012-03-12 22:57:04 -04:00
@ready.each { |x| x.terminate if x.alive? }
2012-01-22 14:32:38 -05:00
@ready.clear
redis do |conn|
2012-02-15 12:31:00 -05:00
workers = conn.smembers('workers')
workers.each do |name|
conn.srem('workers', name) if name =~ /:#{process_id}-/
end
end
if shutdown
if @busy.empty?
# after(0) needed to avoid deadlock in Celluoid after USR1 + TERM
return after(0) { signal(:shutdown) }
else
logger.info { "Pausing #{timeout} seconds to allow workers to finish..." }
end
after(timeout) do
2012-03-12 22:57:04 -04:00
@busy.each { |x| x.terminate if x.alive? }
signal(:shutdown)
end
end
2012-01-22 14:32:38 -05:00
end
def start
2012-01-24 01:08:38 -05:00
dispatch(true)
2012-01-22 14:32:38 -05:00
end
2012-01-16 19:18:36 -05:00
def when_done(&blk)
@done_callback = blk
end
def processor_done(processor)
watchdog('sidekiq processor_done crashed!') do
@done_callback.call(processor) if @done_callback
@busy.delete(processor)
if stopped?
2012-02-18 23:01:29 -05:00
processor.terminate if processor.alive?
signal(:shutdown) if @busy.empty?
else
2012-03-12 22:57:04 -04:00
@ready << processor if processor.alive?
end
dispatch
2012-01-16 23:02:58 -05:00
end
2012-01-16 19:18:36 -05:00
end
2012-01-16 23:02:58 -05:00
def processor_died(processor, reason)
@busy.delete(processor)
2012-01-22 19:01:46 -05:00
unless stopped?
@ready << Processor.new_link(current_actor)
2012-01-22 19:01:46 -05:00
dispatch
else
signal(:shutdown) if @busy.empty?
2012-01-22 19:01:46 -05:00
end
2012-01-22 14:32:38 -05:00
end
2012-01-22 19:01:46 -05:00
private
def find_work(queue)
msg = redis { |x| x.lpop("queue:#{queue}") }
2012-01-24 01:08:38 -05:00
if msg
processor = @ready.pop
@busy << processor
2012-02-11 02:21:03 -05:00
processor.process!(MultiJson.decode(msg), queue)
2012-01-24 01:08:38 -05:00
end
!!msg
2012-01-24 01:08:38 -05:00
end
def dispatch(schedule = false)
2012-01-22 14:32:38 -05:00
watchdog("Fatal error in sidekiq, dispatch loop died") do
return if stopped?
2012-03-12 22:57:04 -04:00
# This is a safety check to ensure we haven't leaked
# processors somehow.
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
# Dispatch loop
2012-01-22 14:32:38 -05:00
loop do
break logger.debug('no processors') if @ready.empty?
found = false
@ready.size.times do
found ||= find_work(@queues.sample)
2012-01-22 14:32:38 -05:00
end
break logger.debug('nothing to process') unless found
2012-01-22 14:32:38 -05:00
end
# This is the polling loop that ensures we check Redis every
# second for work, even if there was nothing to do this time
# around.
after(1) do
dispatch(schedule)
end if schedule
2012-01-22 14:32:38 -05:00
end
end
def stopped?
@done
end
2012-01-16 19:14:47 -05:00
end
end