1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00
rails--rails/activesupport/lib/active_support/notifications/fanout.rb
thedarkone ab3c4a4083 Subscribing to notifications while inside the said instrumented section.
The issue is that on the exit from Instrumenter#instrument section,
an Evented listener will run into an error because its thread local
(Thread.current[:_timestack]) has not been set up by the #start
method (this obviously happens because the Evented listeners didn't
exist at the time, since no subscribtion to that section was made yet).

Note: support for subscribing to instrumented sections, while being
inside those instrumented sections, might be removed in the future.

Maybe fixes #21873.
2015-11-28 01:40:21 +01:00

157 lines
4 KiB
Ruby

require 'mutex_m'
require 'concurrent/map'
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# This class is thread safe. All methods are reentrant.
class Fanout
include Mutex_m
def initialize
@subscribers = []
@listeners_for = Concurrent::Map.new
super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
synchronize do
@subscribers << subscriber
@listeners_for.clear
end
subscriber
end
def unsubscribe(subscriber_or_name)
synchronize do
case subscriber_or_name
when String
@subscribers.reject! { |s| s.matches?(subscriber_or_name) }
else
@subscribers.delete(subscriber_or_name)
end
@listeners_for.clear
end
end
def start(name, id, payload)
listeners_for(name).each { |s| s.start(name, id, payload) }
end
def finish(name, id, payload, listeners = listeners_for(name))
listeners.each { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
listeners_for(name).each { |s| s.publish(name, *args) }
end
def listeners_for(name)
# this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
@listeners_for[name] || synchronize do
# use synchronisation when accessing @subscribers
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end
def listening?(name)
listeners_for(name).any?
end
# This is a sync queue, so there is no waiting.
def wait
end
module Subscribers # :nodoc:
def self.new(pattern, listener)
if listener.respond_to?(:start) and listener.respond_to?(:finish)
subscriber = Evented.new pattern, listener
else
subscriber = Timed.new pattern, listener
end
unless pattern
AllMessages.new(subscriber)
else
subscriber
end
end
class Evented #:nodoc:
def initialize(pattern, delegate)
@pattern = pattern
@delegate = delegate
@can_publish = delegate.respond_to?(:publish)
end
def publish(name, *args)
if @can_publish
@delegate.publish name, *args
end
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def subscribed_to?(name)
@pattern === name
end
def matches?(name)
@pattern && @pattern === name
end
end
class Timed < Evented # :nodoc:
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
timestack = Thread.current[:_timestack] ||= []
timestack.push Time.now
end
def finish(name, id, payload)
timestack = Thread.current[:_timestack]
started = timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
end
class AllMessages # :nodoc:
def initialize(delegate)
@delegate = delegate
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def publish(name, *args)
@delegate.publish name, *args
end
def subscribed_to?(name)
true
end
alias :matches? :===
end
end
end
end
end