This commit is contained in:
Mike Perham 2020-12-23 19:59:12 -08:00
parent 4b31b35714
commit 53ed11661b
12 changed files with 169 additions and 152 deletions

View File

@ -14,12 +14,8 @@ module Sidekiq
LICENSE = "See LICENSE and the LGPL-3.0 for licensing details."
DEFAULTS = {
labels: [],
poll_interval_average: nil,
average_scheduled_poll_interval: 5,
dead_max_jobs: 10_000,
dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months
reloader: proc { |&block| block.call }
}
##
@ -45,12 +41,13 @@ module Sidekiq
end
def self.configure
cfg = Configuration.new
yield cfg
yield DEFAULT_CONFIG
DEFAULT_CONFIG.freeze!
if server?
Sidekiq::CLI.instance.apply(cfg)
Sidekiq::CLI.instance.apply(DEFAULT_CONFIG)
else
Sidekiq::Client.apply(cfg)
Sidekiq::Client.apply(DEFAULT_CONFIG)
end
end

View File

@ -32,20 +32,6 @@ module Sidekiq
@chain = middleware
end
class << self
def apply(cfg)
cfg.freeze!
@default_client = Sidekiq::Client.new(cfg.pool, cfg.client_middleware)
end
def default_client
@default_client ||= Sidekiq::Client.new(ConnectionPool.new {
Redis.new(url: ENV[(ENV["REDIS_PROVIDER"] || "REDIS_URL")] || "redis://localhost:6379/0")
})
end
end
##
# The main method used to push a job to Redis. Accepts a number of options:
#

View File

@ -1,4 +1,5 @@
require "logger"
module Sidekiq
class Configuration
attr_accessor :concurrency
@ -12,10 +13,16 @@ module Sidekiq
attr_accessor :logger
attr_accessor :log_level
attr_accessor :redis
attr_accessor :queues
attr_reader :default_worker_options
attr_reader :pool
# an arbitrary set of entries
attr_reader :labels
attr_accessor :tag
def initialize
@queues = ["default"]
@concurrency = 10
@shutdown_timeout = 25
@logger = ::Logger.new($stdout)
@ -37,6 +44,15 @@ module Sidekiq
@logger.warn("#{ex.class.name}: #{ex.message}")
@logger.warn(ex.backtrace.join("\n")) unless ex.backtrace.nil?
}]
@labels = Set.new
@tag = ""
@components = []
end
# components will be called back when the configuration is
# finalized so they can pull config'd items like logger, pool, etc.
def register_component(comp)
@components << comp
end
def on(event, &block)
@ -53,45 +69,14 @@ module Sidekiq
end
end
def freeze!
@pool = ConnectionPool.new(size: @concurrency + 2, timeout: 5) { Redis.new(@redis) }
end
def finalize
@pool ||= ConnectionPool.new(size: concurrency + 2, timeout: 5) { Redis.new(redis) }
private
def boot
# @runner = Sidekiq::Runner.new(self)
# fire_event(:startup)
end
def fire_event(event, options = {})
reverse = options[:reverse]
reraise = options[:reraise]
arr = @event_hooks[event]
arr.reverse! if reverse
arr.each do |block|
if block.arity == 0
block.call
else
block.call(@runner)
end
rescue => ex
handle_exception(ex, {context: "Exception during Sidekiq lifecycle event.", event: event})
raise ex if reraise
@components.each do |comp|
comp.finalize(self)
end
arr.clear
@components = nil
end
def handle_exception(ex, ctx = {})
error_handlers.each do |handler|
handler.call(ex, ctx)
rescue => ex
logger.error "!!! ERROR HANDLER THREW AN ERROR !!!"
logger.error ex
logger.error ex.backtrace.join("\n") unless ex.backtrace.nil?
end
end
end
end

View File

