1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Active Job refactoring

This commit is contained in:
Cristian Bica 2014-08-25 17:34:50 +03:00
parent 5db4e7f0ec
commit 1e237b4e44
33 changed files with 447 additions and 262 deletions

View file

@ -98,15 +98,10 @@ module ActionMailer
def enqueue_delivery(delivery_method, options={})
args = @mailer.name, @mail_method.to_s, delivery_method.to_s, *@args
enqueue_method = :enqueue
if options[:at]
enqueue_method = :enqueue_at
args.unshift options[:at]
elsif options[:in]
enqueue_method = :enqueue_in
args.unshift options[:in]
end
ActionMailer::DeliveryJob.send enqueue_method, *args
set_options = {}
set_options[:wait_until] = options[:at] if options[:at]
set_options[:wait] = options[:in] if options[:in]
ActionMailer::DeliveryJob.set(set_options).perform_later(*args)
end
end
end

View file

@ -3,8 +3,10 @@ require 'abstract_unit'
require 'active_job'
require 'minitest/mock'
require 'mailers/delayed_mailer'
require 'active_support/core_ext/numeric/time'
class MessageDeliveryTest < ActiveSupport::TestCase
include ActiveJob::TestHelper
setup do
@previous_logger = ActiveJob::Base.logger
@ -13,6 +15,8 @@ class MessageDeliveryTest < ActiveSupport::TestCase
ActiveJob::Base.logger = Logger.new(nil)
@mail = DelayedMailer.test_message(1, 2, 3)
ActionMailer::Base.deliveries.clear
ActiveJob::Base.queue_adapter.perform_enqueued_at_jobs = true
ActiveJob::Base.queue_adapter.perform_enqueued_jobs = true
end
teardown do
@ -70,33 +74,29 @@ class MessageDeliveryTest < ActiveSupport::TestCase
ActionMailer::Base.deliveries.clear
end
test 'should enqueue the email with :deliver delivery method' do
ret = ActionMailer::DeliveryJob.stub :enqueue, ->(*args){ args } do
test 'should enqueue the email with :deliver_now delivery method' do
assert_performed_with(job: ActionMailer::DeliveryJob, args: ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3]) do
@mail.deliver_later
end
assert_equal ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3], ret
end
test 'should enqueue the email with :deliver! delivery method' do
ret = ActionMailer::DeliveryJob.stub :enqueue, ->(*args){ args } do
test 'should enqueue the email with :deliver_now! delivery method' do
assert_performed_with(job: ActionMailer::DeliveryJob, args: ['DelayedMailer', 'test_message', 'deliver_now!', 1, 2, 3]) do
@mail.deliver_later!
end
assert_equal ['DelayedMailer', 'test_message', 'deliver_now!', 1, 2, 3], ret
end
test 'should enqueue a delivery with a delay' do
ret = ActionMailer::DeliveryJob.stub :enqueue_in, ->(*args){ args } do
@mail.deliver_later in: 600
assert_performed_with(job: ActionMailer::DeliveryJob, args: ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3]) do
@mail.deliver_later in: 600.seconds
end
assert_equal [600, 'DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3], ret
end
test 'should enqueue a delivery at a specific time' do
later_time = Time.now.to_i + 3600
ret = ActionMailer::DeliveryJob.stub :enqueue_at, ->(*args){ args } do
later_time = Time.now.to_f + 3600
assert_performed_with(job: ActionMailer::DeliveryJob, at: later_time, args: ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3]) do
@mail.deliver_later at: later_time
end
assert_equal [later_time, 'DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3], ret
end
end

View file

@ -43,15 +43,15 @@ end
Enqueue a job like so:
```ruby
MyJob.enqueue record # Enqueue a job to be performed as soon the queueing system is free.
MyJob.perform_later record # Enqueue a job to be performed as soon the queueing system is free.
```
```ruby
MyJob.enqueue_at Date.tomorrow.noon, record # Enqueue a job to be performed tomorrow at noon.
MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record) # Enqueue a job to be performed tomorrow at noon.
```
```ruby
MyJob.enqueue_in 1.week, record # Enqueue a job to be performed 1 week from now.
MyJob.set(wait: 1.week).perform_later(record) # Enqueue a job to be performed 1 week from now.
```
That's it!

View file

@ -31,6 +31,7 @@ module ActiveJob
autoload :Base
autoload :QueueAdapters
autoload :ConfiguredJob
autoload :TestCase
autoload :TestHelper
end

View file

@ -1,19 +1,19 @@
require 'active_job/core'
require 'active_job/queue_adapter'
require 'active_job/queue_name'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
require 'active_job/identifier'
require 'active_job/logging'
module ActiveJob
class Base
include Core
include QueueAdapter
include QueueName
include Enqueuing
include Execution
include Callbacks
include Identifier
include Logging
ActiveSupport.run_load_hooks(:active_job, self)

View file

@ -0,0 +1,18 @@
module ActiveJob
class ConfiguredJob #:nodoc:
def initialize(job_class, options={})
@options = options
@options[:in] = @options.delete(:wait) if @options[:wait]
@options[:at] = @options.delete(:wait_until) if @options[:wait_until]
@job_class = job_class
end
def perform_now(*args)
@job_class.new(*args).perform_now
end
def perform_later(*args)
@job_class.new(*args).enqueue @options
end
end
end

View file

@ -0,0 +1,89 @@
module ActiveJob
module Core
extend ActiveSupport::Concern
included do
# Job arguments
attr_accessor :arguments
attr_writer :serialized_arguments
# Timestamp when the job should be performed
attr_accessor :scheduled_at
# Job Identifier
attr_accessor :job_id
# Queue on which the job should be run on.
attr_writer :queue_name
end
module ClassMethods
# Creates a new job instance from a hash created with +serialize+
def deserialize(job_data)
job = job_data['job_class'].constantize.new
job.job_id = job_data['job_id']
job.queue_name = job_data['queue_name']
job.serialized_arguments = job_data['arguments']
job
end
# Creates a job preconfigured with the given options. You can call
# perform_later with the job arguments to enqueue the job with the
# preconfigured options
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
#
# ==== Examples
#
# VideoJob.set(queue: :some_queue).perform_later(Video.last)
# VideoJob.set(wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(wait_until: Time.tomorroe).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait_until: Time.tomorroe).perform_later(Video.last)
def set(options={})
ConfiguredJob.new(self, options)
end
end
# Creates a new job instance. Takes as arguments the arguments that
# will be passed to the perform method.
def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
end
# Returns a hash with the job data that can safely be passed to the
# queueing adapter.
def serialize
{
'job_class' => self.class.name,
'job_id' => job_id,
'queue_name' => queue_name,
'arguments' => serialize_arguments(arguments)
}
end
private
def deserialize_arguments_if_needed
if defined?(@serialized_arguments) && @serialized_arguments.present?
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end
def serialize_arguments(serialized_args)
Arguments.serialize(serialized_args)
end
def deserialize_arguments(serialized_args)
Arguments.deserialize(serialized_args)
end
end
end

View file

@ -12,60 +12,64 @@ module ActiveJob
#
# Returns an instance of the job class queued with args available in
# Job#arguments.
def enqueue(*args)
new(args).tap do |job|
job.run_callbacks :enqueue do
queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args)
end
def perform_later(*args)
job_or_instantiate(*args).enqueue
end
protected
def job_or_instantiate(*args)
args.first.is_a?(self) ? args.first : new(*args)
end
end
# Reschedule the job to be re-executed. This is usefull in combination
# with the +rescue_from+ option. When you rescue an exception from your job
# you can ask Active Job to retry performing your job.
#
# ==== Options
# * <tt>:in</tt> - Enqueues the job with the specified delay
# * <tt>:at</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
#
# ==== Examples
#
# class SiteScrapperJob < ActiveJob::Base
# rescue_from(ErrorLoadingSite) do
# retry_job queue: :low_priority
# end
# def perform(*args)
# # raise ErrorLoadingSite if cannot scrape
# end
# end
def retry_job(options={})
enqueue options
end
# Equeue the job to be performed by the queue adapter.
#
# ==== Options
# * <tt>:in</tt> - Enqueues the job with the specified delay
# * <tt>:at</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
#
# ==== Examples
#
# my_job_instance.enqueue
# my_job_instance.enqueue in: 5.minutes
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue at: Date.tomorrow.midnight
def enqueue(options={})
self.scheduled_at = options[:in].seconds.from_now.to_f if options[:in]
self.scheduled_at = options[:at].to_f if options[:at]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
run_callbacks :enqueue do
if self.scheduled_at
self.class.queue_adapter.enqueue_at self, self.scheduled_at
else
self.class.queue_adapter.enqueue self
end
end
# Enqueue a job to be performed at +interval+ from now.
#
# enqueue_in(1.week, "mike")
#
# Returns an instance of the job class queued with args available in
# Job#arguments and the timestamp in Job#enqueue_at.
def enqueue_in(interval, *args)
enqueue_at interval.seconds.from_now, *args
end
# Enqueue a job to be performed at an explicit point in time.
#
# enqueue_at(Date.tomorrow.midnight, "mike")
#
# Returns an instance of the job class queued with args available in
# Job#arguments and the timestamp in Job#enqueue_at.
def enqueue_at(timestamp, *args)
new(args).tap do |job|
job.enqueued_at = timestamp
job.run_callbacks :enqueue do
queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args)
end
end
end
end
included do
attr_accessor :arguments
attr_accessor :enqueued_at
end
def initialize(arguments = nil)
@arguments = arguments
end
def retry_now
self.class.enqueue(*arguments)
end
def retry_in(interval)
self.class.enqueue_in interval, *arguments
end
def retry_at(timestamp)
self.class.enqueue_at timestamp, *arguments
self
end
end
end

