Merge pull request #44469 from jhawthorn/fanout_groups

Make Notifications::Fanout faster and safer
This commit is contained in:
John Hawthorn 2022-06-02 12:28:29 -07:00 committed by GitHub
commit 693c14c54d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 358 additions and 127 deletions

View File

@ -17,78 +17,7 @@ module ActiveSupport
end
end
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# This class is thread safe. All methods are reentrant.
class Fanout
include Mutex_m
def initialize
@string_subscribers = Hash.new { |h, k| h[k] = [] }
@other_subscribers = []
@listeners_for = Concurrent::Map.new
super
end
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
subscriber = Subscribers.new(pattern, callable || block, monotonic)
synchronize do
case pattern
when String
@string_subscribers[pattern] << subscriber
@listeners_for.delete(pattern)
when NilClass, Regexp
@other_subscribers << subscriber
@listeners_for.clear
else
raise ArgumentError, "pattern must be specified as a String, Regexp or empty"
end
end
subscriber
end
def unsubscribe(subscriber_or_name)
synchronize do
case subscriber_or_name
when String
@string_subscribers[subscriber_or_name].clear
@listeners_for.delete(subscriber_or_name)
@other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
else
pattern = subscriber_or_name.try(:pattern)
if String === pattern
@string_subscribers[pattern].delete(subscriber_or_name)
@listeners_for.delete(pattern)
else
@other_subscribers.delete(subscriber_or_name)
@listeners_for.clear
end
end
end
end
def inspect # :nodoc:
total_patterns = @string_subscribers.size + @other_subscribers.size
"#<#{self.class} (#{total_patterns} patterns)>"
end
def start(name, id, payload)
iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) }
end
def finish(name, id, payload, listeners = listeners_for(name))
iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end
def publish_event(event)
iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
module FanoutIteration # :nodoc:
def iterate_guarding_exceptions(listeners)
exceptions = nil
@ -109,6 +38,238 @@ module ActiveSupport
listeners
end
end
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# This class is thread safe. All methods are reentrant.
class Fanout
include Mutex_m
def initialize
@string_subscribers = Hash.new { |h, k| h[k] = [] }
@other_subscribers = []
@listeners_for = Concurrent::Map.new
@groups_for = Concurrent::Map.new
super
end
def inspect # :nodoc:
total_patterns = @string_subscribers.size + @other_subscribers.size
"#<#{self.class} (#{total_patterns} patterns)>"
end
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
subscriber = Subscribers.new(pattern, callable || block, monotonic)
synchronize do
case pattern
when String
@string_subscribers[pattern] << subscriber
clear_cache(pattern)
when NilClass, Regexp
@other_subscribers << subscriber
clear_cache
else
raise ArgumentError, "pattern must be specified as a String, Regexp or empty"
end
end
subscriber
end
def unsubscribe(subscriber_or_name)
synchronize do
case subscriber_or_name
when String
@string_subscribers[subscriber_or_name].clear
clear_cache(subscriber_or_name)
@other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
else
pattern = subscriber_or_name.try(:pattern)
if String === pattern
@string_subscribers[pattern].delete(subscriber_or_name)
clear_cache(pattern)
else
@other_subscribers.delete(subscriber_or_name)
clear_cache
end
end
end
end
def clear_cache(key = nil) # :nodoc:
if key
@listeners_for.delete(key)
@groups_for.delete(key)
else
@listeners_for.clear
@groups_for.clear
end
end
class BaseGroup # :nodoc:
include FanoutIteration
def initialize(listeners, name, id, payload)
@listeners = listeners
end
def each(&block)
iterate_guarding_exceptions(@listeners, &block)
end
end
class BaseTimeGroup < BaseGroup # :nodoc:
def start(name, id, payload)
@start_time = now
end
def finish(name, id, payload)
stop_time = now
each do |listener|
listener.call(name, @start_time, stop_time, id, payload)
end
end
end
class MonotonicTimedGroup < BaseTimeGroup # :nodoc:
private
def now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
class TimedGroup < BaseTimeGroup # :nodoc:
private
def now
Time.now
end
end
class EventedGroup < BaseGroup # :nodoc:
def start(name, id, payload)
each do |s|
s.start(name, id, payload)
end
end
def finish(name, id, payload)
each do |s|
s.finish(name, id, payload)
end
end
end
class EventObjectGroup < BaseGroup # :nodoc:
def start(name, id, payload)
@event = build_event(name, id, payload)
@event.start!
end
def finish(name, id, payload)
@event.payload = payload
@event.finish!
each do |s|
s.call(@event)
end
end
private
def build_event(name, id, payload)
ActiveSupport::Notifications::Event.new name, nil, nil, id, payload
end
end
def groups_for(name) # :nodoc:
@groups_for.compute_if_absent(name) do
listeners_for(name).group_by(&:group_class).transform_values do |s|
s.map(&:delegate)
end
end
end
# A Handle is used to record the start and finish time of event
#
# Both `#start` and `#finish` must each be called exactly once
#
# Where possible, it's best to the block form, +ActiveSupport::Notifications.instrument+
# +Handle+ is a low-level API intended for cases where the block form can't be used.
#
# handle = ActiveSupport::Notifications.instrumenter.build_handle("my.event", {})
# begin
# handle.start
# # work to be instrumented
# ensure
# handle.finish
# end
class Handle
def initialize(notifier, name, id, payload) # :nodoc:
@name = name
@id = id
@payload = payload
@groups = notifier.groups_for(name).map do |group_klass, grouped_listeners|
group_klass.new(grouped_listeners, name, id, payload)
end
@state = :initialized
end
def start
ensure_state! :initialized
@state = :started
@groups.each do |group|
group.start(@name, @id, @payload)
end
end
def finish
finish_with_values(@name, @id, @payload)
end
def finish_with_values(name, id, payload) # :nodoc:
ensure_state! :started
@state = :finished
@groups.each do |group|
group.finish(name, id, payload)
end
end
private
def ensure_state!(expected)
if @state != expected
raise ArgumentError, "expected state to be #{expected.inspect} but was #{@state.inspect}"
end
end
end
include FanoutIteration
def build_handle(name, id, payload)
Handle.new(self, name, id, payload)
end
def start(name, id, payload)
handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
handle = build_handle(name, id, payload)
handle_stack << handle
handle.start
end
def finish(name, id, payload, listeners = nil)
handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
handle = handle_stack.pop
handle.finish_with_values(name, id, payload)
end
def publish(name, *args)
iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end
def publish_event(event)
iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
def listeners_for(name)
# this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
@ -185,7 +346,7 @@ module ActiveSupport
end
class Evented # :nodoc:
attr_reader :pattern
attr_reader :pattern, :delegate
def initialize(pattern, delegate)
@pattern = Matcher.wrap(pattern)
@ -194,6 +355,10 @@ module ActiveSupport
@can_publish_event = delegate.respond_to?(:publish_event)
end
def group_class
EventedGroup
end
def publish(name, *args)
if @can_publish
@delegate.publish name, *args
@ -208,14 +373,6 @@ module ActiveSupport
end
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def subscribed_to?(name)
pattern === name
end
@ -226,63 +383,29 @@ module ActiveSupport
end
class Timed < Evented # :nodoc:
def group_class
TimedGroup
end
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
timestack = IsolatedExecutionState[:_timestack] ||= []
timestack.push Time.now
end
def finish(name, id, payload)
timestack = IsolatedExecutionState[:_timestack]
started = timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
end
class MonotonicTimed < Evented # :nodoc:
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
timestack = IsolatedExecutionState[:_timestack_monotonic] ||= []
timestack.push Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def finish(name, id, payload)
timestack = IsolatedExecutionState[:_timestack_monotonic]
started = timestack.pop
@delegate.call(name, started, Process.clock_gettime(Process::CLOCK_MONOTONIC), id, payload)
class MonotonicTimed < Timed # :nodoc:
def group_class
MonotonicTimedGroup
end
end
class EventObject < Evented
def start(name, id, payload)
stack = IsolatedExecutionState[:_event_stack] ||= []
event = build_event name, id, payload
event.start!
stack.push event
end
def finish(name, id, payload)
stack = IsolatedExecutionState[:_event_stack]
event = stack.pop
event.payload = payload
event.finish!
@delegate.call event
def group_class
EventObjectGroup
end
def publish_event(event)
@delegate.call event
end
private
def build_event(name, id, payload)
ActiveSupport::Notifications::Event.new name, nil, nil, id, payload
end
end
end
end

