mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Events are created inside threads.
This commit is contained in:
parent
7b5225a529
commit
7b7796e23d
2 changed files with 46 additions and 78 deletions
|
@ -51,18 +51,15 @@ module ActiveSupport
|
|||
class Instrumenter
|
||||
def initialize(publisher)
|
||||
@publisher = publisher
|
||||
@stack = []
|
||||
end
|
||||
|
||||
def instrument(name, payload=nil)
|
||||
event = Event.new(name, @stack.last, payload)
|
||||
@stack << event
|
||||
event.result = yield
|
||||
event
|
||||
def instrument(name, payload={})
|
||||
payload[:time] = Time.now
|
||||
payload[:thread_id] = Thread.current.object_id
|
||||
payload[:result] = yield
|
||||
ensure
|
||||
event.finish!
|
||||
@stack.pop
|
||||
@publisher.publish(event)
|
||||
payload[:duration] = 1000 * (Time.now.to_f - payload[:time].to_f)
|
||||
@publisher.publish(name, payload)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -71,8 +68,8 @@ module ActiveSupport
|
|||
@queue = queue
|
||||
end
|
||||
|
||||
def publish(event)
|
||||
@queue.publish(event)
|
||||
def publish(name, payload)
|
||||
@queue.publish(name, payload)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -87,26 +84,22 @@ module ActiveSupport
|
|||
end
|
||||
|
||||
def subscribe
|
||||
@queue.subscribe(@pattern) do |event|
|
||||
yield event
|
||||
@queue.subscribe(@pattern) do |name, payload|
|
||||
yield Event.new(name, payload)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Event
|
||||
attr_reader :name, :time, :duration, :parent, :thread_id, :payload
|
||||
attr_accessor :result
|
||||
attr_reader :name, :time, :duration, :thread_id, :result, :payload
|
||||
|
||||
def initialize(name, parent=nil, payload=nil)
|
||||
def initialize(name, payload)
|
||||
@name = name
|
||||
@time = Time.now
|
||||
@thread_id = Thread.current.object_id
|
||||
@parent = parent
|
||||
@payload = payload
|
||||
end
|
||||
|
||||
def finish!
|
||||
@duration = 1000 * (Time.now.to_f - @time.to_f)
|
||||
@payload = payload.dup
|
||||
@time = @payload.delete(:time)
|
||||
@thread_id = @payload.delete(:thread_id)
|
||||
@result = @payload.delete(:result)
|
||||
@duration = @payload.delete(:duration)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -124,7 +117,7 @@ module ActiveSupport
|
|||
end
|
||||
end
|
||||
|
||||
def publish(event)
|
||||
def publish(*event)
|
||||
@stream.push(event)
|
||||
@thread.run
|
||||
end
|
||||
|
@ -134,7 +127,7 @@ module ActiveSupport
|
|||
end
|
||||
|
||||
def consume(event)
|
||||
@listeners.each { |l| l.publish(event) }
|
||||
@listeners.each { |l| l.publish(*event) }
|
||||
end
|
||||
|
||||
class Listener
|
||||
|
@ -143,9 +136,9 @@ module ActiveSupport
|
|||
@subscriber = block
|
||||
end
|
||||
|
||||
def publish(event)
|
||||
unless @pattern && event.name.to_s !~ @pattern
|
||||
@subscriber.call(event)
|
||||
def publish(name, payload)
|
||||
unless @pattern && name.to_s !~ @pattern
|
||||
@subscriber.call(name, payload)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,103 +8,78 @@ class ActiveSupport::Orchestra::LittleFanout
|
|||
end
|
||||
|
||||
class OrchestraEventTest < Test::Unit::TestCase
|
||||
def setup
|
||||
@parent = ActiveSupport::Orchestra::Event.new(:parent)
|
||||
def test_events_are_initialized_with_name_and_payload
|
||||
event = ActiveSupport::Orchestra::Event.new(:foo, :payload => :bar)
|
||||
assert_equal :foo, event.name
|
||||
assert_equal Hash[:payload => :bar], event.payload
|
||||
end
|
||||
|
||||
def test_initialization_with_name_and_parent_and_payload
|
||||
event = ActiveSupport::Orchestra::Event.new(:awesome, @parent, :payload => "orchestra")
|
||||
assert_equal(:awesome, event.name)
|
||||
assert_equal(@parent, event.parent)
|
||||
assert_equal({ :payload => "orchestra" }, event.payload)
|
||||
end
|
||||
def test_events_consumes_information_given_as_payload
|
||||
event = ActiveSupport::Orchestra::Event.new(:foo,
|
||||
:time => (time = Time.now), :result => 1, :duration => 10)
|
||||
|
||||
def test_thread_id_is_set_on_initialization
|
||||
event = ActiveSupport::Orchestra::Event.new(:awesome)
|
||||
assert_equal Thread.current.object_id, event.thread_id
|
||||
end
|
||||
|
||||
def test_current_time_is_set_on_initialization
|
||||
previous_time = Time.now.utc
|
||||
event = ActiveSupport::Orchestra::Event.new(:awesome)
|
||||
assert_kind_of Time, event.time
|
||||
assert event.time.to_f >= previous_time.to_f
|
||||
end
|
||||
|
||||
def test_duration_is_set_when_event_finishes
|
||||
event = ActiveSupport::Orchestra::Event.new(:awesome)
|
||||
sleep(0.1)
|
||||
event.finish!
|
||||
assert_in_delta 100, event.duration, 30
|
||||
assert_equal Hash.new, event.payload
|
||||
assert_equal time, event.time
|
||||
assert_equal 1, event.result
|
||||
assert_equal 10, event.duration
|
||||
end
|
||||
end
|
||||
|
||||
class OrchestraMainTest < Test::Unit::TestCase
|
||||
def setup
|
||||
@events = []
|
||||
Thread.abort_on_exception = true
|
||||
ActiveSupport::Orchestra.subscribe { |event| @events << event }
|
||||
end
|
||||
|
||||
def teardown
|
||||
Thread.abort_on_exception = false
|
||||
ActiveSupport::Orchestra.queue.clear
|
||||
end
|
||||
|
||||
def test_orchestra_allows_any_action_to_be_instrumented
|
||||
event = ActiveSupport::Orchestra.instrument(:awesome, "orchestra") do
|
||||
sleep(0.1)
|
||||
end
|
||||
|
||||
assert_equal :awesome, event.name
|
||||
assert_equal "orchestra", event.payload
|
||||
assert_in_delta 100, event.duration, 30
|
||||
end
|
||||
|
||||
def test_block_result_is_stored
|
||||
event = ActiveSupport::Orchestra.instrument(:awesome, "orchestra") do
|
||||
def test_orchestra_returns_action_result
|
||||
result = ActiveSupport::Orchestra.instrument(:awesome, :payload => "orchestra") do
|
||||
1 + 1
|
||||
end
|
||||
|
||||
assert_equal 2, event.result
|
||||
assert_equal 2, result
|
||||
end
|
||||
|
||||
def test_events_are_published_to_a_listener
|
||||
event = ActiveSupport::Orchestra.instrument(:awesome, "orchestra") do
|
||||
ActiveSupport::Orchestra.instrument(:awesome, :payload => "orchestra") do
|
||||
1 + 1
|
||||
end
|
||||
|
||||
assert_equal 1, @events.size
|
||||
assert_equal :awesome, @events.last.name
|
||||
assert_equal "orchestra", @events.last.payload
|
||||
assert_equal Hash[:payload => "orchestra"], @events.last.payload
|
||||
end
|
||||
|
||||
def test_nested_events_can_be_instrumented
|
||||
ActiveSupport::Orchestra.instrument(:awesome, "orchestra") do
|
||||
ActiveSupport::Orchestra.instrument(:wot, "child") do
|
||||
ActiveSupport::Orchestra.instrument(:awesome, :payload => "orchestra") do
|
||||
ActiveSupport::Orchestra.instrument(:wot, :payload => "child") do
|
||||
sleep(0.1)
|
||||
end
|
||||
|
||||
assert_equal 1, @events.size
|
||||
assert_equal :wot, @events.first.name
|
||||
assert_equal "child", @events.first.payload
|
||||
|
||||
assert_nil @events.first.parent.duration
|
||||
assert_equal Hash[:payload => "child"], @events.first.payload
|
||||
assert_in_delta 100, @events.first.duration, 30
|
||||
end
|
||||
|
||||
assert_equal 2, @events.size
|
||||
assert_equal :awesome, @events.last.name
|
||||
assert_equal "orchestra", @events.last.payload
|
||||
assert_in_delta 100, @events.first.parent.duration, 30
|
||||
assert_equal Hash[:payload => "orchestra"], @events.last.payload
|
||||
end
|
||||
|
||||
def test_event_is_pushed_even_if_block_fails
|
||||
ActiveSupport::Orchestra.instrument(:awesome, "orchestra") do
|
||||
ActiveSupport::Orchestra.instrument(:awesome, :payload => "orchestra") do
|
||||
raise "OMG"
|
||||
end rescue RuntimeError
|
||||
|
||||
assert_equal 1, @events.size
|
||||
assert_equal :awesome, @events.last.name
|
||||
assert_equal "orchestra", @events.last.payload
|
||||
assert_equal Hash[:payload => "orchestra"], @events.last.payload
|
||||
end
|
||||
|
||||
def test_subscriber_with_pattern
|
||||
|
|
Loading…
Reference in a new issue