2016-11-22 23:39:00 -05:00
|
|
|
# frozen_string_literal: true
|
2012-01-23 17:05:03 -05:00
|
|
|
require 'sidekiq/util'
|
2012-01-25 16:32:51 -05:00
|
|
|
require 'sidekiq/processor'
|
2012-03-24 16:28:18 -04:00
|
|
|
require 'sidekiq/fetch'
|
2015-10-07 12:42:10 -04:00
|
|
|
require 'thread'
|
2016-08-07 14:56:43 -04:00
|
|
|
require 'set'
|
2012-01-22 19:01:46 -05:00
|
|
|
|
2012-01-16 19:14:47 -05:00
|
|
|
module Sidekiq
|
|
|
|
|
|
|
|
##
|
2015-10-06 15:43:01 -04:00
|
|
|
# The Manager is the central coordination point in Sidekiq, controlling
|
2016-12-23 11:15:16 -05:00
|
|
|
# the lifecycle of the Processors.
|
2015-10-06 15:43:01 -04:00
|
|
|
#
|
|
|
|
# Tasks:
|
|
|
|
#
|
2015-10-09 18:33:42 -04:00
|
|
|
# 1. start: Spin up Processors.
|
|
|
|
# 3. processor_died: Handle job failure, throw away Processor, create new one.
|
|
|
|
# 4. quiet: shutdown idle Processors.
|
2015-10-06 15:43:01 -04:00
|
|
|
# 5. stop: hard stop the Processors by deadline.
|
|
|
|
#
|
2015-10-09 18:33:42 -04:00
|
|
|
# Note that only the last task requires its own Thread since it has to monitor
|
2015-10-06 15:43:01 -04:00
|
|
|
# the shutdown process. The other tasks are performed by other threads.
|
2012-01-16 19:14:47 -05:00
|
|
|
#
|
2012-02-03 13:02:57 -05:00
|
|
|
class Manager
|
2012-01-22 14:32:38 -05:00
|
|
|
include Util
|
|
|
|
|
2015-10-07 15:21:10 -04:00
|
|
|
attr_reader :workers
|
2015-10-08 12:37:37 -04:00
|
|
|
attr_reader :options
|
2013-05-10 23:43:53 -04:00
|
|
|
|
2015-10-08 12:37:37 -04:00
|
|
|
def initialize(options={})
|
2012-02-14 12:00:26 -05:00
|
|
|
logger.debug { options.inspect }
|
2013-11-23 12:53:39 -05:00
|
|
|
@options = options
|
2018-09-17 13:10:27 -04:00
|
|
|
@count = options[:concurrency] || 10
|
2015-08-12 13:16:06 -04:00
|
|
|
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2012-01-22 19:01:46 -05:00
|
|
|
@done = false
|
2015-10-07 15:21:10 -04:00
|
|
|
@workers = Set.new
|
|
|
|
@count.times do
|
2015-10-08 12:37:37 -04:00
|
|
|
@workers << Processor.new(self)
|
2013-06-12 18:16:19 -04:00
|
|
|
end
|
2015-10-06 15:43:01 -04:00
|
|
|
@plock = Mutex.new
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
def start
|
2015-10-07 15:21:10 -04:00
|
|
|
@workers.each do |x|
|
2015-10-06 17:45:10 -04:00
|
|
|
x.start
|
2015-10-07 11:37:51 -04:00
|
|
|
end
|
2015-10-06 15:43:01 -04:00
|
|
|
end
|
2012-03-08 23:58:51 -05:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
def quiet
|
|
|
|
return if @done
|
|
|
|
@done = true
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
logger.info { "Terminating quiet workers" }
|
2015-10-07 15:21:10 -04:00
|
|
|
@workers.each { |x| x.terminate }
|
2018-01-11 12:37:55 -05:00
|
|
|
fire_event(:quiet, reverse: true)
|
2012-01-22 14:32:38 -05:00
|
|
|
end
|
|
|
|
|
2016-01-19 12:14:27 -05:00
|
|
|
# hack for quicker development / testing environment #2774
|
|
|
|
PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5
|
2016-01-06 13:00:18 -05:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
def stop(deadline)
|
|
|
|
quiet
|
2018-01-11 12:37:55 -05:00
|
|
|
fire_event(:shutdown, reverse: true)
|
2015-10-30 17:50:44 -04:00
|
|
|
|
|
|
|
# some of the shutdown events can be async,
|
|
|
|
# we don't have any way to know when they're done but
|
|
|
|
# give them a little time to take effect
|
2016-01-06 13:00:18 -05:00
|
|
|
sleep PAUSE_TIME
|
2015-10-07 15:21:10 -04:00
|
|
|
return if @workers.empty?
|
2013-11-21 06:09:30 -05:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
logger.info { "Pausing to allow workers to finish..." }
|
|
|
|
remaining = deadline - Time.now
|
2016-01-06 13:00:18 -05:00
|
|
|
while remaining > PAUSE_TIME
|
2015-10-07 15:21:10 -04:00
|
|
|
return if @workers.empty?
|
2016-01-06 13:00:18 -05:00
|
|
|
sleep PAUSE_TIME
|
2015-10-06 15:43:01 -04:00
|
|
|
remaining = deadline - Time.now
|
|
|
|
end
|
2015-10-07 15:21:10 -04:00
|
|
|
return if @workers.empty?
|
2012-01-16 19:18:36 -05:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
hard_shutdown
|
2012-02-03 13:02:57 -05:00
|
|
|
end
|
|
|
|
|
2015-10-08 12:48:28 -04:00
|
|
|
def processor_stopped(processor)
|
|
|
|
@plock.synchronize do
|
|
|
|
@workers.delete(processor)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-01-25 16:32:51 -05:00
|
|
|
def processor_died(processor, reason)
|
2015-10-06 15:43:01 -04:00
|
|
|
@plock.synchronize do
|
2015-10-07 15:21:10 -04:00
|
|
|
@workers.delete(processor)
|
2015-10-07 12:47:53 -04:00
|
|
|
unless @done
|
2015-10-08 12:37:37 -04:00
|
|
|
p = Processor.new(self)
|
2015-10-07 15:21:10 -04:00
|
|
|
@workers << p
|
2015-10-06 17:45:10 -04:00
|
|
|
p.start
|
2012-03-24 16:28:18 -04:00
|
|
|
end
|
2012-01-22 19:01:46 -05:00
|
|
|
end
|
2013-01-29 11:43:44 -05:00
|
|
|
end
|
|
|
|
|
2015-06-09 16:09:53 -04:00
|
|
|
def stopped?
|
|
|
|
@done
|
|
|
|
end
|
|
|
|
|
2012-03-24 16:28:18 -04:00
|
|
|
private
|
2012-01-22 14:32:38 -05:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
def hard_shutdown
|
|
|
|
# We've reached the timeout and we still have busy workers.
|
2015-10-06 17:45:10 -04:00
|
|
|
# They must die but their jobs shall live on.
|
|
|
|
cleanup = nil
|
|
|
|
@plock.synchronize do
|
2015-10-07 15:21:10 -04:00
|
|
|
cleanup = @workers.dup
|
2015-10-06 17:45:10 -04:00
|
|
|
end
|
2012-04-06 23:53:03 -04:00
|
|
|
|
2015-10-06 17:45:10 -04:00
|
|
|
if cleanup.size > 0
|
2015-10-07 15:21:10 -04:00
|
|
|
jobs = cleanup.map {|p| p.job }.compact
|
|
|
|
|
2015-10-06 17:45:10 -04:00
|
|
|
logger.warn { "Terminating #{cleanup.size} busy worker threads" }
|
2015-10-07 15:21:10 -04:00
|
|
|
logger.warn { "Work still in progress #{jobs.inspect}" }
|
|
|
|
|
2015-10-06 17:45:10 -04:00
|
|
|
# Re-enqueue unfinished jobs
|
|
|
|
# NOTE: You may notice that we may push a job back to redis before
|
|
|
|
# the worker thread is terminated. This is ok because Sidekiq's
|
|
|
|
# contract says that jobs are run AT LEAST once. Process termination
|
|
|
|
# is delayed until we're certain the jobs are back in Redis because
|
|
|
|
# it is worse to lose a job than to run it twice.
|
2015-10-07 15:21:10 -04:00
|
|
|
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
|
|
|
|
strategy.bulk_requeue(jobs, @options)
|
2015-10-06 17:45:10 -04:00
|
|
|
end
|
2012-04-06 23:53:03 -04:00
|
|
|
|
2015-10-07 15:21:10 -04:00
|
|
|
cleanup.each do |processor|
|
2015-10-06 15:43:01 -04:00
|
|
|
processor.kill
|
2012-04-06 23:53:03 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-01-16 19:14:47 -05:00
|
|
|
end
|
|
|
|
end
|