View File

@ -1,5 +1,6 @@
# frozen_string_literal: true
require "active_support/core_ext/module/delegation"
require "securerandom"
module ActiveSupport
@ -9,17 +10,50 @@ module ActiveSupport
attr_reader :id
def initialize(notifier)
unless notifier.respond_to?(:build_handle)
notifier = LegacyHandle::Wrapper.new(notifier)
end
@id = unique_id
@notifier = notifier
end
class LegacyHandle # :nodoc:
class Wrapper # :nodoc:
def initialize(notifier)
@notifier = notifier
end
def build_handle(name, id, payload)
LegacyHandle.new(@notifier, name, id, payload)
end
delegate :start, :finish, to: :@notifier
end
def initialize(notifier, name, id, payload)
@notifier = notifier
@name = name
@id = id
@payload = payload
end
def start
@listener_state = @notifier.start @name, @id, @payload
end
def finish
@notifier.finish(@name, @id, @payload, @listener_state)
end
end
# Given a block, instrument it by measuring the time taken to execute
# and publish it. Without a block, simply send a message via the
# notifier. Notice that events get sent even if an error occurs in the
# passed-in block.
def instrument(name, payload = {})
# some of the listeners might have state
listeners_state = start name, payload
handle = build_handle(name, payload)
handle.start
begin
yield payload if block_given?
rescue Exception => e
@ -27,10 +61,24 @@ module ActiveSupport
payload[:exception_object] = e
raise e
ensure
finish_with_state listeners_state, name, payload
handle.finish
end
end
# Returns a "handle" for an event with the given +name+ and +payload+
#
# +#start+ and +#finish+ must each be called exactly once on the returned object.
#
# Where possible, it's best to use +#instrument+, which will record the
# start and finish of the event and correctly handle any exceptions.
# +build_handle+ is a low-level API intended for cases where using
# +#instrument+ isn't possible.
#
# See ActiveSupport::Notifications::Fanout::Handle
def build_handle(name, payload)
@notifier.build_handle(name, @id, payload)
end
def new_event(name, payload = {}) # :nodoc:
Event.new(name, nil, nil, @id, payload)
end