View file

@ -4,15 +4,29 @@ require 'active_job/arguments'
module ActiveJob
module Execution
extend ActiveSupport::Concern
include ActiveSupport::Rescuable
included do
include ActiveSupport::Rescuable
module ClassMethods
# Performs the job immediately.
#
# MyJob.perform_now("mike")
#
def perform_now(*args)
job_or_instantiate(*args).perform_now
end
def execute(job_data) #:nodoc:
job = deserialize(job_data)
job.perform_now
end
end
def execute(job_id, *serialized_args)
self.job_id = job_id
self.arguments = deserialize_arguments(serialized_args)
# Performs the job immediately. The job is not sent to the queueing adapter
# and will block the execution until it's finished.
#
# MyJob.new(*args).perform_now
def perform_now
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
@ -23,11 +37,5 @@ module ActiveJob
def perform(*)
fail NotImplementedError
end
private
def deserialize_arguments(serialized_args)
Arguments.deserialize(serialized_args)
end
end
end

View file

@ -1,15 +0,0 @@
require 'active_job/arguments'
module ActiveJob
module Identifier
extend ActiveSupport::Concern
included do
attr_writer :job_id
end
def job_id
@job_id ||= SecureRandom.uuid
end
end
end

View file

@ -17,7 +17,7 @@ module ActiveJob
around_perform do |job, block, _|
tag_logger(job.class.name, job.job_id) do
payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments}
payload = {adapter: job.class.queue_adapter, job: job}
ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup)
ActiveSupport::Notifications.instrument("perform.active_job", payload) do
block.call
@ -26,12 +26,12 @@ module ActiveJob
end
before_enqueue do |job|
if job.enqueued_at
if job.scheduled_at
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at
adapter: job.class.queue_adapter, job: job
else
ActiveSupport::Notifications.instrument "enqueue.active_job",
adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments
adapter: job.class.queue_adapter, job: job
end
end
end
@ -52,19 +52,23 @@ module ActiveJob
class LogSubscriber < ActiveSupport::LogSubscriber
def enqueue(event)
info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event) }
job = event.payload[:job]
info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) }
end
def enqueue_at(event)
info { "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) }
job = event.payload[:job]
info { "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) }
end
def perform_start(event)
info { "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) }
job = event.payload[:job]
info { "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job) }
end
def perform(event)
info { "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" }
job = event.payload[:job]
info { "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" }
end
private
@ -72,12 +76,12 @@ module ActiveJob
event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
end
def args_info(event)
event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : ""
def args_info(job)
job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : ""
end
def enqueued_at(event)
Time.at(event.payload[:timestamp]).utc
def scheduled_at(event)
Time.at(event.payload[:job].scheduled_at).utc
end
def logger

View file

@ -4,20 +4,20 @@ module ActiveJob
module QueueAdapters
class BackburnerAdapter
class << self
def enqueue(job, *args)
Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name
def enqueue(job)
Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
end
def enqueue_at(job, timestamp, *args)
def enqueue_at(job, timestamp)
delay = timestamp - Time.current.to_f
Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name, delay: delay
Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
end
end
class JobWrapper
class << self
def perform(job_name, *args)
job_name.constantize.new.execute(*args)
def perform(job_data)
Base.execute job_data
end
end
end

View file

@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class DelayedJobAdapter
class << self
def enqueue(job, *args)
JobWrapper.new.delay(queue: job.queue_name).perform(job, *args)
def enqueue(job)
JobWrapper.new.delay(queue: job.queue_name).perform(job.serialize)
end
def enqueue_at(job, timestamp, *args)
JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job, *args)
def enqueue_at(job, timestamp)
JobWrapper.new.delay(queue: job.queue_name, run_at: Time.at(timestamp)).perform(job.serialize)
end
end
class JobWrapper
def perform(job, *args)
job.new.execute(*args)
def perform(job_data)
Base.execute(job_data)
end
end
end

View file

@ -2,8 +2,8 @@ module ActiveJob
module QueueAdapters
class InlineAdapter
class << self
def enqueue(job, *args)
job.new.execute(*args)
def enqueue(job)
Base.execute(job.serialize)
end
def enqueue_at(*)

View file

@ -5,7 +5,7 @@ module ActiveJob
class QuAdapter
class << self
def enqueue(job, *args)
Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload|
Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end
@ -16,13 +16,12 @@ module ActiveJob
end
class JobWrapper < Qu::Job
def initialize(job_name, *args)
@job = job_name.constantize
@args = args
def initialize(job_data)
@job_data = job_data
end
def perform
@job.new.execute(*@args)
Base.execute @job_data
end
end
end

View file

@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class QueAdapter
class << self
def enqueue(job, *args)
JobWrapper.enqueue job.name, *args, queue: job.queue_name
def enqueue(job)
JobWrapper.enqueue job.serialize, queue: job.queue_name
end
def enqueue_at(job, timestamp, *args)
JobWrapper.enqueue job.name, *args, queue: job.queue_name, run_at: Time.at(timestamp)
def enqueue_at(job, timestamp)
JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp)
end
end
class JobWrapper < Que::Job
def run(job_name, *args)
job_name.constantize.new.execute(*args)
def run(job_data)
Base.execute job_data
end
end
end

View file

@ -4,18 +4,18 @@ module ActiveJob
module QueueAdapters
class QueueClassicAdapter
class << self
def enqueue(job, *args)
build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args)
def enqueue(job)
build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
end
def enqueue_at(job, timestamp, *args)
def enqueue_at(job, timestamp)
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \
'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. '
'You can implement this yourself or you can use the queue_classic-later gem.'
end
queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.name, *args)
queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
end
# Builds a <tt>QC::Queue</tt> object to schedule jobs on.
@ -30,8 +30,8 @@ module ActiveJob
class JobWrapper
class << self
def perform(job_name, *args)
job_name.constantize.new.execute(*args)
def perform(job_data)
Base.execute job_data
end
end
end

