Added queue abstraction to Orchestra.

This commit is contained in:
José Valim 2009-09-30 08:59:15 -03:00
parent 8b340ab2f6
commit a60bdd7d29
2 changed files with 79 additions and 87 deletions

View File

@ -29,8 +29,9 @@ module ActiveSupport
# and is available at ActiveSupport::Orchestra::Listener. # and is available at ActiveSupport::Orchestra::Listener.
# #
module Orchestra module Orchestra
mattr_accessor :queue
@stacked_events = Hash.new { |h,k| h[k] = [] } @stacked_events = Hash.new { |h,k| h[k] = [] }
@listeners = []
def self.instrument(name, payload=nil) def self.instrument(name, payload=nil)
stack = @stacked_events[Thread.current.object_id] stack = @stacked_events[Thread.current.object_id]
@ -41,15 +42,11 @@ module ActiveSupport
ensure ensure
event.finish! event.finish!
stack.delete(event) stack.delete(event)
@listeners.each { |s| s.push(event) } queue.push(event)
end end
def self.register(listener) def self.subscribe(pattern=nil, &block)
@listeners << listener queue.subscribe(pattern, &block)
end
def self.unregister(listener)
@listeners.delete(listener)
end end
class Event class Event
@ -69,35 +66,47 @@ module ActiveSupport
end end
end end
class Listener # This is a default queue implementation that ships with Orchestra. It
attr_reader :mutex, :signaler, :thread # consumes events in a thread and publish them to all registered subscribers.
#
class LittleFanout
def initialize def initialize
@mutex, @signaler = Mutex.new, ConditionVariable.new @listeners, @stream = [], []
@stream = []
@thread = Thread.new do @thread = Thread.new do
loop do loop do
(event = @stream.shift) ? consume(event) : wait (event = @stream.shift) ? consume(event) : Thread.stop
end end
end end
end end
def wait
@mutex.synchronize do
@signaler.wait(@mutex)
end
end
def push(event) def push(event)
@mutex.synchronize do
@stream.push(event) @stream.push(event)
@signaler.broadcast @thread.run
end end
def subscribe(pattern=nil, &block)
@listeners << Listener.new(pattern, &block)
end end
def consume(event) def consume(event)
raise NotImplementedError @listeners.each { |l| l.publish(event) }
end
class Listener
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
end
def publish(event)
unless @pattern && event.name.to_s !~ @pattern
@subscriber.call(event)
end end
end end
end end
end end
end
Orchestra.queue = Orchestra::LittleFanout.new
end

View File

