1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Rework job processing in light of Rails 5's Reloader (#3235)

* Rework job processing in light of Rails 5's Reloader, see #3221

* Ignore built gems

* Documentation, testing

* Add fallback for 'retry' value server-side, fixes #3234

* Fix job hash reporting in stats

* cleanup

* Add job for AJ testing

* Push jobs with invalid JSON immediately to Dead set, fixes #3296

* Break retry logic into global and local parts, fixes #3306

* fix heisentest
This commit is contained in:
Mike Perham 2017-01-17 14:58:08 -08:00 committed by GitHub
parent 21cc53a61e
commit 701e06224c
19 changed files with 447 additions and 350 deletions

1
.gitignore vendored
View file

@ -10,3 +10,4 @@ vendor/
.bundle/
.sass-cache/
tmp/
pkg/*.gem

View file

@ -32,6 +32,10 @@ better with the new Rails 5.0 Executor.
* Rails 3.2 is no longer supported.
* Ruby 2.0 and Ruby 2.1 are no longer supported. Ruby 2.2.2+ is required.
* Jobs which can't be parsed due to invalid JSON are now pushed
immediately to the Dead set so they can be manually processed
since they will never be successfully processed as is. [#3296]
## Upgrade
As always, please upgrade Sidekiq **one major version at a time**.

View file

@ -1,5 +1,17 @@
# Sidekiq Changes
5.0.0
-----------
- **BREAKING CHANGE** Job dispatch was refactored for safer integration with
Rails 5. The **Logging** and **RetryJobs** server middleware were removed and
functionality integrated directly into Sidekiq::Processor. These aren't
commonly used public APIs so this shouldn't impact most users.
```
Sidekiq::Middleware::Server::RetryJobs -> Sidekiq::JobRetry
Sidekiq::Middleware::Server::Logging -> Sidekiq::JobLogging
```
4.2.9
-----------

View file

@ -34,7 +34,6 @@ module Sidekiq
dead_max_jobs: 10_000,
dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months
reloader: proc { |&block| block.call },
executor: proc { |&block| block.call },
}
DEFAULT_WORKER_OPTIONS = {
@ -146,13 +145,7 @@ module Sidekiq
end
def self.default_server_middleware
require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/middleware/server/logging'
Middleware::Chain.new do |m|
m.add Middleware::Server::Logging
m.add Middleware::Server::RetryJobs
end
Middleware::Chain.new
end
def self.default_worker_options=(hash)

View file

@ -282,12 +282,16 @@ module Sidekiq
def initialize(item, queue_name=nil)
@value = item
@item = item.is_a?(Hash) ? item : Sidekiq.load_json(item)
@queue = queue_name || @item['queue']
@item = if item.is_a?(Hash)
item
else
Sidekiq.load_json(item) rescue nil
end
@queue = queue_name || self['queue']
end
def klass
@item['class']
self['class']
end
def display_class
@ -318,8 +322,8 @@ module Sidekiq
arg
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
job_args = @item['wrapped'] ? args[0]["arguments"] : []
if 'ActionMailer::DeliveryJob' == (@item['wrapped'] || args[0])
job_args = self['wrapped'] ? args[0]["arguments"] : []
if 'ActionMailer::DeliveryJob' == (self['wrapped'] || args[0])
# remove MailerClass, mailer_method and 'deliver_now'
job_args.drop(3)
else
@ -331,19 +335,19 @@ module Sidekiq
end
def args
@item['args']
self['args']
end
def jid
@item['jid']
self['jid']
end
def enqueued_at
@item['enqueued_at'] ? Time.at(@item['enqueued_at']).utc : nil
self['enqueued_at'] ? Time.at(self['enqueued_at']).utc : nil
end
def created_at
Time.at(@item['created_at'] || @item['enqueued_at'] || 0).utc
Time.at(self['created_at'] || self['enqueued_at'] || 0).utc
end
def queue
@ -351,7 +355,7 @@ module Sidekiq
end
def latency
Time.now.to_f - (@item['enqueued_at'] || @item['created_at'])
Time.now.to_f - (self['enqueued_at'] || self['created_at'] || 0)
end
##
@ -364,7 +368,10 @@ module Sidekiq
end
def [](name)
@item[name]
# nil will happen if the JSON fails to parse.
# We don't guarantee Sidekiq will work with bad job JSON but we should
# make a best effort to minimize the damage.
@item ? @item[name] : nil
end
private

View file

@ -251,7 +251,6 @@ module Sidekiq
require 'sidekiq/rails'
require File.expand_path("#{options[:require]}/config/environment.rb")
else
# Rails 5+ && development mode, use Reloader
require 'sidekiq/rails'
require File.expand_path("#{options[:require]}/config/environment.rb")
end

36
lib/sidekiq/job_logger.rb Normal file
View file

@ -0,0 +1,36 @@
module Sidekiq
class JobLogger
def call(item, queue)
Sidekiq::Logging.with_context(log_context(item)) do
begin
start = Time.now
logger.info("start".freeze)
yield
logger.info("done: #{elapsed(start)} sec")
rescue Exception
logger.info("fail: #{elapsed(start)} sec")
raise
end
end
end
private
# If we're using a wrapper class, like ActiveJob, use the "wrapped"
# attribute to expose the underlying thing.
def log_context(item)
klass = item['wrapped'.freeze] || item["class".freeze]
"#{klass} JID-#{item['jid'.freeze]}#{" BID-#{item['bid'.freeze]}" if item['bid'.freeze]}"
end
def elapsed(start)
(Time.now - start).round(3)
end
def logger
Sidekiq.logger
end
end
end

232
lib/sidekiq/job_retry.rb Normal file
View file

@ -0,0 +1,232 @@
require 'sidekiq/scheduled'
require 'sidekiq/api'
module Sidekiq
##
# Automatically retry jobs that fail in Sidekiq.
# Sidekiq's retry support assumes a typical development lifecycle:
#
# 0. Push some code changes with a bug in it.
# 1. Bug causes job processing to fail, Sidekiq's middleware captures
# the job and pushes it onto a retry queue.
# 2. Sidekiq retries jobs in the retry queue multiple times with
# an exponential delay, the job continues to fail.
# 3. After a few days, a developer deploys a fix. The job is
# reprocessed successfully.
# 4. Once retries are exhausted, Sidekiq will give up and move the
# job to the Dead Job Queue (aka morgue) where it must be dealt with
# manually in the Web UI.
# 5. After 6 months on the DJQ, Sidekiq will discard the job.
#
# A job looks like:
#
# { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => true }
#
# The 'retry' option also accepts a number (in place of 'true'):
#
# { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => 5 }
#
# The job will be retried this number of times before giving up. (If simply
# 'true', Sidekiq retries 25 times)
#
# We'll add a bit more data to the job to support retries:
#
# * 'queue' - the queue to use
# * 'retry_count' - number of times we've retried so far.
# * 'error_message' - the message from the exception
# * 'error_class' - the exception class
# * 'failed_at' - the first time it failed
# * 'retried_at' - the last time it was retried
# * 'backtrace' - the number of lines of error backtrace to store
#
# We don't store the backtrace by default as that can add a lot of overhead
# to the job and everyone is using an error service, right?
#
# The default number of retries is 25 which works out to about 3 weeks
# You can change the default maximum number of retries in your initializer:
#
# Sidekiq.options[:max_retries] = 7
#
# or limit the number of retries for a particular worker with:
#
# class MyWorker
# include Sidekiq::Worker
# sidekiq_options :retry => 10
# end
#
class JobRetry
class Skip < ::RuntimeError; end
include Sidekiq::Util
DEFAULT_MAX_RETRY_ATTEMPTS = 25
def initialize(options = {})
@max_retries = Sidekiq.options.merge(options).fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
end
# The global retry handler requires only the barest of data.
# We want to be able to retry as much as possible so we don't
# require the worker to be instantiated.
def global(msg, queue)
yield
rescue Skip
raise
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
raise e unless msg['retry']
attempt_retry(nil, msg, queue, e)
end
# The local retry support means that any errors that occur within
# this block can be associated with the given worker instance.
# This is required to support the `sidekiq_retries_exhausted` block.
def local(worker, msg, queue)
yield
rescue Skip
raise
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
if msg['retry'] == nil
msg['retry'] = worker.class.get_sidekiq_options['retry']
end
raise e unless msg['retry']
attempt_retry(worker, msg, queue, e)
# We've handled this error associated with this job, don't
# need to handle it at the global level
raise Skip
end
private
# Note that +worker+ can be nil here if an error is raised before we can
# instantiate the worker instance. All access must be guarded and
# best effort.
def attempt_retry(worker, msg, queue, exception)
max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries)
msg['queue'] = if msg['retry_queue']
msg['retry_queue']
else
queue
end
# App code can stuff all sorts of crazy binary data into the error message
# that won't convert to JSON.
m = exception.message.to_s[0, 10_000]
if m.respond_to?(:scrub!)
m.force_encoding("utf-8")
m.scrub!
end
msg['error_message'] = m
msg['error_class'] = exception.class.name
count = if msg['retry_count']
msg['retried_at'] = Time.now.to_f
msg['retry_count'] += 1
else
msg['failed_at'] = Time.now.to_f
msg['retry_count'] = 0
end
if msg['backtrace'] == true
msg['error_backtrace'] = exception.backtrace
elsif !msg['backtrace']
# do nothing
elsif msg['backtrace'].to_i != 0
msg['error_backtrace'] = exception.backtrace[0...msg['backtrace'].to_i]
end
if count < max_retry_attempts
delay = delay_for(worker, count, exception)
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end
else
# Goodbye dear message, you (re)tried your best I'm sure.
retries_exhausted(worker, msg, exception)
end
raise exception
end
def retries_exhausted(worker, msg, exception)
logger.debug { "Retries exhausted for job" }
begin
block = worker && worker.sidekiq_retries_exhausted_block || Sidekiq.default_retries_exhausted
block.call(msg, exception) if block
rescue => e
handle_exception(e, { context: "Error calling retries_exhausted for #{msg['class']}", job: msg })
end
send_to_morgue(msg) unless msg['dead'] == false
end
def send_to_morgue(msg)
Sidekiq.logger.info { "Adding dead #{msg['class']} job #{msg['jid']}" }
payload = Sidekiq.dump_json(msg)
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd('dead', now, payload)
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
conn.zremrangebyrank('dead', 0, -DeadSet.max_jobs)
end
end
end
def retry_attempts_from(msg_retry, default)
if msg_retry.is_a?(Integer)
msg_retry
else
default
end
end
def delay_for(worker, count, exception)
worker && worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count)
end
# delayed_job uses the same basic formula
def seconds_to_delay(count)
(count ** 4) + 15 + (rand(30)*(count+1))
end
def retry_in(worker, count, exception)
begin
worker.sidekiq_retry_in_block.call(count, exception).to_i
rescue Exception => e
handle_exception(e, { context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{worker.class.name}, falling back to default" })
nil
end
end
def exception_caused_by_shutdown?(e, checked_causes = [])
# In Ruby 2.1.0 only, check if exception is a result of shutdown.
return false unless defined?(e.cause)
# Handle circular causes
checked_causes << e.object_id
return false if checked_causes.include?(e.cause.object_id)
e.cause.instance_of?(Sidekiq::Shutdown) ||
exception_caused_by_shutdown?(e.cause, checked_causes)
end
end
end

View file

@ -2,6 +2,15 @@ module Sidekiq
module Middleware
module Server
class ActiveRecord
def initialize
# With Rails 5+ we must use the Reloader **always**.
# The reloader handles code loading and db connection management.
if ::Rails::VERSION::MAJOR >= 5
raise ArgumentError, "Rails 5 no longer needs or uses the ActiveRecord middleware."
end
end
def call(*args)
yield
ensure

View file

@ -1,40 +0,0 @@
module Sidekiq
module Middleware
module Server
class Logging
def call(worker, item, queue)
Sidekiq::Logging.with_context(log_context(worker, item)) do
begin
start = Time.now
logger.info("start".freeze)
yield
logger.info("done: #{elapsed(start)} sec")
rescue Exception
logger.info("fail: #{elapsed(start)} sec")
raise
end
end
end
private
# If we're using a wrapper class, like ActiveJob, use the "wrapped"
# attribute to expose the underlying thing.
def log_context(worker, item)
klass = item['wrapped'.freeze] || worker.class.to_s
"#{klass} JID-#{item['jid'.freeze]}#{" BID-#{item['bid'.freeze]}" if item['bid'.freeze]}"
end
def elapsed(start)
(Time.now - start).round(3)
end
def logger
Sidekiq.logger
end
end
end
end
end

View file

@ -1,205 +0,0 @@
require 'sidekiq/scheduled'
require 'sidekiq/api'
module Sidekiq
module Middleware
module Server
##
# Automatically retry jobs that fail in Sidekiq.
# Sidekiq's retry support assumes a typical development lifecycle:
#
# 0. Push some code changes with a bug in it.
# 1. Bug causes job processing to fail, Sidekiq's middleware captures
# the job and pushes it onto a retry queue.
# 2. Sidekiq retries jobs in the retry queue multiple times with
# an exponential delay, the job continues to fail.
# 3. After a few days, a developer deploys a fix. The job is
# reprocessed successfully.
# 4. Once retries are exhausted, Sidekiq will give up and move the
# job to the Dead Job Queue (aka morgue) where it must be dealt with
# manually in the Web UI.
# 5. After 6 months on the DJQ, Sidekiq will discard the job.
#
# A job looks like:
#
# { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => true }
#
# The 'retry' option also accepts a number (in place of 'true'):
#
# { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => 5 }
#
# The job will be retried this number of times before giving up. (If simply
# 'true', Sidekiq retries 25 times)
#
# We'll add a bit more data to the job to support retries:
#
# * 'queue' - the queue to use
# * 'retry_count' - number of times we've retried so far.
# * 'error_message' - the message from the exception
# * 'error_class' - the exception class
# * 'failed_at' - the first time it failed
# * 'retried_at' - the last time it was retried
# * 'backtrace' - the number of lines of error backtrace to store
#
# We don't store the backtrace by default as that can add a lot of overhead
# to the job and everyone is using an error service, right?
#
# The default number of retry attempts is 25 which works out to about 3 weeks
# of retries. You can pass a value for the max number of retry attempts when
# adding the middleware using the options hash:
#
# Sidekiq.configure_server do |config|
# config.server_middleware do |chain|
# chain.add Sidekiq::Middleware::Server::RetryJobs, :max_retries => 7
# end
# end
#
# or limit the number of retries for a particular worker with:
#
# class MyWorker
# include Sidekiq::Worker
# sidekiq_options :retry => 10
# end
#
class RetryJobs
include Sidekiq::Util
DEFAULT_MAX_RETRY_ATTEMPTS = 25
def initialize(options = {})
@max_retries = options.fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
end
def call(worker, msg, queue)
yield
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
raise e unless msg['retry']
attempt_retry(worker, msg, queue, e)
end
private
def attempt_retry(worker, msg, queue, exception)
max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries)
msg['queue'] = if msg['retry_queue']
msg['retry_queue']
else
queue
end
# App code can stuff all sorts of crazy binary data into the error message
# that won't convert to JSON.
m = exception.message.to_s[0, 10_000]
if m.respond_to?(:scrub!)
m.force_encoding("utf-8")
m.scrub!
end
msg['error_message'] = m
msg['error_class'] = exception.class.name
count = if msg['retry_count']
msg['retried_at'] = Time.now.to_f
msg['retry_count'] += 1
else
msg['failed_at'] = Time.now.to_f
msg['retry_count'] = 0
end
if msg['backtrace'] == true
msg['error_backtrace'] = exception.backtrace
elsif !msg['backtrace']
# do nothing
elsif msg['backtrace'].to_i != 0
msg['error_backtrace'] = exception.backtrace[0...msg['backtrace'].to_i]
end
if count < max_retry_attempts
delay = delay_for(worker, count, exception)
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end
else
# Goodbye dear message, you (re)tried your best I'm sure.
retries_exhausted(worker, msg, exception)
end
raise exception
end
def retries_exhausted(worker, msg, exception)
logger.debug { "Retries exhausted for job" }
begin
block = worker.sidekiq_retries_exhausted_block || Sidekiq.default_retries_exhausted
block.call(msg, exception) if block
rescue => e
handle_exception(e, { context: "Error calling retries_exhausted for #{worker.class}", job: msg })
end
send_to_morgue(msg) unless msg['dead'] == false
end
def send_to_morgue(msg)
Sidekiq.logger.info { "Adding dead #{msg['class']} job #{msg['jid']}" }
payload = Sidekiq.dump_json(msg)
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd('dead', now, payload)
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
conn.zremrangebyrank('dead', 0, -DeadSet.max_jobs)
end
end
end
def retry_attempts_from(msg_retry, default)
if msg_retry.is_a?(Integer)
msg_retry
else
default
end
end
def delay_for(worker, count, exception)
worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count)
end
# delayed_job uses the same basic formula
def seconds_to_delay(count)
(count ** 4) + 15 + (rand(30)*(count+1))
end
def retry_in(worker, count, exception)
begin
worker.sidekiq_retry_in_block.call(count, exception).to_i
rescue Exception => e
handle_exception(e, { context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{worker.class.name}, falling back to default" })
nil
end
end
def exception_caused_by_shutdown?(e, checked_causes = [])
# In Ruby 2.1.0 only, check if exception is a result of shutdown.
return false unless defined?(e.cause)
# Handle circular causes
checked_causes << e.object_id
return false if checked_causes.include?(e.cause.object_id)
e.cause.instance_of?(Sidekiq::Shutdown) ||
exception_caused_by_shutdown?(e.cause, checked_causes)
end
end
end
end
end

View file

@ -1,6 +1,8 @@
# frozen_string_literal: true
require 'sidekiq/util'
require 'sidekiq/fetch'
require 'sidekiq/job_logger'
require 'sidekiq/job_retry'
require 'thread'
require 'concurrent/map'
require 'concurrent/atomic/atomic_fixnum'
@ -37,7 +39,8 @@ module Sidekiq
@thread = nil
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
@reloader = Sidekiq.options[:reloader]
@executor = Sidekiq.options[:executor]
@logging = Sidekiq::JobLogger.new
@retrier = Sidekiq::JobRetry.new
end
def terminate(wait=false)
@ -116,30 +119,59 @@ module Sidekiq
nil
end
def dispatch(job_hash, queue)
# since middleware can mutate the job hash
# we clone here so we report the original
# job structure to the Web UI
pristine = cloned(job_hash)
@retrier.global(job_hash, queue) do
@logging.call(job_hash, queue) do
stats(pristine, queue) do
# Rails 5 requires a Reloader to wrap code execution. In order to
# constantize the worker and instantiate an instance, we have to call
# the Reloader. It handles code loading, db connection management, etc.
# Effectively this block denotes a "unit of work" to Rails.
@reloader.call do
klass = job_hash['class'.freeze].constantize
worker = klass.new
worker.jid = job_hash['jid'.freeze]
@retrier.local(worker, job_hash, queue) do
yield worker
end
end
end
end
end
end
def process(work)
jobstr = work.job
queue = work.queue_name
ack = false
begin
job_hash = Sidekiq.load_json(jobstr)
@reloader.call do
klass = job_hash['class'.freeze].constantize
worker = klass.new
worker.jid = job_hash['jid'.freeze]
stats(worker, job_hash, queue) do
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
@executor.call do
# Only ack if we either attempted to start this job or
# successfully completed it. This prevents us from
# losing jobs if a middleware raises an exception before yielding
ack = true
execute_job(worker, cloned(job_hash['args'.freeze]))
end
end
end
# Treat malformed JSON like a process crash -- don't acknowledge it.
# * In Sidekiq, the error will be logged but job discarded.
# * In Sidekiq Pro, the error will be logged and the job retried when
# it is recovered by the reliability algorithm. The job may act like
# a poison pill and never execute until manually removed but job loss
# is considered worse.
job_hash = nil
begin
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
Sidekiq.logger.error { "Pushing job to dead queue due to invalid JSON: #{ex}" }
send_to_morgue(jobstr)
ack = true
raise
end
ack = true
dispatch(job_hash, queue) do |worker|
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
execute_job(worker, cloned(job_hash['args'.freeze]))
end
end
rescue Sidekiq::Shutdown
# Had to force kill this job because it didn't finish
@ -154,6 +186,17 @@ module Sidekiq
end
end
def send_to_morgue(msg)
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd('dead', now, msg)
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
conn.zremrangebyrank('dead', 0, -DeadSet.max_jobs)
end
end
end
def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end
@ -166,9 +209,9 @@ module Sidekiq
PROCESSED = Concurrent::AtomicFixnum.new
FAILURE = Concurrent::AtomicFixnum.new
def stats(worker, job_hash, queue)
def stats(job_hash, queue)
tid = thread_identity
WORKER_STATE[tid] = {:queue => queue, :payload => cloned(job_hash), :run_at => Time.now.to_i }
WORKER_STATE[tid] = {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i }
begin
yield
@ -184,8 +227,8 @@ module Sidekiq
# Deep clone the arguments passed to the worker so that if
# the job fails, what is pushed back onto Redis hasn't
# been mutated by the worker.
def cloned(ary)
Marshal.load(Marshal.dump(ary))
def cloned(thing)
Marshal.load(Marshal.dump(thing))
end
end

View file

@ -27,40 +27,12 @@ module Sidekiq
#
Sidekiq.configure_server do |_|
if ::Rails::VERSION::MAJOR >= 5
# The reloader also takes care of ActiveRecord but is incompatible with
# the ActiveRecord middleware so make sure it's not in the chain already.
if defined?(Sidekiq::Middleware::Server::ActiveRecord) && Sidekiq.server_middleware.exists?(Sidekiq::Middleware::Server::ActiveRecord)
raise ArgumentError, "You are using the Sidekiq ActiveRecord middleware and the new Rails 5 reloader which are incompatible. Please remove the ActiveRecord middleware from your Sidekiq middleware configuration."
elsif ::Rails.application.config.cache_classes
# The reloader API has proven to be troublesome under load in production.
# We won't use it at all when classes are cached, see #3154
Sidekiq.logger.debug { "Autoload disabled in #{::Rails.env}, Sidekiq will not reload changed classes" }
Sidekiq.options[:executor] = Sidekiq::Rails::Executor.new
else
Sidekiq.logger.debug { "Enabling Rails 5+ live code reloading, so hot!" }
Sidekiq.options[:reloader] = Sidekiq::Rails::Reloader.new
Psych::Visitors::ToRuby.prepend(Sidekiq::Rails::PsychAutoload)
end
Sidekiq.options[:reloader] = Sidekiq::Rails::Reloader.new
Psych::Visitors::ToRuby.prepend(Sidekiq::Rails::PsychAutoload)
end
end
end
class Executor
def initialize(app = ::Rails.application)
@app = app
end
def call
@app.executor.wrap do
yield
end
end
def inspect
"#<Sidekiq::Rails::Executor @app=#{@app.class.name}>"
end
end
class Reloader
def initialize(app = ::Rails.application)
@app = app

View file

@ -0,0 +1,8 @@
class SomeJob < ApplicationJob
queue_as :default
def perform(*args)
puts "What's up?!?!"
# Do something later
end
end

View file

@ -90,8 +90,8 @@ class TestMiddleware < Sidekiq::Test
it 'correctly replaces middleware when using middleware with options in the initializer' do
chain = Sidekiq::Middleware::Chain.new
chain.add Sidekiq::Middleware::Server::RetryJobs
chain.add Sidekiq::Middleware::Server::RetryJobs, {:max_retries => 5}
chain.add NonYieldingMiddleware
chain.add NonYieldingMiddleware, {:foo => 5}
assert_equal 1, chain.count
end

View file

@ -81,9 +81,25 @@ class TestProcessor < Sidekiq::Test
Sidekiq.error_handlers.pop
end
it 'handles exceptions raised by the job' do
it 'handles invalid JSON' do
ds = Sidekiq::DeadSet.new
ds.clear
job_hash = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
msg = Sidekiq.dump_json(job_hash)
job = work(msg[0...-2])
ds = Sidekiq::DeadSet.new
assert_equal 0, ds.size
begin
@processor.instance_variable_set(:'@job', job)
@processor.process(job)
rescue JSON::ParserError
end
assert_equal 1, ds.size
end
it 'handles exceptions raised by the job' do
job_hash = { 'class' => MockWorker.to_s, 'args' => ['boom'], 'jid' => '123987123' }
msg = Sidekiq.dump_json(job_hash)
job = work(msg)
begin
@processor.instance_variable_set(:'@job', job)
@ -93,7 +109,7 @@ class TestProcessor < Sidekiq::Test
assert_equal 1, errors.count
assert_instance_of TestException, errors.first[:exception]
assert_equal msg, errors.first[:context][:jobstr]
assert_equal job_hash, errors.first[:context][:job]
assert_equal job_hash['jid'], errors.first[:context][:job]['jid']
end
it 'handles exceptions raised by the reloader' do
@ -152,7 +168,8 @@ class TestProcessor < Sidekiq::Test
describe 'middleware throws an exception before processing the work' do
let(:raise_before_yield) { true }
it 'does not ack' do
it 'acks the job' do
work.expect(:acknowledge, nil)
begin
@processor.process(work)
flunk "Expected #process to raise exception"

View file

@ -2,7 +2,7 @@
# frozen_string_literal: true
require_relative 'helper'
require 'sidekiq/scheduled'
require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/job_retry'
class TestRetry < Sidekiq::Test
describe 'middleware' do
@ -19,16 +19,25 @@ class TestRetry < Sidekiq::Test
end
def handler(options={})
@handler ||= Sidekiq::Middleware::Server::RetryJobs.new(options)
@handler ||= Sidekiq::JobRetry.new(options)
end
def job(options={})
@job ||= { 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => true }.merge(options)
end
it 'retries with a nil worker' do
assert_raises RuntimeError do
handler.global(job, 'default') do
raise "boom"
end
end
assert_equal 1, Sidekiq::RetrySet.new.size
end
it 'allows disabling retry' do
assert_raises RuntimeError do
handler.call(worker, job('retry' => false), 'default') do
handler.local(worker, job('retry' => false), 'default') do
raise "kerblammo!"
end
end
@ -37,7 +46,7 @@ class TestRetry < Sidekiq::Test
it 'allows a numeric retry' do
assert_raises RuntimeError do
handler.call(worker, job('retry' => 2), 'default') do
handler.local(worker, job('retry' => 2), 'default') do
raise "kerblammo!"
end
end
@ -47,7 +56,7 @@ class TestRetry < Sidekiq::Test
it 'allows 0 retry => no retry and dead queue' do
assert_raises RuntimeError do
handler.call(worker, job('retry' => 0), 'default') do
handler.local(worker, job('retry' => 0), 'default') do
raise "kerblammo!"
end
end
@ -59,7 +68,7 @@ class TestRetry < Sidekiq::Test
skip 'skipped! test requires ruby 2.1+' if RUBY_VERSION <= '2.1.0'
assert_raises RuntimeError do
handler.call(worker, job, 'default') do
handler.local(worker, job, 'default') do
raise "kerblammo! #{195.chr}"
end
end
@ -71,7 +80,7 @@ class TestRetry < Sidekiq::Test
max_retries = 7
1.upto(max_retries + 1) do
assert_raises RuntimeError do
handler(:max_retries => max_retries).call(worker, job, 'default') do
handler(:max_retries => max_retries).local(worker, job, 'default') do
raise "kerblammo!"
end
end
@ -84,7 +93,7 @@ class TestRetry < Sidekiq::Test
it 'saves backtraces' do
c = nil
assert_raises RuntimeError do
handler.call(worker, job('backtrace' => true), 'default') do
handler.local(worker, job('backtrace' => true), 'default') do
c = caller(0); raise "kerblammo!"
end
end
@ -95,7 +104,7 @@ class TestRetry < Sidekiq::Test
it 'saves partial backtraces' do
c = nil
assert_raises RuntimeError do
handler.call(worker, job('backtrace' => 3), 'default') do
handler.local(worker, job('backtrace' => 3), 'default') do
c = caller(0)[0...3]; raise "kerblammo!"
end
end
@ -106,7 +115,7 @@ class TestRetry < Sidekiq::Test
it 'handles a new failed message' do
assert_raises RuntimeError do
handler.call(worker, job, 'default') do
handler.local(worker, job, 'default') do
raise "kerblammo!"
end
end
@ -123,7 +132,7 @@ class TestRetry < Sidekiq::Test
assert_equal 0, rs.size
msg = { 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => true }
assert_raises Sidekiq::Shutdown do
handler.call(worker, msg, 'default') do
handler.local(worker, msg, 'default') do
raise Sidekiq::Shutdown
end
end
@ -137,7 +146,7 @@ class TestRetry < Sidekiq::Test
assert_equal 0, rs.size
msg = { 'class' => 'Bob', 'args' => [1,2,'foo'], 'retry' => true }
assert_raises Sidekiq::Shutdown do
handler.call(worker, msg, 'default') do
handler.local(worker, msg, 'default') do
begin
raise Sidekiq::Shutdown
rescue Interrupt
@ -154,7 +163,7 @@ class TestRetry < Sidekiq::Test
rs = Sidekiq::RetrySet.new
assert_equal 0, rs.size
assert_raises Sidekiq::Shutdown do
handler.call(worker, job, 'default') do
handler.local(worker, job, 'default') do
begin
raise Sidekiq::Shutdown
rescue Interrupt
@ -171,7 +180,7 @@ class TestRetry < Sidekiq::Test
it 'allows a retry queue' do
assert_raises RuntimeError do
handler.call(worker, job("retry_queue" => 'retryx'), 'default') do
handler.local(worker, job("retry_queue" => 'retryx'), 'default') do
raise "kerblammo!"
end
end
@ -187,7 +196,7 @@ class TestRetry < Sidekiq::Test
now = Time.now.to_f
msg = {"queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>10}
assert_raises RuntimeError do
handler.call(worker, job(msg), 'default') do
handler.local(worker, job(msg), 'default') do
raise "kerblammo!"
end
end
@ -208,7 +217,7 @@ class TestRetry < Sidekiq::Test
now = Time.now.to_f
msg = {"queue"=>"default", "error_message"=>"kerblammo!", "error_class"=>"RuntimeError", "failed_at"=>now, "retry_count"=>25}
assert_raises RuntimeError do
handler.call(worker, job(msg), 'default') do
handler.local(worker, job(msg), 'default') do
raise "kerblammo!"
end
end
@ -293,7 +302,7 @@ class TestRetry < Sidekiq::Test
end
it "does not recurse infinitely checking if it's a shutdown" do
assert(!Sidekiq::Middleware::Server::RetryJobs.new.send(
assert(!Sidekiq::JobRetry.new.send(
:exception_caused_by_shutdown?, @error))
end
end
@ -317,7 +326,7 @@ class TestRetry < Sidekiq::Test
end
it "does not recurse infinitely checking if it's a shutdown" do
assert(!Sidekiq::Middleware::Server::RetryJobs.new.send(
assert(!Sidekiq::JobRetry.new.send(
:exception_caused_by_shutdown?, @error))
end
end

View file

@ -1,6 +1,6 @@
# encoding: utf-8
require_relative 'helper'
require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/job_retry'
class TestRetryExhausted < Sidekiq::Test
describe 'sidekiq_retries_exhausted' do
@ -52,7 +52,7 @@ class TestRetryExhausted < Sidekiq::Test
end
def handler(options={})
@handler ||= Sidekiq::Middleware::Server::RetryJobs.new(options)
@handler ||= Sidekiq::JobRetry.new(options)
end
def job(options={})
@ -60,7 +60,7 @@ class TestRetryExhausted < Sidekiq::Test
end
it 'does not run exhausted block when job successful on first run' do
handler.call(new_worker, job('retry' => 2), 'default') do
handler.local(new_worker, job('retry' => 2), 'default') do
# successful
end
@ -68,7 +68,7 @@ class TestRetryExhausted < Sidekiq::Test
end
it 'does not run exhausted block when job successful on last retry' do
handler.call(new_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
handler.local(new_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
# successful
end
@ -77,7 +77,7 @@ class TestRetryExhausted < Sidekiq::Test
it 'does not run exhausted block when retries not exhausted yet' do
assert_raises RuntimeError do
handler.call(new_worker, job('retry' => 1), 'default') do
handler.local(new_worker, job('retry' => 1), 'default') do
raise 'kerblammo!'
end
end
@ -87,7 +87,7 @@ class TestRetryExhausted < Sidekiq::Test
it 'runs exhausted block when retries exhausted' do
assert_raises RuntimeError do
handler.call(new_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
handler.local(new_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
raise 'kerblammo!'
end
end
@ -98,7 +98,7 @@ class TestRetryExhausted < Sidekiq::Test
it 'passes job and exception to retries exhausted block' do
raised_error = assert_raises RuntimeError do
handler.call(new_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
handler.local(new_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
raise 'kerblammo!'
end
end
@ -110,7 +110,7 @@ class TestRetryExhausted < Sidekiq::Test
it 'passes job to retries exhausted block' do
raised_error = assert_raises RuntimeError do
handler.call(old_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
handler.local(old_worker, job('retry_count' => 0, 'retry' => 1), 'default') do
raise 'kerblammo!'
end
end
@ -134,7 +134,7 @@ class TestRetryExhausted < Sidekiq::Test
end
f = Foobar.new
raised_error = assert_raises RuntimeError do
handler.call(f, job('retry_count' => 0, 'retry' => 1), 'default') do
handler.local(f, job('retry_count' => 0, 'retry' => 1), 'default') do
raise 'kerblammo!'
end
end

View file

@ -34,7 +34,7 @@
<td>
<form action="<%= root_path %>queues/<%= @name %>/delete" method="post">
<%= csrf_tag %>
<input name="key_val" value="<%= h Sidekiq.dump_json(msg.item) %>" type="hidden" />
<input name="key_val" value="<%= h msg.value %>" type="hidden" />
<input class="btn btn-danger btn-xs" type="submit" name="delete" value="<%= t('Delete') %>" data-confirm="<%= t('AreYouSure') %>" />
</form>
</td>