@ -4,62 +4,68 @@ require "sidekiq"
module Sidekiq
class BasicFetch
include Loggable
# We want the fetch operation to timeout every few seconds so the thread
# can check if the process is shutting down.
TIMEOUT = 2
UnitOfWork = Struct.new(:queue, :job) {
def acknowledge
# nothing to do
end
def queue_name
queue.delete_prefix("queue:")
end
def requeue
Sidekiq.redis do |conn|
conn.rpush(queue, job)
end
end
}
def initialize(options)
raise ArgumentError, "missing queue list" unless options[:queues]
@options = options
@strictly_ordered_queues = !!@options[:strict]
@queues = @options[:queues].map { |q| "queue:#{q}" }
if @strictly_ordered_queues
@queues.uniq!
@queues << TIMEOUT
def initialize(cfg)
raise ArgumentError, "missing queue list" unless cfg.queues
qs = cfg.queues
cfg.register_component(self)
@strict = qs.size == qs.uniq.size
@timeout = { timeout: TIMEOUT }
@queues = qs.map { |q| "queue:#{q}" }
end
def finalize(cfg)
@pool = cfg.pool
@logger = cfg.logger
end
def acknowledge(uow)
# nothing to do
end
def requeue(uow)
@pool.with do |conn|
conn.rpush(uow.queue, uow.job)
end
end
def retrieve_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
work = @pool.with { |conn| conn.brpop(*queues_cmd, @timeout) }
UnitOfWork.new(*work) if work
end
def bulk_requeue(inprogress, options)
def bulk_requeue(inprogress)
return if inprogress.empty?
Sidekiq.logger.debug { "Re-queueing terminated jobs" }
debug { "Re-queueing terminated jobs" }
jobs_to_requeue = {}
inprogress.each do |unit_of_work|
jobs_to_requeue[unit_of_work.queue] ||= []
jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
end
Sidekiq.redis do |conn|
@pool.with do |conn|
conn.pipelined do
jobs_to_requeue.each do |queue, jobs|
conn.rpush(queue, jobs)
end
end
end
Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
# Creating the Redis#brpop command takes into account any
@ -68,12 +74,10 @@ module Sidekiq
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
if @strict
@queues
else
queues = @queues.shuffle!.uniq
queues << TIMEOUT
queues
@queues.shuffle!.uniq
end
end
end

View File

@ -2,7 +2,7 @@
module Sidekiq
class JobLogger
def initialize(logger = Sidekiq.logger)
def initialize(logger)
@logger = logger
end

View File

@ -68,7 +68,7 @@ module Sidekiq
DEFAULT_MAX_RETRY_ATTEMPTS = 25
def initialize(options = {})
@max_retries = Sidekiq.options.merge(options).fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
@max_retries = options.fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
end
# The global retry handler requires only the barest of data.

View File

@ -1,12 +1,10 @@
module Loggable
attr_accessor :logger
def info(*args)
@logger.debug(*args)
@logger.info(*args)
end
def warn(*args)
@logger.debug(*args)
@logger.warn(*args)
end
def debug(*args)

View File

