2014-03-06 21:41:05 -08:00
|
|
|
# encoding: utf-8
|
2012-01-23 14:05:03 -08:00
|
|
|
require 'sidekiq/util'
|
2013-05-10 20:43:53 -07:00
|
|
|
require 'sidekiq/actor'
|
2012-01-25 13:32:51 -08:00
|
|
|
require 'sidekiq/processor'
|
2012-03-24 13:28:18 -07:00
|
|
|
require 'sidekiq/fetch'
|
2012-01-22 16:01:46 -08:00
|
|
|
|
2012-01-16 16:14:47 -08:00
|
|
|
module Sidekiq
|
|
|
|
|
|
|
|
##
|
2012-02-03 10:02:57 -08:00
|
|
|
# The main router in the system. This
|
2012-03-24 13:28:18 -07:00
|
|
|
# manages the processor state and accepts messages
|
2012-02-03 10:02:57 -08:00
|
|
|
# from Redis to be dispatched to an idle processor.
|
2012-01-16 16:14:47 -08:00
|
|
|
#
|
2012-02-03 10:02:57 -08:00
|
|
|
class Manager
|
2012-01-22 11:32:38 -08:00
|
|
|
include Util
|
2013-05-10 20:43:53 -07:00
|
|
|
include Actor
|
2012-01-25 13:32:51 -08:00
|
|
|
trap_exit :processor_died
|
2012-01-22 11:32:38 -08:00
|
|
|
|
2013-05-10 20:43:53 -07:00
|
|
|
attr_reader :ready
|
|
|
|
attr_reader :busy
|
2013-09-22 14:05:29 -07:00
|
|
|
attr_accessor :fetcher
|
2013-05-10 20:43:53 -07:00
|
|
|
|
2013-11-22 18:52:03 -05:00
|
|
|
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1
|
2014-12-10 16:42:03 -06:00
|
|
|
JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API
|
2013-11-21 06:09:30 -05:00
|
|
|
|
2014-11-18 10:50:42 -08:00
|
|
|
def initialize(condvar, options={})
|
2012-02-14 09:00:26 -08:00
|
|
|
logger.debug { options.inspect }
|
2013-11-23 12:53:39 -05:00
|
|
|
@options = options
|
2012-02-16 09:45:55 -08:00
|
|
|
@count = options[:concurrency] || 25
|
2015-08-12 10:16:06 -07:00
|
|
|
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
|
2012-02-05 13:22:57 -08:00
|
|
|
@done_callback = nil
|
2014-11-18 10:50:42 -08:00
|
|
|
@finished = condvar
|
2012-01-22 11:32:38 -08:00
|
|
|
|
2012-04-06 20:53:03 -07:00
|
|
|
@in_progress = {}
|
2013-06-10 22:20:15 -07:00
|
|
|
@threads = {}
|
2012-01-22 16:01:46 -08:00
|
|
|
@done = false
|
|
|
|
@busy = []
|
2013-06-12 15:16:19 -07:00
|
|
|
@ready = @count.times.map do
|
|
|
|
p = Processor.new_link(current_actor)
|
|
|
|
p.proxy_id = p.object_id
|
|
|
|
p
|
|
|
|
end
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2012-03-08 20:58:51 -08:00
|
|
|
def stop(options={})
|
2012-03-25 19:52:15 -07:00
|
|
|
watchdog('Manager#stop died') do
|
2013-11-22 18:52:03 -05:00
|
|
|
should_shutdown = options[:shutdown]
|
2012-03-25 19:52:15 -07:00
|
|
|
timeout = options[:timeout]
|
2012-03-08 20:58:51 -08:00
|
|
|
|
2012-03-25 19:52:15 -07:00
|
|
|
@done = true
|
2012-04-06 20:53:03 -07:00
|
|
|
|
2014-12-26 09:20:15 -08:00
|
|
|
logger.info { "Terminating #{@ready.size} quiet workers" }
|
2012-03-30 21:22:19 -07:00
|
|
|
@ready.each { |x| x.terminate if x.alive? }
|
2012-03-25 19:52:15 -07:00
|
|
|
@ready.clear
|
2012-01-22 11:32:38 -08:00
|
|
|
|
2013-11-21 06:14:25 -05:00
|
|
|
return if clean_up_for_graceful_shutdown
|
2013-04-10 09:02:49 -07:00
|
|
|
|
2013-11-22 18:52:03 -05:00
|
|
|
hard_shutdown_in timeout if should_shutdown
|
2012-02-17 21:33:17 -08:00
|
|
|
end
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2013-11-21 06:09:30 -05:00
|
|
|
def clean_up_for_graceful_shutdown
|
|
|
|
if @busy.empty?
|
2013-11-22 18:52:03 -05:00
|
|
|
shutdown
|
2013-11-21 06:14:25 -05:00
|
|
|
return true
|
2013-11-21 06:09:30 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
|
2013-11-21 06:14:25 -05:00
|
|
|
false
|
2013-11-21 06:09:30 -05:00
|
|
|
end
|
|
|
|
|
2013-09-22 14:05:29 -07:00
|
|
|
def start
|
2012-03-25 19:52:15 -07:00
|
|
|
@ready.each { dispatch }
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
2012-01-16 16:18:36 -08:00
|
|
|
|
2012-02-09 10:15:31 -06:00
|
|
|
def when_done(&blk)
|
|
|
|
@done_callback = blk
|
2012-02-03 10:02:57 -08:00
|
|
|
end
|
|
|
|
|
2012-01-25 13:32:51 -08:00
|
|
|
def processor_done(processor)
|
2012-03-24 13:28:18 -07:00
|
|
|
watchdog('Manager#processor_done died') do
|
2012-02-09 21:46:44 -08:00
|
|
|
@done_callback.call(processor) if @done_callback
|
2012-04-06 20:53:03 -07:00
|
|
|
@in_progress.delete(processor.object_id)
|
2013-06-10 22:20:15 -07:00
|
|
|
@threads.delete(processor.object_id)
|
2012-02-09 21:46:44 -08:00
|
|
|
@busy.delete(processor)
|
|
|
|
if stopped?
|
2012-02-18 20:01:29 -08:00
|
|
|
processor.terminate if processor.alive?
|
2013-11-22 18:52:03 -05:00
|
|
|
shutdown if @busy.empty?
|
2012-02-09 21:46:44 -08:00
|
|
|
else
|
2012-03-12 19:57:04 -07:00
|
|
|
@ready << processor if processor.alive?
|
2012-02-09 21:46:44 -08:00
|
|
|
end
|
|
|
|
dispatch
|
2012-01-16 20:02:58 -08:00
|
|
|
end
|
2012-01-16 16:18:36 -08:00
|
|
|
end
|
2012-01-16 20:02:58 -08:00
|
|
|
|
2012-01-25 13:32:51 -08:00
|
|
|
def processor_died(processor, reason)
|
2012-03-24 13:28:18 -07:00
|
|
|
watchdog("Manager#processor_died died") do
|
2012-04-06 20:53:03 -07:00
|
|
|
@in_progress.delete(processor.object_id)
|
2013-06-10 22:20:15 -07:00
|
|
|
@threads.delete(processor.object_id)
|
2012-03-24 13:28:18 -07:00
|
|
|
@busy.delete(processor)
|
2012-01-23 14:05:03 -08:00
|
|
|
|
2012-03-24 13:28:18 -07:00
|
|
|
unless stopped?
|
2013-06-12 15:16:19 -07:00
|
|
|
p = Processor.new_link(current_actor)
|
|
|
|
p.proxy_id = p.object_id
|
|
|
|
@ready << p
|
2012-03-24 13:28:18 -07:00
|
|
|
dispatch
|
|
|
|
else
|
2013-11-22 18:52:03 -05:00
|
|
|
shutdown if @busy.empty?
|
2012-03-24 13:28:18 -07:00
|
|
|
end
|
2012-01-22 16:01:46 -08:00
|
|
|
end
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2013-01-05 21:17:08 -08:00
|
|
|
def assign(work)
|
2012-03-24 13:28:18 -07:00
|
|
|
watchdog("Manager#assign died") do
|
2012-04-16 20:18:48 -07:00
|
|
|
if stopped?
|
|
|
|
# Race condition between Manager#stop if Fetcher
|
|
|
|
# is blocked on redis and gets a message after
|
|
|
|
# all the ready Processors have been stopped.
|
|
|
|
# Push the message back to redis.
|
2013-01-05 21:17:08 -08:00
|
|
|
work.requeue
|
2012-04-16 20:18:48 -07:00
|
|
|
else
|
|
|
|
processor = @ready.pop
|
2013-01-05 21:17:08 -08:00
|
|
|
@in_progress[processor.object_id] = work
|
2012-04-16 20:18:48 -07:00
|
|
|
@busy << processor
|
2013-01-05 21:17:08 -08:00
|
|
|
processor.async.process(work)
|
2012-04-16 20:18:48 -07:00
|
|
|
end
|
2012-01-23 22:08:38 -08:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2013-06-10 22:20:15 -07:00
|
|
|
# A hack worthy of Rube Goldberg. We need to be able
|
|
|
|
# to hard stop a working thread. But there's no way for us to
|
|
|
|
# get handle to the underlying thread performing work for a processor
|
|
|
|
# so we have it call us and tell us.
|
|
|
|
def real_thread(proxy_id, thr)
|
|
|
|
@threads[proxy_id] = thr
|
|
|
|
end
|
|
|
|
|
2015-06-09 13:09:53 -07:00
|
|
|
PROCTITLES = [
|
2015-06-09 23:37:29 +03:00
|
|
|
proc { 'sidekiq'.freeze },
|
|
|
|
proc { Sidekiq::VERSION },
|
|
|
|
proc { |mgr, data| data['tag'] },
|
|
|
|
proc { |mgr, data| "[#{mgr.busy.size} of #{data['concurrency']} busy]" },
|
|
|
|
proc { |mgr, data| "stopping" if mgr.stopped? },
|
2015-06-09 13:09:53 -07:00
|
|
|
]
|
|
|
|
|
2014-08-07 21:03:34 -07:00
|
|
|
def heartbeat(key, data, json)
|
2015-06-09 23:37:29 +03:00
|
|
|
results = PROCTITLES.map {|x| x.(self, data) }
|
2015-06-09 13:18:56 -07:00
|
|
|
results.compact!
|
|
|
|
$0 = results.join(' ')
|
2014-04-14 01:30:25 +03:00
|
|
|
|
2014-08-07 21:03:34 -07:00
|
|
|
❤(key, json)
|
2014-03-02 16:36:00 -08:00
|
|
|
after(5) do
|
2014-08-07 21:03:34 -07:00
|
|
|
heartbeat(key, data, json)
|
2014-03-02 16:36:00 -08:00
|
|
|
end
|
2013-01-30 00:43:44 +08:00
|
|
|
end
|
|
|
|
|
2015-06-09 13:09:53 -07:00
|
|
|
def stopped?
|
|
|
|
@done
|
|
|
|
end
|
|
|
|
|
2012-03-24 13:28:18 -07:00
|
|
|
private
|
2012-01-22 11:32:38 -08:00
|
|
|
|
2014-08-07 21:03:34 -07:00
|
|
|
def ❤(key, json)
|
2014-03-21 20:27:11 -07:00
|
|
|
begin
|
2014-08-07 21:03:34 -07:00
|
|
|
_, _, _, msg = Sidekiq.redis do |conn|
|
2014-03-07 22:41:10 -08:00
|
|
|
conn.multi do
|
2014-08-07 21:03:34 -07:00
|
|
|
conn.sadd('processes', key)
|
|
|
|
conn.hmset(key, 'info', json, 'busy', @busy.size, 'beat', Time.now.to_f)
|
2014-03-07 22:41:10 -08:00
|
|
|
conn.expire(key, 60)
|
2014-05-13 21:41:40 -07:00
|
|
|
conn.rpop("#{key}-signals")
|
2014-03-07 22:41:10 -08:00
|
|
|
end
|
2013-04-10 09:02:49 -07:00
|
|
|
end
|
2014-05-13 21:41:40 -07:00
|
|
|
|
2014-12-10 16:42:03 -06:00
|
|
|
return unless msg
|
|
|
|
|
|
|
|
if JVM_RESERVED_SIGNALS.include?(msg)
|
|
|
|
Sidekiq::CLI.instance.handle_signal(msg)
|
|
|
|
else
|
|
|
|
::Process.kill(msg, $$)
|
|
|
|
end
|
2014-03-21 20:27:11 -07:00
|
|
|
rescue => e
|
|
|
|
# ignore all redis/network issues
|
|
|
|
logger.error("heartbeat: #{e.message}")
|
2013-04-10 09:02:49 -07:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-04-06 20:53:03 -07:00
|
|
|
def hard_shutdown_in(delay)
|
2013-04-04 20:01:16 -07:00
|
|
|
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }
|
|
|
|
|
2012-04-07 19:33:32 -07:00
|
|
|
after(delay) do
|
2013-03-26 22:57:12 -07:00
|
|
|
watchdog("Manager#hard_shutdown_in died") do
|
2012-04-06 20:53:03 -07:00
|
|
|
# We've reached the timeout and we still have busy workers.
|
|
|
|
# They must die but their messages shall live on.
|
2013-12-21 08:19:30 -06:00
|
|
|
logger.warn { "Terminating #{@busy.size} busy worker threads" }
|
|
|
|
logger.warn { "Work still in progress #{@in_progress.values.inspect}" }
|
2012-04-06 20:53:03 -07:00
|
|
|
|
2013-11-22 15:49:14 -05:00
|
|
|
requeue
|
|
|
|
|
2013-01-16 21:48:21 -08:00
|
|
|
@busy.each do |processor|
|
2013-06-10 22:20:15 -07:00
|
|
|
if processor.alive? && t = @threads.delete(processor.object_id)
|
2013-06-10 15:17:47 -07:00
|
|
|
t.raise Shutdown
|
|
|
|
end
|
2012-04-06 20:53:03 -07:00
|
|
|
end
|
|
|
|
|
2014-11-18 10:50:42 -08:00
|
|
|
@finished.signal
|
2012-04-06 20:53:03 -07:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-03-24 13:28:18 -07:00
|
|
|
def dispatch
|
|
|
|
return if stopped?
|
|
|
|
# This is a safety check to ensure we haven't leaked
|
|
|
|
# processors somehow.
|
|
|
|
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
|
2012-03-25 19:52:15 -07:00
|
|
|
raise "No ready processor!?" if @ready.empty?
|
2012-03-24 13:28:18 -07:00
|
|
|
|
2012-11-03 19:56:06 -07:00
|
|
|
@fetcher.async.fetch
|
2012-01-22 11:32:38 -08:00
|
|
|
end
|
|
|
|
|
2013-11-22 18:52:03 -05:00
|
|
|
def shutdown
|
2013-11-22 15:49:14 -05:00
|
|
|
requeue
|
2014-11-18 10:50:42 -08:00
|
|
|
@finished.signal
|
2013-11-22 15:49:14 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def requeue
|
2013-11-22 15:45:30 -05:00
|
|
|
# Re-enqueue terminated jobs
|
|
|
|
# NOTE: You may notice that we may push a job back to redis before
|
|
|
|
# the worker thread is terminated. This is ok because Sidekiq's
|
|
|
|
# contract says that jobs are run AT LEAST once. Process termination
|
|
|
|
# is delayed until we're certain the jobs are back in Redis because
|
|
|
|
# it is worse to lose a job than to run it twice.
|
2013-11-23 12:53:39 -05:00
|
|
|
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values, @options)
|
2013-11-22 15:45:30 -05:00
|
|
|
@in_progress.clear
|
|
|
|
end
|
2012-01-16 16:14:47 -08:00
|
|
|
end
|
|
|
|
end
|