2015-12-31 18:33:35 -05:00
|
|
|
# frozen_string_literal: true
|
2015-10-06 18:27:21 -04:00
|
|
|
# encoding: utf-8
|
2013-01-29 11:15:34 -05:00
|
|
|
require 'sidekiq/manager'
|
2013-09-21 23:17:37 -04:00
|
|
|
require 'sidekiq/fetch'
|
2013-01-29 11:15:34 -05:00
|
|
|
require 'sidekiq/scheduled'
|
|
|
|
|
|
|
|
module Sidekiq
|
2013-09-21 20:05:16 -04:00
|
|
|
# The Launcher is a very simple Actor whose job is to
|
|
|
|
# start, monitor and stop the core Actors in Sidekiq.
|
|
|
|
# If any of these actors die, the Sidekiq process exits
|
|
|
|
# immediately.
|
2013-01-29 11:15:34 -05:00
|
|
|
class Launcher
|
2013-09-22 17:05:29 -04:00
|
|
|
include Util
|
2013-09-21 20:05:16 -04:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
attr_accessor :manager, :poller, :fetcher
|
2013-09-21 20:05:16 -04:00
|
|
|
|
2013-01-29 11:15:34 -05:00
|
|
|
def initialize(options)
|
2015-10-08 12:37:37 -04:00
|
|
|
@manager = Sidekiq::Manager.new(options)
|
2015-10-06 15:43:01 -04:00
|
|
|
@poller = Sidekiq::Scheduled::Poller.new
|
2013-09-21 20:05:16 -04:00
|
|
|
@done = false
|
2013-01-29 11:15:34 -05:00
|
|
|
@options = options
|
2013-09-21 20:05:16 -04:00
|
|
|
end
|
2013-09-21 10:51:49 -04:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
def run
|
|
|
|
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
|
|
|
|
@poller.start
|
|
|
|
@manager.start
|
|
|
|
end
|
2014-12-10 12:26:55 -05:00
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
# Stops this instance from processing any more jobs,
|
|
|
|
#
|
|
|
|
def quiet
|
2015-10-07 17:27:47 -04:00
|
|
|
@done = true
|
2015-10-06 15:43:01 -04:00
|
|
|
@manager.quiet
|
|
|
|
@poller.terminate
|
2013-01-29 11:15:34 -05:00
|
|
|
end
|
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
# Shuts down the process. This method does not
|
|
|
|
# return until all work is complete and cleaned up.
|
|
|
|
# It can take up to the timeout to complete.
|
|
|
|
def stop
|
|
|
|
deadline = Time.now + @options[:timeout]
|
2014-03-21 11:53:22 -04:00
|
|
|
|
2015-10-07 17:27:47 -04:00
|
|
|
@done = true
|
2015-10-06 15:43:01 -04:00
|
|
|
@manager.quiet
|
|
|
|
@poller.terminate
|
|
|
|
|
|
|
|
@manager.stop(deadline)
|
|
|
|
|
|
|
|
# Requeue everything in case there was a worker who grabbed work while stopped
|
|
|
|
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
|
2015-10-07 15:21:10 -04:00
|
|
|
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
|
|
|
|
strategy.bulk_requeue([], @options)
|
2015-10-06 15:43:01 -04:00
|
|
|
|
2015-10-06 17:45:10 -04:00
|
|
|
clear_heartbeat
|
2013-01-29 11:15:34 -05:00
|
|
|
end
|
|
|
|
|
2015-10-07 17:27:47 -04:00
|
|
|
def stopping?
|
|
|
|
@done
|
|
|
|
end
|
|
|
|
|
2015-10-06 17:05:46 -04:00
|
|
|
private unless $TESTING
|
2015-10-06 15:43:01 -04:00
|
|
|
|
|
|
|
JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API
|
2014-01-10 11:56:20 -05:00
|
|
|
|
2015-10-07 15:21:10 -04:00
|
|
|
def heartbeat(k, data, json)
|
2015-10-12 12:43:51 -04:00
|
|
|
results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, data) }
|
2015-10-06 17:05:46 -04:00
|
|
|
results.compact!
|
|
|
|
$0 = results.join(' ')
|
2014-03-08 01:41:10 -05:00
|
|
|
|
2015-10-07 15:21:10 -04:00
|
|
|
❤(k, json)
|
2013-01-29 11:15:34 -05:00
|
|
|
end
|
2013-01-29 11:43:44 -05:00
|
|
|
|
2015-10-07 18:33:25 -04:00
|
|
|
def ❤(key, json)
|
2015-10-14 14:08:36 -04:00
|
|
|
fails = procd = 0
|
2015-10-06 15:43:01 -04:00
|
|
|
begin
|
2015-10-07 17:27:47 -04:00
|
|
|
Processor::FAILURE.update {|curr| fails = curr; 0 }
|
|
|
|
Processor::PROCESSED.update {|curr| procd = curr; 0 }
|
|
|
|
|
|
|
|
workers_key = "#{key}:workers".freeze
|
|
|
|
nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze)
|
|
|
|
Sidekiq.redis do |conn|
|
2016-05-23 13:19:59 -04:00
|
|
|
conn.multi do
|
2015-10-07 17:27:47 -04:00
|
|
|
conn.incrby("stat:processed".freeze, procd)
|
|
|
|
conn.incrby("stat:processed:#{nowdate}", procd)
|
|
|
|
conn.incrby("stat:failed".freeze, fails)
|
|
|
|
conn.incrby("stat:failed:#{nowdate}", fails)
|
|
|
|
conn.del(workers_key)
|
|
|
|
Processor::WORKER_STATE.each_pair do |tid, hash|
|
|
|
|
conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
|
|
|
|
end
|
2016-05-04 15:43:15 -04:00
|
|
|
conn.expire(workers_key, 60)
|
2015-10-07 17:27:47 -04:00
|
|
|
end
|
|
|
|
end
|
2015-10-14 14:08:36 -04:00
|
|
|
fails = procd = 0
|
2015-10-07 17:27:47 -04:00
|
|
|
|
2016-08-12 15:34:41 -04:00
|
|
|
_, exists, _, _, msg = Sidekiq.redis do |conn|
|
2016-05-23 13:19:59 -04:00
|
|
|
conn.multi do
|
2015-10-06 15:43:01 -04:00
|
|
|
conn.sadd('processes', key)
|
2016-08-12 15:34:41 -04:00
|
|
|
conn.exists(key)
|
2016-01-07 17:19:37 -05:00
|
|
|
conn.hmset(key, 'info', json, 'busy', Processor::WORKER_STATE.size, 'beat', Time.now.to_f, 'quiet', @done)
|
2015-10-06 15:43:01 -04:00
|
|
|
conn.expire(key, 60)
|
|
|
|
conn.rpop("#{key}-signals")
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2016-08-12 15:34:41 -04:00
|
|
|
# first heartbeat or recovering from an outage and need to reestablish our heartbeat
|
|
|
|
fire_event(:heartbeat) if !exists
|
|
|
|
|
2015-10-06 15:43:01 -04:00
|
|
|
return unless msg
|
|
|
|
|
|
|
|
if JVM_RESERVED_SIGNALS.include?(msg)
|
|
|
|
Sidekiq::CLI.instance.handle_signal(msg)
|
|
|
|
else
|
|
|
|
::Process.kill(msg, $$)
|
|
|
|
end
|
|
|
|
rescue => e
|
|
|
|
# ignore all redis/network issues
|
|
|
|
logger.error("heartbeat: #{e.message}")
|
2015-10-14 14:08:36 -04:00
|
|
|
# don't lose the counts if there was a network issue
|
2015-11-17 10:18:30 -05:00
|
|
|
Processor::PROCESSED.increment(procd)
|
|
|
|
Processor::FAILURE.increment(fails)
|
2015-10-06 15:43:01 -04:00
|
|
|
end
|
|
|
|
end
|
2014-03-21 11:53:22 -04:00
|
|
|
|
2014-03-21 23:46:14 -04:00
|
|
|
def start_heartbeat
|
2015-10-07 15:21:10 -04:00
|
|
|
k = identity
|
2014-03-08 01:41:10 -05:00
|
|
|
data = {
|
2014-03-02 19:36:00 -05:00
|
|
|
'hostname' => hostname,
|
2014-03-07 00:30:11 -05:00
|
|
|
'started_at' => Time.now.to_f,
|
2014-03-02 19:36:00 -05:00
|
|
|
'pid' => $$,
|
2014-03-21 23:46:14 -04:00
|
|
|
'tag' => @options[:tag] || '',
|
2014-03-02 19:36:00 -05:00
|
|
|
'concurrency' => @options[:concurrency],
|
|
|
|
'queues' => @options[:queues].uniq,
|
2015-10-06 15:43:01 -04:00
|
|
|
'labels' => @options[:labels],
|
2015-10-07 15:21:10 -04:00
|
|
|
'identity' => k,
|
2014-03-08 01:41:10 -05:00
|
|
|
}
|
2014-08-08 00:03:34 -04:00
|
|
|
# this data doesn't change so dump it to a string
|
|
|
|
# now so we don't need to dump it every heartbeat.
|
|
|
|
json = Sidekiq.dump_json(data)
|
2015-10-06 17:05:46 -04:00
|
|
|
|
2015-10-07 17:27:47 -04:00
|
|
|
while true
|
2015-10-07 15:21:10 -04:00
|
|
|
heartbeat(k, data, json)
|
2015-10-06 17:05:46 -04:00
|
|
|
sleep 5
|
|
|
|
end
|
2015-10-06 17:45:10 -04:00
|
|
|
Sidekiq.logger.info("Heartbeat stopping...")
|
2014-03-08 01:41:10 -05:00
|
|
|
end
|
|
|
|
|
2015-10-06 17:45:10 -04:00
|
|
|
def clear_heartbeat
|
|
|
|
# Remove record from Redis since we are shutting down.
|
|
|
|
# Note we don't stop the heartbeat thread; if the process
|
|
|
|
# doesn't actually exit, it'll reappear in the Web UI.
|
2014-03-08 01:41:10 -05:00
|
|
|
Sidekiq.redis do |conn|
|
2014-06-06 11:37:23 -04:00
|
|
|
conn.pipelined do
|
|
|
|
conn.srem('processes', identity)
|
|
|
|
conn.del("#{identity}:workers")
|
|
|
|
end
|
2014-03-08 01:41:10 -05:00
|
|
|
end
|
2015-03-13 15:07:01 -04:00
|
|
|
rescue
|
|
|
|
# best effort, ignore network errors
|
2013-01-29 11:43:44 -05:00
|
|
|
end
|
2014-08-08 00:03:34 -04:00
|
|
|
|
2013-01-29 11:15:34 -05:00
|
|
|
end
|
|
|
|
end
|