Forward sql.active_record notifications back into the calling thread

It is not uncommon for `sql.active_record` subscribers to rely on
thread local or fiber local state. For instance the `buffered-logger`
gem buffer the logs in a thread variable.

With the introduction of async queries, the `sql.active_record`
events can now be produced from a background thread and that break
some expectations.

This makes it hard for subscriber to map the event to the request
or job that scheduled it.

That is why I believe we should instead store the event and
publish it back on the calling thread when the results are
accessed.
This commit is contained in:
Jean Boussier 2021-03-01 17:55:47 +01:00
parent c47686483e
commit 091dc78f94
11 changed files with 208 additions and 71 deletions

View File

@ -103,6 +103,19 @@ module ActiveRecord
)
end
EXCEPTION_NEVER = { Exception => :never }.freeze # :nodoc:
EXCEPTION_IMMEDIATE = { Exception => :immediate }.freeze # :nodoc:
private_constant :EXCEPTION_NEVER, :EXCEPTION_IMMEDIATE
def with_instrumenter(instrumenter, &block) # :nodoc:
Thread.handle_interrupt(EXCEPTION_NEVER) do
previous_instrumenter = @instrumenter
@instrumenter = instrumenter
Thread.handle_interrupt(EXCEPTION_IMMEDIATE, &block)
ensure
@instrumenter = previous_instrumenter
end
end
def check_if_write_query(sql) # :nodoc:
if preventing_writes? && write_query?(sql)
raise ActiveRecord::ReadOnlyError, "Write query attempted while in readonly mode: #{sql}"

View File

@ -17,6 +17,8 @@ module ActiveRecord
@pending = true
@error = nil
@result = nil
@instrumenter = ActiveSupport::Notifications.instrumenter
@event_buffer = nil
end
def schedule!(session)
@ -41,7 +43,10 @@ module ActiveRecord
return unless @mutex.try_lock
begin
if pending?
execute_query(connection, async: true)
@event_buffer = @instrumenter.buffer
connection.with_instrumenter(@event_buffer) do
execute_query(connection, async: true)
end
end
ensure
@mutex.unlock
@ -51,20 +56,22 @@ module ActiveRecord
def result
execute_or_wait
if @error
raise @error
elsif canceled?
@event_buffer&.flush
if canceled?
raise Canceled
elsif @error
raise @error
else
@result
end
end
private
def pending?
@pending && (!@session || @session.active?)
end
def pending?
@pending && (!@session || @session.active?)
end
private
def canceled?
@session && !@session.active?
end

View File

@ -32,6 +32,7 @@ module ActiveRecord
@predicate_builder = predicate_builder
@delegate_to_klass = false
@future_result = nil
@records = nil
end
def initialize_copy(other)
@ -703,7 +704,7 @@ module ActiveRecord
@delegate_to_klass = false
@to_sql = @arel = @loaded = @should_eager_load = nil
@offsets = @take = nil
@records = [].freeze
@records = nil
self
end

View File

@ -83,6 +83,14 @@ module AsynchronousQueriesSharedTests
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
private
def wait_for_future_result(result)
100.times do
break unless result.pending?
sleep 0.01
end
end
end
class AsynchronousQueriesTest < ActiveRecord::TestCase
@ -98,14 +106,10 @@ class AsynchronousQueriesTest < ActiveRecord::TestCase
ActiveRecord::Base.asynchronous_queries_tracker.start_session
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM posts"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
@ -115,11 +119,9 @@ class AsynchronousQueriesTest < ActiveRecord::TestCase
assert_kind_of ActiveRecord::Result, future_result
else
assert_kind_of ActiveRecord::FutureResult, future_result
wait_for_future_result(future_result)
end
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_kind_of ActiveRecord::Result, future_result.result
assert_equal @connection.supports_concurrent_connections?, status[:async]
ensure

View File

