1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/manager.rb

139 lines
3.5 KiB
Ruby
Raw Normal View History

2014-03-07 00:41:05 -05:00
# encoding: utf-8
# frozen_string_literal: true
require 'sidekiq/util'
require 'sidekiq/processor'
require 'sidekiq/fetch'
2015-10-07 12:42:10 -04:00
require 'thread'
2016-08-07 14:56:43 -04:00
require 'set'
2012-01-22 19:01:46 -05:00
2012-01-16 19:14:47 -05:00
module Sidekiq
##
2015-10-06 15:43:01 -04:00
# The Manager is the central coordination point in Sidekiq, controlling
2016-12-23 11:15:16 -05:00
# the lifecycle of the Processors.
2015-10-06 15:43:01 -04:00
#
# Tasks:
#
2015-10-09 18:33:42 -04:00
# 1. start: Spin up Processors.
# 3. processor_died: Handle job failure, throw away Processor, create new one.
# 4. quiet: shutdown idle Processors.
2015-10-06 15:43:01 -04:00
# 5. stop: hard stop the Processors by deadline.
#
2015-10-09 18:33:42 -04:00
# Note that only the last task requires its own Thread since it has to monitor
2015-10-06 15:43:01 -04:00
# the shutdown process. The other tasks are performed by other threads.
2012-01-16 19:14:47 -05:00
#
class Manager
2012-01-22 14:32:38 -05:00
include Util
attr_reader :workers
2015-10-08 12:37:37 -04:00
attr_reader :options
2015-10-08 12:37:37 -04:00
def initialize(options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
2012-01-22 14:32:38 -05:00
2012-01-22 19:01:46 -05:00
@done = false
@workers = Set.new
@count.times do
2015-10-08 12:37:37 -04:00
@workers << Processor.new(self)
2013-06-12 18:16:19 -04:00
end
2015-10-06 15:43:01 -04:00
@plock = Mutex.new
2012-01-22 14:32:38 -05:00
end
2015-10-06 15:43:01 -04:00
def start
@workers.each do |x|
2015-10-06 17:45:10 -04:00
x.start
end
2015-10-06 15:43:01 -04:00
end
2015-10-06 15:43:01 -04:00
def quiet
return if @done
@done = true
2012-01-22 14:32:38 -05:00
2015-10-06 15:43:01 -04:00
logger.info { "Terminating quiet workers" }
@workers.each { |x| x.terminate }
fire_event(:quiet, reverse: true)
2012-01-22 14:32:38 -05:00
end
# hack for quicker development / testing environment #2774
PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5
2015-10-06 15:43:01 -04:00
def stop(deadline)
quiet
fire_event(:shutdown, reverse: true)
# some of the shutdown events can be async,
# we don't have any way to know when they're done but
# give them a little time to take effect
sleep PAUSE_TIME
return if @workers.empty?
2015-10-06 15:43:01 -04:00
logger.info { "Pausing to allow workers to finish..." }
remaining = deadline - Time.now
while remaining > PAUSE_TIME
return if @workers.empty?
sleep PAUSE_TIME
2015-10-06 15:43:01 -04:00
remaining = deadline - Time.now
end
return if @workers.empty?
2012-01-16 19:18:36 -05:00
2015-10-06 15:43:01 -04:00
hard_shutdown
end
2015-10-08 12:48:28 -04:00
def processor_stopped(processor)
@plock.synchronize do
@workers.delete(processor)
end
end
def processor_died(processor, reason)
2015-10-06 15:43:01 -04:00
@plock.synchronize do
@workers.delete(processor)
2015-10-07 12:47:53 -04:00
unless @done
2015-10-08 12:37:37 -04:00
p = Processor.new(self)
@workers << p
2015-10-06 17:45:10 -04:00
p.start
end
2012-01-22 19:01:46 -05:00
end
end
def stopped?
@done
end
private
2012-01-22 14:32:38 -05:00
2015-10-06 15:43:01 -04:00
def hard_shutdown
# We've reached the timeout and we still have busy workers.
2015-10-06 17:45:10 -04:00
# They must die but their jobs shall live on.
cleanup = nil
@plock.synchronize do
cleanup = @workers.dup
2015-10-06 17:45:10 -04:00
end
2015-10-06 17:45:10 -04:00
if cleanup.size > 0
jobs = cleanup.map {|p| p.job }.compact
2015-10-06 17:45:10 -04:00
logger.warn { "Terminating #{cleanup.size} busy worker threads" }
logger.warn { "Work still in progress #{jobs.inspect}" }
2015-10-06 17:45:10 -04:00
# Re-enqueue unfinished jobs
# NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy.bulk_requeue(jobs, @options)
2015-10-06 17:45:10 -04:00
end
cleanup.each do |processor|
2015-10-06 15:43:01 -04:00
processor.kill
end
end
2012-01-16 19:14:47 -05:00
end
end