1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00
rails--rails/activejob/lib/active_job/core.rb

188 lines
6.2 KiB
Ruby
Raw Normal View History

2017-07-09 13:49:52 -04:00
# frozen_string_literal: true
2014-08-25 10:34:50 -04:00
module ActiveJob
# Provides general behavior that will be included into every Active Job
# object that inherits from ActiveJob::Base.
2014-08-25 10:34:50 -04:00
module Core
extend ActiveSupport::Concern
# Job arguments
attr_accessor :arguments
attr_writer :serialized_arguments
2014-08-25 10:34:50 -04:00
# Timestamp when the job should be performed
attr_accessor :scheduled_at
2014-08-25 10:34:50 -04:00
# Job Identifier
attr_accessor :job_id
2014-08-25 10:34:50 -04:00
# Queue in which the job will reside.
attr_writer :queue_name
# Priority that the job will have (lower is more priority).
attr_writer :priority
2015-03-18 05:48:26 -04:00
# ID optionally provided by adapter
attr_accessor :provider_job_id
# Number of times this job has been executed (which increments on every retry, like after an exception).
attr_accessor :executions
# Hash that contains the number of times this job handled errors for each specific retry_on declaration.
# Keys are the string representation of the exceptions listed in the retry_on declaration,
# while its associated value holds the number of executions where the corresponding retry_on
# declaration handled one of its listed exceptions.
attr_accessor :exception_executions
# I18n.locale to be used during the job.
attr_accessor :locale
# Timezone to be used during the job.
attr_accessor :timezone
2014-08-25 10:34:50 -04:00
2019-02-25 04:18:39 -05:00
# Track when a job was enqueued
attr_accessor :enqueued_at
Communicate enqueue failures to callers of perform_later There is presently no clean way of telling a caller of `perform_later` the reason why a job failed to enqueue. When the job is enqueued successfully, the job object itself is returned, but when the job can not be enqueued, only `false` is returned. This does not allow callers to distinguish between classes of failures. One important class of failures is when the job backend experiences a network partition when communicating with its underlying datastore. It is entirely possible for that network partition to recover and as such, code attempting to enqueue a job may wish to take action to reenqueue that job after a brief delay. This is distinguished from the class of failures where due a business rule defined in a callback in the application, a job fails to enqueue and should not be retried. This PR changes the following: - Allows a block to be passed to the `perform_later` method. After the `enqueue` method is executed, but before the result is returned, the job will be yielded to the block. This allows the code invoking the `perform_later` method to inspect the job object, even in failure scenarios. - Adds an exception `EnqueueError` which job adapters can raise if they detect a problem specific to their underlying implementation or infrastructure during the enqueue process. - Adds two properties to the job base class: `successfully_enqueued` and `enqueue_error`. `enqueue_error` will be populated by the `enqueue` method if it rescues an `EnqueueError` raised by the job backend. `successfully_enqueued` will be true if the job is not rejected by callbacks and does not cause the job backend to raise an `EnqueueError` and will be `false` otherwise. This will allow developers to do something like the following: MyJob.perform_later do |job| unless job.successfully_enqueued? if job.enqueue_error&.message == "Redis was unavailable" # invoke some code that will retry the job after a delay end end end
2021-01-19 09:57:46 -05:00
# Track whether the adapter received the job successfully.
attr_writer :successfully_enqueued # :nodoc:
def successfully_enqueued?
@successfully_enqueued
end
# Track any exceptions raised by the backend so callers can inspect the errors.
attr_accessor :enqueue_error
# These methods will be included into any Active Job object, adding
# helpers for de/serialization and creation of job instances.
2014-08-25 10:34:50 -04:00
module ClassMethods
# Creates a new job instance from a hash created with +serialize+
def deserialize(job_data)
job = job_data["job_class"].constantize.new
job.deserialize(job_data)
2014-08-25 10:34:50 -04:00
job
end
# Creates a job preconfigured with the given options. You can call
# perform_later with the job arguments to enqueue the job with the
# preconfigured options
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
2015-03-18 05:48:26 -04:00
# * <tt>:priority</tt> - Enqueues the job with the specified priority
2014-08-25 10:34:50 -04:00
#
# ==== Examples
#
# VideoJob.set(queue: :some_queue).perform_later(Video.last)
# VideoJob.set(wait: 5.minutes).perform_later(Video.last)
2014-09-27 02:07:24 -04:00
# VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
2014-08-25 10:34:50 -04:00
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
2014-09-27 02:07:24 -04:00
# VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
2015-03-18 05:48:26 -04:00
# VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
def set(options = {})
2014-08-25 10:34:50 -04:00
ConfiguredJob.new(self, options)
end
end
# Creates a new job instance. Takes the arguments that will be
# passed to the perform method.
2014-08-25 10:34:50 -04:00
def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
2015-03-18 05:48:26 -04:00
@priority = self.class.priority
@executions = 0
@exception_executions = {}
@timezone = Time.zone&.name
2014-08-25 10:34:50 -04:00
end
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)
2014-08-25 10:34:50 -04:00
# Returns a hash with the job data that can safely be passed to the
# queuing adapter.
2014-08-25 10:34:50 -04:00
def serialize
{
"job_class" => self.class.name,
"job_id" => job_id,
"provider_job_id" => provider_job_id,
"queue_name" => queue_name,
"priority" => priority,
"arguments" => serialize_arguments_if_needed(arguments),
"executions" => executions,
"exception_executions" => exception_executions,
"locale" => I18n.locale.to_s,
"timezone" => timezone,
"enqueued_at" => Time.now.utc.iso8601
2014-08-25 10:34:50 -04:00
}
end
# Attaches the stored job data to the current instance. Receives a hash
# returned from +serialize+
#
# ==== Examples
#
# class DeliverWebhookJob < ActiveJob::Base
# attr_writer :attempt_number
#
# def attempt_number
# @attempt_number ||= 0
# end
#
# def serialize
# super.merge('attempt_number' => attempt_number + 1)
# end
#
# def deserialize(job_data)
# super
# self.attempt_number = job_data['attempt_number']
# end
#
# rescue_from(Timeout::Error) do |exception|
# raise exception if attempt_number > 5
# retry_job(wait: 10)
# end
# end
def deserialize(job_data)
self.job_id = job_data["job_id"]
self.provider_job_id = job_data["provider_job_id"]
self.queue_name = job_data["queue_name"]
self.priority = job_data["priority"]
self.serialized_arguments = job_data["arguments"]
self.executions = job_data["executions"]
self.exception_executions = job_data["exception_executions"]
self.locale = job_data["locale"] || I18n.locale.to_s
self.timezone = job_data["timezone"] || Time.zone&.name
self.enqueued_at = job_data["enqueued_at"]
end
2014-08-25 10:34:50 -04:00
private
def serialize_arguments_if_needed(arguments)
if arguments_serialized?
@serialized_arguments
else
serialize_arguments(arguments)
end
end
2014-08-25 10:34:50 -04:00
def deserialize_arguments_if_needed
if arguments_serialized?
2014-08-25 10:34:50 -04:00
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end
def serialize_arguments(arguments)
Arguments.serialize(arguments)
2014-08-25 10:34:50 -04:00
end
def deserialize_arguments(serialized_args)
Arguments.deserialize(serialized_args)
end
def arguments_serialized?
defined?(@serialized_arguments) && @serialized_arguments
end
2014-08-25 10:34:50 -04:00
end
end