mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Report async queries lock wait duration
This duration is very important to figure wether the `load_async` actually improved something.
This commit is contained in:
parent
6fa41fcf94
commit
99913f6a5e
6 changed files with 52 additions and 43 deletions
|
@ -2,10 +2,34 @@
|
|||
|
||||
module ActiveRecord
|
||||
class FutureResult # :nodoc:
|
||||
class EventBuffer
|
||||
def initialize(future_result, instrumenter)
|
||||
@future_result = future_result
|
||||
@instrumenter = instrumenter
|
||||
@events = []
|
||||
end
|
||||
|
||||
def instrument(name, payload = {}, &block)
|
||||
event = @instrumenter.new_event(name, payload)
|
||||
@events << event
|
||||
event.record(&block)
|
||||
end
|
||||
|
||||
def flush
|
||||
events, @events = @events, []
|
||||
events.each do |event|
|
||||
event.payload[:lock_wait] = @future_result.lock_wait
|
||||
ActiveSupport::Notifications.publish_event(event)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Canceled = Class.new(ActiveRecordError)
|
||||
|
||||
delegate :empty?, :to_a, to: :result
|
||||
|
||||
attr_reader :lock_wait
|
||||
|
||||
def initialize(pool, *args, **kwargs)
|
||||
@mutex = Mutex.new
|
||||
|
||||
|
@ -43,7 +67,7 @@ module ActiveRecord
|
|||
return unless @mutex.try_lock
|
||||
begin
|
||||
if pending?
|
||||
@event_buffer = @instrumenter.buffer
|
||||
@event_buffer = EventBuffer.new(self, @instrumenter)
|
||||
connection.with_instrumenter(@event_buffer) do
|
||||
execute_query(connection, async: true)
|
||||
end
|
||||
|
@ -77,12 +101,17 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def execute_or_wait
|
||||
return unless pending?
|
||||
|
||||
@mutex.synchronize do
|
||||
if pending?
|
||||
execute_query(@pool.connection)
|
||||
if pending?
|
||||
start = Concurrent.monotonic_time
|
||||
@mutex.synchronize do
|
||||
if pending?
|
||||
execute_query(@pool.connection)
|
||||
else
|
||||
@lock_wait = (Concurrent.monotonic_time - start) * 1_000
|
||||
end
|
||||
end
|
||||
else
|
||||
@lock_wait = 0.0
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -37,9 +37,12 @@ module ActiveRecord
|
|||
|
||||
return if IGNORE_PAYLOAD_NAMES.include?(payload[:name])
|
||||
|
||||
name = "#{payload[:name]} (#{event.duration.round(1)}ms)"
|
||||
name = if payload[:async]
|
||||
"ASYNC #{payload[:name]} (#{payload[:lock_wait].round(1)}ms) (db time #{event.duration.round(1)}ms)"
|
||||
else
|
||||
"#{payload[:name]} (#{event.duration.round(1)}ms)"
|
||||
end
|
||||
name = "CACHE #{name}" if payload[:cached]
|
||||
name = "ASYNC #{name}" if payload[:async]
|
||||
sql = payload[:sql]
|
||||
binds = nil
|
||||
|
||||
|
|
|
@ -133,6 +133,12 @@ class LogSubscriberTest < ActiveRecord::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
def test_async_query
|
||||
logger = TestDebugLogSubscriber.new
|
||||
logger.sql(Event.new(0.9, sql: "SELECT * from models", name: "Model Load", async: true, lock_wait: 0.01))
|
||||
assert_match(/ASYNC Model Load \(0\.0ms\) \(db time 0\.9ms\) SELECT/i, logger.debugs.last)
|
||||
end
|
||||
|
||||
def test_query_logging_coloration_with_nested_select
|
||||
logger = TestDebugLogSubscriber.new
|
||||
logger.colorize_logging = true
|
||||
|
|
|
@ -62,6 +62,7 @@ module ActiveRecord
|
|||
status[:executed] = true
|
||||
status[:async] = event.payload[:async]
|
||||
status[:thread_id] = Thread.current.object_id
|
||||
status[:lock_wait] = event.payload[:lock_wait]
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -70,6 +71,11 @@ module ActiveRecord
|
|||
assert_equal expected_records, deferred_posts.to_a
|
||||
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
|
||||
assert_equal Thread.current.object_id, status[:thread_id]
|
||||
if Post.connection.supports_concurrent_connections?
|
||||
assert_instance_of Float, status[:lock_wait]
|
||||
else
|
||||
assert_nil status[:lock_wait]
|
||||
end
|
||||
ensure
|
||||
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
|
||||
end
|
||||
|
|
|
@ -6,26 +6,6 @@ module ActiveSupport
|
|||
module Notifications
|
||||
# Instrumenters are stored in a thread local.
|
||||
class Instrumenter
|
||||
class Buffer # :nodoc:
|
||||
def initialize(instrumenter)
|
||||
@instrumenter = instrumenter
|
||||
@events = []
|
||||
end
|
||||
|
||||
def instrument(name, payload = {}, &block)
|
||||
event = @instrumenter.new_event(name, payload)
|
||||
@events << event
|
||||
event.record(&block)
|
||||
end
|
||||
|
||||
def flush
|
||||
events, @events = @events, []
|
||||
events.each do |event|
|
||||
ActiveSupport::Notifications.publish_event(event)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
attr_reader :id
|
||||
|
||||
def initialize(notifier)
|
||||
|
@ -55,10 +35,6 @@ module ActiveSupport
|
|||
Event.new(name, nil, nil, @id, payload)
|
||||
end
|
||||
|
||||
def buffer # :nodoc:
|
||||
Buffer.new(self)
|
||||
end
|
||||
|
||||
# Send a start notification with +name+ and +payload+.
|
||||
def start(name, payload)
|
||||
@notifier.start name, @id, payload
|
||||
|
|
|
@ -107,17 +107,6 @@ class SyncLogSubscriberTest < ActiveSupport::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
def test_does_not_send_buffered_events_if_logger_is_nil
|
||||
ActiveSupport::LogSubscriber.logger = nil
|
||||
assert_not_called(@log_subscriber, :some_event) do
|
||||
ActiveSupport::LogSubscriber.attach_to :my_log_subscriber, @log_subscriber
|
||||
buffer = ActiveSupport::Notifications.instrumenter.buffer
|
||||
buffer.instrument "some_event.my_log_subscriber"
|
||||
buffer.flush
|
||||
wait
|
||||
end
|
||||
end
|
||||
|
||||
def test_does_not_fail_with_non_namespaced_events
|
||||
ActiveSupport::LogSubscriber.attach_to :my_log_subscriber, @log_subscriber
|
||||
instrument "whatever"
|
||||
|
|
Loading…
Reference in a new issue