1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00
rails--rails/activesupport/lib/active_support/notifications/fanout.rb
Eric Saxby 6fe36ba585 Evented notifications take priority over Timed notifications
In cases where a notification subscriber includes methods to support
both Evented and Timed events, Evented should take priority over Timed.
This allows subscribers to be backwards compatible (older Rails only
allows Timed events) while defaulting to newer behavior.
2012-08-11 17:39:20 -07:00

145 lines
3.5 KiB
Ruby

require 'mutex_m'
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# This class is thread safe. All methods are reentrant.
class Fanout
include Mutex_m
def initialize
@subscribers = []
@listeners_for = {}
super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
synchronize do
@subscribers << subscriber
@listeners_for.clear
end
subscriber
end
def unsubscribe(subscriber)
synchronize do
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
end
end
def start(name, id, payload)
listeners_for(name).each { |s| s.start(name, id, payload) }
end
def finish(name, id, payload)
listeners_for(name).each { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
listeners_for(name).each { |s| s.publish(name, *args) }
end
def listeners_for(name)
synchronize do
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end
def listening?(name)
listeners_for(name).any?
end
# This is a sync queue, so there is no waiting.
def wait
end
module Subscribers # :nodoc:
def self.new(pattern, listener)
if listener.respond_to?(:start) and listener.respond_to?(:finish)
subscriber = Evented.new pattern, listener
else
subscriber = Timed.new pattern, listener
end
unless pattern
AllMessages.new(subscriber)
else
subscriber
end
end
class Evented #:nodoc:
def initialize(pattern, delegate)
@pattern = pattern
@delegate = delegate
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def subscribed_to?(name)
@pattern === name.to_s
end
def matches?(subscriber_or_name)
self === subscriber_or_name ||
@pattern && @pattern === subscriber_or_name
end
end
class Timed < Evented
def initialize(pattern, delegate)
@timestack = []
super
end
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
@timestack.push Time.now
end
def finish(name, id, payload)
started = @timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
end
class AllMessages # :nodoc:
def initialize(delegate)
@delegate = delegate
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def publish(name, *args)
@delegate.publish name, *args
end
def subscribed_to?(name)
true
end
alias :matches? :===
end
end
end
end
end