@ -6,7 +6,25 @@ require "models/comment"
require "models/other_dog"
module ActiveRecord
module WaitForAsyncTestHelper
private
def wait_for_async_query(relation, timeout: 5)
if !relation.connection.async_enabled? || relation.instance_variable_get(:@records)
return relation
end
future_result = relation.instance_variable_get(:@future_result)
(timeout * 100).times do
return relation unless future_result.pending?
sleep 0.01
end
raise Timeout::Error, "The async executor wasn't drained after #{timeout} seconds"
end
end
class LoadAsyncTest < ActiveRecord::TestCase
include WaitForAsyncTestHelper
self.use_transactional_tests = false
fixtures :posts, :comments
@ -34,27 +52,42 @@ module ActiveRecord
assert_not_predicate defered_posts, :scheduled?
end
def test_simple_query
def test_notification_forwarding
expected_records = Post.where(author_id: 1).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "Post Load"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
status[:thread_id] = Thread.current.object_id
end
end
defered_posts = Post.where(author_id: 1).load_async
defered_posts = wait_for_async_query(Post.where(author_id: 1).load_async)
monitor.synchronize do
condition.wait_until { status[:executed] }
assert_equal expected_records, defered_posts.to_a
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
assert_equal Thread.current.object_id, status[:thread_id]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
def test_simple_query
expected_records = Post.where(author_id: 1).to_a
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "Post Load"
status[:executed] = true
status[:async] = event.payload[:async]
end
end
defered_posts = wait_for_async_query(Post.where(author_id: 1).load_async)
assert_equal expected_records, defered_posts.to_a
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
ensure
@ -83,18 +116,15 @@ module ActiveRecord
expected_records = Post.where(author_id: 1).eager_load(:comments).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "SQL"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async
defered_posts = wait_for_async_query(Post.where(author_id: 1).eager_load(:comments).load_async)
if in_memory_db?
assert_not_predicate defered_posts, :scheduled?
@ -102,10 +132,6 @@ module ActiveRecord
assert_predicate defered_posts, :scheduled?
end
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_equal expected_records, defered_posts.to_a
assert_queries(0) do
defered_posts.each(&:comments)
@ -181,23 +207,16 @@ module ActiveRecord
expected_records = Post.where(author_id: 1).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "Post Load"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
defered_posts = Post.where(author_id: 1).load_async
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_equal expected_records, defered_posts.to_a
assert_not_equal Post.connection.supports_concurrent_connections?, status[:async]
ensure
@ -222,14 +241,11 @@ module ActiveRecord
expected_records = Post.where(author_id: 1).eager_load(:comments).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "SQL"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
@ -237,10 +253,6 @@ module ActiveRecord
assert_not_predicate defered_posts, :scheduled?
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_equal expected_records, defered_posts.to_a
assert_queries(0) do
defered_posts.each(&:comments)
@ -283,6 +295,8 @@ module ActiveRecord
end
class LoadAsyncMultiThreadPoolExecutorTest < ActiveRecord::TestCase
include WaitForAsyncTestHelper
self.use_transactional_tests = false
fixtures :posts, :comments
@ -329,22 +343,14 @@ module ActiveRecord
expected_records = Post.where(author_id: 1).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "Post Load"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
defered_posts = Post.where(author_id: 1).load_async
monitor.synchronize do
condition.wait_until { status[:executed] }
end
defered_posts = wait_for_async_query(Post.where(author_id: 1).load_async)
assert_equal expected_records, defered_posts.to_a
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
@ -370,25 +376,17 @@ module ActiveRecord
expected_records = Post.where(author_id: 1).eager_load(:comments).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "SQL"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async
defered_posts = wait_for_async_query(Post.where(author_id: 1).eager_load(:comments).load_async)
assert_predicate defered_posts, :scheduled?
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_equal expected_records, defered_posts.to_a
assert_queries(0) do
defered_posts.each(&:comments)
@ -429,6 +427,8 @@ module ActiveRecord
end
class LoadAsyncMixedThreadPoolExecutorTest < ActiveRecord::TestCase
include WaitForAsyncTestHelper
self.use_transactional_tests = false
fixtures :posts, :comments, :other_dogs
@ -476,30 +476,24 @@ module ActiveRecord
status = {}
dog_status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "Post Load"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
if event.payload[:name] == "OtherDog Load"
dog_status[:executed] = true
dog_status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
defered_posts = Post.where(author_id: 1).load_async
defered_dogs = OtherDog.where(id: 1).load_async
monitor.synchronize do
condition.wait_until { status[:executed] }
condition.wait_until { dog_status[:executed] }
end
wait_for_async_query(defered_posts)
wait_for_async_query(defered_dogs)
assert_equal expected_records, defered_posts.to_a
assert_equal expected_dogs, defered_dogs.to_a

View File

@ -198,6 +198,10 @@ module ActiveSupport
notifier.publish(name, *args)
end
def publish_event(event) # :nodoc;
notifier.publish_event(event)
end
def instrument(name, payload = {})
if notifier.listening?(name)
instrumenter.instrument(name, payload) { yield payload if block_given? }