View File

@ -140,6 +140,66 @@ module Notifications
end
end
class BuildHandleTest < TestCase
def test_interleaved_event
event_name = "foo"
actual_times = []
ActiveSupport::Notifications.subscribe(event_name) do |name, started, finished, unique_id, data|
actual_times << [started, finished]
end
times = (1..4).map { |s| Time.new(2020, 1, 1) + s }
instrumenter = ActiveSupport::Notifications.instrumenter
travel_to times[0]
handle1 = instrumenter.build_handle(event_name, {})
handle2 = instrumenter.build_handle(event_name, {})
handle1.start
travel_to times[1]
handle2.start
travel_to times[2]
handle1.finish
travel_to times[3]
handle2.finish
assert_equal [
# from when start1 was returned, to when its state passed to finish
[times[0], times[2]],
# from when start2 was returned, to when its state passed to finish
[times[1], times[3]],
], actual_times
end
def test_subscribed_interleaved_with_event
instrumenter = ActiveSupport::Notifications.instrumenter
name = "foo"
events1 = []
events2 = []
callback1 = lambda { |event| events1 << event }
callback2 = lambda { |event| events2 << event }
ActiveSupport::Notifications.subscribed(callback1, name) do
handle = instrumenter.build_handle(name, {})
handle.start
ActiveSupport::Notifications.subscribed(callback2, name) do
handle.finish
end
end
assert_equal 1, events1.size
assert_empty events2
assert_equal name, events1[0].name
assert events1[0].time
assert events1[0].end
end
end
class SubscribedTest < TestCase
def test_subscribed
name = "foo"