From b881be7331339d7eab50ccf0512de3ab4e49a896 Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Fri, 14 Sep 2012 17:00:46 -0700 Subject: [PATCH] Always run jobs using a consumer, even in synchronous & test queues, to ensure shared behavior. --- activesupport/lib/active_support/queueing.rb | 81 +++++++++++-------- .../test/queueing/synchronous_queue_test.rb | 27 +++++++ .../test/queueing/test_queue_test.rb | 4 +- .../test/queueing/threaded_consumer_test.rb | 67 +++++++-------- 4 files changed, 109 insertions(+), 70 deletions(-) create mode 100644 activesupport/test/queueing/synchronous_queue_test.rb diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb index f397e1c0c5..d36b5c17a8 100644 --- a/activesupport/lib/active_support/queueing.rb +++ b/activesupport/lib/active_support/queueing.rb @@ -2,17 +2,33 @@ require 'delegate' require 'thread' module ActiveSupport - # A Queue that simply inherits from STDLIB's Queue. Everytime this - # queue is used, Rails automatically sets up a ThreadedConsumer - # to consume it. + # A Queue that simply inherits from STDLIB's Queue. When this + # queue is used, Rails automatically starts a job runner in a + # background thread. class Queue < ::Queue + attr_writer :consumer + + def initialize(consumer_options = {}) + super() + @consumer_options = consumer_options + end + + def consumer + @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options) + end + + # Drain the queue, running all jobs in a different thread. This method + # may not be available on production queues. + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + consumer.drain + end end - class SynchronousQueue < ::Queue + class SynchronousQueue < Queue def push(job) - result = nil - Thread.new { result = job.run }.join - result + super.tap { drain } end alias << push alias enq push @@ -25,7 +41,7 @@ module ActiveSupport # # Jobs are run in a separate thread to catch mistakes where code # assumes that the job is run in the same thread. - class TestQueue < ::Queue + class TestQueue < Queue # Get a list of the jobs off this queue. This method may not be # available on production queues. def jobs @@ -38,14 +54,6 @@ module ActiveSupport def push(job) super Marshal.load(Marshal.dump(job)) end - - # Drain the queue, running all jobs in a different thread. This method - # may not be available on production queues. - def drain - # run the jobs in a separate thread so assumptions of synchronous - # jobs are caught in test mode. - Thread.new { pop.run until empty? }.join - end end # A container for multiple queues. This class delegates to a default Queue @@ -82,25 +90,17 @@ module ActiveSupport # queue and joins the thread, which will ensure that all jobs # are executed before the process finally dies. class ThreadedQueueConsumer - def self.start(queue, logger=nil) - new(queue, logger).start + def self.start(*args) + new(*args).start end - def initialize(queue, logger=nil) - @queue = queue - @logger = logger + def initialize(queue, options = {}) + @queue = queue + @logger = options[:logger] end def start - @thread = Thread.new do - while job = @queue.pop - begin - job.run - rescue Exception => e - handle_exception e - end - end - end + @thread = Thread.new { consume } self end @@ -109,8 +109,25 @@ module ActiveSupport @thread.join end - def handle_exception(e) - @logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger + def drain + Thread.new { run(@queue.pop) until @queue.empty? }.join + end + + def consume + while job = @queue.pop + run job + end + end + + def run(job) + job.run + rescue Exception => exception + handle_exception job, exception + end + + def handle_exception(job, exception) + raise unless @logger + @logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}" end end end diff --git a/activesupport/test/queueing/synchronous_queue_test.rb b/activesupport/test/queueing/synchronous_queue_test.rb new file mode 100644 index 0000000000..86c39d0f6c --- /dev/null +++ b/activesupport/test/queueing/synchronous_queue_test.rb @@ -0,0 +1,27 @@ +require 'abstract_unit' +require 'active_support/queueing' + +class SynchronousQueueTest < ActiveSupport::TestCase + class Job + attr_reader :ran + def run; @ran = true end + end + + class ExceptionRaisingJob + def run; raise end + end + + def setup + @queue = ActiveSupport::SynchronousQueue.new + end + + def test_runs_jobs_immediately + job = Job.new + @queue.push job + assert job.ran + + assert_raises RuntimeError do + @queue.push ExceptionRaisingJob.new + end + end +end diff --git a/activesupport/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb index 4c08314366..9e74bc64ee 100644 --- a/activesupport/test/queueing/test_queue_test.rb +++ b/activesupport/test/queueing/test_queue_test.rb @@ -12,7 +12,7 @@ class TestQueueTest < ActiveSupport::TestCase end end - def test_drain_raises + def test_drain_raises_exceptions_from_running_jobs @queue.push ExceptionRaisingJob.new assert_raises(RuntimeError) { @queue.drain } end @@ -41,8 +41,8 @@ class TestQueueTest < ActiveSupport::TestCase end def test_contents - assert @queue.empty? job = EquivalentJob.new + assert @queue.empty? @queue.push job refute @queue.empty? assert_equal job, @queue.pop diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb index 20a1cc4e8e..6bed80e9dd 100644 --- a/activesupport/test/queueing/threaded_consumer_test.rb +++ b/activesupport/test/queueing/threaded_consumer_test.rb @@ -5,7 +5,7 @@ require "active_support/log_subscriber/test_helper" class TestThreadConsumer < ActiveSupport::TestCase class Job attr_reader :id - def initialize(id, &block) + def initialize(id = 1, &block) @id = id @block = block end @@ -16,83 +16,78 @@ class TestThreadConsumer < ActiveSupport::TestCase end def setup - @queue = ActiveSupport::Queue.new @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new - @consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger) + @queue = ActiveSupport::Queue.new(logger: @logger) end def teardown - @queue.push nil + @queue.drain end test "the jobs are executed" do ran = false - - job = Job.new(1) do - ran = true - end + job = Job.new { ran = true } @queue.push job - sleep 0.1 + @queue.drain + assert_equal true, ran end test "the jobs are not executed synchronously" do - ran = false - - job = Job.new(1) do - sleep 0.1 - ran = true - end + run, ran = Queue.new, Queue.new + job = Job.new { ran.push run.pop } + @queue.consumer.start @queue.push job - assert_equal false, ran + assert ran.empty? + + run.push true + assert_equal true, ran.pop end test "shutting down the queue synchronously drains the jobs" do + runnable = ::Queue.new ran = false - - job = Job.new(1) do + job = Job.new do sleep 0.1 ran = true end + @queue.consumer.start @queue.push job assert_equal false, ran - @consumer.shutdown - + @queue.consumer.shutdown assert_equal true, ran end test "log job that raises an exception" do - job = Job.new(1) do - raise "RuntimeError: Error!" - end + job = Job.new { raise "RuntimeError: Error!" } @queue.push job - sleep 0.1 + @queue.drain assert_equal 1, @logger.logged(:error).size - assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last) + assert_match 'Job Error: RuntimeError: Error!', @logger.logged(:error).last end test "test overriding exception handling" do - @consumer.shutdown - @consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do - attr_reader :last_error - def handle_exception(e) - @last_error = e.message + @queue.consumer.instance_eval do + def handle_exception(job, exception) + @last_error = exception.message end - end.start(@queue) - job = Job.new(1) do - raise "RuntimeError: Error!" + def last_error + @last_error + end end - @queue.push job - sleep 0.1 + job = Job.new { raise "RuntimeError: Error!" } - assert_equal "RuntimeError: Error!", @consumer.last_error + @queue.push job + @queue.drain + + assert_equal "RuntimeError: Error!", @queue.consumer.last_error end end