Move background jobs to the 'jobs' branch until fully baked. Not shipping with Rails 4.0.

This commit is contained in:
Jeremy Kemper 2012-12-21 15:42:47 -08:00
parent 10c0a3bd11
commit f9da785d0b
25 changed files with 4 additions and 724 deletions

View File

@ -28,8 +28,6 @@
* Raise an `ActionView::MissingTemplate` exception when no implicit template could be found. *Damien Mathieu*
* Asynchronously send messages via the Rails Queue *Brian Cardarella*
* Allow callbacks to be defined in mailers similar to `ActionController::Base`. You can configure default
settings, headers, attachments, delivery settings or change delivery using
`before_filter`, `after_filter` etc. *Justin S. Leitgeb*

View File

@ -44,5 +44,4 @@ module ActionMailer
autoload :MailHelper
autoload :TestCase
autoload :TestHelper
autoload :QueuedMessage
end

View File

@ -1,10 +1,8 @@
require 'mail'
require 'action_mailer/queued_message'
require 'action_mailer/collector'
require 'active_support/core_ext/string/inflections'
require 'active_support/core_ext/hash/except'
require 'active_support/core_ext/module/anonymous'
require 'active_support/queueing'
require 'action_mailer/log_subscriber'
module ActionMailer
@ -361,8 +359,6 @@ module ActionMailer
#
# * <tt>deliveries</tt> - Keeps an array of all the emails sent out through the Action Mailer with
# <tt>delivery_method :test</tt>. Most useful for unit and functional testing.
#
# * <tt>queue</> - The queue that will be used to deliver the mail. The queue should expect a job that responds to <tt>run</tt>.
class Base < AbstractController::Base
include DeliveryMethods
abstract!
@ -389,9 +385,6 @@ module ActionMailer
parts_order: [ "text/plain", "text/enriched", "text/html" ]
}.freeze
class_attribute :queue
self.queue = ActiveSupport::SynchronousQueue.new
class << self
# Register one or more Observers which will be notified when mail is delivered.
def register_observers(*observers)
@ -483,8 +476,8 @@ module ActionMailer
end
def method_missing(method_name, *args)
if action_methods.include?(method_name.to_s)
QueuedMessage.new(queue, self, method_name, *args)
if respond_to?(method_name)
new(method_name, *args).message
else
super
end

View File

@ -1,37 +0,0 @@
require 'delegate'
module ActionMailer
class QueuedMessage < ::Delegator
attr_reader :queue
def initialize(queue, mailer_class, method_name, *args)
@queue = queue
@job = DeliveryJob.new(mailer_class, method_name, args)
end
def __getobj__
@job.message
end
# Queues the message for delivery.
def deliver
tap { @queue.push @job }
end
class DeliveryJob
def initialize(mailer_class, method_name, args)
@mailer_class = mailer_class
@method_name = method_name
@args = args
end
def message
@message ||= @mailer_class.send(:new, @method_name, *@args).message
end
def run
message.deliver
end
end
end
end

View File

@ -19,8 +19,6 @@ module ActionMailer
options.javascripts_dir ||= paths["public/javascripts"].first
options.stylesheets_dir ||= paths["public/stylesheets"].first
options.queue ||= app.queue
# make sure readers methods get compiled
options.asset_host ||= app.config.asset_host
options.relative_url_root ||= app.config.relative_url_root

View File

@ -11,7 +11,6 @@ end
require 'minitest/autorun'
require 'action_mailer'
require 'action_mailer/test_case'
require 'active_support/queueing'
silence_warnings do
# These external dependencies have warnings :/

View File