@ -18,24 +18,22 @@ module Sidekiq
# 5. stop: hard stop the Processors by deadline.
#
# Note that only the last task requires its own Thread since it has to monitor
# the shutdown process. The other tasks are performed by other threads.
#
# the shutdown process. The other tasks are performed by other threads.
class Manager
include Util
include Loggable
attr_reader :workers
attr_reader :options
def initialize(options = {})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 10
def initialize(cfg)
@count = cfg.concurrency || 10
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@logger = cfg.logger
@done = false
@workers = Set.new
@count.times do
@workers << Processor.new(self, options)
@workers << Processor.new(self, cfg)
end
@plock = Mutex.new
end
@ -50,7 +48,7 @@ module Sidekiq
return if @done
@done = true
logger.info { "Terminating quiet workers" }
info { "Terminating quiet workers" }
@workers.each { |x| x.terminate }
fire_event(:quiet, reverse: true)
end
@ -58,7 +56,7 @@ module Sidekiq
# hack for quicker development / testing environment #2774
PAUSE_TIME = $stdout.tty? ? 0.1 : 0.5
def stop(deadline)
def stop(deadline, fetcher)
quiet
fire_event(:shutdown, reverse: true)
@ -68,7 +66,7 @@ module Sidekiq
sleep PAUSE_TIME
return if @workers.empty?
logger.info { "Pausing to allow workers to finish..." }
info { "Pausing to allow workers to finish..." }
remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
while remaining > PAUSE_TIME
return if @workers.empty?
@ -77,7 +75,7 @@ module Sidekiq
end
return if @workers.empty?
hard_shutdown
hard_shutdown(fetcher)
end
def processor_stopped(processor)
@ -90,7 +88,7 @@ module Sidekiq
@plock.synchronize do
@workers.delete(processor)
unless @done
p = Processor.new(self, options)
p = Processor.new(self, cfg)
@workers << p
p.start
end
@ -103,7 +101,7 @@ module Sidekiq
private
def hard_shutdown
def hard_shutdown(fetcher)
# We've reached the timeout and we still have busy workers.
# They must die but their jobs shall live on.
cleanup = nil
@ -114,8 +112,8 @@ module Sidekiq
if cleanup.size > 0
jobs = cleanup.map { |p| p.job }.compact
logger.warn { "Terminating #{cleanup.size} busy worker threads" }
logger.warn { "Work still in progress #{jobs.inspect}" }
warn { "Terminating #{cleanup.size} busy worker threads" }
warn { "Work still in progress #{jobs.inspect}" }
# Re-enqueue unfinished jobs
# NOTE: You may notice that we may push a job back to redis before
@ -123,8 +121,7 @@ module Sidekiq
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
strategy = @options[:fetch]
strategy.bulk_requeue(jobs, @options)
fetcher.bulk_requeue(jobs)
end
cleanup.each do |processor|

View File

@ -28,15 +28,16 @@ module Sidekiq
attr_reader :thread
attr_reader :job
def initialize(mgr, options)
def initialize(mgr, cfg)
@mgr = mgr
@down = false
@done = false
@job = nil
@thread = nil
@strategy = options[:fetch]
@reloader = options[:reloader] || proc { |&block| block.call }
@job_logger = (options[:job_logger] || Sidekiq::JobLogger).new
@strategy = Sidekiq::BasicFetch.new(cfg)
@reloader = proc { |&block| block.call }
@job_logger = Sidekiq::JobLogger.new(cfg.logger)
@retrier = Sidekiq::JobRetry.new
end

View File

