2012-01-22 19:01:46 -05:00
|
|
|
require 'celluloid'
|
|
|
|
|
2012-01-23 17:05:03 -05:00
|
|
|
require 'sidekiq/util'
|
2012-01-25 16:32:51 -05:00
|
|
|
require 'sidekiq/processor'
|
2012-03-24 16:28:18 -04:00
|
|
|
require 'sidekiq/fetch'
|
2012-01-22 19:01:46 -05:00
|
|
|
|
2012-01-16 19:14:47 -05:00
|
|
|
module Sidekiq
|
|
|
|
|
|
|
|
##
|
2012-02-03 13:02:57 -05:00
|
|
|
# The main router in the system. This
|
2012-03-24 16:28:18 -04:00
|
|
|
# manages the processor state and accepts messages
|
2012-02-03 13:02:57 -05:00
|
|
|
# from Redis to be dispatched to an idle processor.
|
2012-01-16 19:14:47 -05:00
|
|
|
#
|
2012-02-03 13:02:57 -05:00
|
|
|
class Manager
|
2012-01-22 14:32:38 -05:00
|
|
|
include Util
|
2012-01-16 23:02:58 -05:00
|
|
|
include Celluloid
|
2012-01-16 19:18:36 -05:00
|
|
|
|
2012-01-25 16:32:51 -05:00
|
|
|
trap_exit :processor_died
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2012-02-11 16:14:03 -05:00
|
|
|
def initialize(options={})
|
2012-02-14 12:00:26 -05:00
|
|
|
logger.debug { options.inspect }
|
2012-02-16 12:45:55 -05:00
|
|
|
@count = options[:concurrency] || 25
|
2012-02-05 16:22:57 -05:00
|
|
|
@done_callback = nil
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2012-04-06 23:53:03 -04:00
|
|
|
@in_progress = {}
|
2012-01-22 19:01:46 -05:00
|
|
|
@done = false
|
|
|
|
@busy = []
|
2013-01-06 00:17:08 -05:00
|
|
|
@fetcher = Fetcher.new(current_actor, options)
|
2012-02-11 02:19:05 -05:00
|
|
|
@ready = @count.times.map { Processor.new_link(current_actor) }
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
2012-03-08 23:58:51 -05:00
|
|
|
def stop(options={})
|
2012-03-25 22:52:15 -04:00
|
|
|
watchdog('Manager#stop died') do
|
|
|
|
shutdown = options[:shutdown]
|
|
|
|
timeout = options[:timeout]
|
2012-03-08 23:58:51 -05:00
|
|
|
|
2012-03-25 22:52:15 -04:00
|
|
|
@done = true
|
2012-05-12 00:25:38 -04:00
|
|
|
Sidekiq::Fetcher.done!
|
2012-11-03 22:56:06 -04:00
|
|
|
@fetcher.async.terminate if @fetcher.alive?
|
2012-04-06 23:53:03 -04:00
|
|
|
|
|
|
|
logger.info { "Shutting down #{@ready.size} quiet workers" }
|
2012-03-31 00:22:19 -04:00
|
|
|
@ready.each { |x| x.terminate if x.alive? }
|
2012-03-25 22:52:15 -04:00
|
|
|
@ready.clear
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2013-04-10 12:02:49 -04:00
|
|
|
clear_worker_set
|
|
|
|
|
2012-04-06 23:53:03 -04:00
|
|
|
return after(0) { signal(:shutdown) } if @busy.empty?
|
2012-04-07 22:33:32 -04:00
|
|
|
hard_shutdown_in timeout if shutdown
|
2012-02-18 00:33:17 -05:00
|
|
|
end
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def start
|
2012-03-25 22:52:15 -04:00
|
|
|
@ready.each { dispatch }
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
2012-01-16 19:18:36 -05:00
|
|
|
|
2012-02-09 11:15:31 -05:00
|
|
|
def when_done(&blk)
|
|
|
|
@done_callback = blk
|
2012-02-03 13:02:57 -05:00
|
|
|
end
|
|
|
|
|
2012-01-25 16:32:51 -05:00
|
|
|
def processor_done(processor)
|
2012-03-24 16:28:18 -04:00
|
|
|
watchdog('Manager#processor_done died') do
|
2012-02-10 00:46:44 -05:00
|
|
|
@done_callback.call(processor) if @done_callback
|
2012-04-06 23:53:03 -04:00
|
|
|
@in_progress.delete(processor.object_id)
|
2012-02-10 00:46:44 -05:00
|
|
|
@busy.delete(processor)
|
|
|
|
if stopped?
|
2012-02-18 23:01:29 -05:00
|
|
|
processor.terminate if processor.alive?
|
2012-03-11 23:06:20 -04:00
|
|
|
signal(:shutdown) if @busy.empty?
|
2012-02-10 00:46:44 -05:00
|
|
|
else
|
2012-03-12 22:57:04 -04:00
|
|
|
@ready << processor if processor.alive?
|
2012-02-10 00:46:44 -05:00
|
|
|
end
|
|
|
|
dispatch
|
2012-01-16 23:02:58 -05:00
|
|
|
end
|
2012-01-16 19:18:36 -05:00
|
|
|
end
|
2012-01-16 23:02:58 -05:00
|
|
|
|
2012-01-25 16:32:51 -05:00
|
|
|
def processor_died(processor, reason)
|
2012-03-24 16:28:18 -04:00
|
|
|
watchdog("Manager#processor_died died") do
|
2012-04-06 23:53:03 -04:00
|
|
|
@in_progress.delete(processor.object_id)
|
2012-03-24 16:28:18 -04:00
|
|
|
@busy.delete(processor)
|
2012-01-23 17:05:03 -05:00
|
|
|
|
2012-03-24 16:28:18 -04:00
|
|
|
unless stopped?
|
|
|
|
@ready << Processor.new_link(current_actor)
|
|
|
|
dispatch
|
|
|
|
else
|
|
|
|
signal(:shutdown) if @busy.empty?
|
|
|
|
end
|
2012-01-22 19:01:46 -05:00
|
|
|
end
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
2013-01-06 00:17:08 -05:00
|
|
|
def assign(work)
|
2012-03-24 16:28:18 -04:00
|
|
|
watchdog("Manager#assign died") do
|
2012-04-16 23:18:48 -04:00
|
|
|
if stopped?
|
|
|
|
# Race condition between Manager#stop if Fetcher
|
|
|
|
# is blocked on redis and gets a message after
|
|
|
|
# all the ready Processors have been stopped.
|
|
|
|
# Push the message back to redis.
|
2013-01-06 00:17:08 -05:00
|
|
|
work.requeue
|
2012-04-16 23:18:48 -04:00
|
|
|
else
|
|
|
|
processor = @ready.pop
|
2013-01-06 00:17:08 -05:00
|
|
|
@in_progress[processor.object_id] = work
|
2012-04-16 23:18:48 -04:00
|
|
|
@busy << processor
|
2013-01-06 00:17:08 -05:00
|
|
|
processor.async.process(work)
|
2012-04-16 23:18:48 -04:00
|
|
|
end
|
2012-01-24 01:08:38 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2013-01-29 11:43:44 -05:00
|
|
|
def procline(tag)
|
|
|
|
"sidekiq #{Sidekiq::VERSION} #{tag}[#{@busy.size} of #{@count} busy]#{stopped? ? ' stopping' : ''}"
|
|
|
|
end
|
|
|
|
|
2012-03-24 16:28:18 -04:00
|
|
|
private
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2013-04-10 12:02:49 -04:00
|
|
|
def clear_worker_set
|
|
|
|
# Clearing workers in Redis
|
|
|
|
# NOTE: we do this before terminating worker threads because the
|
|
|
|
# process will likely receive a hard shutdown soon anyway, which
|
|
|
|
# means the threads will killed.
|
|
|
|
logger.debug { "Clearing workers in redis" }
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
workers = conn.smembers('workers')
|
|
|
|
workers_to_remove = workers.select do |worker_name|
|
|
|
|
worker_name =~ /:#{process_id}-/
|
|
|
|
end
|
|
|
|
conn.srem('workers', workers_to_remove) if !workers_to_remove.empty?
|
|
|
|
end
|
2013-05-10 20:19:23 -04:00
|
|
|
rescue => ex
|
|
|
|
Sidekiq.logger.warn("Unable to clear worker set while shutting down: #{ex.message}")
|
2013-04-10 12:02:49 -04:00
|
|
|
end
|
|
|
|
|
2012-04-06 23:53:03 -04:00
|
|
|
def hard_shutdown_in(delay)
|
2013-04-04 23:01:16 -04:00
|
|
|
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }
|
|
|
|
|
2012-04-07 22:33:32 -04:00
|
|
|
after(delay) do
|
2013-03-27 01:57:12 -04:00
|
|
|
watchdog("Manager#hard_shutdown_in died") do
|
2012-04-06 23:53:03 -04:00
|
|
|
# We've reached the timeout and we still have busy workers.
|
|
|
|
# They must die but their messages shall live on.
|
|
|
|
logger.info("Still waiting for #{@busy.size} busy workers")
|
|
|
|
|
2013-01-17 00:48:21 -05:00
|
|
|
# Re-enqueue terminated jobs
|
|
|
|
# NOTE: You may notice that we may push a job back to redis before
|
|
|
|
# the worker thread is terminated. This is ok because Sidekiq's
|
|
|
|
# contract says that jobs are run AT LEAST once. Process termination
|
|
|
|
# is delayed until we're certain the jobs are back in Redis because
|
|
|
|
# it is worse to lose a job than to run it twice.
|
2013-02-05 18:11:41 -05:00
|
|
|
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values)
|
2013-01-17 00:48:21 -05:00
|
|
|
|
|
|
|
logger.debug { "Terminating worker threads" }
|
|
|
|
@busy.each do |processor|
|
2013-03-27 01:56:49 -04:00
|
|
|
t = processor.bare_object.actual_work_thread
|
|
|
|
t.raise Shutdown if processor.alive?
|
2012-04-06 23:53:03 -04:00
|
|
|
end
|
|
|
|
|
|
|
|
after(0) { signal(:shutdown) }
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-03-24 16:28:18 -04:00
|
|
|
def dispatch
|
|
|
|
return if stopped?
|
|
|
|
# This is a safety check to ensure we haven't leaked
|
|
|
|
# processors somehow.
|
|
|
|
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
|
2012-03-25 22:52:15 -04:00
|
|
|
raise "No ready processor!?" if @ready.empty?
|
2012-03-24 16:28:18 -04:00
|
|
|
|
2012-11-03 22:56:06 -04:00
|
|
|
@fetcher.async.fetch
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def stopped?
|
|
|
|
@done
|
|
|
|
end
|
2012-01-16 19:14:47 -05:00
|
|
|
end
|
|
|
|
end
|