diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 22768c9611..32e7afaee2 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -103,6 +103,19 @@ module ActiveRecord ) end + EXCEPTION_NEVER = { Exception => :never }.freeze # :nodoc: + EXCEPTION_IMMEDIATE = { Exception => :immediate }.freeze # :nodoc: + private_constant :EXCEPTION_NEVER, :EXCEPTION_IMMEDIATE + def with_instrumenter(instrumenter, &block) # :nodoc: + Thread.handle_interrupt(EXCEPTION_NEVER) do + previous_instrumenter = @instrumenter + @instrumenter = instrumenter + Thread.handle_interrupt(EXCEPTION_IMMEDIATE, &block) + ensure + @instrumenter = previous_instrumenter + end + end + def check_if_write_query(sql) # :nodoc: if preventing_writes? && write_query?(sql) raise ActiveRecord::ReadOnlyError, "Write query attempted while in readonly mode: #{sql}" diff --git a/activerecord/lib/active_record/future_result.rb b/activerecord/lib/active_record/future_result.rb index 77d5624be3..a45b100695 100644 --- a/activerecord/lib/active_record/future_result.rb +++ b/activerecord/lib/active_record/future_result.rb @@ -17,6 +17,8 @@ module ActiveRecord @pending = true @error = nil @result = nil + @instrumenter = ActiveSupport::Notifications.instrumenter + @event_buffer = nil end def schedule!(session) @@ -41,7 +43,10 @@ module ActiveRecord return unless @mutex.try_lock begin if pending? - execute_query(connection, async: true) + @event_buffer = @instrumenter.buffer + connection.with_instrumenter(@event_buffer) do + execute_query(connection, async: true) + end end ensure @mutex.unlock @@ -51,20 +56,22 @@ module ActiveRecord def result execute_or_wait - if @error - raise @error - elsif canceled? + @event_buffer&.flush + + if canceled? raise Canceled + elsif @error + raise @error else @result end end - private - def pending? - @pending && (!@session || @session.active?) - end + def pending? + @pending && (!@session || @session.active?) + end + private def canceled? @session && !@session.active? end diff --git a/activerecord/lib/active_record/relation.rb b/activerecord/lib/active_record/relation.rb index a890af4519..547297a822 100644 --- a/activerecord/lib/active_record/relation.rb +++ b/activerecord/lib/active_record/relation.rb @@ -32,6 +32,7 @@ module ActiveRecord @predicate_builder = predicate_builder @delegate_to_klass = false @future_result = nil + @records = nil end def initialize_copy(other) @@ -703,7 +704,7 @@ module ActiveRecord @delegate_to_klass = false @to_sql = @arel = @loaded = @should_eager_load = nil @offsets = @take = nil - @records = [].freeze + @records = nil self end diff --git a/activerecord/test/cases/asynchronous_queries_test.rb b/activerecord/test/cases/asynchronous_queries_test.rb index 0118c94ca5..8f9b068840 100644 --- a/activerecord/test/cases/asynchronous_queries_test.rb +++ b/activerecord/test/cases/asynchronous_queries_test.rb @@ -83,6 +83,14 @@ module AsynchronousQueriesSharedTests ensure ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber end + + private + def wait_for_future_result(result) + 100.times do + break unless result.pending? + sleep 0.01 + end + end end class AsynchronousQueriesTest < ActiveRecord::TestCase @@ -98,14 +106,10 @@ class AsynchronousQueriesTest < ActiveRecord::TestCase ActiveRecord::Base.asynchronous_queries_tracker.start_session status = {} - monitor = Monitor.new - condition = monitor.new_cond - subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:sql] == "SELECT * FROM posts" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end end @@ -115,11 +119,9 @@ class AsynchronousQueriesTest < ActiveRecord::TestCase assert_kind_of ActiveRecord::Result, future_result else assert_kind_of ActiveRecord::FutureResult, future_result + wait_for_future_result(future_result) end - monitor.synchronize do - condition.wait_until { status[:executed] } - end assert_kind_of ActiveRecord::Result, future_result.result assert_equal @connection.supports_concurrent_connections?, status[:async] ensure diff --git a/activerecord/test/cases/relation/load_async_test.rb b/activerecord/test/cases/relation/load_async_test.rb index bacde8c4ab..d3e6f2e5ca 100644 --- a/activerecord/test/cases/relation/load_async_test.rb +++ b/activerecord/test/cases/relation/load_async_test.rb @@ -6,7 +6,25 @@ require "models/comment" require "models/other_dog" module ActiveRecord + module WaitForAsyncTestHelper + private + def wait_for_async_query(relation, timeout: 5) + if !relation.connection.async_enabled? || relation.instance_variable_get(:@records) + return relation + end + + future_result = relation.instance_variable_get(:@future_result) + (timeout * 100).times do + return relation unless future_result.pending? + sleep 0.01 + end + raise Timeout::Error, "The async executor wasn't drained after #{timeout} seconds" + end + end + class LoadAsyncTest < ActiveRecord::TestCase + include WaitForAsyncTestHelper + self.use_transactional_tests = false fixtures :posts, :comments @@ -34,27 +52,42 @@ module ActiveRecord assert_not_predicate defered_posts, :scheduled? end - def test_simple_query + def test_notification_forwarding expected_records = Post.where(author_id: 1).to_a status = {} - monitor = Monitor.new - condition = monitor.new_cond subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:name] == "Post Load" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } + status[:thread_id] = Thread.current.object_id end end - defered_posts = Post.where(author_id: 1).load_async + defered_posts = wait_for_async_query(Post.where(author_id: 1).load_async) - monitor.synchronize do - condition.wait_until { status[:executed] } + assert_equal expected_records, defered_posts.to_a + assert_equal Post.connection.supports_concurrent_connections?, status[:async] + assert_equal Thread.current.object_id, status[:thread_id] + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end + + def test_simple_query + expected_records = Post.where(author_id: 1).to_a + + status = {} + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:name] == "Post Load" + status[:executed] = true + status[:async] = event.payload[:async] + end end + defered_posts = wait_for_async_query(Post.where(author_id: 1).load_async) + assert_equal expected_records, defered_posts.to_a assert_equal Post.connection.supports_concurrent_connections?, status[:async] ensure @@ -83,18 +116,15 @@ module ActiveRecord expected_records = Post.where(author_id: 1).eager_load(:comments).to_a status = {} - monitor = Monitor.new - condition = monitor.new_cond subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:name] == "SQL" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end end - defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async + defered_posts = wait_for_async_query(Post.where(author_id: 1).eager_load(:comments).load_async) if in_memory_db? assert_not_predicate defered_posts, :scheduled? @@ -102,10 +132,6 @@ module ActiveRecord assert_predicate defered_posts, :scheduled? end - monitor.synchronize do - condition.wait_until { status[:executed] } - end - assert_equal expected_records, defered_posts.to_a assert_queries(0) do defered_posts.each(&:comments) @@ -181,23 +207,16 @@ module ActiveRecord expected_records = Post.where(author_id: 1).to_a status = {} - monitor = Monitor.new - condition = monitor.new_cond subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:name] == "Post Load" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end end defered_posts = Post.where(author_id: 1).load_async - monitor.synchronize do - condition.wait_until { status[:executed] } - end - assert_equal expected_records, defered_posts.to_a assert_not_equal Post.connection.supports_concurrent_connections?, status[:async] ensure @@ -222,14 +241,11 @@ module ActiveRecord expected_records = Post.where(author_id: 1).eager_load(:comments).to_a status = {} - monitor = Monitor.new - condition = monitor.new_cond subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:name] == "SQL" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end end @@ -237,10 +253,6 @@ module ActiveRecord assert_not_predicate defered_posts, :scheduled? - monitor.synchronize do - condition.wait_until { status[:executed] } - end - assert_equal expected_records, defered_posts.to_a assert_queries(0) do defered_posts.each(&:comments) @@ -283,6 +295,8 @@ module ActiveRecord end class LoadAsyncMultiThreadPoolExecutorTest < ActiveRecord::TestCase + include WaitForAsyncTestHelper + self.use_transactional_tests = false fixtures :posts, :comments @@ -329,22 +343,14 @@ module ActiveRecord expected_records = Post.where(author_id: 1).to_a status = {} - monitor = Monitor.new - condition = monitor.new_cond - subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:name] == "Post Load" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end end - defered_posts = Post.where(author_id: 1).load_async - - monitor.synchronize do - condition.wait_until { status[:executed] } - end + defered_posts = wait_for_async_query(Post.where(author_id: 1).load_async) assert_equal expected_records, defered_posts.to_a assert_equal Post.connection.supports_concurrent_connections?, status[:async] @@ -370,25 +376,17 @@ module ActiveRecord expected_records = Post.where(author_id: 1).eager_load(:comments).to_a status = {} - monitor = Monitor.new - condition = monitor.new_cond - subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:name] == "SQL" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end end - defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async + defered_posts = wait_for_async_query(Post.where(author_id: 1).eager_load(:comments).load_async) assert_predicate defered_posts, :scheduled? - monitor.synchronize do - condition.wait_until { status[:executed] } - end - assert_equal expected_records, defered_posts.to_a assert_queries(0) do defered_posts.each(&:comments) @@ -429,6 +427,8 @@ module ActiveRecord end class LoadAsyncMixedThreadPoolExecutorTest < ActiveRecord::TestCase + include WaitForAsyncTestHelper + self.use_transactional_tests = false fixtures :posts, :comments, :other_dogs @@ -476,30 +476,24 @@ module ActiveRecord status = {} dog_status = {} - monitor = Monitor.new - condition = monitor.new_cond subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| if event.payload[:name] == "Post Load" status[:executed] = true status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end if event.payload[:name] == "OtherDog Load" dog_status[:executed] = true dog_status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } end end defered_posts = Post.where(author_id: 1).load_async defered_dogs = OtherDog.where(id: 1).load_async - monitor.synchronize do - condition.wait_until { status[:executed] } - condition.wait_until { dog_status[:executed] } - end + wait_for_async_query(defered_posts) + wait_for_async_query(defered_dogs) assert_equal expected_records, defered_posts.to_a assert_equal expected_dogs, defered_dogs.to_a diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 5bdeb9fb43..3000dde797 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -198,6 +198,10 @@ module ActiveSupport notifier.publish(name, *args) end + def publish_event(event) # :nodoc; + notifier.publish_event(event) + end + def instrument(name, payload = {}) if notifier.listening?(name) instrumenter.instrument(name, payload) { yield payload if block_given? } diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index 56c2bd2aa2..24e7ca6471 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -70,6 +70,10 @@ module ActiveSupport listeners_for(name).each { |s| s.publish(name, *args) } end + def publish_event(event) + listeners_for(event.name).each { |s| s.publish_event(event) } + end + def listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @listeners_for[name] || synchronize do @@ -144,6 +148,7 @@ module ActiveSupport @pattern = Matcher.wrap(pattern) @delegate = delegate @can_publish = delegate.respond_to?(:publish) + @can_publish_event = delegate.respond_to?(:publish_event) end def publish(name, *args) @@ -152,6 +157,14 @@ module ActiveSupport end end + def publish_event(event) + if @can_publish_event + @delegate.publish_event event + else + publish(event.name, event.time, event.end, event.transaction_id, event.payload) + end + end + def start(name, id, payload) @delegate.start name, id, payload end @@ -223,6 +236,10 @@ module ActiveSupport @delegate.call event end + def publish_event(event) + @delegate.call event + end + private def build_event(name, id, payload) ActiveSupport::Notifications::Event.new name, nil, nil, id, payload diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb index e1a9fe349c..834f6cb1d0 100644 --- a/activesupport/lib/active_support/notifications/instrumenter.rb +++ b/activesupport/lib/active_support/notifications/instrumenter.rb @@ -6,6 +6,26 @@ module ActiveSupport module Notifications # Instrumenters are stored in a thread local. class Instrumenter + class Buffer # :nodoc: + def initialize(instrumenter) + @instrumenter = instrumenter + @events = [] + end + + def instrument(name, payload = {}, &block) + event = @instrumenter.new_event(name, payload) + @events << event + event.record(&block) + end + + def flush + events, @events = @events, [] + events.each do |event| + ActiveSupport::Notifications.publish_event(event) + end + end + end + attr_reader :id def initialize(notifier) @@ -31,6 +51,14 @@ module ActiveSupport end end + def new_event(name, payload = {}) # :nodoc: + Event.new(name, nil, nil, @id, payload) + end + + def buffer # :nodoc: + Buffer.new(self) + end + # Send a start notification with +name+ and +payload+. def start(name, payload) @notifier.start name, @id, payload @@ -68,6 +96,19 @@ module ActiveSupport @allocation_count_finish = 0 end + def record + start! + begin + yield payload if block_given? + rescue Exception => e + payload[:exception] = [e.class.name, e.message] + payload[:exception_object] = e + raise e + ensure + finish! + end + end + # Record information at the time this event starts def start! @time = now diff --git a/activesupport/lib/active_support/subscriber.rb b/activesupport/lib/active_support/subscriber.rb index 24f8681af8..991c5d8d30 100644 --- a/activesupport/lib/active_support/subscriber.rb +++ b/activesupport/lib/active_support/subscriber.rb @@ -150,6 +150,11 @@ module ActiveSupport send(method, event) end + def publish_event(event) # :nodoc: + method = event.name.split(".").first + send(method, event) + end + private def event_stack SubscriberQueueRegistry.instance.get_queue(@queue_key) diff --git a/activesupport/test/notifications/instrumenter_test.rb b/activesupport/test/notifications/instrumenter_test.rb index 1d10dc7f2d..47f708a438 100644 --- a/activesupport/test/notifications/instrumenter_test.rb +++ b/activesupport/test/notifications/instrumenter_test.rb @@ -61,6 +61,47 @@ module ActiveSupport assert_equal [["foo", instrumenter.id, payload]], notifier.finishes assert_empty notifier.starts end + + def test_record + called = false + event = instrumenter.new_event("foo", payload) + event.record { + called = true + } + + assert called + end + + def test_record_yields_the_payload_for_further_modification + event = instrumenter.new_event("awesome") + event.record { |p| p[:result] = 1 + 1 } + assert_equal 2, event.payload[:result] + + assert_equal "awesome", event.name + assert_equal Hash[result: 2], event.payload + assert_equal instrumenter.id, event.transaction_id + assert_not_nil event.time + assert_not_nil event.end + end + + def test_record_works_without_a_block + event = instrumenter.new_event("no.block", payload) + event.record + + assert_equal "no.block", event.name + assert_equal payload, event.payload + assert_equal instrumenter.id, event.transaction_id + assert_not_nil event.time + assert_not_nil event.end + end + + def test_record_with_exception + event = instrumenter.new_event("crash", payload) + assert_raises RuntimeError do + event.record { raise "Oopsies" } + end + assert_equal "Oopsies", event.payload[:exception_object].message + end end end end diff --git a/activesupport/test/subscriber_test.rb b/activesupport/test/subscriber_test.rb index e01b6976d9..b374241738 100644 --- a/activesupport/test/subscriber_test.rb +++ b/activesupport/test/subscriber_test.rb @@ -115,4 +115,16 @@ class SubscriberTest < ActiveSupport::TestCase assert_equal [], TestSubscriber.events end + + def test_supports_publish_event + TestSubscriber.attach_to :doodle + + original_event = ActiveSupport::Notifications::Event.new("open_party.doodle", Time.at(0), Time.at(10), "id", { foo: "bar" }) + + ActiveSupport::Notifications.publish_event(original_event) + + assert_equal original_event, TestSubscriber.events.first + ensure + TestSubscriber.detach_from :doodle + end end