1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00
rails--rails/activejob/lib/active_job/enqueuing.rb
Daniel Morton ee60ce5606 Communicate enqueue failures to callers of perform_later
There is presently no clean way of telling a caller of `perform_later`
the reason why a job failed to enqueue. When the job is enqueued
successfully, the job object itself is returned, but when the job can
not be enqueued, only `false` is returned. This does not allow callers
to distinguish between classes of failures.

One important class of failures is when the job backend experiences a
network partition when communicating with its underlying datastore. It
is entirely possible for that network partition to recover and as such,
code attempting to enqueue a job may wish to take action to reenqueue
that job after a brief delay. This is distinguished from the class of
failures where due a business rule defined in a callback in the
application, a job fails to enqueue and should not be retried.

This PR changes the following:

- Allows a block to be passed to the `perform_later` method. After the
  `enqueue` method is executed, but before the result is returned, the
  job will be yielded to the block. This allows the code invoking the
  `perform_later` method to inspect the job object, even in failure
  scenarios.

- Adds an exception `EnqueueError` which job adapters can raise if they
  detect a problem specific to their underlying implementation or
  infrastructure during the enqueue process.

- Adds two properties to the job base class: `successfully_enqueued` and
  `enqueue_error`. `enqueue_error` will be populated by the `enqueue`
  method if it rescues an `EnqueueError` raised by the job backend.
  `successfully_enqueued` will be true if the job is not rejected by
  callbacks and does not cause the job backend to raise an
  `EnqueueError` and will be `false` otherwise.

This will allow developers to do something like the following:

    MyJob.perform_later do |job|
      unless job.successfully_enqueued?
        if job.enqueue_error&.message == "Redis was unavailable"
          # invoke some code that will retry the job after a delay
        end
      end
    end
2021-02-05 16:32:43 -05:00

86 lines
2.9 KiB
Ruby

# frozen_string_literal: true
require "active_job/arguments"
module ActiveJob
# Provides behavior for enqueuing jobs.
# Can be raised by adapters if they wish to communicate to the caller a reason
# why the adapter was unexpectedly unable to enqueue a job.
class EnqueueError < StandardError; end
module Enqueuing
extend ActiveSupport::Concern
# Includes the +perform_later+ method for job initialization.
module ClassMethods
# Push a job onto the queue. By default the arguments must be either String,
# Integer, Float, NilClass, TrueClass, FalseClass, BigDecimal, Symbol, Date,
# Time, DateTime, ActiveSupport::TimeWithZone, ActiveSupport::Duration,
# Hash, ActiveSupport::HashWithIndifferentAccess, Array or
# GlobalID::Identification instances, although this can be extended by adding
# custom serializers.
#
# Returns an instance of the job class queued with arguments available in
# Job#arguments or false if the enqueue did not succeed.
#
# After the attempted enqueue, the job will be yielded to an optional block.
def perform_later(*args)
job = job_or_instantiate(*args)
enqueue_result = job.enqueue
yield job if block_given?
enqueue_result
end
ruby2_keywords(:perform_later) if respond_to?(:ruby2_keywords, true)
private
def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
ruby2_keywords(:job_or_instantiate) if respond_to?(:ruby2_keywords, true)
end
# Enqueues the job to be performed by the queue adapter.
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
# * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
# my_job_instance.enqueue
# my_job_instance.enqueue wait: 5.minutes
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
# my_job_instance.enqueue priority: 10
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
self.successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
else
queue_adapter.enqueue self
end
self.successfully_enqueued = true
rescue EnqueueError => e
self.enqueue_error = e
end
if successfully_enqueued?
self
else
false
end
end
end
end