View file

@ -16,23 +16,23 @@ module ActiveJob
module QueueAdapters
class ResqueAdapter
class << self
def enqueue(job, *args)
Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args
def enqueue(job)
Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
def enqueue_at(job, timestamp, *args)
def enqueue_at(job, timestamp)
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args
Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
end
class JobWrapper
class << self
def perform(job_name, *args)
job_name.constantize.new.execute(*args)
def perform(job_data)
Base.execute job_data
end
end
end

View file

@ -4,20 +4,20 @@ module ActiveJob
module QueueAdapters
class SidekiqAdapter
class << self
def enqueue(job, *args)
def enqueue(job)
#Sidekiq::Client does not support symbols as keys
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
'args' => [ job, *args ],
'args' => [ job.serialize ],
'retry' => true
end
def enqueue_at(job, timestamp, *args)
def enqueue_at(job, timestamp)
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
'args' => [ job, *args ],
'args' => [ job.serialize ],
'retry' => true,
'at' => timestamp
end
@ -26,8 +26,8 @@ module ActiveJob
class JobWrapper
include Sidekiq::Worker
def perform(job_name, *args)
job_name.constantize.new.execute(*args)
def perform(job_data)
Base.execute job_data
end
end
end

View file

@ -7,14 +7,14 @@ module ActiveJob
@monitor = Monitor.new
class << self
def enqueue(job, *args)
def enqueue(job)
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ])
JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end
def enqueue_at(job, timestamp, *args)
def enqueue_at(job, timestamp)
raise NotImplementedError
end
end
@ -24,8 +24,8 @@ module ActiveJob
from_queue 'active_jobs_default'
def work(msg)
job_name, *args = ActiveSupport::JSON.decode(msg)
job_name.constantize.new.execute(*args)
job_data = ActiveSupport::JSON.decode(msg)
Base.execute job_data
ack!
end
end

