1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00
rails--rails/activejob/lib/active_job/exceptions.rb
bogdanvlviv b6b5a7ac52
retry_job should publish enqueue_retry.active_job notification
Also this commit removes `:wait` from payload of
`retry_stopped.active_job`.

Related to https://github.com/rails/rails/pull/33751#discussion_r214140008
Follow up #33751

/cc @kaspth, @rafaelfranca
2018-09-16 17:46:24 +03:00

158 lines
6.4 KiB
Ruby

# frozen_string_literal: true
require "active_support/core_ext/numeric/time"
module ActiveJob
# Provides behavior for retrying and discarding jobs on exceptions.
module Exceptions
extend ActiveSupport::Concern
module ClassMethods
# Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts.
# The number of attempts includes the total executions of a job, not just the retried executions.
# If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to
# bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a
# holding queue for inspection.
#
# You can also pass a block that'll be invoked if the retry attempts fail for custom logic rather than letting
# the exception bubble up. This block is yielded with the job instance as the first and the error instance as the second parameter.
#
# ==== Options
# * <tt>:wait</tt> - Re-enqueues the job with a delay specified either in seconds (default: 3 seconds),
# as a computing proc that the number of executions so far as an argument, or as a symbol reference of
# <tt>:exponentially_longer</tt>, which applies the wait algorithm of <tt>(executions ** 4) + 2</tt>
# (first wait 3s, then 18s, then 83s, etc)
# * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts),
# attempts here refers to the total number of times the job is executed, not just retried executions
# * <tt>:queue</tt> - Re-enqueues the job on a different queue
# * <tt>:priority</tt> - Re-enqueues the job with a different priority
#
# ==== Examples
#
# class RemoteServiceJob < ActiveJob::Base
# retry_on CustomAppException # defaults to 3s wait, 5 attempts
# retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 }
# retry_on(YetAnotherCustomAppException) do |job, error|
# ExceptionNotifier.caught(error)
# end
# retry_on ActiveRecord::Deadlocked, wait: 5.seconds, attempts: 3
# retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10
#
# def perform(*args)
# # Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific
# # Might raise ActiveRecord::Deadlocked when a local db deadlock is detected
# # Might raise Net::OpenTimeout when the remote service is down
# end
# end
def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil)
rescue_from(*exceptions) do |error|
if executions < attempts
retry_job wait: determine_delay(wait), queue: queue, priority: priority, error: error
else
payload = {
job: self,
adapter: self.class.queue_adapter,
error: error
}
if block_given?
ActiveSupport::Notifications.instrument("retry_stopped.active_job", payload) do
yield self, error
end
else
ActiveSupport::Notifications.instrument("retry_stopped.active_job", payload)
raise error
end
end
end
end
# Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job,
# like an Active Record, is no longer available, and the job is thus no longer relevant.
#
# You can also pass a block that'll be invoked. This block is yielded with the job instance as the first and the error instance as the second parameter.
#
# ==== Example
#
# class SearchIndexingJob < ActiveJob::Base
# discard_on ActiveJob::DeserializationError
# discard_on(CustomAppException) do |job, error|
# ExceptionNotifier.caught(error)
# end
#
# def perform(record)
# # Will raise ActiveJob::DeserializationError if the record can't be deserialized
# # Might raise CustomAppException for something domain specific
# end
# end
def discard_on(*exceptions)
rescue_from(*exceptions) do |error|
payload = {
job: self,
adapter: self.class.queue_adapter,
error: error
}
ActiveSupport::Notifications.instrument("discard.active_job", payload) do
if block_given?
yield self, error
end
end
end
end
end
# Reschedules the job to be re-executed. This is useful in combination
# with the +rescue_from+ option. When you rescue an exception from your job
# you can ask Active Job to retry performing your job.
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay in seconds
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
# class SiteScraperJob < ActiveJob::Base
# rescue_from(ErrorLoadingSite) do
# retry_job queue: :low_priority
# end
#
# def perform(*args)
# # raise ErrorLoadingSite if cannot scrape
# end
# end
def retry_job(options = {})
payload = {
job: self,
adapter: self.class.queue_adapter,
error: options[:error],
wait: options[:wait]
}
ActiveSupport::Notifications.instrument("enqueue_retry.active_job", payload) do
enqueue options
end
end
private
def determine_delay(seconds_or_duration_or_algorithm)
case seconds_or_duration_or_algorithm
when :exponentially_longer
(executions**4) + 2
when ActiveSupport::Duration
duration = seconds_or_duration_or_algorithm
duration.to_i
when Integer
seconds = seconds_or_duration_or_algorithm
seconds
when Proc
algorithm = seconds_or_duration_or_algorithm
algorithm.call(executions)
else
raise "Couldn't determine a delay based on #{seconds_or_duration_or_algorithm.inspect}"
end
end
end
end