1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/manager.rb

169 lines
4.3 KiB
Ruby
Raw Normal View History

2014-03-07 00:41:05 -05:00
# encoding: utf-8
require 'sidekiq/util'
require 'sidekiq/processor'
require 'sidekiq/fetch'
2012-01-22 19:01:46 -05:00
2012-01-16 19:14:47 -05:00
module Sidekiq
##
2015-10-06 15:43:01 -04:00
# The Manager is the central coordination point in Sidekiq, controlling
# the lifecycle of the Processors and feeding them jobs as necessary.
#
# Tasks:
#
# 1. start: Spin up Processors. Issue fetch requests for each.
# 2. processor_done: Handle job success, issue fetch request.
# 3. processor_died: Handle job failure, throw away Processor, issue fetch request.
# 4. quiet: shutdown idle Processors, ignore further fetch requests.
# 5. stop: hard stop the Processors by deadline.
#
# Note that only the last task requires a Thread since it has to monitor
# the shutdown process. The other tasks are performed by other threads.
2012-01-16 19:14:47 -05:00
#
class Manager
2012-01-22 14:32:38 -05:00
include Util
2015-10-06 15:43:01 -04:00
attr_writer :fetcher
attr_reader :in_progress
attr_reader :ready
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1
def initialize(condvar, options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@finished = condvar
2012-01-22 14:32:38 -05:00
@in_progress = {}
2012-01-22 19:01:46 -05:00
@done = false
2015-10-06 15:43:01 -04:00
@ready = Array.new(@count) do
Processor.new(self)
2013-06-12 18:16:19 -04:00
end
2015-10-06 15:43:01 -04:00
@plock = Mutex.new
2012-01-22 14:32:38 -05:00
end
2015-10-06 15:43:01 -04:00
def start
@ready.each { |x| x.start; dispatch }
end
2015-10-06 15:43:01 -04:00
def quiet
return if @done
2015-10-06 15:43:01 -04:00
@done = true
2012-01-22 14:32:38 -05:00
2015-10-06 15:43:01 -04:00
logger.info { "Terminating quiet workers" }
2015-10-06 15:43:01 -04:00
@plock.synchronize do
@ready.each { |x| x.terminate }
@ready.clear
end
2012-01-22 14:32:38 -05:00
end
2015-10-06 15:43:01 -04:00
def stop(deadline)
quiet
return shutdown if @in_progress.empty?
2015-10-06 15:43:01 -04:00
logger.info { "Pausing to allow workers to finish..." }
remaining = deadline - Time.now
while remaining > 0.5
return shutdown if @in_progress.empty?
sleep 0.5
remaining = deadline - Time.now
end
return shutdown if @in_progress.empty?
2012-01-16 19:18:36 -05:00
2015-10-06 15:43:01 -04:00
hard_shutdown
end
def processor_done(processor)
2015-10-06 15:43:01 -04:00
@plock.synchronize do
@in_progress.delete(processor)
if @done
processor.terminate
#shutdown if @in_progress.empty?
else
2015-10-06 15:43:01 -04:00
@ready << processor
end
2012-01-16 23:02:58 -05:00
end
2015-10-06 15:43:01 -04:00
dispatch
2012-01-16 19:18:36 -05:00
end
2012-01-16 23:02:58 -05:00
def processor_died(processor, reason)
2015-10-06 15:43:01 -04:00
@plock.synchronize do
@in_progress.delete(processor)
if @done
#shutdown if @in_progress.empty?
else
2015-10-06 15:43:01 -04:00
@ready << Processor.new(self)
end
2012-01-22 19:01:46 -05:00
end
2015-10-06 15:43:01 -04:00
dispatch
2012-01-22 14:32:38 -05:00
end
def assign(work)
2015-10-06 15:43:01 -04:00
if @done
# Race condition between Manager#stop if Fetcher
# is blocked on redis and gets a message after
# all the ready Processors have been stopped.
# Push the message back to redis.
work.requeue
else
processor = nil
@plock.synchronize do
processor = @ready.pop
2015-10-06 15:43:01 -04:00
@in_progress[processor] = work
end
2015-10-06 15:43:01 -04:00
processor.request_process(work)
end
end
def stopped?
@done
end
private
2012-01-22 14:32:38 -05:00
2015-10-06 15:43:01 -04:00
def hard_shutdown
# We've reached the timeout and we still have busy workers.
# They must die but their messages shall live on.
logger.warn { "Terminating #{@in_progress.size} busy worker threads" }
logger.warn { "Work still in progress #{@in_progress.values.inspect}" }
2015-10-06 15:43:01 -04:00
requeue
2015-10-06 15:43:01 -04:00
@in_progress.each do |processor, _|
processor.kill
end
end
def dispatch
2015-10-06 15:43:01 -04:00
return if @done
# This is a safety check to ensure we haven't leaked processors somehow.
raise "BUG: No processors, cannot continue!" if @ready.empty? && @in_progress.empty?
2012-03-25 22:52:15 -04:00
raise "No ready processor!?" if @ready.empty?
2015-10-06 15:43:01 -04:00
@fetcher.request_job
2012-01-22 14:32:38 -05:00
end
def shutdown
2013-11-22 15:49:14 -05:00
requeue
end
def requeue
2015-10-06 15:43:01 -04:00
# Re-enqueue unfinished jobs
2013-11-22 15:45:30 -05:00
# NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
2015-10-06 15:43:01 -04:00
jobs = nil
@plock.synchronize do
jobs = @in_progress.values
end
Sidekiq::Fetcher.strategy.bulk_requeue(jobs, @options) if jobs.size > 0
2013-11-22 15:45:30 -05:00
end
2012-01-16 19:14:47 -05:00
end
end