1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Notifications: synchronous fanout queue pushes events to subscribers rather than having them concurrently pull

This commit is contained in:
Jeremy Kemper 2009-11-29 02:30:35 -08:00
parent bb84cab2fc
commit 327545c3ae
2 changed files with 55 additions and 28 deletions

View file

@ -6,7 +6,8 @@ module ActiveSupport
# consumes events in a thread and publish them to all registered subscribers.
#
class Fanout
def initialize
def initialize(sync = false)
@subscriber_klass = sync ? Subscriber : AsyncSubscriber
@subscribers = []
end
@ -15,7 +16,7 @@ module ActiveSupport
end
def subscribe(pattern = nil, &block)
@subscribers << Subscriber.new(pattern, &block)
@subscribers << @subscriber_klass.new(pattern, &block)
end
def publish(*args)
@ -29,17 +30,7 @@ module ActiveSupport
# Used for internal implementation only.
class Binding #:nodoc:
def initialize(queue, pattern)
@queue, @pattern = queue, pattern
end
def subscribe(&block)
@queue.subscribe(@pattern, &block)
end
end
# Used for internal implementation only.
class Subscriber #:nodoc:
def initialize(pattern, &block)
@queue = queue
@pattern =
case pattern
when Regexp, NilClass
@ -47,23 +38,47 @@ module ActiveSupport
else
/^#{Regexp.escape(pattern.to_s)}/
end
end
def subscribe(&block)
@queue.subscribe(@pattern, &block)
end
end
class Subscriber #:nodoc:
def initialize(pattern, &block)
@pattern = pattern
@block = block
end
def publish(*args)
push(*args) if matches?(args.first)
end
def drained?
true
end
private
def matches?(name)
!@pattern || @pattern =~ name.to_s
end
def push(*args)
@block.call(*args)
end
end
# Used for internal implementation only.
class AsyncSubscriber < Subscriber #:nodoc:
def initialize(pattern, &block)
super
@events = Queue.new
start_consumer
end
def publish(name, *args)
push(name, args) if matches?(name)
end
def consume
while args = @events.shift
@block.call(*args)
end
end
def drained?
@events.size.zero?
@events.empty?
end
private
@ -71,12 +86,14 @@ module ActiveSupport
Thread.new { consume }
end
def matches?(name)
!@pattern || @pattern =~ name.to_s
def consume
while args = @events.shift
@block.call(*args)
end
end
def push(name, args)
@events << args.unshift(name)
def push(*args)
@events << args
end
end
end

View file

@ -71,6 +71,16 @@ module Notifications
end
end
class SyncPubSubTest < PubSubTest
def setup
Thread.abort_on_exception = true
@notifier = ActiveSupport::Notifications::Notifier.new(ActiveSupport::Notifications::Fanout.new(true))
@events = []
@notifier.subscribe { |*args| @events << event(*args) }
end
end
class InstrumentationTest < TestCase
def test_instrument_returns_block_result
assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 }