From f539be73061a3ef6f2d53237a802e7c082995e87 Mon Sep 17 00:00:00 2001 From: eileencodes Date: Tue, 23 Feb 2021 13:53:07 -0500 Subject: [PATCH] Replace `ImmediateExecutor` with nothing In #41495 I added an `ImmediateExecutor` for applications that haven't configured a thread pool executor. After some thought and consideration I don't think this is correct. While it's essentially a no-op it will be confusing for users who see that queries that ran in the foreground marked as running in the background. At the moment we don't have any code that calls `load_async` internally from Rails but we may in the future and we should ensure that those calls run in the foreground and marked as such until an application has opted into async behavior by configuring which async executor they want. In this PR I've replaced the `ImmediateExecutor` with `nil`. I then added a check for `async_enabled?` that checks whether the executor is set to `nil` and if the adapter supports concurrent queries. This is a slight change in behavior because it ensures that adapters that can't run async queries don't log those queries are async. Co-authored-by: John Hawthorn --- activerecord/CHANGELOG.md | 4 +- .../abstract/connection_pool.rb | 2 - .../abstract/database_statements.rb | 3 +- .../connection_adapters/abstract_adapter.rb | 4 + activerecord/lib/active_record/core.rb | 16 +- activerecord/lib/active_record/relation.rb | 4 + .../test/cases/asynchronous_queries_test.rb | 56 ++++-- .../test/cases/relation/load_async_test.rb | 162 +++++++++++++++++- 8 files changed, 215 insertions(+), 36 deletions(-) diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 1b3af363cc..b87f24bb6d 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -23,7 +23,9 @@ Some applications may want one thread pool per database whereas others want to use a single global thread pool for all queries. By default Rails will set `async_query_executor` - to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op. + to `nil` which will not initialize any executor. If `load_async` is called and no executor + has been configured, the query will be executed in the foreground. + To create one thread pool for all database connections to use applications can set `config.active_record.async_query_executor` to `:global_thread_pool` and optionally define `config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 6582a6df54..f8d4f2f527 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -469,8 +469,6 @@ module ActiveRecord ) when :global_thread_pool Base.global_thread_pool_async_query_executor - else - Base.immediate_query_executor end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index 14d6127bc5..51429fe760 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -525,7 +525,7 @@ module ActiveRecord # Returns an ActiveRecord::Result instance. def select(sql, name = nil, binds = [], prepare: false, async: false) - if async + if async && async_enabled? if current_transaction.joinable? raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions" end @@ -544,6 +544,7 @@ module ActiveRecord end return future_result end + exec_query(sql, name, binds, prepare: prepare) end diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 75cf7c1c83..bb051528bc 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -429,6 +429,10 @@ module ActiveRecord true end + def async_enabled? + supports_concurrent_connections? && !Base.async_query_executor.nil? + end + # This is meant to be implemented by the adapters that support extensions def disable_extension(name) end diff --git a/activerecord/lib/active_record/core.rb b/activerecord/lib/active_record/core.rb index 1d2da15d6d..405754464b 100644 --- a/activerecord/lib/active_record/core.rb +++ b/activerecord/lib/active_record/core.rb @@ -158,18 +158,16 @@ module ActiveRecord mattr_accessor :application_record_class, instance_accessor: false, default: nil # Sets the async_query_executor for an application. By default the thread pool executor - # set to +:immediate+. Options are: + # set to +nil+ which will not run queries in the background. Applications must configure + # a thread pool executor to use this feature. Options are: # - # * :immediate - Initializes a single +Concurrent::ImmediateExecutor+ + # * nil - Does not initalize a thread pool executor. Any async calls will be + # run in the foreground. # * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+ # that uses the +async_query_concurrency+ for the +max_threads+ value. # * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each # database connection. The initializer values are defined in the configuration hash. - mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate - - def self.immediate_query_executor # :nodoc: - @@immediate_query_executor ||= Concurrent::ImmediateExecutor.new - end + mattr_accessor :async_query_executor, instance_accessor: false, default: nil def self.global_thread_pool_async_query_executor # :nodoc: concurrency = global_executor_concurrency || 4 @@ -184,8 +182,8 @@ module ActiveRecord # Set the +global_executor_concurrency+. This configuration value can only be used # with the global thread pool async query executor. def self.global_executor_concurrency=(global_executor_concurrency) - if async_query_executor == :immediate || async_query_executor == :multi_thread_pool - raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op." + if async_query_executor.nil? || async_query_executor == :multi_thread_pool + raise ArgumentError, "`global_executor_concurrency` cannot be set when using the executor is nil or set to multi_thead_pool. For multiple thread pools, please set the concurrency in your database configuration." end @@global_executor_concurrency = global_executor_concurrency diff --git a/activerecord/lib/active_record/relation.rb b/activerecord/lib/active_record/relation.rb index e6ec2a3a3d..a890af4519 100644 --- a/activerecord/lib/active_record/relation.rb +++ b/activerecord/lib/active_record/relation.rb @@ -654,8 +654,11 @@ module ActiveRecord # # Post.where(published: true).load_async # => # def load_async + return load if !connection.async_enabled? + unless loaded? result = exec_main_query(async: connection.current_transaction.closed?) + if result.is_a?(Array) @records = result else @@ -663,6 +666,7 @@ module ActiveRecord end @loaded = true end + self end diff --git a/activerecord/test/cases/asynchronous_queries_test.rb b/activerecord/test/cases/asynchronous_queries_test.rb index 1325a7001e..cc7da91ab6 100644 --- a/activerecord/test/cases/asynchronous_queries_test.rb +++ b/activerecord/test/cases/asynchronous_queries_test.rb @@ -8,10 +8,16 @@ module AsynchronousQueriesSharedTests def test_async_select_failure ActiveRecord::Base.asynchronous_queries_tracker.start_session - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result + if in_memory_db? + assert_raises ActiveRecord::StatementInvalid do + @connection.select_all "SELECT * FROM does_not_exists", async: true + end + else + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end end ensure ActiveRecord::Base.asynchronous_queries_tracker.finalize_session @@ -24,9 +30,11 @@ module AsynchronousQueriesSharedTests @connection.select_all "SELECT * FROM posts", async: true end - @connection.transaction do - assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do - @connection.select_all "SELECT * FROM posts", async: true + unless in_memory_db? + @connection.transaction do + assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do + @connection.select_all "SELECT * FROM posts", async: true + end end end ensure @@ -57,10 +65,16 @@ module AsynchronousQueriesSharedTests end @connection.pool.stub(:schedule_query, proc { }) do - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result + if in_memory_db? + assert_raises ActiveRecord::StatementInvalid do + @connection.select_all "SELECT * FROM does_not_exists", async: true + end + else + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end end end @@ -96,7 +110,12 @@ class AsynchronousQueriesTest < ActiveRecord::TestCase end future_result = @connection.select_all "SELECT * FROM posts", async: true - assert_kind_of ActiveRecord::FutureResult, future_result + + if in_memory_db? + assert_kind_of ActiveRecord::Result, future_result + else + assert_kind_of ActiveRecord::FutureResult, future_result + end monitor.synchronize do condition.wait_until { status[:executed] } @@ -121,9 +140,9 @@ class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase end class AsynchronousExecutorTypeTest < ActiveRecord::TestCase - def test_immediate_configuration_uses_a_single_immediate_executor_by_default + def test_null_configuration_uses_a_single_null_executor_by_default old_value = ActiveRecord::Base.async_query_executor - ActiveRecord::Base.async_query_executor = :immediate + ActiveRecord::Base.async_query_executor = nil handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") @@ -134,11 +153,10 @@ class AsynchronousExecutorTypeTest < ActiveRecord::TestCase async_pool1 = pool1.instance_variable_get(:@async_executor) async_pool2 = pool2.instance_variable_get(:@async_executor) - assert async_pool1.is_a?(Concurrent::ImmediateExecutor) - assert async_pool2.is_a?(Concurrent::ImmediateExecutor) + assert_nil async_pool1 + assert_nil async_pool2 assert_equal 2, handler.all_connection_pools.count - assert_equal async_pool1, async_pool2 ensure clean_up_connection_handler ActiveRecord::Base.async_query_executor = old_value @@ -213,9 +231,9 @@ class AsynchronousExecutorTypeTest < ActiveRecord::TestCase ActiveRecord::Base.async_query_executor = old_value end - def test_concurrency_cannot_be_set_with_immediate_or_multi_thread_pool + def test_concurrency_cannot_be_set_with_null_executor_or_multi_thread_pool old_value = ActiveRecord::Base.async_query_executor - ActiveRecord::Base.async_query_executor = :immediate + ActiveRecord::Base.async_query_executor = nil assert_raises ArgumentError do ActiveRecord::Base.global_executor_concurrency = 8 diff --git a/activerecord/test/cases/relation/load_async_test.rb b/activerecord/test/cases/relation/load_async_test.rb index 8f2a4df742..72df32a9dc 100644 --- a/activerecord/test/cases/relation/load_async_test.rb +++ b/activerecord/test/cases/relation/load_async_test.rb @@ -12,7 +12,11 @@ module ActiveRecord def test_scheduled? defered_posts = Post.where(author_id: 1).load_async - assert_predicate defered_posts, :scheduled? + if in_memory_db? + assert_not_predicate defered_posts, :scheduled? + else + assert_predicate defered_posts, :scheduled? + end assert_predicate defered_posts, :loaded? defered_posts.to_a assert_not_predicate defered_posts, :scheduled? @@ -20,7 +24,11 @@ module ActiveRecord def test_reset defered_posts = Post.where(author_id: 1).load_async - assert_predicate defered_posts, :scheduled? + if in_memory_db? + assert_not_predicate defered_posts, :scheduled? + else + assert_predicate defered_posts, :scheduled? + end defered_posts.reset assert_not_predicate defered_posts, :scheduled? end @@ -57,7 +65,11 @@ module ActiveRecord Post.transaction do Post.where(author_id: 1).update_all(title: "In Transaction") posts = Post.where(author_id: 1).load_async - assert_predicate posts, :scheduled? + if in_memory_db? + assert_not_predicate posts, :scheduled? + else + assert_predicate posts, :scheduled? + end assert_predicate posts, :loaded? raise ActiveRecord::Rollback end @@ -83,7 +95,11 @@ module ActiveRecord defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async - assert_predicate defered_posts, :scheduled? + if in_memory_db? + assert_not_predicate defered_posts, :scheduled? + else + assert_predicate defered_posts, :scheduled? + end monitor.synchronize do condition.wait_until { status[:executed] } @@ -127,4 +143,142 @@ module ActiveRecord assert_predicate defered_posts, :loaded? end end + + unless in_memory_db? + class LoadAsyncNullExecutorTest < ActiveRecord::TestCase + self.use_transactional_tests = false + + fixtures :posts, :comments + + def setup + @old_config = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = nil + ActiveRecord::Base.establish_connection :arunit + end + + def teardown + ActiveRecord::Base.async_query_executor = @old_config + ActiveRecord::Base.establish_connection :arunit + end + + def test_scheduled? + defered_posts = Post.where(author_id: 1).load_async + assert_not_predicate defered_posts, :scheduled? + assert_predicate defered_posts, :loaded? + defered_posts.to_a + assert_not_predicate defered_posts, :scheduled? + end + + def test_reset + defered_posts = Post.where(author_id: 1).load_async + assert_not_predicate defered_posts, :scheduled? + defered_posts.reset + assert_not_predicate defered_posts, :scheduled? + end + + def test_simple_query + expected_records = Post.where(author_id: 1).to_a + + status = {} + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:name] == "Post Load" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + defered_posts = Post.where(author_id: 1).load_async + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + + assert_equal expected_records, defered_posts.to_a + assert_not_equal Post.connection.supports_concurrent_connections?, status[:async] + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end + + def test_load_async_from_transaction + posts = nil + Post.transaction do + Post.where(author_id: 1).update_all(title: "In Transaction") + posts = Post.where(author_id: 1).load_async + assert_not_predicate posts, :scheduled? + assert_predicate posts, :loaded? + raise ActiveRecord::Rollback + end + + assert_not_nil posts + assert_equal ["In Transaction"], posts.map(&:title).uniq + end + + def test_eager_loading_query + expected_records = Post.where(author_id: 1).eager_load(:comments).to_a + + status = {} + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:name] == "SQL" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async + + assert_not_predicate defered_posts, :scheduled? + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + + assert_equal expected_records, defered_posts.to_a + assert_queries(0) do + defered_posts.each(&:comments) + end + + assert_predicate Post.connection, :supports_concurrent_connections? + assert_not status[:async], "Expected status[:async] to be false with NullExecutor" + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end + + def test_contradiction + assert_queries(0) do + assert_equal [], Post.where(id: []).load_async.to_a + end + + Post.where(id: []).load_async.reset + end + + def test_pluck + titles = Post.where(author_id: 1).pluck(:title) + assert_equal titles, Post.where(author_id: 1).load_async.pluck(:title) + end + + def test_size + expected_size = Post.where(author_id: 1).size + + defered_posts = Post.where(author_id: 1).load_async + + assert_equal expected_size, defered_posts.size + assert_predicate defered_posts, :loaded? + end + + def test_empty? + defered_posts = Post.where(author_id: 1).load_async + + assert_equal false, defered_posts.empty? + assert_predicate defered_posts, :loaded? + end + end + end end