2014-03-06 21:41:05 -08:00
|
|
|
# encoding: utf-8
|
2012-01-23 14:05:03 -08:00
|
|
|
require 'sidekiq/util'
|
2012-01-25 13:32:51 -08:00
|
|
|
require 'sidekiq/processor'
|
2012-03-24 13:28:18 -07:00
|
|
|
require 'sidekiq/fetch'
|
2012-01-22 16:01:46 -08:00
|
|
|
|
2012-01-16 16:14:47 -08:00
|
|
|
module Sidekiq
|
|
|
|
|
|
|
|
##
|
2015-10-06 12:43:01 -07:00
|
|
|
# The Manager is the central coordination point in Sidekiq, controlling
|
|
|
|
# the lifecycle of the Processors and feeding them jobs as necessary.
|
|
|
|
#
|
|
|
|
# Tasks:
|
|
|
|
#
|
|
|
|
# 1. start: Spin up Processors. Issue fetch requests for each.
|
|
|
|
# 2. processor_done: Handle job success, issue fetch request.
|
|
|
|
# 3. processor_died: Handle job failure, throw away Processor, issue fetch request.
|
|
|
|
# 4. quiet: shutdown idle Processors, ignore further fetch requests.
|
|
|
|
# 5. stop: hard stop the Processors by deadline.
|
|
|
|
#
|
|
|
|
# Note that only the last task requires a Thread since it has to monitor
|
|
|
|
# the shutdown process. The other tasks are performed by other threads.
|
2012-01-16 16:14:47 -08:00
|
|
|
#
|
2012-02-03 10:02:57 -08:00
|
|
|
class Manager
|
2012-01-22 11:32:38 -08:00
|
|
|
include Util
|
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
attr_writer :fetcher
|
|
|
|
attr_reader :in_progress
|
2013-05-10 20:43:53 -07:00
|
|
|
attr_reader :ready
|
|
|
|
|
2013-11-22 18:52:03 -05:00
|
|
|
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1
|
2013-11-21 06:09:30 -05:00
|
|
|
|
2014-11-18 10:50:42 -08:00
|
|
|
def initialize(condvar, options={})
|
2012-02-14 09:00:26 -08:00
|
|
|
logger.debug { options.inspect }
|
2013-11-23 12:53:39 -05:00
|
|
|
@options = options
|
2012-02-16 09:45:55 -08:00
|
|
|
@count = options[:concurrency] || 25
|
2015-08-12 10:16:06 -07:00
|
|
|
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
|
2014-11-18 10:50:42 -08:00
|
|
|
@finished = condvar
|
2012-01-22 11:32:38 -08:00
|
|
|
|
2012-04-06 20:53:03 -07:00
|
|
|
@in_progress = {}
|
2012-01-22 16:01:46 -08:00
|
|
|
@done = false
|
2015-10-06 12:43:01 -07:00
|
|
|
@ready = Array.new(@count) do
|
|
|
|
Processor.new(self)
|
2013-06-12 15:16:19 -07:00
|
|
|
end
|
2015-10-06 12:43:01 -07:00
|
|
|
@plock = Mutex.new
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
def start
|
2015-10-06 14:45:10 -07:00
|
|
|
@ready.each do |x|
|
|
|
|
x.start
|
|
|
|
dispatch
|
|
|
|
end
|
2015-10-06 12:43:01 -07:00
|
|
|
end
|
2012-03-08 20:58:51 -08:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
def quiet
|
|
|
|
return if @done
|
2012-04-06 20:53:03 -07:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
@done = true
|
2012-01-22 11:32:38 -08:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
logger.info { "Terminating quiet workers" }
|
2013-04-10 09:02:49 -07:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
@plock.synchronize do
|
|
|
|
@ready.each { |x| x.terminate }
|
|
|
|
@ready.clear
|
2012-02-17 21:33:17 -08:00
|
|
|
end
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
def stop(deadline)
|
|
|
|
quiet
|
2015-10-06 14:45:10 -07:00
|
|
|
return if @in_progress.empty?
|
2013-11-21 06:09:30 -05:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
logger.info { "Pausing to allow workers to finish..." }
|
|
|
|
remaining = deadline - Time.now
|
|
|
|
while remaining > 0.5
|
2015-10-06 14:45:10 -07:00
|
|
|
return if @in_progress.empty?
|
2015-10-06 12:43:01 -07:00
|
|
|
sleep 0.5
|
|
|
|
remaining = deadline - Time.now
|
|
|
|
end
|
2015-10-06 14:45:10 -07:00
|
|
|
return if @in_progress.empty?
|
2012-01-16 16:18:36 -08:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
hard_shutdown
|
2012-02-03 10:02:57 -08:00
|
|
|
end
|
|
|
|
|
2012-01-25 13:32:51 -08:00
|
|
|
def processor_done(processor)
|
2015-10-06 12:43:01 -07:00
|
|
|
@plock.synchronize do
|
|
|
|
@in_progress.delete(processor)
|
|
|
|
if @done
|
|
|
|
processor.terminate
|
|
|
|
#shutdown if @in_progress.empty?
|
2012-02-09 21:46:44 -08:00
|
|
|
else
|
2015-10-06 12:43:01 -07:00
|
|
|
@ready << processor
|
2012-02-09 21:46:44 -08:00
|
|
|
end
|
2012-01-16 20:02:58 -08:00
|
|
|
end
|
2015-10-06 12:43:01 -07:00
|
|
|
dispatch
|
2012-01-16 16:18:36 -08:00
|
|
|
end
|
2012-01-16 20:02:58 -08:00
|
|
|
|
2012-01-25 13:32:51 -08:00
|
|
|
def processor_died(processor, reason)
|
2015-10-06 12:43:01 -07:00
|
|
|
@plock.synchronize do
|
|
|
|
@in_progress.delete(processor)
|
|
|
|
if @done
|
|
|
|
#shutdown if @in_progress.empty?
|
2012-03-24 13:28:18 -07:00
|
|
|
else
|
2015-10-06 14:45:10 -07:00
|
|
|
p = Processor.new(self)
|
|
|
|
p.start
|
|
|
|
@ready << p
|
2012-03-24 13:28:18 -07:00
|
|
|
end
|
2012-01-22 16:01:46 -08:00
|
|
|
end
|
2015-10-06 12:43:01 -07:00
|
|
|
dispatch
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2013-01-05 21:17:08 -08:00
|
|
|
def assign(work)
|
2015-10-06 12:43:01 -07:00
|
|
|
if @done
|
|
|
|
# Race condition between Manager#stop if Fetcher
|
|
|
|
# is blocked on redis and gets a message after
|
|
|
|
# all the ready Processors have been stopped.
|
|
|
|
# Push the message back to redis.
|
|
|
|
work.requeue
|
|
|
|
else
|
|
|
|
processor = nil
|
|
|
|
@plock.synchronize do
|
2012-04-16 20:18:48 -07:00
|
|
|
processor = @ready.pop
|
2015-10-06 12:43:01 -07:00
|
|
|
@in_progress[processor] = work
|
2012-04-16 20:18:48 -07:00
|
|
|
end
|
2015-10-06 12:43:01 -07:00
|
|
|
processor.request_process(work)
|
2014-03-02 16:36:00 -08:00
|
|
|
end
|
2013-01-30 00:43:44 +08:00
|
|
|
end
|
|
|
|
|
2015-06-09 13:09:53 -07:00
|
|
|
def stopped?
|
|
|
|
@done
|
|
|
|
end
|
|
|
|
|
2012-03-24 13:28:18 -07:00
|
|
|
private
|
2012-01-22 11:32:38 -08:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
def hard_shutdown
|
|
|
|
# We've reached the timeout and we still have busy workers.
|
2015-10-06 14:45:10 -07:00
|
|
|
# They must die but their jobs shall live on.
|
|
|
|
cleanup = nil
|
|
|
|
@plock.synchronize do
|
|
|
|
cleanup = @in_progress.dup
|
|
|
|
end
|
2012-04-06 20:53:03 -07:00
|
|
|
|
2015-10-06 14:45:10 -07:00
|
|
|
if cleanup.size > 0
|
|
|
|
logger.warn { "Terminating #{cleanup.size} busy worker threads" }
|
|
|
|
logger.warn { "Work still in progress #{cleanup.values.inspect}" }
|
|
|
|
# Re-enqueue unfinished jobs
|
|
|
|
# NOTE: You may notice that we may push a job back to redis before
|
|
|
|
# the worker thread is terminated. This is ok because Sidekiq's
|
|
|
|
# contract says that jobs are run AT LEAST once. Process termination
|
|
|
|
# is delayed until we're certain the jobs are back in Redis because
|
|
|
|
# it is worse to lose a job than to run it twice.
|
|
|
|
Sidekiq::Fetcher.strategy.bulk_requeue(cleanup.values, @options)
|
|
|
|
end
|
2012-04-06 20:53:03 -07:00
|
|
|
|
2015-10-06 14:45:10 -07:00
|
|
|
cleanup.each do |processor, _|
|
2015-10-06 12:43:01 -07:00
|
|
|
processor.kill
|
2012-04-06 20:53:03 -07:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-03-24 13:28:18 -07:00
|
|
|
def dispatch
|
2015-10-06 12:43:01 -07:00
|
|
|
return if @done
|
|
|
|
# This is a safety check to ensure we haven't leaked processors somehow.
|
|
|
|
raise "BUG: No processors, cannot continue!" if @ready.empty? && @in_progress.empty?
|
2012-03-25 19:52:15 -07:00
|
|
|
raise "No ready processor!?" if @ready.empty?
|
2012-03-24 13:28:18 -07:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
@fetcher.request_job
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2012-01-16 16:14:47 -08:00
|
|
|
end
|
|
|
|
end
|