1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Always run jobs using a consumer, even in synchronous & test queues, to ensure shared behavior.

This commit is contained in:
Jeremy Kemper 2012-09-14 17:00:46 -07:00
parent e08a564e87
commit b881be7331
4 changed files with 109 additions and 70 deletions

View file

@ -2,17 +2,33 @@ require 'delegate'
require 'thread'
module ActiveSupport
# A Queue that simply inherits from STDLIB's Queue. Everytime this
# queue is used, Rails automatically sets up a ThreadedConsumer
# to consume it.
# A Queue that simply inherits from STDLIB's Queue. When this
# queue is used, Rails automatically starts a job runner in a
# background thread.
class Queue < ::Queue
attr_writer :consumer
def initialize(consumer_options = {})
super()
@consumer_options = consumer_options
end
def consumer
@consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
end
# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
# run the jobs in a separate thread so assumptions of synchronous
# jobs are caught in test mode.
consumer.drain
end
end
class SynchronousQueue < ::Queue
class SynchronousQueue < Queue
def push(job)
result = nil
Thread.new { result = job.run }.join
result
super.tap { drain }
end
alias << push
alias enq push
@ -25,7 +41,7 @@ module ActiveSupport
#
# Jobs are run in a separate thread to catch mistakes where code
# assumes that the job is run in the same thread.
class TestQueue < ::Queue
class TestQueue < Queue
# Get a list of the jobs off this queue. This method may not be
# available on production queues.
def jobs
@ -38,14 +54,6 @@ module ActiveSupport
def push(job)
super Marshal.load(Marshal.dump(job))
end
# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
# run the jobs in a separate thread so assumptions of synchronous
# jobs are caught in test mode.
Thread.new { pop.run until empty? }.join
end
end
# A container for multiple queues. This class delegates to a default Queue
@ -82,25 +90,17 @@ module ActiveSupport
# queue and joins the thread, which will ensure that all jobs
# are executed before the process finally dies.
class ThreadedQueueConsumer
def self.start(queue, logger=nil)
new(queue, logger).start
def self.start(*args)
new(*args).start
end
def initialize(queue, logger=nil)
@queue = queue
@logger = logger
def initialize(queue, options = {})
@queue = queue
@logger = options[:logger]
end
def start
@thread = Thread.new do
while job = @queue.pop
begin
job.run
rescue Exception => e
handle_exception e
end
end
end
@thread = Thread.new { consume }
self
end
@ -109,8 +109,25 @@ module ActiveSupport
@thread.join
end
def handle_exception(e)
@logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger
def drain
Thread.new { run(@queue.pop) until @queue.empty? }.join
end
def consume
while job = @queue.pop
run job
end
end
def run(job)
job.run
rescue Exception => exception
handle_exception job, exception
end
def handle_exception(job, exception)
raise unless @logger
@logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}"
end
end
end

View file

@ -0,0 +1,27 @@
require 'abstract_unit'
require 'active_support/queueing'
class SynchronousQueueTest < ActiveSupport::TestCase
class Job
attr_reader :ran
def run; @ran = true end
end
class ExceptionRaisingJob
def run; raise end
end
def setup
@queue = ActiveSupport::SynchronousQueue.new
end
def test_runs_jobs_immediately
job = Job.new
@queue.push job
assert job.ran
assert_raises RuntimeError do
@queue.push ExceptionRaisingJob.new
end
end
end

View file

@ -12,7 +12,7 @@ class TestQueueTest < ActiveSupport::TestCase
end
end
def test_drain_raises
def test_drain_raises_exceptions_from_running_jobs
@queue.push ExceptionRaisingJob.new
assert_raises(RuntimeError) { @queue.drain }
end
@ -41,8 +41,8 @@ class TestQueueTest < ActiveSupport::TestCase
end
def test_contents
assert @queue.empty?
job = EquivalentJob.new
assert @queue.empty?
@queue.push job
refute @queue.empty?
assert_equal job, @queue.pop

View file

@ -5,7 +5,7 @@ require "active_support/log_subscriber/test_helper"
class TestThreadConsumer < ActiveSupport::TestCase
class Job
attr_reader :id
def initialize(id, &block)
def initialize(id = 1, &block)
@id = id
@block = block
end
@ -16,83 +16,78 @@ class TestThreadConsumer < ActiveSupport::TestCase
end
def setup
@queue = ActiveSupport::Queue.new
@logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
@consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger)
@queue = ActiveSupport::Queue.new(logger: @logger)
end
def teardown
@queue.push nil
@queue.drain
end
test "the jobs are executed" do
ran = false
job = Job.new(1) do
ran = true
end
job = Job.new { ran = true }
@queue.push job
sleep 0.1
@queue.drain
assert_equal true, ran
end
test "the jobs are not executed synchronously" do
ran = false
job = Job.new(1) do
sleep 0.1
ran = true
end
run, ran = Queue.new, Queue.new
job = Job.new { ran.push run.pop }
@queue.consumer.start
@queue.push job
assert_equal false, ran
assert ran.empty?
run.push true
assert_equal true, ran.pop
end
test "shutting down the queue synchronously drains the jobs" do
runnable = ::Queue.new
ran = false
job = Job.new(1) do
job = Job.new do
sleep 0.1
ran = true
end
@queue.consumer.start
@queue.push job
assert_equal false, ran
@consumer.shutdown
@queue.consumer.shutdown
assert_equal true, ran
end
test "log job that raises an exception" do
job = Job.new(1) do
raise "RuntimeError: Error!"
end
job = Job.new { raise "RuntimeError: Error!" }
@queue.push job
sleep 0.1
@queue.drain
assert_equal 1, @logger.logged(:error).size
assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last)
assert_match 'Job Error: RuntimeError: Error!', @logger.logged(:error).last
end
test "test overriding exception handling" do
@consumer.shutdown
@consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do
attr_reader :last_error
def handle_exception(e)
@last_error = e.message
@queue.consumer.instance_eval do
def handle_exception(job, exception)
@last_error = exception.message
end
end.start(@queue)
job = Job.new(1) do
raise "RuntimeError: Error!"
def last_error
@last_error
end
end
@queue.push job
sleep 0.1
job = Job.new { raise "RuntimeError: Error!" }
assert_equal "RuntimeError: Error!", @consumer.last_error
@queue.push job
@queue.drain
assert_equal "RuntimeError: Error!", @queue.consumer.last_error
end
end