@ -1,5 +1,6 @@
# frozen_string_literal: true
require "sidekiq/loggable"
require "sidekiq/manager"
require "sidekiq/fetch"
require "sidekiq/scheduled"
@ -10,36 +11,44 @@ module Sidekiq
STATS_TTL = 5 * 365 * 24 * 60 * 60 # 5 years
attr_accessor :manager, :poller, :fetcher
def initialize
@concurrency = 10
@shutdown_timeout = 25
@environment = "development"
@middleware = Sidekiq::Middleware::Chain.new
@event_hooks = {
startup: [],
quiet: [],
shutdown: [],
heartbeat: []
}
@death_handlers = []
@error_handlers = [->(ex, ctx) {
@logger.warn(Sidekiq.dump_json(ctx)) unless ctx.empty?
@logger.warn("#{ex.class.name}: #{ex.message}")
@logger.warn(ex.backtrace.join("\n")) unless ex.backtrace.nil?
}]
attr_reader :manager, :poller, :fetcher
def initialize(cfg)
@config = cfg
@done = false
end
def run
@manager = Sidekiq::Manager.new
@poller = Sidekiq::Scheduled::Poller.new
@fetcher ||= BasicFetch.new
# Run Sidekiq. If `install_signals` is true, this method does not return.
# If false, you are responsible for hooking into the process signals and
# calling `stop` to shut down the Sidekiq processor threads.
def run(install_signals: true)
@manager = Sidekiq::Manager.new(@config)
@poller = Sidekiq::Scheduled::Poller.new(@config)
@fetcher ||= BasicFetch.new(@config)
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
@poller.start
@manager.start
if install_signals
self_read = hook
begin
loop do
readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
logger.info "Shutting down"
stop
logger.info "Bye!"
# Explicitly exit so busy Processor threads won't block process shutdown.
#
# NB: slow at_exit handlers will prevent a timely exit if they take
# a while to run. If Sidekiq is getting here but the process isn't exiting,
# use the TTIN signal to determine where things are stuck.
exit(0)
end
end
end
# Stops this instance from processing any more jobs,
@ -54,17 +63,17 @@ module Sidekiq
# return until all work is complete and cleaned up.
# It can take up to the timeout to complete.
def stop
deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @options[:timeout]
deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @config.shutdown_timeout
@done = true
@manager.quiet
@poller.terminate
@manager.stop(deadline)
@manager.stop(deadline, @fetcher)
# Requeue everything in case there was a worker who grabbed work while stopped
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
@fetcher.bulk_requeue
@fetcher.bulk_requeue({})
clear_heartbeat
end
@ -75,6 +84,21 @@ module Sidekiq
private unless $TESTING
def hook
self_read, self_write = IO.pipe
sigs = %w[INT TERM TTIN TSTP]
# USR1 and USR2 don't work on the JVM
sigs << "USR2" if Sidekiq.pro? && !defined?(::JRUBY_VERSION)
sigs.each do |sig|
trap sig do
self_write.puts(sig)
end
rescue ArgumentError
puts "Signal #{sig} not supported"
end
self_read
end
def start_heartbeat
loop do
@ -211,10 +235,10 @@ module Sidekiq
"hostname" => hostname,
"started_at" => Time.now.to_f,
"pid" => ::Process.pid,
"tag" => @options[:tag] || "",
"concurrency" => @options[:concurrency],
"queues" => @options[:queues].uniq,
"labels" => @options[:labels],
"tag" => @config.tag,
"concurrency" => @config.concurrency,
"queues" => @config.queues.uniq,
"labels" => @config.labels.to_a,
"identity" => identity
}
end

View File

@ -47,13 +47,14 @@ module Sidekiq
INITIAL_WAIT = 10
def initialize(options)
def initialize(cfg)
@sleeper = ConnectionPool::TimedStack.new
@done = false
@thread = nil
@pollavg = options[:average_scheduled_poll_interval]
@calcavg = options[:poll_interval_average]
@pollavg = 5
@calcavg = nil
@enq = nil
@logger = cfg.logger
end
# Shut down this instance, will pause until the thread is dead.
@ -84,7 +85,7 @@ module Sidekiq
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
info("Scheduler exiting...")
}
end
@ -93,7 +94,7 @@ module Sidekiq
rescue => ex
# Most likely a problem with redis networking.
# Punt and try again at the next interval
logger.error ex.message
error { ex.message }
handle_exception(ex)
end
@ -106,7 +107,7 @@ module Sidekiq
rescue => ex
# if poll_interval_average hasn't been calculated yet, we can
# raise an error trying to reach Redis.
logger.error ex.message
error { ex.message }
handle_exception(ex)
sleep 5
end

View File

@ -2,14 +2,12 @@
require "socket"
require "securerandom"
require "sidekiq/exception_handler"
module Sidekiq
##
# This module is part of Sidekiq core and not intended for extensions.
#
module Util
include ExceptionHandler
def watchdog(last_words)
yield
@ -49,5 +47,31 @@ module Sidekiq
@@identity ||= "#{hostname}:#{::Process.pid}:#{process_nonce}"
end
def fire_event(runner, handlers, event, reverse: false, reraise: false, clearable: true)
arr = handlers[event]
arr.reverse! if reverse
arr.each do |block|
if block.arity == 0
block.call
else
block.call(runner)
end
rescue => ex
handle_exception(ex, {context: "Exception during Sidekiq :#{event} event.", event: event})
raise ex if reraise
end
arr.clear if clearable
end
def handle_exception(runner, ex, ctx = {})
error_handlers.each do |handler|
handler.call(ex, ctx)
rescue => ex
logger.error "!!! ERROR HANDLER THREW AN ERROR !!!"
logger.error ex
logger.error ex.backtrace.join("\n") unless ex.backtrace.nil?
end
end
end
end