@ -3,13 +3,11 @@ require 'abstract_unit'
require 'set'
require 'action_dispatch'
require 'active_support/queueing'
require 'active_support/time'
require 'mailers/base_mailer'
require 'mailers/proc_mailer'
require 'mailers/asset_mailer'
require 'mailers/async_mailer'
class BaseTest < ActiveSupport::TestCase
def teardown
@ -422,17 +420,6 @@ class BaseTest < ActiveSupport::TestCase
assert_equal(1, BaseMailer.deliveries.length)
end
test "delivering message asynchronously" do
AsyncMailer.delivery_method = :test
AsyncMailer.deliveries.clear
AsyncMailer.welcome.deliver
assert_equal 0, AsyncMailer.deliveries.length
AsyncMailer.queue.drain
assert_equal 1, AsyncMailer.deliveries.length
end
test "calling deliver, ActionMailer should yield back to mail to let it call :do_delivery on itself" do
mail = Mail::Message.new
mail.expects(:do_delivery).once

View File

@ -1,3 +0,0 @@
class AsyncMailer < BaseMailer
self.queue = ActiveSupport::TestQueue.new
end

View File

@ -10,16 +10,6 @@ require 'controller/fake_controllers'
require 'action_mailer'
ActionMailer::Base.view_paths = FIXTURE_LOAD_PATH
class SynchronousQueue < Queue
def push(job)
job.run
end
alias << push
alias enq push
end
ActionMailer::Base.queue = SynchronousQueue.new
class AssertSelectTest < ActionController::TestCase
Assertion = ActiveSupport::TestCase::Assertion

View File

@ -1,105 +0,0 @@
require 'delegate'
require 'thread'
module ActiveSupport
# A Queue that simply inherits from STDLIB's Queue. When this
# queue is used, Rails automatically starts a job runner in a
# background thread.
class Queue < ::Queue
attr_writer :consumer
def initialize(consumer_options = {})
super()
@consumer_options = consumer_options
end
def consumer
@consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
end
# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
# run the jobs in a separate thread so assumptions of synchronous
# jobs are caught in test mode.
consumer.drain
end
end
class SynchronousQueue < Queue
def push(job)
super.tap { drain }
end
alias << push
alias enq push
end
# In test mode, the Rails queue is backed by an Array so that assertions
# can be made about its contents. The test queue provides a +jobs+
# method to make assertions about the queue's contents and a +drain+
# method to drain the queue and run the jobs.
#
# Jobs are run in a separate thread to catch mistakes where code
# assumes that the job is run in the same thread.
class TestQueue < Queue
# Get a list of the jobs off this queue. This method may not be
# available on production queues.
def jobs
@que.dup
end
# Marshal and unmarshal job before pushing it onto the queue. This will
# raise an exception on any attempts in tests to push jobs that can't (or
# shouldn't) be marshalled.
def push(job)
super Marshal.load(Marshal.dump(job))
end
end
# The threaded consumer will run jobs in a background thread in
# development mode or in a VM where running jobs on a thread in
# production mode makes sense.
#
# When the process exits, the consumer pushes a nil onto the
# queue and joins the thread, which will ensure that all jobs
# are executed before the process finally dies.
class ThreadedQueueConsumer
attr_accessor :logger
def initialize(queue, options = {})
@queue = queue
@logger = options[:logger]
@fallback_logger = Logger.new($stderr)
end
def start
@thread = Thread.new { consume }
self
end
def shutdown
@queue.push nil
@thread.join
end
def drain
@queue.pop.run until @queue.empty?
end
def consume
while job = @queue.pop
run job
end
end
def run(job)
job.run
rescue Exception => exception
handle_exception job, exception
end
def handle_exception(job, exception)
(logger || @fallback_logger).error "Job Error: #{job.inspect}\n#{exception.message}\n#{exception.backtrace.join("\n")}"
end
end
end

View File

@ -1,27 +0,0 @@
require 'abstract_unit'
require 'active_support/queueing'
class SynchronousQueueTest < ActiveSupport::TestCase
class Job
attr_reader :ran
def run; @ran = true end
end
class ExceptionRaisingJob
def run; raise end
end
def setup
@queue = ActiveSupport::SynchronousQueue.new
end
def test_runs_jobs_immediately
job = Job.new
@queue.push job
assert job.ran
assert_raises RuntimeError do
@queue.push ExceptionRaisingJob.new
end
end
end

View File