View file

@ -4,11 +4,11 @@ module ActiveJob
module QueueAdapters
class SuckerPunchAdapter
class << self
def enqueue(job, *args)
JobWrapper.new.async.perform job, *args
def enqueue(job)
JobWrapper.new.async.perform job.serialize
end
def enqueue_at(job, timestamp, *args)
def enqueue_at(job, timestamp)
raise NotImplementedError
end
end
@ -16,8 +16,8 @@ module ActiveJob
class JobWrapper
include SuckerPunch::Job
def perform(job, *args)
job.new.execute(*args)
def perform(job_data)
Base.execute job_data
end
end
end

View file

@ -45,21 +45,21 @@ module ActiveJob
@performed_jobs = val
end
def enqueue(job, *args)
def enqueue(job)
if perform_enqueued_jobs?
performed_jobs << {job: job, args: args, queue: job.queue_name}
job.new.execute(*args)
performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
job.perform_now
else
enqueued_jobs << {job: job, args: args, queue: job.queue_name}
enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
end
end
def enqueue_at(job, timestamp, *args)
def enqueue_at(job, timestamp)
if perform_enqueued_at_jobs?
performed_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp}
job.new.execute(*args)
performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
job.perform_now
else
enqueued_jobs << {job: job, args: args, queue: job.queue_name, run_at: timestamp}
enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
end
end

View file

