mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
3b5862492a
In #2531, we saw how an IO exception in the logger could cause a job to fail and be deleted before it reached the RetryJobs block, causing job loss. To fix this, we disabled job acknowledgement until job execution starts but this has the bad side effect of duplicating jobs if the user is running a reliable scheme and the error happens after the RetryJobs middleware but before execution starts. Instead we flip the middleware ordering; logging now happens within the retry block. We would lose context-specific logging within retry so we move the context log setup out of the middleware into the Processor. With these changes, we can properly retry and acknowledge even if there are errors within the initial server middleware and executor calls. This code path has been reimplemented in Sidekiq 5.0 so this change only applies to 4.x.
228 lines
6 KiB
Ruby
228 lines
6 KiB
Ruby
# encoding: utf-8
|
||
# frozen_string_literal: true
|
||
require 'sidekiq/version'
|
||
fail "Sidekiq #{Sidekiq::VERSION} does not support Ruby versions below 2.2.2." if RUBY_PLATFORM != 'java' && RUBY_VERSION < '2.2.2'
|
||
|
||
require 'sidekiq/logging'
|
||
require 'sidekiq/client'
|
||
require 'sidekiq/worker'
|
||
require 'sidekiq/redis_connection'
|
||
require 'sidekiq/delay'
|
||
|
||
require 'json'
|
||
|
||
module Sidekiq
|
||
NAME = 'Sidekiq'
|
||
LICENSE = 'See LICENSE and the LGPL-3.0 for licensing details.'
|
||
|
||
DEFAULTS = {
|
||
queues: [],
|
||
labels: [],
|
||
concurrency: 25,
|
||
require: '.',
|
||
environment: nil,
|
||
timeout: 8,
|
||
poll_interval_average: nil,
|
||
average_scheduled_poll_interval: 15,
|
||
error_handlers: [],
|
||
lifecycle_events: {
|
||
startup: [],
|
||
quiet: [],
|
||
shutdown: [],
|
||
heartbeat: [],
|
||
},
|
||
dead_max_jobs: 10_000,
|
||
dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months
|
||
reloader: proc { |&block| block.call },
|
||
}
|
||
|
||
DEFAULT_WORKER_OPTIONS = {
|
||
'retry' => true,
|
||
'queue' => 'default'
|
||
}
|
||
|
||
FAKE_INFO = {
|
||
"redis_version" => "9.9.9",
|
||
"uptime_in_days" => "9999",
|
||
"connected_clients" => "9999",
|
||
"used_memory_human" => "9P",
|
||
"used_memory_peak_human" => "9P"
|
||
}.freeze
|
||
|
||
def self.❨╯°□°❩╯︵┻━┻
|
||
puts "Calm down, yo."
|
||
end
|
||
|
||
def self.options
|
||
@options ||= DEFAULTS.dup
|
||
end
|
||
def self.options=(opts)
|
||
@options = opts
|
||
end
|
||
|
||
##
|
||
# Configuration for Sidekiq server, use like:
|
||
#
|
||
# Sidekiq.configure_server do |config|
|
||
# config.redis = { :namespace => 'myapp', :size => 25, :url => 'redis://myhost:8877/0' }
|
||
# config.server_middleware do |chain|
|
||
# chain.add MyServerHook
|
||
# end
|
||
# end
|
||
def self.configure_server
|
||
yield self if server?
|
||
end
|
||
|
||
##
|
||
# Configuration for Sidekiq client, use like:
|
||
#
|
||
# Sidekiq.configure_client do |config|
|
||
# config.redis = { :namespace => 'myapp', :size => 1, :url => 'redis://myhost:8877/0' }
|
||
# end
|
||
def self.configure_client
|
||
yield self unless server?
|
||
end
|
||
|
||
def self.server?
|
||
defined?(Sidekiq::CLI)
|
||
end
|
||
|
||
def self.redis
|
||
raise ArgumentError, "requires a block" unless block_given?
|
||
redis_pool.with do |conn|
|
||
retryable = true
|
||
begin
|
||
yield conn
|
||
rescue Redis::CommandError => ex
|
||
#2550 Failover can cause the server to become a slave, need
|
||
# to disconnect and reopen the socket to get back to the master.
|
||
(conn.disconnect!; retryable = false; retry) if retryable && ex.message =~ /READONLY/
|
||
raise
|
||
end
|
||
end
|
||
end
|
||
|
||
def self.redis_info
|
||
redis do |conn|
|
||
begin
|
||
# admin commands can't go through redis-namespace starting
|
||
# in redis-namespace 2.0
|
||
if conn.respond_to?(:namespace)
|
||
conn.redis.info
|
||
else
|
||
conn.info
|
||
end
|
||
rescue Redis::CommandError => ex
|
||
#2850 return fake version when INFO command has (probably) been renamed
|
||
raise unless ex.message =~ /unknown command/
|
||
FAKE_INFO
|
||
end
|
||
end
|
||
end
|
||
|
||
def self.redis_pool
|
||
@redis ||= Sidekiq::RedisConnection.create
|
||
end
|
||
|
||
def self.redis=(hash)
|
||
@redis = if hash.is_a?(ConnectionPool)
|
||
hash
|
||
else
|
||
Sidekiq::RedisConnection.create(hash)
|
||
end
|
||
end
|
||
|
||
def self.client_middleware
|
||
@client_chain ||= Middleware::Chain.new
|
||
yield @client_chain if block_given?
|
||
@client_chain
|
||
end
|
||
|
||
def self.server_middleware
|
||
@server_chain ||= default_server_middleware
|
||
yield @server_chain if block_given?
|
||
@server_chain
|
||
end
|
||
|
||
def self.default_server_middleware
|
||
Middleware::Chain.new
|
||
end
|
||
|
||
def self.default_worker_options=(hash)
|
||
@default_worker_options = default_worker_options.merge(hash.stringify_keys)
|
||
end
|
||
def self.default_worker_options
|
||
defined?(@default_worker_options) ? @default_worker_options : DEFAULT_WORKER_OPTIONS
|
||
end
|
||
|
||
# Sidekiq.configure_server do |config|
|
||
# config.default_retries_exhausted = -> (job, ex) do
|
||
# end
|
||
# end
|
||
def self.default_retries_exhausted=(prok)
|
||
@default_retries_exhausted = prok
|
||
end
|
||
@default_retries_exhausted = ->(job, ex) { }
|
||
def self.default_retries_exhausted
|
||
@default_retries_exhausted
|
||
end
|
||
|
||
def self.load_json(string)
|
||
JSON.parse(string)
|
||
end
|
||
def self.dump_json(object)
|
||
JSON.generate(object)
|
||
end
|
||
|
||
def self.logger
|
||
Sidekiq::Logging.logger
|
||
end
|
||
def self.logger=(log)
|
||
Sidekiq::Logging.logger = log
|
||
end
|
||
|
||
# How frequently Redis should be checked by a random Sidekiq process for
|
||
# scheduled and retriable jobs. Each individual process will take turns by
|
||
# waiting some multiple of this value.
|
||
#
|
||
# See sidekiq/scheduled.rb for an in-depth explanation of this value
|
||
def self.average_scheduled_poll_interval=(interval)
|
||
self.options[:average_scheduled_poll_interval] = interval
|
||
end
|
||
|
||
# Register a proc to handle any error which occurs within the Sidekiq process.
|
||
#
|
||
# Sidekiq.configure_server do |config|
|
||
# config.error_handlers << proc {|ex,ctx_hash| MyErrorService.notify(ex, ctx_hash) }
|
||
# end
|
||
#
|
||
# The default error handler logs errors to Sidekiq.logger.
|
||
def self.error_handlers
|
||
self.options[:error_handlers]
|
||
end
|
||
|
||
# Register a block to run at a point in the Sidekiq lifecycle.
|
||
# :startup, :quiet or :shutdown are valid events.
|
||
#
|
||
# Sidekiq.configure_server do |config|
|
||
# config.on(:shutdown) do
|
||
# puts "Goodbye cruel world!"
|
||
# end
|
||
# end
|
||
def self.on(event, &block)
|
||
raise ArgumentError, "Symbols only please: #{event}" unless event.is_a?(Symbol)
|
||
raise ArgumentError, "Invalid event name: #{event}" unless options[:lifecycle_events].key?(event)
|
||
options[:lifecycle_events][event] << block
|
||
end
|
||
|
||
# We are shutting down Sidekiq but what about workers that
|
||
# are working on some long job? This error is
|
||
# raised in workers that have not finished within the hard
|
||
# timeout limit. This is needed to rollback db transactions,
|
||
# otherwise Ruby's Thread#kill will commit. See #377.
|
||
# DO NOT RESCUE THIS ERROR IN YOUR WORKERS
|
||
class Shutdown < Interrupt; end
|
||
|
||
end
|
||
|
||
require 'sidekiq/rails' if defined?(::Rails::Engine)
|