@ -1,146 +0,0 @@
require 'abstract_unit'
require 'active_support/queueing'
class TestQueueTest < ActiveSupport::TestCase
def setup
@queue = ActiveSupport::TestQueue.new
end
class ExceptionRaisingJob
def run
raise
end
end
def test_drain_raises_exceptions_from_running_jobs
@queue.push ExceptionRaisingJob.new
assert_raises(RuntimeError) { @queue.drain }
end
def test_jobs
@queue.push 1
@queue.push 2
assert_equal [1,2], @queue.jobs
end
class EquivalentJob
def initialize
@initial_id = self.object_id
end
def run
end
def ==(other)
other.same_initial_id?(@initial_id)
end
def same_initial_id?(other_id)
other_id == @initial_id
end
end
def test_contents
job = EquivalentJob.new
assert @queue.empty?
@queue.push job
refute @queue.empty?
assert_equal job, @queue.pop
end
class ProcessingJob
def self.clear_processed
@processed = []
end
def self.processed
@processed
end
def initialize(object)
@object = object
end
def run
self.class.processed << @object
end
end
def test_order
ProcessingJob.clear_processed
job1 = ProcessingJob.new(1)
job2 = ProcessingJob.new(2)
@queue.push job1
@queue.push job2
@queue.drain
assert_equal [1,2], ProcessingJob.processed
end
class ThreadTrackingJob
attr_reader :thread_id
def run
@thread_id = Thread.current.object_id
end
def ran?
@thread_id
end
end
def test_drain
@queue.push ThreadTrackingJob.new
job = @queue.jobs.last
@queue.drain
assert @queue.empty?
assert job.ran?, "The job runs synchronously when the queue is drained"
assert_equal job.thread_id, Thread.current.object_id
end
class IdentifiableJob
def initialize(id)
@id = id
end
def ==(other)
other.same_id?(@id)
end
def same_id?(other_id)
other_id == @id
end
def run
end
end
def test_queue_can_be_observed
jobs = (1..10).map do |id|
IdentifiableJob.new(id)
end
jobs.each do |job|
@queue.push job
end
assert_equal jobs, @queue.jobs
end
def test_adding_an_unmarshallable_job
anonymous_class_instance = Struct.new(:run).new
assert_raises TypeError do
@queue.push anonymous_class_instance
end
end
def test_attempting_to_add_a_reference_to_itself
job = {reference: @queue}
assert_raises TypeError do
@queue.push job
end
end
end

View File

@ -1,110 +0,0 @@
require 'abstract_unit'
require 'active_support/queueing'
require "active_support/log_subscriber/test_helper"
class TestThreadConsumer < ActiveSupport::TestCase
class Job
attr_reader :id
def initialize(id = 1, &block)
@id = id
@block = block
end
def run
@block.call if @block
end
end
def setup
@logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
@queue = ActiveSupport::Queue.new(logger: @logger)
end
def teardown
@queue.drain
end
test "the jobs are executed" do
ran = false
job = Job.new { ran = true }
@queue.push job
@queue.drain
assert_equal true, ran
end
test "the jobs are not executed synchronously" do
run, ran = Queue.new, Queue.new
job = Job.new { ran.push run.pop }
@queue.consumer.start
@queue.push job
assert ran.empty?
run.push true
assert_equal true, ran.pop
end
test "shutting down the queue synchronously drains the jobs" do
ran = false
job = Job.new do
sleep 0.1
ran = true
end
@queue.consumer.start
@queue.push job
assert_equal false, ran
@queue.consumer.shutdown
assert_equal true, ran
end
test "log job that raises an exception" do
job = Job.new { raise "RuntimeError: Error!" }
@queue.push job
consume_queue @queue
assert_equal 1, @logger.logged(:error).size
assert_match "Job Error: #{job.inspect}\nRuntimeError: Error!", @logger.logged(:error).last
end
test "logger defaults to stderr" do
begin
$stderr, old_stderr = StringIO.new, $stderr
queue = ActiveSupport::Queue.new
queue.push Job.new { raise "RuntimeError: Error!" }
consume_queue queue
assert_match 'Job Error', $stderr.string
ensure
$stderr = old_stderr
end
end
test "test overriding exception handling" do
@queue.consumer.instance_eval do
def handle_exception(job, exception)
@last_error = exception.message
end
def last_error
@last_error
end
end
job = Job.new { raise "RuntimeError: Error!" }
@queue.push job
consume_queue @queue
assert_equal "RuntimeError: Error!", @queue.consumer.last_error
end
private
def consume_queue(queue)
queue.push nil
queue.consumer.consume
end
end