@ -6,16 +6,33 @@ module ActiveJob
mattr_accessor(:queue_name_prefix)
mattr_accessor(:default_queue_name) { "default" }
def queue_as(part_name)
def queue_as(part_name=nil, &block)
if block_given?
self.queue_name = block
else
self.queue_name = queue_name_from_part(part_name)
end
end
def queue_name_from_part(part_name) #:nodoc:
queue_name = part_name.to_s.presence || default_queue_name
name_parts = [queue_name_prefix.presence, queue_name]
self.queue_name = name_parts.compact.join('_')
name_parts.compact.join('_')
end
end
included do
class_attribute :queue_name
class_attribute :queue_name, instance_accessor: false
self.queue_name = default_queue_name
end
# Returns the name of the queue the job will be run on
def queue_name
if @queue_name.is_a?(Proc)
@queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name))
end
@queue_name
end
end
end

View file

@ -5,7 +5,8 @@ require 'active_support/core_ext/object/inclusion'
class CallbacksTest < ActiveSupport::TestCase
test 'perform callbacks' do
performed_callback_job = CallbackJob.new.tap { |j| j.execute("A-JOB-ID") }
performed_callback_job = CallbackJob.new("A-JOB-ID")
performed_callback_job.perform_now
assert "CallbackJob ran before_perform".in? performed_callback_job.history
assert "CallbackJob ran after_perform".in? performed_callback_job.history
assert "CallbackJob ran around_perform_start".in? performed_callback_job.history
@ -13,7 +14,7 @@ class CallbacksTest < ActiveSupport::TestCase
end
test 'enqueue callbacks' do
enqueued_callback_job = CallbackJob.enqueue
enqueued_callback_job = CallbackJob.perform_later
assert "CallbackJob ran before_enqueue".in? enqueued_callback_job.history
assert "CallbackJob ran after_enqueue".in? enqueued_callback_job.history
assert "CallbackJob ran around_enqueue_start".in? enqueued_callback_job.history

View file

@ -9,7 +9,7 @@ class JobSerializationTest < ActiveSupport::TestCase
end
test 'serialize job with gid' do
GidJob.enqueue @person
GidJob.perform_later @person
assert_equal "Person with ID: 5", JobBuffer.last_value
end
end

View file

@ -42,34 +42,43 @@ class AdapterTest < ActiveSupport::TestCase
def test_uses_active_job_as_tag
HelloJob.enqueue "Cristian"
HelloJob.perform_later "Cristian"
assert_match(/\[ActiveJob\]/, @logger.messages)
end
def test_uses_job_name_as_tag
LoggingJob.perform_later "Dummy"
assert_match(/\[LoggingJob\]/, @logger.messages)
end
def test_uses_job_id_as_tag
LoggingJob.perform_later "Dummy"
assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages)
end
def test_logs_correct_queue_name
original_queue_name = LoggingJob.queue_name
LoggingJob.queue_as :php_jobs
LoggingJob.perform_later("Dummy")
assert_match(/to .*?\(php_jobs\).*/, @logger.messages)
ensure
LoggingJob.queue_name = original_queue_name
end
def test_enqueue_job_logging
HelloJob.enqueue "Cristian"
HelloJob.perform_later "Cristian"
assert_match(/Enqueued HelloJob \(Job ID: .*?\) to .*?:.*Cristian/, @logger.messages)
end
def test_perform_job_logging
LoggingJob.enqueue "Dummy"
LoggingJob.perform_later "Dummy"
assert_match(/Performing LoggingJob from .*? with arguments:.*Dummy/, @logger.messages)
assert_match(/Dummy, here is it: Dummy/, @logger.messages)
assert_match(/Performed LoggingJob from .*? in .*ms/, @logger.messages)
end
def test_perform_uses_job_name_job_logging
LoggingJob.enqueue "Dummy"
assert_match(/\[LoggingJob\]/, @logger.messages)
end
def test_perform_uses_job_id_job_logging
LoggingJob.enqueue "Dummy"
assert_match(/\[LOGGING-JOB-ID\]/, @logger.messages)
end
def test_perform_nested_jobs_logging
NestedJob.enqueue
NestedJob.perform_later
assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages)
assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages)
assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performing NestedJob from/, @logger.messages)
@ -81,14 +90,14 @@ class AdapterTest < ActiveSupport::TestCase
end
def test_enqueue_at_job_logging
HelloJob.enqueue_at 1, "Cristian"
HelloJob.set(wait_until: 24.hours.from_now).perform_later "Cristian"
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
rescue NotImplementedError
skip
end
def test_enqueue_in_job_logging
HelloJob.enqueue_in 2, "Cristian"
HelloJob.set(wait: 2.seconds).perform_later "Cristian"
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
rescue NotImplementedError
skip

