mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
6fe36ba585
In cases where a notification subscriber includes methods to support both Evented and Timed events, Evented should take priority over Timed. This allows subscribers to be backwards compatible (older Rails only allows Timed events) while defaulting to newer behavior.
145 lines
3.5 KiB
Ruby
145 lines
3.5 KiB
Ruby
require 'mutex_m'
|
|
|
|
module ActiveSupport
|
|
module Notifications
|
|
# This is a default queue implementation that ships with Notifications.
|
|
# It just pushes events to all registered log subscribers.
|
|
#
|
|
# This class is thread safe. All methods are reentrant.
|
|
class Fanout
|
|
include Mutex_m
|
|
|
|
def initialize
|
|
@subscribers = []
|
|
@listeners_for = {}
|
|
super
|
|
end
|
|
|
|
def subscribe(pattern = nil, block = Proc.new)
|
|
subscriber = Subscribers.new pattern, block
|
|
synchronize do
|
|
@subscribers << subscriber
|
|
@listeners_for.clear
|
|
end
|
|
subscriber
|
|
end
|
|
|
|
def unsubscribe(subscriber)
|
|
synchronize do
|
|
@subscribers.reject! { |s| s.matches?(subscriber) }
|
|
@listeners_for.clear
|
|
end
|
|
end
|
|
|
|
def start(name, id, payload)
|
|
listeners_for(name).each { |s| s.start(name, id, payload) }
|
|
end
|
|
|
|
def finish(name, id, payload)
|
|
listeners_for(name).each { |s| s.finish(name, id, payload) }
|
|
end
|
|
|
|
def publish(name, *args)
|
|
listeners_for(name).each { |s| s.publish(name, *args) }
|
|
end
|
|
|
|
def listeners_for(name)
|
|
synchronize do
|
|
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
|
|
end
|
|
end
|
|
|
|
def listening?(name)
|
|
listeners_for(name).any?
|
|
end
|
|
|
|
# This is a sync queue, so there is no waiting.
|
|
def wait
|
|
end
|
|
|
|
module Subscribers # :nodoc:
|
|
def self.new(pattern, listener)
|
|
if listener.respond_to?(:start) and listener.respond_to?(:finish)
|
|
subscriber = Evented.new pattern, listener
|
|
else
|
|
subscriber = Timed.new pattern, listener
|
|
end
|
|
|
|
unless pattern
|
|
AllMessages.new(subscriber)
|
|
else
|
|
subscriber
|
|
end
|
|
end
|
|
|
|
class Evented #:nodoc:
|
|
def initialize(pattern, delegate)
|
|
@pattern = pattern
|
|
@delegate = delegate
|
|
end
|
|
|
|
def start(name, id, payload)
|
|
@delegate.start name, id, payload
|
|
end
|
|
|
|
def finish(name, id, payload)
|
|
@delegate.finish name, id, payload
|
|
end
|
|
|
|
def subscribed_to?(name)
|
|
@pattern === name.to_s
|
|
end
|
|
|
|
def matches?(subscriber_or_name)
|
|
self === subscriber_or_name ||
|
|
@pattern && @pattern === subscriber_or_name
|
|
end
|
|
end
|
|
|
|
class Timed < Evented
|
|
def initialize(pattern, delegate)
|
|
@timestack = []
|
|
super
|
|
end
|
|
|
|
def publish(name, *args)
|
|
@delegate.call name, *args
|
|
end
|
|
|
|
def start(name, id, payload)
|
|
@timestack.push Time.now
|
|
end
|
|
|
|
def finish(name, id, payload)
|
|
started = @timestack.pop
|
|
@delegate.call(name, started, Time.now, id, payload)
|
|
end
|
|
end
|
|
|
|
class AllMessages # :nodoc:
|
|
def initialize(delegate)
|
|
@delegate = delegate
|
|
end
|
|
|
|
def start(name, id, payload)
|
|
@delegate.start name, id, payload
|
|
end
|
|
|
|
def finish(name, id, payload)
|
|
@delegate.finish name, id, payload
|
|
end
|
|
|
|
def publish(name, *args)
|
|
@delegate.publish name, *args
|
|
end
|
|
|
|
def subscribed_to?(name)
|
|
true
|
|
end
|
|
|
|
alias :matches? :===
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|