2016-11-22 23:39:00 -05:00
|
|
|
# frozen_string_literal: true
|
2019-04-01 18:20:41 +02:00
|
|
|
|
|
|
|
require "sidekiq/manager"
|
|
|
|
require "sidekiq/fetch"
|
|
|
|
require "sidekiq/scheduled"
|
2013-01-30 00:15:34 +08:00
|
|
|
|
|
|
|
module Sidekiq
|
2019-08-28 09:59:28 -07:00
|
|
|
# The Launcher starts the Manager and Poller threads and provides the process heartbeat.
|
2013-01-30 00:15:34 +08:00
|
|
|
class Launcher
|
2013-09-22 14:05:29 -07:00
|
|
|
include Util
|
2013-09-21 17:05:16 -07:00
|
|
|
|
2019-04-01 18:20:41 +02:00
|
|
|
STATS_TTL = 5 * 365 * 24 * 60 * 60 # 5 years
|
2018-12-30 21:15:21 +01:00
|
|
|
|
|
|
|
PROCTITLES = [
|
2019-04-01 18:20:41 +02:00
|
|
|
proc { "sidekiq" },
|
2018-12-30 21:15:21 +01:00
|
|
|
proc { Sidekiq::VERSION },
|
2019-04-01 18:20:41 +02:00
|
|
|
proc { |me, data| data["tag"] },
|
|
|
|
proc { |me, data| "[#{Processor::WORKER_STATE.size} of #{data["concurrency"]} busy]" },
|
2020-03-17 13:38:48 -07:00
|
|
|
proc { |me, data| "stopping" if me.stopping? }
|
2018-12-30 21:15:21 +01:00
|
|
|
]
|
2018-08-10 05:57:56 -07:00
|
|
|
|
2018-12-30 21:15:21 +01:00
|
|
|
attr_accessor :manager, :poller, :fetcher
|
2013-09-21 17:05:16 -07:00
|
|
|
|
2013-01-30 00:15:34 +08:00
|
|
|
def initialize(options)
|
2020-06-19 08:39:18 -07:00
|
|
|
options[:fetch] ||= BasicFetch.new(options)
|
2015-10-08 09:37:37 -07:00
|
|
|
@manager = Sidekiq::Manager.new(options)
|
2015-10-06 12:43:01 -07:00
|
|
|
@poller = Sidekiq::Scheduled::Poller.new
|
2013-09-21 17:05:16 -07:00
|
|
|
@done = false
|
2013-01-30 00:15:34 +08:00
|
|
|
@options = options
|
2013-09-21 17:05:16 -07:00
|
|
|
end
|
2013-09-21 10:51:49 -04:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
def run
|
|
|
|
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
|
|
|
|
@poller.start
|
|
|
|
@manager.start
|
|
|
|
end
|
2014-12-10 09:26:55 -08:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
# Stops this instance from processing any more jobs,
|
|
|
|
#
|
|
|
|
def quiet
|
2015-10-07 14:27:47 -07:00
|
|
|
@done = true
|
2015-10-06 12:43:01 -07:00
|
|
|
@manager.quiet
|
|
|
|
@poller.terminate
|
2013-01-30 00:15:34 +08:00
|
|
|
end
|
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
# Shuts down the process. This method does not
|
|
|
|
# return until all work is complete and cleaned up.
|
|
|
|
# It can take up to the timeout to complete.
|
|
|
|
def stop
|
2018-10-18 13:51:58 -07:00
|
|
|
deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @options[:timeout]
|
2014-03-21 08:53:22 -07:00
|
|
|
|
2015-10-07 14:27:47 -07:00
|
|
|
@done = true
|
2015-10-06 12:43:01 -07:00
|
|
|
@manager.quiet
|
|
|
|
@poller.terminate
|
|
|
|
|
|
|
|
@manager.stop(deadline)
|
|
|
|
|
|
|
|
# Requeue everything in case there was a worker who grabbed work while stopped
|
|
|
|
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
|
2020-06-19 08:39:18 -07:00
|
|
|
strategy = @options[:fetch]
|
2015-10-07 12:21:10 -07:00
|
|
|
strategy.bulk_requeue([], @options)
|
2015-10-06 12:43:01 -07:00
|
|
|
|
2015-10-06 14:45:10 -07:00
|
|
|
clear_heartbeat
|
2013-01-30 00:15:34 +08:00
|
|
|
end
|
|
|
|
|
2015-10-07 14:27:47 -07:00
|
|
|
def stopping?
|
|
|
|
@done
|
|
|
|
end
|
|
|
|
|
2015-10-06 14:05:46 -07:00
|
|
|
private unless $TESTING
|
2015-10-06 12:43:01 -07:00
|
|
|
|
2021-11-01 10:44:26 -07:00
|
|
|
BEAT_PAUSE = 5
|
|
|
|
|
2018-12-30 21:15:21 +01:00
|
|
|
def start_heartbeat
|
2019-04-01 18:20:41 +02:00
|
|
|
loop do
|
2018-12-30 21:15:21 +01:00
|
|
|
heartbeat
|
2021-11-01 10:44:26 -07:00
|
|
|
sleep BEAT_PAUSE
|
2018-12-30 21:15:21 +01:00
|
|
|
end
|
|
|
|
Sidekiq.logger.info("Heartbeat stopping...")
|
|
|
|
end
|
|
|
|
|
|
|
|
def clear_heartbeat
|
|
|
|
# Remove record from Redis since we are shutting down.
|
|
|
|
# Note we don't stop the heartbeat thread; if the process
|
|
|
|
# doesn't actually exit, it'll reappear in the Web UI.
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
conn.pipelined do
|
2019-04-01 18:20:41 +02:00
|
|
|
conn.srem("processes", identity)
|
2020-02-05 14:05:17 -08:00
|
|
|
conn.unlink("#{identity}:workers")
|
2018-12-30 21:15:21 +01:00
|
|
|
end
|
|
|
|
end
|
|
|
|
rescue
|
|
|
|
# best effort, ignore network errors
|
|
|
|
end
|
|
|
|
|
2016-11-25 13:39:22 -08:00
|
|
|
def heartbeat
|
2019-04-01 18:20:41 +02:00
|
|
|
$0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ")
|
2014-03-07 22:41:10 -08:00
|
|
|
|
2016-11-25 13:39:22 -08:00
|
|
|
❤
|
2013-01-30 00:15:34 +08:00
|
|
|
end
|
2013-01-30 00:43:44 +08:00
|
|
|
|
2020-03-18 19:45:53 -07:00
|
|
|
def self.flush_stats
|
|
|
|
fails = Processor::FAILURE.reset
|
|
|
|
procd = Processor::PROCESSED.reset
|
2020-03-26 13:13:57 -07:00
|
|
|
return if fails + procd == 0
|
|
|
|
|
|
|
|
nowdate = Time.now.utc.strftime("%Y-%m-%d")
|
|
|
|
begin
|
2020-03-23 23:11:23 +01:00
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
conn.pipelined do
|
|
|
|
conn.incrby("stat:processed", procd)
|
|
|
|
conn.incrby("stat:processed:#{nowdate}", procd)
|
|
|
|
conn.expire("stat:processed:#{nowdate}", STATS_TTL)
|
2020-03-18 19:45:53 -07:00
|
|
|
|
2020-03-23 23:11:23 +01:00
|
|
|
conn.incrby("stat:failed", fails)
|
|
|
|
conn.incrby("stat:failed:#{nowdate}", fails)
|
|
|
|
conn.expire("stat:failed:#{nowdate}", STATS_TTL)
|
|
|
|
end
|
2020-03-18 19:45:53 -07:00
|
|
|
end
|
2020-03-26 13:13:57 -07:00
|
|
|
rescue => ex
|
|
|
|
# we're exiting the process, things might be shut down so don't
|
|
|
|
# try to handle the exception
|
|
|
|
Sidekiq.logger.warn("Unable to flush stats: #{ex}")
|
2020-03-18 19:45:53 -07:00
|
|
|
end
|
|
|
|
end
|
|
|
|
at_exit(&method(:flush_stats))
|
|
|
|
|
2016-11-25 13:39:22 -08:00
|
|
|
def ❤
|
|
|
|
key = identity
|
2015-10-14 11:08:36 -07:00
|
|
|
fails = procd = 0
|
2018-12-30 21:15:21 +01:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
begin
|
2018-04-23 15:46:58 -07:00
|
|
|
fails = Processor::FAILURE.reset
|
|
|
|
procd = Processor::PROCESSED.reset
|
2018-08-10 05:57:56 -07:00
|
|
|
curstate = Processor::WORKER_STATE.dup
|
2015-10-07 14:27:47 -07:00
|
|
|
|
2018-02-16 13:01:25 -08:00
|
|
|
workers_key = "#{key}:workers"
|
|
|
|
nowdate = Time.now.utc.strftime("%Y-%m-%d")
|
2018-12-30 21:15:21 +01:00
|
|
|
|
2015-10-07 14:27:47 -07:00
|
|
|
Sidekiq.redis do |conn|
|
2016-05-23 10:19:59 -07:00
|
|
|
conn.multi do
|
2018-02-16 13:01:25 -08:00
|
|
|
conn.incrby("stat:processed", procd)
|
2015-10-07 14:27:47 -07:00
|
|
|
conn.incrby("stat:processed:#{nowdate}", procd)
|
2018-08-06 19:46:00 +02:00
|
|
|
conn.expire("stat:processed:#{nowdate}", STATS_TTL)
|
2018-08-10 05:57:56 -07:00
|
|
|
|
2018-02-16 13:01:25 -08:00
|
|
|
conn.incrby("stat:failed", fails)
|
2015-10-07 14:27:47 -07:00
|
|
|
conn.incrby("stat:failed:#{nowdate}", fails)
|
2018-08-06 19:46:00 +02:00
|
|
|
conn.expire("stat:failed:#{nowdate}", STATS_TTL)
|
2018-08-10 05:57:56 -07:00
|
|
|
|
2020-02-05 14:05:17 -08:00
|
|
|
conn.unlink(workers_key)
|
2018-08-10 05:57:56 -07:00
|
|
|
curstate.each_pair do |tid, hash|
|
2015-10-07 14:27:47 -07:00
|
|
|
conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
|
|
|
|
end
|
2016-05-04 14:43:15 -05:00
|
|
|
conn.expire(workers_key, 60)
|
2015-10-07 14:27:47 -07:00
|
|
|
end
|
|
|
|
end
|
2018-12-30 21:15:21 +01:00
|
|
|
|
2021-02-22 15:48:38 -08:00
|
|
|
rtt = check_rtt
|
|
|
|
|
2015-10-14 11:08:36 -07:00
|
|
|
fails = procd = 0
|
2020-10-14 15:09:50 -07:00
|
|
|
kb = memory_usage(::Process.pid)
|
2015-10-07 14:27:47 -07:00
|
|
|
|
2019-04-01 18:20:41 +02:00
|
|
|
_, exists, _, _, msg = Sidekiq.redis { |conn|
|
2019-09-05 16:02:55 +03:00
|
|
|
conn.multi {
|
2019-04-01 18:20:41 +02:00
|
|
|
conn.sadd("processes", key)
|
2020-06-09 15:14:02 -07:00
|
|
|
conn.exists?(key)
|
2020-10-14 15:09:50 -07:00
|
|
|
conn.hmset(key, "info", to_json,
|
|
|
|
"busy", curstate.size,
|
|
|
|
"beat", Time.now.to_f,
|
2021-02-22 15:48:38 -08:00
|
|
|
"rtt_us", rtt,
|
2020-10-14 15:09:50 -07:00
|
|
|
"quiet", @done,
|
|
|
|
"rss", kb)
|
2015-10-06 12:43:01 -07:00
|
|
|
conn.expire(key, 60)
|
|
|
|
conn.rpop("#{key}-signals")
|
2019-04-01 18:20:41 +02:00
|
|
|
}
|
|
|
|
}
|
2015-10-06 12:43:01 -07:00
|
|
|
|
2016-08-12 12:34:41 -07:00
|
|
|
# first heartbeat or recovering from an outage and need to reestablish our heartbeat
|
2019-04-01 18:20:41 +02:00
|
|
|
fire_event(:heartbeat) unless exists
|
2016-08-12 12:34:41 -07:00
|
|
|
|
2015-10-06 12:43:01 -07:00
|
|
|
return unless msg
|
|
|
|
|
2019-04-26 12:27:16 -07:00
|
|
|
::Process.kill(msg, ::Process.pid)
|
2015-10-06 12:43:01 -07:00
|
|
|
rescue => e
|
|
|
|
# ignore all redis/network issues
|
2020-03-26 13:13:57 -07:00
|
|
|
logger.error("heartbeat: #{e}")
|
2015-10-14 11:08:36 -07:00
|
|
|
# don't lose the counts if there was a network issue
|
2018-04-23 15:46:58 -07:00
|
|
|
Processor::PROCESSED.incr(procd)
|
|
|
|
Processor::FAILURE.incr(fails)
|
2015-10-06 12:43:01 -07:00
|
|
|
end
|
|
|
|
end
|
2014-03-21 08:53:22 -07:00
|
|
|
|
2021-03-30 16:46:20 -07:00
|
|
|
# We run the heartbeat every five seconds.
|
|
|
|
# Capture five samples of RTT, log a warning if each sample
|
|
|
|
# is above our warning threshold.
|
|
|
|
RTT_READINGS = RingBuffer.new(5)
|
2021-02-22 15:48:38 -08:00
|
|
|
RTT_WARNING_LEVEL = 50_000
|
|
|
|
|
|
|
|
def check_rtt
|
|
|
|
a = b = 0
|
|
|
|
Sidekiq.redis do |x|
|
|
|
|
a = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)
|
|
|
|
x.ping
|
|
|
|
b = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)
|
|
|
|
end
|
|
|
|
rtt = b - a
|
2021-03-30 16:46:20 -07:00
|
|
|
RTT_READINGS << rtt
|
2021-02-22 15:48:38 -08:00
|
|
|
# Ideal RTT for Redis is < 1000µs
|
|
|
|
# Workable is < 10,000µs
|
|
|
|
# Log a warning if it's a disaster.
|
2021-03-30 16:46:20 -07:00
|
|
|
if RTT_READINGS.all? { |x| x > RTT_WARNING_LEVEL }
|
|
|
|
Sidekiq.logger.warn <<~EOM
|
2021-02-22 15:48:38 -08:00
|
|
|
Your Redis network connection is performing extremely poorly.
|
2021-03-30 16:46:20 -07:00
|
|
|
Last RTT readings were #{RTT_READINGS.buffer.inspect}, ideally these should be < 1000.
|
2021-02-22 15:48:38 -08:00
|
|
|
Ensure Redis is running in the same AZ or datacenter as Sidekiq.
|
2021-10-31 14:23:07 -07:00
|
|
|
If these values are close to 100,000, that means your Sidekiq process may be
|
|
|
|
CPU overloaded; see https://github.com/mperham/sidekiq/discussions/5039
|
2021-02-22 15:48:38 -08:00
|
|
|
EOM
|
2021-03-30 16:46:20 -07:00
|
|
|
RTT_READINGS.reset
|
2021-02-22 15:48:38 -08:00
|
|
|
end
|
|
|
|
rtt
|
|
|
|
end
|
|
|
|
|
2020-10-14 15:09:50 -07:00
|
|
|
MEMORY_GRABBER = case RUBY_PLATFORM
|
|
|
|
when /linux/
|
|
|
|
->(pid) {
|
|
|
|
IO.readlines("/proc/#{$$}/status").each do |line|
|
|
|
|
next unless line.start_with?("VmRSS:")
|
|
|
|
break line.split[1].to_i
|
|
|
|
end
|
|
|
|
}
|
|
|
|
when /darwin|bsd/
|
|
|
|
->(pid) {
|
|
|
|
`ps -o pid,rss -p #{pid}`.lines.last.split.last.to_i
|
|
|
|
}
|
|
|
|
else
|
|
|
|
->(pid) { 0 }
|
|
|
|
end
|
|
|
|
|
|
|
|
def memory_usage(pid)
|
|
|
|
MEMORY_GRABBER.call(pid)
|
|
|
|
end
|
|
|
|
|
2016-11-25 13:39:22 -08:00
|
|
|
def to_data
|
2021-05-24 12:30:03 -07:00
|
|
|
@data ||= {
|
|
|
|
"hostname" => hostname,
|
|
|
|
"started_at" => Time.now.to_f,
|
|
|
|
"pid" => ::Process.pid,
|
|
|
|
"tag" => @options[:tag] || "",
|
|
|
|
"concurrency" => @options[:concurrency],
|
|
|
|
"queues" => @options[:queues].uniq,
|
|
|
|
"labels" => @options[:labels],
|
|
|
|
"identity" => identity
|
|
|
|
}
|
2016-11-25 13:39:22 -08:00
|
|
|
end
|
|
|
|
|
|
|
|
def to_json
|
2021-05-24 12:30:03 -07:00
|
|
|
# this data changes infrequently so dump it to a string
|
|
|
|
# now so we don't need to dump it every heartbeat.
|
|
|
|
@json ||= Sidekiq.dump_json(to_data)
|
2016-11-25 13:39:22 -08:00
|
|
|
end
|
2013-01-30 00:15:34 +08:00
|
|
|
end
|
|
|
|
end
|