View file

@ -8,20 +8,38 @@ class QueueNamingTest < ActiveSupport::TestCase
assert_equal "default", HelloJob.queue_name
end
test 'name appended in job' do
test 'uses given queue name job' do
begin
original_queue_name = HelloJob.queue_name
HelloJob.queue_as :greetings
LoggingJob.queue_as :bookkeeping
assert_equal "default", NestedJob.queue_name
assert_equal "greetings", HelloJob.queue_name
assert_equal "bookkeeping", LoggingJob.queue_name
assert_equal "greetings", HelloJob.new.queue_name
ensure
HelloJob.queue_name = LoggingJob.queue_name = ActiveJob::Base.default_queue_name
HelloJob.queue_name = original_queue_name
end
end
test 'should prefix the queue name' do
test 'evals block given to queue_as to determine queue' do
begin
original_queue_name = HelloJob.queue_name
HelloJob.queue_as { :another }
assert_equal "another", HelloJob.new.queue_name
ensure
HelloJob.queue_name = original_queue_name
end
end
test 'can use arguments to determine queue_name in queue_as block' do
begin
original_queue_name = HelloJob.queue_name
HelloJob.queue_as { self.arguments.first=='1' ? :one : :two }
assert_equal "one", HelloJob.new('1').queue_name
assert_equal "two", HelloJob.new('3').queue_name
ensure
HelloJob.queue_name = original_queue_name
end
end
test 'queu_name_prefix prepended to the queue name' do
begin
original_queue_name_prefix = ActiveJob::Base.queue_name_prefix
original_queue_name = HelloJob.queue_name
@ -35,4 +53,9 @@ class QueueNamingTest < ActiveSupport::TestCase
end
end
test 'uses queue passed to #set' do
job = HelloJob.set(queue: :some_queue).perform_later
assert_equal "some_queue", job.queue_name
end
end

View file

@ -9,18 +9,18 @@ class QueuingTest < ActiveSupport::TestCase
end
test 'run queued job' do
HelloJob.enqueue
HelloJob.perform_later
assert_equal "David says hello", JobBuffer.last_value
end
test 'run queued job with arguments' do
HelloJob.enqueue "Jamie"
HelloJob.perform_later "Jamie"
assert_equal "Jamie says hello", JobBuffer.last_value
end
test 'run queued job later' do
begin
result = HelloJob.enqueue_at 1.second.ago, "Jamie"
result = HelloJob.set(wait_until: 1.second.ago).perform_later "Jamie"
assert result
rescue NotImplementedError
skip
@ -28,15 +28,15 @@ class QueuingTest < ActiveSupport::TestCase
end
test 'job returned by enqueue has the arguments available' do
job = HelloJob.enqueue "Jamie"
job = HelloJob.perform_later "Jamie"
assert_equal [ "Jamie" ], job.arguments
end
test 'job returned by enqueue_at has the timestamp available' do
test 'job returned by perform_at has the timestamp available' do
begin
job = HelloJob.enqueue_at Time.utc(2014, 1, 1)
assert_equal Time.utc(2014, 1, 1), job.enqueued_at
job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later
assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at
rescue NotImplementedError
skip
end

View file

@ -10,27 +10,27 @@ class RescueTest < ActiveSupport::TestCase
end
test 'rescue perform exception with retry' do
job = RescueJob.new
job.execute(SecureRandom.uuid, "david")
job = RescueJob.new("david")
job.perform_now
assert_equal [ "rescued from ArgumentError", "performed beautifully" ], JobBuffer.values
end
test 'let through unhandled perform exception' do
job = RescueJob.new
job = RescueJob.new("other")
assert_raises(RescueJob::OtherError) do
job.execute(SecureRandom.uuid, "other")
job.perform_now
end
end
test 'rescue from deserialization errors' do
RescueJob.enqueue Person.new(404)
RescueJob.perform_later Person.new(404)
assert_includes JobBuffer.values, 'rescued from DeserializationError'
assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound'
assert_not_includes JobBuffer.values, 'performed beautifully'
end
test "should not wrap DeserializationError in DeserializationError" do
RescueJob.enqueue [Person.new(404)]
RescueJob.perform_later [Person.new(404)]
assert_includes JobBuffer.values, 'DeserializationError original exception was Person::RecordNotFound'
end
end

View file

