You can unsubscribe a subscriber
This commit is contained in:
parent
048b436f33
commit
c88360ef36
|
@ -69,6 +69,10 @@ module ActiveSupport
|
|||
@queue.bind(pattern).subscribe(&block)
|
||||
end
|
||||
|
||||
def unsubscribe(subscriber)
|
||||
@queue.unsubscribe(subscriber)
|
||||
end
|
||||
|
||||
def wait
|
||||
@queue.wait
|
||||
end
|
||||
|
|
|
@ -4,7 +4,7 @@ module ActiveSupport
|
|||
# just pushes events to all registered log subscribers.
|
||||
class Fanout
|
||||
def initialize
|
||||
@log_subscribers = []
|
||||
@subscribers = []
|
||||
end
|
||||
|
||||
def bind(pattern)
|
||||
|
@ -12,11 +12,16 @@ module ActiveSupport
|
|||
end
|
||||
|
||||
def subscribe(pattern = nil, &block)
|
||||
@log_subscribers << Subscriber.new(pattern, &block)
|
||||
@subscribers << Subscriber.new(pattern, &block)
|
||||
@subscribers.last
|
||||
end
|
||||
|
||||
def unsubscribe(subscriber)
|
||||
@subscribers.delete(subscriber)
|
||||
end
|
||||
|
||||
def publish(*args)
|
||||
@log_subscribers.each { |s| s.publish(*args) }
|
||||
@subscribers.each { |s| s.publish(*args) }
|
||||
end
|
||||
|
||||
# This is a sync queue, so there is not waiting.
|
||||
|
|
|
@ -6,7 +6,7 @@ module Notifications
|
|||
ActiveSupport::Notifications.notifier = nil
|
||||
@notifier = ActiveSupport::Notifications.notifier
|
||||
@events = []
|
||||
@notifier.subscribe { |*args| @events << event(*args) }
|
||||
@subscription = @notifier.subscribe { |*args| @events << event(*args) }
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -19,6 +19,18 @@ module Notifications
|
|||
end
|
||||
end
|
||||
|
||||
class UnsubscribeTest < TestCase
|
||||
def unsubscribing_removes_a_subscription
|
||||
@notifier.publish :foo
|
||||
@notifier.wait
|
||||
assert_equal [[:foo]], @events
|
||||
@notifier.unsubscribe(@subscription)
|
||||
@notifier.publish :bar
|
||||
@notifier.wait
|
||||
assert_equal [[:foo]], @events
|
||||
end
|
||||
end
|
||||
|
||||
class SyncPubSubTest < TestCase
|
||||
def test_events_are_published_to_a_listener
|
||||
@notifier.publish :foo
|
||||
|
|
Loading…
Reference in New Issue