@ -1,5 +1,12 @@
require 'abstract_unit' require 'abstract_unit'
# Allow LittleFanout to be cleaned.
class ActiveSupport::Orchestra::LittleFanout
def clear
@listeners.clear
end
end
class OrchestraEventTest < Test::Unit::TestCase class OrchestraEventTest < Test::Unit::TestCase
def setup def setup
@parent = ActiveSupport::Orchestra::Event.new(:parent) @parent = ActiveSupport::Orchestra::Event.new(:parent)
@ -34,12 +41,12 @@ end
class OrchestraMainTest < Test::Unit::TestCase class OrchestraMainTest < Test::Unit::TestCase
def setup def setup
@listener = [] @events = []
ActiveSupport::Orchestra.register @listener ActiveSupport::Orchestra.subscribe { |event| @events << event }
end end
def teardown def teardown
ActiveSupport::Orchestra.unregister @listener ActiveSupport::Orchestra.queue.clear
end end
def test_orchestra_allows_any_action_to_be_instrumented def test_orchestra_allows_any_action_to_be_instrumented
@ -65,9 +72,9 @@ class OrchestraMainTest < Test::Unit::TestCase
1 + 1 1 + 1
end end
assert_equal 1, @listener.size assert_equal 1, @events.size
assert_equal :awesome, @listener.last.name assert_equal :awesome, @events.last.name
assert_equal "orchestra", @listener.last.payload assert_equal "orchestra", @events.last.payload
end end
def test_nested_events_can_be_instrumented def test_nested_events_can_be_instrumented
@ -76,18 +83,18 @@ class OrchestraMainTest < Test::Unit::TestCase
sleep(0.1) sleep(0.1)
end end
assert_equal 1, @listener.size assert_equal 1, @events.size
assert_equal :wot, @listener.first.name assert_equal :wot, @events.first.name
assert_equal "child", @listener.first.payload assert_equal "child", @events.first.payload
assert_nil @listener.first.parent.duration assert_nil @events.first.parent.duration
assert_in_delta 100, @listener.first.duration, 30 assert_in_delta 100, @events.first.duration, 30
end end
assert_equal 2, @listener.size assert_equal 2, @events.size
assert_equal :awesome, @listener.last.name assert_equal :awesome, @events.last.name
assert_equal "orchestra", @listener.last.payload assert_equal "orchestra", @events.last.payload
assert_in_delta 100, @listener.first.parent.duration, 30 assert_in_delta 100, @events.first.parent.duration, 30
end end
def test_event_is_pushed_even_if_block_fails def test_event_is_pushed_even_if_block_fails
@ -95,67 +102,43 @@ class OrchestraMainTest < Test::Unit::TestCase
raise "OMG" raise "OMG"
end rescue RuntimeError end rescue RuntimeError
assert_equal 1, @listener.size assert_equal 1, @events.size
assert_equal :awesome, @listener.last.name assert_equal :awesome, @events.last.name
assert_equal "orchestra", @listener.last.payload assert_equal "orchestra", @events.last.payload
end
end end
class OrchestraListenerTest < Test::Unit::TestCase def test_subscriber_with_pattern
class MyListener < ActiveSupport::Orchestra::Listener @another = []
attr_reader :consumed ActiveSupport::Orchestra.subscribe(/cache/) { |event| @another << event }
def consume(event) ActiveSupport::Orchestra.instrument(:something){ 0 }
@consumed ||= [] ActiveSupport::Orchestra.instrument(:cache){ 10 }
@consumed << event
end
end
def setup
@listener = MyListener.new
ActiveSupport::Orchestra.register @listener
end
def teardown
ActiveSupport::Orchestra.unregister @listener
end
def test_thread_is_exposed_by_listener
assert_kind_of Thread, @listener.thread
end
def test_event_is_consumed_when_an_action_is_instrumented
ActiveSupport::Orchestra.instrument(:sum) do
1 + 1
end
sleep 0.1 sleep 0.1
assert_equal 1, @listener.consumed.size
assert_equal :sum, @listener.consumed.first.name assert_equal 1, @another.size
assert_equal 2, @listener.consumed.first.result assert_equal :cache, @another.first.name
assert_equal 10, @another.first.result
end end
def test_with_sevaral_consumers_and_several_events def test_with_several_consumers_and_several_events
@another = MyListener.new @another = []
ActiveSupport::Orchestra.register @another ActiveSupport::Orchestra.subscribe { |event| @another << event }
1.upto(100) do |i| 1.upto(100) do |i|
ActiveSupport::Orchestra.instrument(:value) do ActiveSupport::Orchestra.instrument(:value){ i }
i
end
end end
sleep 0.1 sleep 0.1
assert_equal 100, @listener.consumed.size assert_equal 100, @events.size
assert_equal :value, @listener.consumed.first.name assert_equal :value, @events.first.name
assert_equal 1, @listener.consumed.first.result assert_equal 1, @events.first.result
assert_equal 100, @listener.consumed.last.result assert_equal 100, @events.last.result
assert_equal 100, @another.consumed.size assert_equal 100, @another.size
assert_equal :value, @another.consumed.first.name assert_equal :value, @another.first.name
assert_equal 1, @another.consumed.first.result assert_equal 1, @another.first.result
assert_equal 100, @another.consumed.last.result assert_equal 100, @another.last.result
ensure
ActiveSupport::Orchestra.unregister @another
end end
end end