@ -11,7 +11,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase
def test_assert_enqueued_jobs
assert_nothing_raised do
assert_enqueued_jobs 1 do
HelloJob.enqueue('david')
HelloJob.perform_later('david')
end
end
end
@ -19,27 +19,27 @@ class EnqueuedJobsTest < ActiveJob::TestCase
def test_repeated_enqueued_jobs_calls
assert_nothing_raised do
assert_enqueued_jobs 1 do
HelloJob.enqueue('abdelkader')
HelloJob.perform_later('abdelkader')
end
end
assert_nothing_raised do
assert_enqueued_jobs 2 do
HelloJob.enqueue('sean')
HelloJob.enqueue('yves')
HelloJob.perform_later('sean')
HelloJob.perform_later('yves')
end
end
end
def test_assert_enqueued_jobs_with_no_block
assert_nothing_raised do
HelloJob.enqueue('rafael')
HelloJob.perform_later('rafael')
assert_enqueued_jobs 1
end
assert_nothing_raised do
HelloJob.enqueue('aaron')
HelloJob.enqueue('matthew')
HelloJob.perform_later('aaron')
HelloJob.perform_later('matthew')
assert_enqueued_jobs 3
end
end
@ -48,7 +48,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase
assert_nothing_raised do
assert_no_enqueued_jobs do
# Scheduled jobs are being performed in this context
HelloJob.enqueue_at(Date.tomorrow.noon, 'godfrey')
HelloJob.set(wait_until: Date.tomorrow.noon).perform_later('godfrey')
end
end
end
@ -56,7 +56,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase
def test_assert_enqueued_jobs_too_few_sent
error = assert_raise ActiveSupport::TestCase::Assertion do
assert_enqueued_jobs 2 do
HelloJob.enqueue('xavier')
HelloJob.perform_later('xavier')
end
end
@ -66,8 +66,8 @@ class EnqueuedJobsTest < ActiveJob::TestCase
def test_assert_enqueued_jobs_too_many_sent
error = assert_raise ActiveSupport::TestCase::Assertion do
assert_enqueued_jobs 1 do
HelloJob.enqueue('cristian')
HelloJob.enqueue('guillermo')
HelloJob.perform_later('cristian')
HelloJob.perform_later('guillermo')
end
end
@ -77,7 +77,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase
def test_assert_no_enqueued_jobs_failure
error = assert_raise ActiveSupport::TestCase::Assertion do
assert_no_enqueued_jobs do
HelloJob.enqueue('jeremy')
HelloJob.perform_later('jeremy')
end
end
@ -86,20 +86,20 @@ class EnqueuedJobsTest < ActiveJob::TestCase
def test_assert_enqueued_job
assert_enqueued_with(job: LoggingJob, queue: 'default') do
NestedJob.enqueue_at(Date.tomorrow.noon)
NestedJob.set(wait_until: Date.tomorrow.noon).perform_later
end
end
def test_assert_enqueued_job_failure
assert_raise ActiveSupport::TestCase::Assertion do
assert_enqueued_with(job: LoggingJob, queue: 'default') do
NestedJob.enqueue
NestedJob.perform_later
end
end
assert_raise ActiveSupport::TestCase::Assertion do
assert_enqueued_with(job: NestedJob, queue: 'low') do
NestedJob.enqueue
NestedJob.perform_later
end
end
end
@ -107,7 +107,7 @@ class EnqueuedJobsTest < ActiveJob::TestCase
def test_assert_enqueued_job_args
assert_raise ArgumentError do
assert_enqueued_with(class: LoggingJob) do
NestedJob.enqueue_at(Date.tomorrow.noon)
NestedJob.set(wait_until: Date.tomorrow.noon).perform_later
end
end
end
@ -119,7 +119,7 @@ class PerformedJobsTest < ActiveJob::TestCase
def test_assert_performed_jobs
assert_nothing_raised do
assert_performed_jobs 1 do
HelloJob.enqueue('david')
HelloJob.perform_later('david')
end
end
end
@ -127,27 +127,27 @@ class PerformedJobsTest < ActiveJob::TestCase
def test_repeated_performed_jobs_calls
assert_nothing_raised do
assert_performed_jobs 1 do
HelloJob.enqueue('abdelkader')
HelloJob.perform_later('abdelkader')
end
end
assert_nothing_raised do
assert_performed_jobs 2 do
HelloJob.enqueue('sean')
HelloJob.enqueue('yves')
HelloJob.perform_later('sean')
HelloJob.perform_later('yves')
end
end
end
def test_assert_performed_jobs_with_no_block
assert_nothing_raised do
HelloJob.enqueue('rafael')
HelloJob.perform_later('rafael')
assert_performed_jobs 1
end
assert_nothing_raised do
HelloJob.enqueue('aaron')
HelloJob.enqueue('matthew')
HelloJob.perform_later('aaron')
HelloJob.perform_later('matthew')
assert_performed_jobs 3
end
end
@ -156,7 +156,7 @@ class PerformedJobsTest < ActiveJob::TestCase
assert_nothing_raised do
assert_no_performed_jobs do
# Scheduled jobs are being enqueued in this context
HelloJob.enqueue_at(Date.tomorrow.noon, 'godfrey')
HelloJob.set(wait_until: Date.tomorrow.noon).perform_later('godfrey')
end
end
end
@ -164,7 +164,7 @@ class PerformedJobsTest < ActiveJob::TestCase
def test_assert_performed_jobs_too_few_sent
error = assert_raise ActiveSupport::TestCase::Assertion do
assert_performed_jobs 2 do
HelloJob.enqueue('xavier')
HelloJob.perform_later('xavier')
end
end
@ -174,8 +174,8 @@ class PerformedJobsTest < ActiveJob::TestCase
def test_assert_performed_jobs_too_many_sent
error = assert_raise ActiveSupport::TestCase::Assertion do
assert_performed_jobs 1 do
HelloJob.enqueue('cristian')
HelloJob.enqueue('guillermo')
HelloJob.perform_later('cristian')
HelloJob.perform_later('guillermo')
end
end
@ -185,7 +185,7 @@ class PerformedJobsTest < ActiveJob::TestCase
def test_assert_no_performed_jobs_failure
error = assert_raise ActiveSupport::TestCase::Assertion do
assert_no_performed_jobs do
HelloJob.enqueue('jeremy')
HelloJob.perform_later('jeremy')
end
end
@ -194,20 +194,20 @@ class PerformedJobsTest < ActiveJob::TestCase
def test_assert_performed_job
assert_performed_with(job: NestedJob, queue: 'default') do
NestedJob.enqueue
NestedJob.perform_later
end
end
def test_assert_performed_job_failure
assert_raise ActiveSupport::TestCase::Assertion do
assert_performed_with(job: LoggingJob, queue: 'default') do
NestedJob.enqueue_at(Date.tomorrow.noon)
assert_performed_with(job: LoggingJob, at: Date.tomorrow.noon, queue: 'default') do
NestedJob.set(wait_until: Date.tomorrow.noon).perform_later
end
end
assert_raise ActiveSupport::TestCase::Assertion do
assert_performed_with(job: NestedJob, queue: 'low') do
NestedJob.enqueue_at(Date.tomorrow.noon)
assert_performed_with(job: NestedJob, at: Date.tomorrow.noon, queue: 'low') do
NestedJob.set(queue: 'low', wait_until: Date.tomorrow.noon).perform_later
end
end
end

