Orchestra listeners have their own queue.

This commit is contained in:
José Valim 2009-10-06 09:42:42 -03:00
parent 7b7796e23d
commit 5d0f8abc00
2 changed files with 47 additions and 23 deletions

View File

@ -3,31 +3,41 @@ require 'active_support/core_ext/module/delegation'
module ActiveSupport
# Orchestra provides an instrumentation API for Ruby. To instrument an action
# in Ruby you just need to:
# in Ruby you just need to do:
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
# Those actions are consumed by listeners. A listener is anything that responds
# to push. You can even register an array:
# You can consume those events and the information they provide by registering
# a subscriber. For instance, let's store all instrumented events in an array:
#
# @listener = []
# ActiveSupport::Orchestra.register @listener
# @events = []
#
# ActiveSupport::Orchestra.subscribe do |event|
# @events << event
# end
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
# event #=> ActiveSupport::Orchestra::Event
# event = @events.first
# event.class #=> ActiveSupport::Orchestra::Event
# event.name #=> :render
# event.duration #=> 10 (in miliseconds)
# event.result #=> "Foo"
# event.payload #=> { :extra => :information }
#
# Orchestra ships with a default listener implementation which puts events in
# a stream and consume them in a Thread. This implementation is thread safe
# and is available at ActiveSupport::Orchestra::Listener.
# When subscribing to Orchestra, you can pass a pattern, to only consume
# events that match the pattern:
#
# ActiveSupport::Orchestra.subscribe(/render/) do |event|
# @render_events << event
# end
#
# Orchestra ships with a queue implementation that consumes and publish events
# to subscribers in a thread. You can use any queue implementation you want.
#
module Orchestra
mattr_accessor :queue
@ -108,37 +118,43 @@ module ActiveSupport
#
class LittleFanout
def initialize
@listeners, @stream = [], []
@thread = Thread.new do
loop do
(event = @stream.shift) ? consume(event) : Thread.stop
end
end
@listeners, @stream = [], Queue.new
@thread = Thread.new { consume }
end
def publish(*event)
@stream.push(event)
@thread.run
end
def subscribe(pattern=nil, &block)
@listeners << Listener.new(pattern, &block)
end
def consume(event)
@listeners.each { |l| l.publish(*event) }
def consume
while event = @stream.shift
@listeners.each { |l| l.publish(*event) }
end
end
class Listener
attr_reader :thread
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
@queue = Queue.new
@thread = Thread.new { consume }
end
def publish(name, payload)
unless @pattern && name.to_s !~ @pattern
@subscriber.call(name, payload)
@queue << [name, payload]
end
end
def consume
while event = @queue.shift
@subscriber.call(*event)
end
end
end

View File

@ -50,6 +50,8 @@ class OrchestraMainTest < Test::Unit::TestCase
1 + 1
end
sleep(0.1)
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "orchestra"], @events.last.payload
@ -58,18 +60,22 @@ class OrchestraMainTest < Test::Unit::TestCase
def test_nested_events_can_be_instrumented
ActiveSupport::Orchestra.instrument(:awesome, :payload => "orchestra") do
ActiveSupport::Orchestra.instrument(:wot, :payload => "child") do
sleep(0.1)
1 + 1
end
sleep(0.1)
assert_equal 1, @events.size
assert_equal :wot, @events.first.name
assert_equal Hash[:payload => "child"], @events.first.payload
assert_in_delta 100, @events.first.duration, 30
end
sleep(0.1)
assert_equal 2, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "orchestra"], @events.last.payload
assert_in_delta 100, @events.last.duration, 70
end
def test_event_is_pushed_even_if_block_fails
@ -77,6 +83,8 @@ class OrchestraMainTest < Test::Unit::TestCase
raise "OMG"
end rescue RuntimeError
sleep(0.1)
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "orchestra"], @events.last.payload
@ -89,7 +97,7 @@ class OrchestraMainTest < Test::Unit::TestCase
ActiveSupport::Orchestra.instrument(:something){ 0 }
ActiveSupport::Orchestra.instrument(:cache){ 10 }
sleep 0.1
sleep(0.1)
assert_equal 1, @another.size
assert_equal :cache, @another.first.name