View File

@ -5,7 +5,6 @@ Highlights in Rails 4.0:
* Ruby 1.9.3 only
* Strong Parameters
* Queue API
* Turbolinks
* Russian Doll Caching
* Asynchronous Mailers
@ -88,8 +87,6 @@ Please refer to the [Changelog](https://github.com/rails/rails/blob/master/railt
* Threadsafe on by default
* Add `Rails.queue` for processing jobs in the background.
### Deprecations
* `config.threadsafe!` is deprecated in favor of `config.eager_load` which provides a more fine grained control on what is eager loaded.
@ -103,8 +100,6 @@ Please refer to the [Changelog](https://github.com/rails/rails/blob/master/actio
### Notable changes
* Asynchronously send messages via the Rails Queue. ([Pull Request](https://github.com/rails/rails/pull/6839))
### Deprecations
Active Model

View File

@ -575,26 +575,3 @@ end
```
In the test we send the email and store the returned object in the `email` variable. We then ensure that it was sent (the first assert), then, in the second batch of assertions, we ensure that the email does indeed contain what we expect.
Asynchronous
------------
Rails provides a Synchronous Queue by default. If you want to use an Asynchronous one you will need to configure an async Queue provider like Resque. Queue providers are supposed to have a Railtie where they configure it's own async queue.
### Custom Queues
If you need a different queue than `Rails.queue` for your mailer you can use `ActionMailer::Base.queue=`:
```ruby
class WelcomeMailer < ActionMailer::Base
self.queue = MyQueue.new
end
```
or adding to your `config/environments/$RAILS_ENV.rb`:
```ruby
config.action_mailer.queue = MyQueue.new
```
Your custom queue should expect a job that responds to `#run`.

View File

@ -109,10 +109,6 @@ These configuration methods are to be called on a `Rails::Railtie` object, such
* `config.middleware` allows you to configure the application's middleware. This is covered in depth in the [Configuring Middleware](#configuring-middleware) section below.
* `config.queue` configures the default job queue for the application. Defaults to `ActiveSupport::Queue.new` which processes jobs in a background thread. If you change the queue, you're responsible for running the jobs as well.
* `config.queue_consumer` configures a different job consumer for the default queue. Defaults to `ActiveSupport::ThreadedQueueConsumer`. The job consumer must respond to `start`.
* `config.reload_classes_only_on_change` enables or disables reloading of classes only when tracked files change. By default tracks everything on autoload paths and is set to true. If `config.cache_classes` is true, this option is ignored.
* `config.secret_key_base` used for specifying a key which allows sessions for the application to be verified against a known secure key to prevent tampering. Applications get `config.secret_key_base` initialized to a random key in `config/initializers/secret_token.rb`.
@ -431,11 +427,6 @@ There are a number of settings available on `config.action_mailer`:
config.action_mailer.interceptors = ["MailInterceptor"]
```
* `config.action_mailer.queue` registers the queue that will be used to deliver the mail.
```ruby
config.action_mailer.queue = SomeQueue.new
```
### Configuring Active Support
There are a few configuration options available in Active Support:

View File

@ -122,10 +122,6 @@
* Load all environments available in `config.paths["config/environments"]`. *Piotr Sarnacki*
* Add `config.queue_consumer` to change the job queue consumer from the default `ActiveSupport::ThreadedQueueConsumer`. *Carlos Antonio da Silva*
* Add `Rails.queue` for processing jobs in the background. *Yehuda Katz*
* Remove Rack::SSL in favour of ActionDispatch::SSL. *Rafael Mendonça França*
* Remove Active Resource from Rails framework. *Prem Sichangrist*

View File

@ -32,25 +32,6 @@ module Rails
application.config
end
# Rails.queue is the application's queue. You can push a job onto
# the queue by:
#
# Rails.queue.push job
#
# A job is an object that responds to +run+. Queue consumers will
# pop jobs off of the queue and invoke the queue's +run+ method.
#
# Note that depending on your queue implementation, jobs may not
# be executed in the same process as they were created in, and
# are never executed in the same thread as they were created in.
#
# If necessary, a queue implementation may need to serialize your
# job for distribution to another process. The documentation of
# your queue will specify the requirements for that serialization.
def queue
application.queue
end
def initialize!
application.initialize!
end

View File

@ -1,5 +1,4 @@
require 'fileutils'
require 'active_support/queueing'
# FIXME remove DummyKeyGenerator and this require in 4.1
require 'active_support/key_generator'
require 'rails/engine'
@ -68,10 +67,9 @@ module Rails
end
end
attr_accessor :assets, :sandbox, :queue_consumer
attr_accessor :assets, :sandbox
alias_method :sandbox?, :sandbox
attr_reader :reloaders
attr_writer :queue
delegate :default_url_options, :default_url_options=, to: :routes
@ -83,7 +81,6 @@ module Rails
@env_config = nil
@ordered_railties = nil
@railties = nil
@queue = nil
end
# Returns true if the application is initialized.
@ -228,10 +225,6 @@ module Rails
@config ||= Application::Configuration.new(find_root_with_flag("config.ru", Dir.pwd))
end
def queue #:nodoc:
@queue ||= config.queue || ActiveSupport::Queue.new
end
def to_app #:nodoc:
self
end

View File

@ -1,6 +1,5 @@
require 'active_support/core_ext/kernel/reporting'
require 'active_support/file_update_checker'
require 'active_support/queueing'
require 'rails/engine/configuration'
module Rails
@ -13,7 +12,7 @@ module Rails
:railties_order, :relative_url_root, :secret_key_base, :secret_token,
:serve_static_assets, :ssl_options, :static_cache_control, :session_options,
:time_zone, :reload_classes_only_on_change,
:queue, :queue_consumer, :beginning_of_week, :filter_redirect
:beginning_of_week, :filter_redirect
attr_writer :log_level
attr_reader :encoding
@ -44,8 +43,6 @@ module Rails
@exceptions_app = nil
@autoflush_log = true
@log_formatter = ActiveSupport::Logger::SimpleFormatter.new
@queue = ActiveSupport::SynchronousQueue.new
@queue_consumer = nil
@eager_load = nil
@secret_token = nil
@secret_key_base = nil

View File

@ -96,15 +96,6 @@ module Rails
ActiveSupport::Dependencies.unhook!
end
end
initializer :activate_queue_consumer do |app|
if config.queue.class == ActiveSupport::Queue
app.queue_consumer = config.queue_consumer || config.queue.consumer
app.queue_consumer.logger ||= Rails.logger if app.queue_consumer.respond_to?(:logger=)
app.queue_consumer.start
at_exit { app.queue_consumer.shutdown }
end
end
end
end
end

View File

@ -84,8 +84,4 @@
# Use default logging formatter so that PID and timestamp are not suppressed.
config.log_formatter = ::Logger::Formatter.new
# Default the production mode queue to an synchronous queue. You will probably
# want to replace this with an out-of-process queueing solution.
# config.queue = ActiveSupport::SynchronousQueue.new
end

View File

@ -33,7 +33,4 @@
# Print deprecation notices to the stderr.
config.active_support.deprecation = :stderr
# Use the synchronous queue to run jobs immediately.
config.queue = ActiveSupport::SynchronousQueue.new
end

View File

@ -50,22 +50,6 @@ module ApplicationTests
assert_equal "test.rails", ActionMailer::Base.default_url_options[:host]
end
test "uses the default queue for ActionMailer" do
require "#{app_path}/config/environment"
assert_kind_of ActiveSupport::Queue, ActionMailer::Base.queue
end
test "allows me to configure queue for ActionMailer" do
app_file "config/environments/development.rb", <<-RUBY
AppTemplate::Application.configure do
config.action_mailer.queue = ActiveSupport::TestQueue.new
end
RUBY
require "#{app_path}/config/environment"
assert_kind_of ActiveSupport::TestQueue, ActionMailer::Base.queue
end
test "does not include url helpers as action methods" do
app_file "config/routes.rb", <<-RUBY
AppTemplate::Application.routes.draw do

View File

@ -1,154 +0,0 @@
require 'isolation/abstract_unit'
module ApplicationTests
class QueueTest < ActiveSupport::TestCase
include ActiveSupport::Testing::Isolation
def setup
build_app
boot_rails
end
def teardown
teardown_app
end
def app_const
@app_const ||= Class.new(Rails::Application)
end
test "the queue is a SynchronousQueue in test mode" do
app("test")
assert_kind_of ActiveSupport::SynchronousQueue, Rails.application.queue
assert_kind_of ActiveSupport::SynchronousQueue, Rails.queue
end
test "the queue is a SynchronousQueue in development mode" do
app("development")
assert_kind_of ActiveSupport::SynchronousQueue, Rails.application.queue
assert_kind_of ActiveSupport::SynchronousQueue, Rails.queue
end
class ThreadTrackingJob
def initialize
@origin = Thread.current.object_id
end
def run
@target = Thread.current.object_id
end
def ran_in_different_thread?
@origin != @target
end
def ran?
@target
end
end
test "in development mode, an enqueued job will be processed in the same thread" do
app("development")
job = ThreadTrackingJob.new
Rails.queue.push job
sleep 0.1
assert job.ran?, "Expected job to be run"
refute job.ran_in_different_thread?, "Expected job to run in the same thread"
end
test "in test mode, an enqueued job will be processed in the same thread" do
app("test")
job = ThreadTrackingJob.new
Rails.queue.push job
sleep 0.1
assert job.ran?, "Expected job to be run"
refute job.ran_in_different_thread?, "Expected job to run in the same thread"
end
test "in production, automatically spawn a queue consumer in a background thread" do
add_to_env_config "production", <<-RUBY
config.queue = ActiveSupport::Queue.new
RUBY
app("production")
assert_nil Rails.application.config.queue_consumer
assert_kind_of ActiveSupport::ThreadedQueueConsumer, Rails.application.queue_consumer
assert_equal Rails.logger, Rails.application.queue_consumer.logger
end
test "attempting to marshal a queue will raise an exception" do
app("test")
assert_raises TypeError do
Marshal.dump Rails.queue
end
end
def setup_custom_queue
add_to_env_config "production", <<-RUBY
require "my_queue"
config.queue = MyQueue.new
RUBY
app_file "lib/my_queue.rb", <<-RUBY
class MyQueue
def push(job)
job.run
end
end
RUBY
app("production")
end
test "a custom queue implementation can be provided" do
setup_custom_queue
assert_kind_of MyQueue, Rails.queue
job = Struct.new(:id, :ran) do
def run
self.ran = true
end
end
job1 = job.new(1)
Rails.queue.push job1
assert_equal true, job1.ran
end
test "a custom consumer implementation can be provided" do
add_to_env_config "production", <<-RUBY
require "my_queue_consumer"
config.queue = ActiveSupport::Queue.new
config.queue_consumer = MyQueueConsumer.new
RUBY
app_file "lib/my_queue_consumer.rb", <<-RUBY
class MyQueueConsumer
attr_reader :started
def start
@started = true
end
end
RUBY
app("production")
assert_kind_of MyQueueConsumer, Rails.application.queue_consumer
assert Rails.application.queue_consumer.started
end
test "default consumer is not used with custom queue implementation" do
setup_custom_queue
assert_nil Rails.application.queue_consumer
end
end
end