View file

@ -1,6 +1,6 @@
class NestedJob < ActiveJob::Base
def perform
LoggingJob.enqueue "NestedJob"
LoggingJob.perform_later "NestedJob"
end
def job_id

View file

@ -6,7 +6,7 @@ class RescueJob < ActiveJob::Base
rescue_from(ArgumentError) do
JobBuffer.add('rescued from ArgumentError')
arguments[0] = "DIFFERENT!"
retry_now
retry_job
end
rescue_from(ActiveJob::DeserializationError) do |e|

View file

@ -78,15 +78,15 @@ end
Enqueue a job like so:
```ruby
MyJob.enqueue record # Enqueue a job to be performed as soon the queueing system is free.
MyJob.perform_later record # Enqueue a job to be performed as soon the queueing system is free.
```
```ruby
MyJob.enqueue_at Date.tomorrow.noon, record # Enqueue a job to be performed tomorrow at noon.
MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record) # Enqueue a job to be performed tomorrow at noon.
```
```ruby
MyJob.enqueue_in 1.week, record # Enqueue a job to be performed 1 week from now.
MyJob.set(wait: 1.week).perform_later(record) # Enqueue a job to be performed 1 week from now.
```
That's it!
@ -155,7 +155,7 @@ class GuestsCleanupJob < ActiveJob::Base
end
```
Also you can prefix the queue name for all your jobs using
You can prefix the queue name for all your jobs using
`config.active_job.queue_name_prefix` in `application.rb`:
```ruby
@ -172,10 +172,42 @@ class GuestsCleanupJob < ActiveJob::Base
#....
end
# Now your job will run on queue production_low_priority on your production
# environment and on beta_low_priority on your beta environment
# Now your job will run on queue production_low_priority on your
# production environment and on beta_low_priority on your beta
# environment
```
If you want more control on what queue a job will be run you can pass a :queue
option to #set:
```ruby
MyJob.set(queue: :another_queue).perform_later(record)
```
To control the queue from the job level you can pass a block to queue_as. The
block will be executed in the job context (so you can access self.arguments)
and you must return the queue name:
```ruby
class ProcessVideoJob < ActiveJob::Base
queue_as do
video = self.arguments.first
if video.owner.premium?
:premium_videojobs
else
:videojobs
end
end
def perform(video)
# do process video
end
end
ProcessVideoJob.perform_later(Video.last)
```
NOTE: Make sure your queueing backend "listens" on your queue name. For some
backends you need to specify the queues to listen to.