diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index b5b4cbaede..2a03515b86 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -7,6 +7,16 @@ require "active_support/core_ext/object/try" module ActiveSupport module Notifications + class InstrumentationSubscriberError < RuntimeError + attr_reader :exceptions + + def initialize(exceptions) + @exceptions = exceptions + exception_class_names = exceptions.map { |e| e.class.name } + super "Exception(s) occurred within instrumentation subscribers: #{exception_class_names.join(', ')}" + end + end + # This is a default queue implementation that ships with Notifications. # It just pushes events to all registered log subscribers. # @@ -59,19 +69,32 @@ module ActiveSupport end def start(name, id, payload) - listeners_for(name).each { |s| s.start(name, id, payload) } + iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) } end def finish(name, id, payload, listeners = listeners_for(name)) - listeners.each { |s| s.finish(name, id, payload) } + iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) } end def publish(name, *args) - listeners_for(name).each { |s| s.publish(name, *args) } + iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) } end def publish_event(event) - listeners_for(event.name).each { |s| s.publish_event(event) } + iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) } + end + + def iterate_guarding_exceptions(listeners) + exceptions = nil + + listeners.each do |s| + yield s + rescue => e + exceptions ||= [] + exceptions << e + end + ensure + raise InstrumentationSubscriberError.new(exceptions) unless exceptions.nil? end def listeners_for(name) diff --git a/activesupport/test/notifications/evented_notification_test.rb b/activesupport/test/notifications/evented_notification_test.rb index 3f523044e4..771e059759 100644 --- a/activesupport/test/notifications/evented_notification_test.rb +++ b/activesupport/test/notifications/evented_notification_test.rb @@ -5,6 +5,8 @@ require_relative "../abstract_unit" module ActiveSupport module Notifications class EventedTest < ActiveSupport::TestCase + class BadListenerException < RuntimeError; end + class Listener attr_reader :events @@ -27,6 +29,24 @@ module ActiveSupport end end + class BadStartListener < Listener + def start(name, id, payload) + raise BadListenerException + end + + def finish(name, id, payload) + end + end + + class BadFinishListener < Listener + def start(name, id, payload) + end + + def finish(name, id, payload) + raise BadListenerException + end + end + def test_evented_listener notifier = Fanout.new listener = Listener.new @@ -71,6 +91,54 @@ module ActiveSupport ], listener.events end + def test_listen_start_exception_consistency + notifier = Fanout.new + listener = Listener.new + notifier.subscribe nil, BadStartListener.new + notifier.subscribe nil, listener + + assert_raises InstrumentationSubscriberError do + notifier.start "hello", 1, {} + end + assert_raises InstrumentationSubscriberError do + notifier.start "world", 1, {} + end + notifier.finish "world", 1, {} + notifier.finish "hello", 1, {} + + assert_equal 4, listener.events.length + assert_equal [ + [:start, "hello", 1, {}], + [:start, "world", 1, {}], + [:finish, "world", 1, {}], + [:finish, "hello", 1, {}], + ], listener.events + end + + def test_listen_finish_exception_consistency + notifier = Fanout.new + listener = Listener.new + notifier.subscribe nil, BadFinishListener.new + notifier.subscribe nil, listener + + notifier.start "hello", 1, {} + notifier.start "world", 1, {} + assert_raises InstrumentationSubscriberError do + notifier.finish "world", 1, {} + end + assert_raises InstrumentationSubscriberError do + notifier.finish "hello", 1, {} + end + + assert_equal 4, listener.events.length + assert_equal [ + [:start, "hello", 1, {}], + [:start, "world", 1, {}], + [:finish, "world", 1, {}], + [:finish, "hello", 1, {}], + ], listener.events + end + def test_evented_listener_priority notifier = Fanout.new listener = ListenerWithTimedSupport.new