View File

@ -70,6 +70,10 @@ module ActiveSupport
listeners_for(name).each { |s| s.publish(name, *args) }
end
def publish_event(event)
listeners_for(event.name).each { |s| s.publish_event(event) }
end
def listeners_for(name)
# this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
@listeners_for[name] || synchronize do
@ -144,6 +148,7 @@ module ActiveSupport
@pattern = Matcher.wrap(pattern)
@delegate = delegate
@can_publish = delegate.respond_to?(:publish)
@can_publish_event = delegate.respond_to?(:publish_event)
end
def publish(name, *args)
@ -152,6 +157,14 @@ module ActiveSupport
end
end
def publish_event(event)
if @can_publish_event
@delegate.publish_event event
else
publish(event.name, event.time, event.end, event.transaction_id, event.payload)
end
end
def start(name, id, payload)
@delegate.start name, id, payload
end
@ -223,6 +236,10 @@ module ActiveSupport
@delegate.call event
end
def publish_event(event)
@delegate.call event
end
private
def build_event(name, id, payload)
ActiveSupport::Notifications::Event.new name, nil, nil, id, payload

View File

@ -6,6 +6,26 @@ module ActiveSupport
module Notifications
# Instrumenters are stored in a thread local.
class Instrumenter
class Buffer # :nodoc:
def initialize(instrumenter)
@instrumenter = instrumenter
@events = []
end
def instrument(name, payload = {}, &block)
event = @instrumenter.new_event(name, payload)
@events << event
event.record(&block)
end
def flush
events, @events = @events, []
events.each do |event|
ActiveSupport::Notifications.publish_event(event)
end
end
end
attr_reader :id
def initialize(notifier)
@ -31,6 +51,14 @@ module ActiveSupport
end
end
def new_event(name, payload = {}) # :nodoc:
Event.new(name, nil, nil, @id, payload)
end
def buffer # :nodoc:
Buffer.new(self)
end
# Send a start notification with +name+ and +payload+.
def start(name, payload)
@notifier.start name, @id, payload
@ -68,6 +96,19 @@ module ActiveSupport
@allocation_count_finish = 0
end
def record
start!
begin
yield payload if block_given?
rescue Exception => e
payload[:exception] = [e.class.name, e.message]
payload[:exception_object] = e
raise e
ensure
finish!
end
end
# Record information at the time this event starts
def start!
@time = now

View File

@ -150,6 +150,11 @@ module ActiveSupport
send(method, event)
end
def publish_event(event) # :nodoc:
method = event.name.split(".").first
send(method, event)
end
private
def event_stack
SubscriberQueueRegistry.instance.get_queue(@queue_key)

View File

@ -61,6 +61,47 @@ module ActiveSupport
assert_equal [["foo", instrumenter.id, payload]], notifier.finishes
assert_empty notifier.starts
end
def test_record
called = false
event = instrumenter.new_event("foo", payload)
event.record {
called = true
}
assert called
end
def test_record_yields_the_payload_for_further_modification
event = instrumenter.new_event("awesome")
event.record { |p| p[:result] = 1 + 1 }
assert_equal 2, event.payload[:result]
assert_equal "awesome", event.name
assert_equal Hash[result: 2], event.payload
assert_equal instrumenter.id, event.transaction_id
assert_not_nil event.time
assert_not_nil event.end
end
def test_record_works_without_a_block
event = instrumenter.new_event("no.block", payload)
event.record
assert_equal "no.block", event.name
assert_equal payload, event.payload
assert_equal instrumenter.id, event.transaction_id
assert_not_nil event.time
assert_not_nil event.end
end
def test_record_with_exception
event = instrumenter.new_event("crash", payload)
assert_raises RuntimeError do
event.record { raise "Oopsies" }
end
assert_equal "Oopsies", event.payload[:exception_object].message
end
end
end
end

View File

@ -115,4 +115,16 @@ class SubscriberTest < ActiveSupport::TestCase
assert_equal [], TestSubscriber.events
end
def test_supports_publish_event
TestSubscriber.attach_to :doodle
original_event = ActiveSupport::Notifications::Event.new("open_party.doodle", Time.at(0), Time.at(10), "id", { foo: "bar" })
ActiveSupport::Notifications.publish_event(original_event)
assert_equal original_event, TestSubscriber.events.first
ensure
TestSubscriber.detach_from :doodle
end
end