diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 1b3af363cc..b87f24bb6d 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -23,7 +23,9 @@ Some applications may want one thread pool per database whereas others want to use a single global thread pool for all queries. By default Rails will set `async_query_executor` - to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op. + to `nil` which will not initialize any executor. If `load_async` is called and no executor + has been configured, the query will be executed in the foreground. + To create one thread pool for all database connections to use applications can set `config.active_record.async_query_executor` to `:global_thread_pool` and optionally define `config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 6582a6df54..f8d4f2f527 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -469,8 +469,6 @@ module ActiveRecord ) when :global_thread_pool Base.global_thread_pool_async_query_executor - else - Base.immediate_query_executor end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index 14d6127bc5..51429fe760 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -525,7 +525,7 @@ module ActiveRecord # Returns an ActiveRecord::Result instance. def select(sql, name = nil, binds = [], prepare: false, async: false) - if async + if async && async_enabled? if current_transaction.joinable? raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions" end @@ -544,6 +544,7 @@ module ActiveRecord end return future_result end + exec_query(sql, name, binds, prepare: prepare) end diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 75cf7c1c83..bb051528bc 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -429,6 +429,10 @@ module ActiveRecord true end + def async_enabled? + supports_concurrent_connections? && !Base.async_query_executor.nil? + end + # This is meant to be implemented by the adapters that support extensions def disable_extension(name) end diff --git a/activerecord/lib/active_record/core.rb b/activerecord/lib/active_record/core.rb index 1d2da15d6d..405754464b 100644 --- a/activerecord/lib/active_record/core.rb +++ b/activerecord/lib/active_record/core.rb @@ -158,18 +158,16 @@ module ActiveRecord mattr_accessor :application_record_class, instance_accessor: false, default: nil # Sets the async_query_executor for an application. By default the thread pool executor - # set to +:immediate+. Options are: + # set to +nil+ which will not run queries in the background. Applications must configure + # a thread pool executor to use this feature. Options are: # - # * :immediate - Initializes a single +Concurrent::ImmediateExecutor+ + # * nil - Does not initalize a thread pool executor. Any async calls will be + # run in the foreground. # * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+ # that uses the +async_query_concurrency+ for the +max_threads+ value. # * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each # database connection. The initializer values are defined in the configuration hash. - mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate - - def self.immediate_query_executor # :nodoc: - @@immediate_query_executor ||= Concurrent::ImmediateExecutor.new - end + mattr_accessor :async_query_executor, instance_accessor: false, default: nil def self.global_thread_pool_async_query_executor # :nodoc: concurrency = global_executor_concurrency || 4 @@ -184,8 +182,8 @@ module ActiveRecord # Set the +global_executor_concurrency+. This configuration value can only be used # with the global thread pool async query executor. def self.global_executor_concurrency=(global_executor_concurrency) - if async_query_executor == :immediate || async_query_executor == :multi_thread_pool - raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op." + if async_query_executor.nil? || async_query_executor == :multi_thread_pool + raise ArgumentError, "`global_executor_concurrency` cannot be set when using the executor is nil or set to multi_thead_pool. For multiple thread pools, please set the concurrency in your database configuration." end @@global_executor_concurrency = global_executor_concurrency diff --git a/activerecord/lib/active_record/relation.rb b/activerecord/lib/active_record/relation.rb index e6ec2a3a3d..a890af4519 100644 --- a/activerecord/lib/active_record/relation.rb +++ b/activerecord/lib/active_record/relation.rb @@ -654,8 +654,11 @@ module ActiveRecord # # Post.where(published: true).load_async # => # def load_async + return load if !connection.async_enabled? + unless loaded? result = exec_main_query(async: connection.current_transaction.closed?) + if result.is_a?(Array) @records = result else @@ -663,6 +666,7 @@ module ActiveRecord end @loaded = true end + self end diff --git a/activerecord/test/cases/asynchronous_queries_test.rb b/activerecord/test/cases/asynchronous_queries_test.rb index 1325a7001e..cc7da91ab6 100644 --- a/activerecord/test/cases/asynchronous_queries_test.rb +++ b/activerecord/test/cases/asynchronous_queries_test.rb @@ -8,10 +8,16 @@ module AsynchronousQueriesSharedTests def test_async_select_failure ActiveRecord::Base.asynchronous_queries_tracker.start_session - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result + if in_memory_db? + assert_raises ActiveRecord::StatementInvalid do + @connection.select_all "SELECT * FROM does_not_exists", async: true + end + else + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end end ensure ActiveRecord::Base.asynchronous_queries_tracker.finalize_session @@ -24,9 +30,11 @@ module AsynchronousQueriesSharedTests @connection.select_all "SELECT * FROM posts", async: true end - @connection.transaction do - assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do - @connection.select_all "SELECT * FROM posts", async: true + unless in_memory_db? + @connection.transaction do + assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do + @connection.select_all "SELECT * FROM posts", async: true + end end end ensure @@ -57,10 +65,16 @@ module AsynchronousQueriesSharedTests end @connection.pool.stub(:schedule_query, proc { }) do - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result + if in_memory_db? + assert_raises ActiveRecord::StatementInvalid do + @connection.select_all "SELECT * FROM does_not_exists", async: true + end + else + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end end end @@ -96,7 +110,12 @@ class AsynchronousQueriesTest < ActiveRecord::TestCase end future_result = @connection.select_all "SELECT * FROM posts", async: true - assert_kind_of ActiveRecord::FutureResult, future_result + + if in_memory_db? + assert_kind_of ActiveRecord::Result, future_result + else + assert_kind_of ActiveRecord::FutureResult, future_result + end monitor.synchronize do condition.wait_until { status[:executed] } @@ -121,9 +140,9 @@ class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase end class AsynchronousExecutorTypeTest < ActiveRecord::TestCase - def test_immediate_configuration_uses_a_single_immediate_executor_by_default + def test_null_configuration_uses_a_single_null_executor_by_default old_value = ActiveRecord::Base.async_query_executor - ActiveRecord::Base.async_query_executor = :immediate + ActiveRecord::Base.async_query_executor = nil handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") @@ -134,11 +153,10 @@ class AsynchronousExecutorTypeTest < ActiveRecord::TestCase async_pool1 = pool1.instance_variable_get(:@async_executor) async_pool2 = pool2.instance_variable_get(:@async_executor) - assert async_pool1.is_a?(Concurrent::ImmediateExecutor) - assert async_pool2.is_a?(Concurrent::ImmediateExecutor) + assert_nil async_pool1 + assert_nil async_pool2 assert_equal 2, handler.all_connection_pools.count - assert_equal async_pool1, async_pool2 ensure clean_up_connection_handler ActiveRecord::Base.async_query_executor = old_value @@ -213,9 +231,9 @@ class AsynchronousExecutorTypeTest < ActiveRecord::TestCase ActiveRecord::Base.async_query_executor = old_value end - def test_concurrency_cannot_be_set_with_immediate_or_multi_thread_pool + def test_concurrency_cannot_be_set_with_null_executor_or_multi_thread_pool old_value = ActiveRecord::Base.async_query_executor - ActiveRecord::Base.async_query_executor = :immediate + ActiveRecord::Base.async_query_executor = nil assert_raises ArgumentError do ActiveRecord::Base.global_executor_concurrency = 8 diff --git a/activerecord/test/cases/relation/load_async_test.rb b/activerecord/test/cases/relation/load_async_test.rb index 8f2a4df742..72df32a9dc 100644 --- a/activerecord/test/cases/relation/load_async_test.rb +++ b/activerecord/test/cases/relation/load_async_test.rb @@ -12,7 +12,11 @@ module ActiveRecord def test_scheduled? defered_posts = Post.where(author_id: 1).load_async - assert_predicate defered_posts, :scheduled? + if in_memory_db? + assert_not_predicate defered_posts, :scheduled? + else + assert_predicate defered_posts, :scheduled? + end assert_predicate defered_posts, :loaded? defered_posts.to_a assert_not_predicate defered_posts, :scheduled? @@ -20,7 +24,11 @@ module ActiveRecord def test_reset defered_posts = Post.where(author_id: 1).load_async - assert_predicate defered_posts, :scheduled? + if in_memory_db? + assert_not_predicate defered_posts, :scheduled? + else + assert_predicate defered_posts, :scheduled? + end defered_posts.reset assert_not_predicate defered_posts, :scheduled? end @@ -57,7 +65,11 @@ module ActiveRecord Post.transaction do Post.where(author_id: 1).update_all(title: "In Transaction") posts = Post.where(author_id: 1).load_async - assert_predicate posts, :scheduled? + if in_memory_db? + assert_not_predicate posts, :scheduled? + else + assert_predicate posts, :scheduled? + end assert_predicate posts, :loaded? raise ActiveRecord::Rollback end @@ -83,7 +95,11 @@ module ActiveRecord defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async - assert_predicate defered_posts, :scheduled? + if in_memory_db? + assert_not_predicate defered_posts, :scheduled? + else + assert_predicate defered_posts, :scheduled? + end monitor.synchronize do condition.wait_until { status[:executed] } @@ -127,4 +143,142 @@ module ActiveRecord assert_predicate defered_posts, :loaded? end end + + unless in_memory_db? + class LoadAsyncNullExecutorTest < ActiveRecord::TestCase + self.use_transactional_tests = false + + fixtures :posts, :comments + + def setup + @old_config = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = nil + ActiveRecord::Base.establish_connection :arunit + end + + def teardown + ActiveRecord::Base.async_query_executor = @old_config + ActiveRecord::Base.establish_connection :arunit + end + + def test_scheduled? + defered_posts = Post.where(author_id: 1).load_async + assert_not_predicate defered_posts, :scheduled? + assert_predicate defered_posts, :loaded? + defered_posts.to_a + assert_not_predicate defered_posts, :scheduled? + end + + def test_reset + defered_posts = Post.where(author_id: 1).load_async + assert_not_predicate defered_posts, :scheduled? + defered_posts.reset + assert_not_predicate defered_posts, :scheduled? + end + + def test_simple_query + expected_records = Post.where(author_id: 1).to_a + + status = {} + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:name] == "Post Load" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + defered_posts = Post.where(author_id: 1).load_async + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + + assert_equal expected_records, defered_posts.to_a + assert_not_equal Post.connection.supports_concurrent_connections?, status[:async] + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end + + def test_load_async_from_transaction + posts = nil + Post.transaction do + Post.where(author_id: 1).update_all(title: "In Transaction") + posts = Post.where(author_id: 1).load_async + assert_not_predicate posts, :scheduled? + assert_predicate posts, :loaded? + raise ActiveRecord::Rollback + end + + assert_not_nil posts + assert_equal ["In Transaction"], posts.map(&:title).uniq + end + + def test_eager_loading_query + expected_records = Post.where(author_id: 1).eager_load(:comments).to_a + + status = {} + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:name] == "SQL" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async + + assert_not_predicate defered_posts, :scheduled? + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + + assert_equal expected_records, defered_posts.to_a + assert_queries(0) do + defered_posts.each(&:comments) + end + + assert_predicate Post.connection, :supports_concurrent_connections? + assert_not status[:async], "Expected status[:async] to be false with NullExecutor" + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end + + def test_contradiction + assert_queries(0) do + assert_equal [], Post.where(id: []).load_async.to_a + end + + Post.where(id: []).load_async.reset + end + + def test_pluck + titles = Post.where(author_id: 1).pluck(:title) + assert_equal titles, Post.where(author_id: 1).load_async.pluck(:title) + end + + def test_size + expected_size = Post.where(author_id: 1).size + + defered_posts = Post.where(author_id: 1).load_async + + assert_equal expected_size, defered_posts.size + assert_predicate defered_posts, :loaded? + end + + def test_empty? + defered_posts = Post.where(author_id: 1).load_async + + assert_equal false, defered_posts.empty? + assert_predicate defered_posts, :loaded? + end + end + end end