1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/manager.rb

174 lines
5.2 KiB
Ruby
Raw Normal View History

2012-01-22 19:01:46 -05:00
require 'celluloid'
require 'sidekiq/util'
require 'sidekiq/processor'
require 'sidekiq/fetch'
2012-01-22 19:01:46 -05:00
2012-01-16 19:14:47 -05:00
module Sidekiq
##
# The main router in the system. This
# manages the processor state and accepts messages
# from Redis to be dispatched to an idle processor.
2012-01-16 19:14:47 -05:00
#
class Manager
2012-01-22 14:32:38 -05:00
include Util
2012-01-16 23:02:58 -05:00
include Celluloid
2012-01-16 19:18:36 -05:00
trap_exit :processor_died
2012-01-22 14:32:38 -05:00
def initialize(options={})
logger.debug { options.inspect }
@count = options[:concurrency] || 25
2012-02-05 16:22:57 -05:00
@done_callback = nil
2012-01-22 14:32:38 -05:00
@in_progress = {}
2012-01-22 19:01:46 -05:00
@done = false
@busy = []
@fetcher = Fetcher.new(current_actor, options)
@ready = @count.times.map { Processor.new_link(current_actor) }
procline(options[:tag] ? "#{options[:tag]} " : '')
2012-01-22 14:32:38 -05:00
end
def stop(options={})
2012-03-25 22:52:15 -04:00
watchdog('Manager#stop died') do
shutdown = options[:shutdown]
timeout = options[:timeout]
2012-03-25 22:52:15 -04:00
@done = true
Sidekiq::Fetcher.done!
@fetcher.async.terminate if @fetcher.alive?
logger.info { "Shutting down #{@ready.size} quiet workers" }
2012-03-31 00:22:19 -04:00
@ready.each { |x| x.terminate if x.alive? }
2012-03-25 22:52:15 -04:00
@ready.clear
2012-01-22 14:32:38 -05:00
return after(0) { signal(:shutdown) } if @busy.empty?
logger.info { "Pausing up to #{timeout} seconds to allow workers to finish..." }
2012-04-07 22:33:32 -04:00
hard_shutdown_in timeout if shutdown
end
2012-01-22 14:32:38 -05:00
end
def start
2012-03-25 22:52:15 -04:00
@ready.each { dispatch }
2012-01-22 14:32:38 -05:00
end
2012-01-16 19:18:36 -05:00
def when_done(&blk)
@done_callback = blk
end
def processor_done(processor)
watchdog('Manager#processor_done died') do
@done_callback.call(processor) if @done_callback
@in_progress.delete(processor.object_id)
@busy.delete(processor)
if stopped?
2012-02-18 23:01:29 -05:00
processor.terminate if processor.alive?
signal(:shutdown) if @busy.empty?
else
2012-03-12 22:57:04 -04:00
@ready << processor if processor.alive?
end
dispatch
2012-01-16 23:02:58 -05:00
end
2012-01-16 19:18:36 -05:00
end
2012-01-16 23:02:58 -05:00
def processor_died(processor, reason)
watchdog("Manager#processor_died died") do
@in_progress.delete(processor.object_id)
@busy.delete(processor)
unless stopped?
@ready << Processor.new_link(current_actor)
dispatch
else
signal(:shutdown) if @busy.empty?
end
2012-01-22 19:01:46 -05:00
end
2012-01-22 14:32:38 -05:00
end
def assign(work)
watchdog("Manager#assign died") do
if stopped?
# Race condition between Manager#stop if Fetcher
# is blocked on redis and gets a message after
# all the ready Processors have been stopped.
# Push the message back to redis.
work.requeue
else
processor = @ready.pop
@in_progress[processor.object_id] = work
@busy << processor
processor.async.process(work)
end
2012-01-24 01:08:38 -05:00
end
end
private
2012-01-22 14:32:38 -05:00
def hard_shutdown_in(delay)
2012-04-07 22:33:32 -04:00
after(delay) do
watchdog("Manager#watch_for_shutdown died") do
# We've reached the timeout and we still have busy workers.
# They must die but their messages shall live on.
logger.info("Still waiting for #{@busy.size} busy workers")
Sidekiq.redis do |conn|
logger.debug { "Clearing workers in redis" }
workers = conn.smembers('workers')
workers_to_remove = workers.select do |worker_name|
worker_name =~ /:#{process_id}-/
end
conn.srem('workers', workers_to_remove)
# Re-enqueue terminated jobs
# NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
jobs_to_requeue = {}
@busy.each do |processor|
# processor is an actor proxy and we can't call any methods
# that would go to the actor (since it's busy). Instead
# we'll use the object_id to track the worker's data here.
unit_of_work = @in_progress[processor.object_id]
jobs_to_requeue[unit_of_work.queue] ||= []
jobs_to_requeue[unit_of_work.queue] << unit_of_work.message
end
jobs_to_requeue.each do |queue, jobs|
conn.rpush(queue, jobs)
end
# Lastly, terminate worker threads
@busy.each do |processor|
processor.terminate if processor.alive?
end
end
logger.info("Pushed #{@busy.size} messages back to Redis")
after(0) { signal(:shutdown) }
end
end
end
def dispatch
return if stopped?
# This is a safety check to ensure we haven't leaked
# processors somehow.
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
2012-03-25 22:52:15 -04:00
raise "No ready processor!?" if @ready.empty?
@fetcher.async.fetch
2012-01-22 14:32:38 -05:00
end
def stopped?
@done
end
def procline(tag)
$0 = "sidekiq #{Sidekiq::VERSION} #{tag}[#{@busy.size} of #{@count} busy]#{stopped? ? ' stopping' : ''}"
after(5) { procline(tag) }
end
2012-01-16 19:14:47 -05:00
end
end