mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Replace ImmediateExecutor
with nothing
In #41495 I added an `ImmediateExecutor` for applications that haven't configured a thread pool executor. After some thought and consideration I don't think this is correct. While it's essentially a no-op it will be confusing for users who see that queries that ran in the foreground marked as running in the background. At the moment we don't have any code that calls `load_async` internally from Rails but we may in the future and we should ensure that those calls run in the foreground and marked as such until an application has opted into async behavior by configuring which async executor they want. In this PR I've replaced the `ImmediateExecutor` with `nil`. I then added a check for `async_enabled?` that checks whether the executor is set to `nil` and if the adapter supports concurrent queries. This is a slight change in behavior because it ensures that adapters that can't run async queries don't log those queries are async. Co-authored-by: John Hawthorn <john@hawthorn.email>
This commit is contained in:
parent
0e71d45b00
commit
f539be7306
8 changed files with 215 additions and 36 deletions
|
@ -23,7 +23,9 @@
|
|||
|
||||
Some applications may want one thread pool per database whereas others want to use
|
||||
a single global thread pool for all queries. By default Rails will set `async_query_executor`
|
||||
to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op.
|
||||
to `nil` which will not initialize any executor. If `load_async` is called and no executor
|
||||
has been configured, the query will be executed in the foreground.
|
||||
|
||||
To create one thread pool for all database connections to use applications can set
|
||||
`config.active_record.async_query_executor` to `:global_thread_pool` and optionally define
|
||||
`config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want
|
||||
|
|
|
@ -469,8 +469,6 @@ module ActiveRecord
|
|||
)
|
||||
when :global_thread_pool
|
||||
Base.global_thread_pool_async_query_executor
|
||||
else
|
||||
Base.immediate_query_executor
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -525,7 +525,7 @@ module ActiveRecord
|
|||
|
||||
# Returns an ActiveRecord::Result instance.
|
||||
def select(sql, name = nil, binds = [], prepare: false, async: false)
|
||||
if async
|
||||
if async && async_enabled?
|
||||
if current_transaction.joinable?
|
||||
raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions"
|
||||
end
|
||||
|
@ -544,6 +544,7 @@ module ActiveRecord
|
|||
end
|
||||
return future_result
|
||||
end
|
||||
|
||||
exec_query(sql, name, binds, prepare: prepare)
|
||||
end
|
||||
|
||||
|
|
|
@ -429,6 +429,10 @@ module ActiveRecord
|
|||
true
|
||||
end
|
||||
|
||||
def async_enabled?
|
||||
supports_concurrent_connections? && !Base.async_query_executor.nil?
|
||||
end
|
||||
|
||||
# This is meant to be implemented by the adapters that support extensions
|
||||
def disable_extension(name)
|
||||
end
|
||||
|
|
|
@ -158,18 +158,16 @@ module ActiveRecord
|
|||
mattr_accessor :application_record_class, instance_accessor: false, default: nil
|
||||
|
||||
# Sets the async_query_executor for an application. By default the thread pool executor
|
||||
# set to +:immediate+. Options are:
|
||||
# set to +nil+ which will not run queries in the background. Applications must configure
|
||||
# a thread pool executor to use this feature. Options are:
|
||||
#
|
||||
# * :immediate - Initializes a single +Concurrent::ImmediateExecutor+
|
||||
# * nil - Does not initalize a thread pool executor. Any async calls will be
|
||||
# run in the foreground.
|
||||
# * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+
|
||||
# that uses the +async_query_concurrency+ for the +max_threads+ value.
|
||||
# * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each
|
||||
# database connection. The initializer values are defined in the configuration hash.
|
||||
mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate
|
||||
|
||||
def self.immediate_query_executor # :nodoc:
|
||||
@@immediate_query_executor ||= Concurrent::ImmediateExecutor.new
|
||||
end
|
||||
mattr_accessor :async_query_executor, instance_accessor: false, default: nil
|
||||
|
||||
def self.global_thread_pool_async_query_executor # :nodoc:
|
||||
concurrency = global_executor_concurrency || 4
|
||||
|
@ -184,8 +182,8 @@ module ActiveRecord
|
|||
# Set the +global_executor_concurrency+. This configuration value can only be used
|
||||
# with the global thread pool async query executor.
|
||||
def self.global_executor_concurrency=(global_executor_concurrency)
|
||||
if async_query_executor == :immediate || async_query_executor == :multi_thread_pool
|
||||
raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op."
|
||||
if async_query_executor.nil? || async_query_executor == :multi_thread_pool
|
||||
raise ArgumentError, "`global_executor_concurrency` cannot be set when using the executor is nil or set to multi_thead_pool. For multiple thread pools, please set the concurrency in your database configuration."
|
||||
end
|
||||
|
||||
@@global_executor_concurrency = global_executor_concurrency
|
||||
|
|
|
@ -654,8 +654,11 @@ module ActiveRecord
|
|||
#
|
||||
# Post.where(published: true).load_async # => #<ActiveRecord::Relation>
|
||||
def load_async
|
||||
return load if !connection.async_enabled?
|
||||
|
||||
unless loaded?
|
||||
result = exec_main_query(async: connection.current_transaction.closed?)
|
||||
|
||||
if result.is_a?(Array)
|
||||
@records = result
|
||||
else
|
||||
|
@ -663,6 +666,7 @@ module ActiveRecord
|
|||
end
|
||||
@loaded = true
|
||||
end
|
||||
|
||||
self
|
||||
end
|
||||
|
||||
|
|
|
@ -8,10 +8,16 @@ module AsynchronousQueriesSharedTests
|
|||
def test_async_select_failure
|
||||
ActiveRecord::Base.asynchronous_queries_tracker.start_session
|
||||
|
||||
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
future_result.result
|
||||
if in_memory_db?
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
@connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
end
|
||||
else
|
||||
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
future_result.result
|
||||
end
|
||||
end
|
||||
ensure
|
||||
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
|
||||
|
@ -24,9 +30,11 @@ module AsynchronousQueriesSharedTests
|
|||
@connection.select_all "SELECT * FROM posts", async: true
|
||||
end
|
||||
|
||||
@connection.transaction do
|
||||
assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do
|
||||
@connection.select_all "SELECT * FROM posts", async: true
|
||||
unless in_memory_db?
|
||||
@connection.transaction do
|
||||
assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do
|
||||
@connection.select_all "SELECT * FROM posts", async: true
|
||||
end
|
||||
end
|
||||
end
|
||||
ensure
|
||||
|
@ -57,10 +65,16 @@ module AsynchronousQueriesSharedTests
|
|||
end
|
||||
|
||||
@connection.pool.stub(:schedule_query, proc { }) do
|
||||
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
future_result.result
|
||||
if in_memory_db?
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
@connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
end
|
||||
else
|
||||
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
future_result.result
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -96,7 +110,12 @@ class AsynchronousQueriesTest < ActiveRecord::TestCase
|
|||
end
|
||||
|
||||
future_result = @connection.select_all "SELECT * FROM posts", async: true
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
|
||||
if in_memory_db?
|
||||
assert_kind_of ActiveRecord::Result, future_result
|
||||
else
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
end
|
||||
|
||||
monitor.synchronize do
|
||||
condition.wait_until { status[:executed] }
|
||||
|
@ -121,9 +140,9 @@ class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
|
|||
end
|
||||
|
||||
class AsynchronousExecutorTypeTest < ActiveRecord::TestCase
|
||||
def test_immediate_configuration_uses_a_single_immediate_executor_by_default
|
||||
def test_null_configuration_uses_a_single_null_executor_by_default
|
||||
old_value = ActiveRecord::Base.async_query_executor
|
||||
ActiveRecord::Base.async_query_executor = :immediate
|
||||
ActiveRecord::Base.async_query_executor = nil
|
||||
|
||||
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
|
||||
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
|
||||
|
@ -134,11 +153,10 @@ class AsynchronousExecutorTypeTest < ActiveRecord::TestCase
|
|||
async_pool1 = pool1.instance_variable_get(:@async_executor)
|
||||
async_pool2 = pool2.instance_variable_get(:@async_executor)
|
||||
|
||||
assert async_pool1.is_a?(Concurrent::ImmediateExecutor)
|
||||
assert async_pool2.is_a?(Concurrent::ImmediateExecutor)
|
||||
assert_nil async_pool1
|
||||
assert_nil async_pool2
|
||||
|
||||
assert_equal 2, handler.all_connection_pools.count
|
||||
assert_equal async_pool1, async_pool2
|
||||
ensure
|
||||
clean_up_connection_handler
|
||||
ActiveRecord::Base.async_query_executor = old_value
|
||||
|
@ -213,9 +231,9 @@ class AsynchronousExecutorTypeTest < ActiveRecord::TestCase
|
|||
ActiveRecord::Base.async_query_executor = old_value
|
||||
end
|
||||
|
||||
def test_concurrency_cannot_be_set_with_immediate_or_multi_thread_pool
|
||||
def test_concurrency_cannot_be_set_with_null_executor_or_multi_thread_pool
|
||||
old_value = ActiveRecord::Base.async_query_executor
|
||||
ActiveRecord::Base.async_query_executor = :immediate
|
||||
ActiveRecord::Base.async_query_executor = nil
|
||||
|
||||
assert_raises ArgumentError do
|
||||
ActiveRecord::Base.global_executor_concurrency = 8
|
||||
|
|
|
@ -12,7 +12,11 @@ module ActiveRecord
|
|||
|
||||
def test_scheduled?
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
if in_memory_db?
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
else
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
end
|
||||
assert_predicate defered_posts, :loaded?
|
||||
defered_posts.to_a
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
|
@ -20,7 +24,11 @@ module ActiveRecord
|
|||
|
||||
def test_reset
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
if in_memory_db?
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
else
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
end
|
||||
defered_posts.reset
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
end
|
||||
|
@ -57,7 +65,11 @@ module ActiveRecord
|
|||
Post.transaction do
|
||||
Post.where(author_id: 1).update_all(title: "In Transaction")
|
||||
posts = Post.where(author_id: 1).load_async
|
||||
assert_predicate posts, :scheduled?
|
||||
if in_memory_db?
|
||||
assert_not_predicate posts, :scheduled?
|
||||
else
|
||||
assert_predicate posts, :scheduled?
|
||||
end
|
||||
assert_predicate posts, :loaded?
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
|
@ -83,7 +95,11 @@ module ActiveRecord
|
|||
|
||||
defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async
|
||||
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
if in_memory_db?
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
else
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
end
|
||||
|
||||
monitor.synchronize do
|
||||
condition.wait_until { status[:executed] }
|
||||
|
@ -127,4 +143,142 @@ module ActiveRecord
|
|||
assert_predicate defered_posts, :loaded?
|
||||
end
|
||||
end
|
||||
|
||||
unless in_memory_db?
|
||||
class LoadAsyncNullExecutorTest < ActiveRecord::TestCase
|
||||
self.use_transactional_tests = false
|
||||
|
||||
fixtures :posts, :comments
|
||||
|
||||
def setup
|
||||
@old_config = ActiveRecord::Base.async_query_executor
|
||||
ActiveRecord::Base.async_query_executor = nil
|
||||
ActiveRecord::Base.establish_connection :arunit
|
||||
end
|
||||
|
||||
def teardown
|
||||
ActiveRecord::Base.async_query_executor = @old_config
|
||||
ActiveRecord::Base.establish_connection :arunit
|
||||
end
|
||||
|
||||
def test_scheduled?
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
assert_predicate defered_posts, :loaded?
|
||||
defered_posts.to_a
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
end
|
||||
|
||||
def test_reset
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
defered_posts.reset
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
end
|
||||
|
||||
def test_simple_query
|
||||
expected_records = Post.where(author_id: 1).to_a
|
||||
|
||||
status = {}
|
||||
monitor = Monitor.new
|
||||
condition = monitor.new_cond
|
||||
|
||||
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
|
||||
if event.payload[:name] == "Post Load"
|
||||
status[:executed] = true
|
||||
status[:async] = event.payload[:async]
|
||||
monitor.synchronize { condition.signal }
|
||||
end
|
||||
end
|
||||
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
|
||||
monitor.synchronize do
|
||||
condition.wait_until { status[:executed] }
|
||||
end
|
||||
|
||||
assert_equal expected_records, defered_posts.to_a
|
||||
assert_not_equal Post.connection.supports_concurrent_connections?, status[:async]
|
||||
ensure
|
||||
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
|
||||
end
|
||||
|
||||
def test_load_async_from_transaction
|
||||
posts = nil
|
||||
Post.transaction do
|
||||
Post.where(author_id: 1).update_all(title: "In Transaction")
|
||||
posts = Post.where(author_id: 1).load_async
|
||||
assert_not_predicate posts, :scheduled?
|
||||
assert_predicate posts, :loaded?
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
|
||||
assert_not_nil posts
|
||||
assert_equal ["In Transaction"], posts.map(&:title).uniq
|
||||
end
|
||||
|
||||
def test_eager_loading_query
|
||||
expected_records = Post.where(author_id: 1).eager_load(:comments).to_a
|
||||
|
||||
status = {}
|
||||
monitor = Monitor.new
|
||||
condition = monitor.new_cond
|
||||
|
||||
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
|
||||
if event.payload[:name] == "SQL"
|
||||
status[:executed] = true
|
||||
status[:async] = event.payload[:async]
|
||||
monitor.synchronize { condition.signal }
|
||||
end
|
||||
end
|
||||
|
||||
defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async
|
||||
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
|
||||
monitor.synchronize do
|
||||
condition.wait_until { status[:executed] }
|
||||
end
|
||||
|
||||
assert_equal expected_records, defered_posts.to_a
|
||||
assert_queries(0) do
|
||||
defered_posts.each(&:comments)
|
||||
end
|
||||
|
||||
assert_predicate Post.connection, :supports_concurrent_connections?
|
||||
assert_not status[:async], "Expected status[:async] to be false with NullExecutor"
|
||||
ensure
|
||||
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
|
||||
end
|
||||
|
||||
def test_contradiction
|
||||
assert_queries(0) do
|
||||
assert_equal [], Post.where(id: []).load_async.to_a
|
||||
end
|
||||
|
||||
Post.where(id: []).load_async.reset
|
||||
end
|
||||
|
||||
def test_pluck
|
||||
titles = Post.where(author_id: 1).pluck(:title)
|
||||
assert_equal titles, Post.where(author_id: 1).load_async.pluck(:title)
|
||||
end
|
||||
|
||||
def test_size
|
||||
expected_size = Post.where(author_id: 1).size
|
||||
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
|
||||
assert_equal expected_size, defered_posts.size
|
||||
assert_predicate defered_posts, :loaded?
|
||||
end
|
||||
|
||||
def test_empty?
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
|
||||
assert_equal false, defered_posts.empty?
|
||||
assert_predicate defered_posts, :loaded?
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue