1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Allow async executor to be configurable

This is a followup/alternative to #41406. This change wouldn't work for
GitHub because we intend to implement an executor for each database and
use the database configuration to set the `min_threads` and
`max_threads` for each one.

The changes here borrow from #41406 by implementing an
`Concurrent::ImmediateExecutor` by default. Otherwise applications have
the option of having one global thread pool that is used by all connections
or a thread pool for each connection. A global thread pool can set with
`config.active_record.async_query_executor = :global_thread_pool`. This
will create a single `Concurrent::ThreadPoolExecutor` for applications
to utilize. By default the concurrency is 4, but it can be changed for the
`global_thread_pool` by setting `global_executor_concurrency` to another
number. If applications want to use a thread pool per database
connection they can set `config.active_record.async_query_executor =
:multi_thread_pool`. This will create a `Concurrent::ThreadPoolExecutor`
for each database connection and set the `min_threads` and `max_threads`
by their configuration values or the defaults.

I've also moved the async tests out of the adapter test and into their
own tests and added tests for all the new functionality. This change
would allow us at GitHub to control threads per database and per
writer/reader or other apps to use one global executor. The immediate
executor allows apps to no-op by default.

Took the immediate executor idea from Jean's PR.
Co-authored-by: Jean Boussier <jean.boussier@gmail.com>
This commit is contained in:
eileencodes 2021-02-18 13:10:57 -05:00
parent 5b2fb8a6ff
commit d9639a211f
No known key found for this signature in database
GPG key ID: BA5C575120BBE8DF
9 changed files with 394 additions and 122 deletions

View file

@ -1,3 +1,17 @@
* Allow applications to configure the thread pool for async queries
Some applications may want one thread pool per database whereas others want to use
a single global thread pool for all queries. By default Rails will set `async_query_executor`
to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op.
To create one thread pool for all database connections to use applications can set
`config.active_record.async_query_executor` to `:global_thread_pool` and optionally define
`config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want
to have a thread pool for each database connection, `config.active_record.async_query_executor` can
be set to `:multi_thread_pool`. The configuration for each thread pool is set in the database
configuration.
*Eileen M. Uchitelle*
* Allow new syntax for `enum` to avoid leading `_` from reserved options.
Before:

View file

@ -144,12 +144,7 @@ module ActiveRecord
@lock_thread = false
@async_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: @size,
max_queue: @size * 4,
fallback_policy: :caller_runs
)
@async_executor = build_async_executor
@reaper = Reaper.new(self, db_config.reaping_frequency)
@reaper.run
@ -463,6 +458,22 @@ module ActiveRecord
end
private
def build_async_executor
case Base.async_query_executor
when :multi_thread_pool
Concurrent::ThreadPoolExecutor.new(
min_threads: @db_config.min_threads,
max_threads: @db_config.max_threads,
max_queue: @db_config.max_queue,
fallback_policy: :caller_runs
)
when :global_thread_pool
Base.global_thread_pool_async_query_executor
else
Base.immediate_query_executor
end
end
#--
# this is unfortunately not concurrent
def bulk_make_new_connections(num_new_conns_needed)

View file

@ -157,6 +157,44 @@ module ActiveRecord
mattr_accessor :application_record_class, instance_accessor: false, default: nil
# Sets the async_query_executor for an application. By default the thread pool executor
# set to `:immediate. Options are:
#
# * :immediate - Initializes a single +Concurrent::ImmediateExecutor+
# * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+
# that uses the +async_query_concurrency+ for the +max_threads+ value.
# * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each
# database connection. The initializer values are defined in the configuration hash.
mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate
def self.immediate_query_executor # :nodoc:
@@immediate_query_executor ||= Concurrent::ImmediateExecutor.new
end
def self.global_thread_pool_async_query_executor # :nodoc:
concurrency = global_executor_concurrency || 4
@@global_thread_pool_async_query_executor ||= Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: concurrency,
max_queue: concurrency * 4,
fallback_policy: :caller_runs
)
end
# Set the +global_executor_concurrency+. This configuration value can only be used
# with the global thread pool async query executor.
def self.global_executor_concurrency=(global_executor_concurrency)
if async_query_executor == :immediate || async_query_executor == :multi_thread_pool
raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op."
end
@@global_executor_concurrency = global_executor_concurrency
end
def self.global_executor_concurrency # :nodoc:
@@global_executor_concurrency ||= nil
end
def self.application_record_class? # :nodoc:
if Base.application_record_class
self == Base.application_record_class

View file

@ -48,6 +48,18 @@ module ActiveRecord
raise NotImplementedError
end
def min_threads
raise NotImplementedError
end
def max_threads
raise NotImplementedError
end
def max_queue
raise NotImplementedError
end
def checkout_timeout
raise NotImplementedError
end

View file

@ -66,6 +66,18 @@ module ActiveRecord
(configuration_hash[:pool] || 5).to_i
end
def min_threads
(configuration_hash[:min_threads] || 0).to_i
end
def max_threads
(configuration_hash[:max_threads] || pool).to_i
end
def max_queue
max_threads * 4
end
def checkout_timeout
(configuration_hash[:checkout_timeout] || 5).to_f
end

View file

@ -343,122 +343,6 @@ module ActiveRecord
end
end
module AsynchronousQueriesSharedTests
def test_async_select_failure
ActiveRecord::Base.asynchronous_queries_tracker.start_session
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_from_transaction
ActiveRecord::Base.asynchronous_queries_tracker.start_session
assert_nothing_raised do
@connection.select_all "SELECT * FROM posts", async: true
end
@connection.transaction do
assert_raises AsynchronousQueryInsideTransactionError do
@connection.select_all "SELECT * FROM posts", async: true
end
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_cache
ActiveRecord::Base.asynchronous_queries_tracker.start_session
@connection.enable_query_cache!
@connection.select_all "SELECT * FROM posts"
result = @connection.select_all "SELECT * FROM posts", async: true
assert_equal Result, result.class
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
@connection.disable_query_cache!
end
def test_async_query_foreground_fallback
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM does_not_exists"
status[:executed] = true
status[:async] = event.payload[:async]
end
end
@connection.pool.stub(:schedule_query, proc { }) do
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
end
assert_equal true, status[:executed]
assert_equal false, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesTest < ActiveRecord::TestCase
self.use_transactional_tests = false
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
end
def test_async_select_all
ActiveRecord::Base.asynchronous_queries_tracker.start_session
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM posts"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
future_result = @connection.select_all "SELECT * FROM posts", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_kind_of ActiveRecord::Result, future_result.result
assert_equal @connection.supports_concurrent_connections?, status[:async]
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
self.use_transactional_tests = true
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
@connection.materialize_transactions
end
end
class AdapterForeignKeyTest < ActiveRecord::TestCase
self.use_transactional_tests = false

View file

@ -0,0 +1,267 @@
# frozen_string_literal: true
require "cases/helper"
require "support/connection_helper"
require "models/post"
module AsynchronousQueriesSharedTests
def test_async_select_failure
ActiveRecord::Base.asynchronous_queries_tracker.start_session
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_from_transaction
ActiveRecord::Base.asynchronous_queries_tracker.start_session
assert_nothing_raised do
@connection.select_all "SELECT * FROM posts", async: true
end
@connection.transaction do
assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do
@connection.select_all "SELECT * FROM posts", async: true
end
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_cache
ActiveRecord::Base.asynchronous_queries_tracker.start_session
@connection.enable_query_cache!
@connection.select_all "SELECT * FROM posts"
result = @connection.select_all "SELECT * FROM posts", async: true
assert_equal ActiveRecord::Result, result.class
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
@connection.disable_query_cache!
end
def test_async_query_foreground_fallback
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM does_not_exists"
status[:executed] = true
status[:async] = event.payload[:async]
end
end
@connection.pool.stub(:schedule_query, proc { }) do
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
end
assert_equal true, status[:executed]
assert_equal false, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesTest < ActiveRecord::TestCase
self.use_transactional_tests = false
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
end
def test_async_select_all
ActiveRecord::Base.asynchronous_queries_tracker.start_session
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM posts"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
future_result = @connection.select_all "SELECT * FROM posts", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_kind_of ActiveRecord::Result, future_result.result
assert_equal @connection.supports_concurrent_connections?, status[:async]
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
self.use_transactional_tests = true
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
@connection.materialize_transactions
end
end
class AsynchronousExecutorTypeTest < ActiveRecord::TestCase
def test_immediate_configuration_uses_a_single_immediate_executor_by_default
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :immediate
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ImmediateExecutor)
assert async_pool2.is_a?(Concurrent::ImmediateExecutor)
assert_equal 2, handler.all_connection_pools.count
assert_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.async_query_executor = old_value
end
def test_one_global_thread_pool_is_used_when_set_with_default_concurrency
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :global_thread_pool
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)
assert 0, async_pool1.min_length
assert 4, async_pool1.max_length
assert 16, async_pool1.max_queue
assert :caller_runs, async_pool1.fallback_policy
assert 0, async_pool2.min_length
assert 4, async_pool2.max_length
assert 16, async_pool2.max_queue
assert :caller_runs, async_pool2.fallback_policy
assert_equal 2, handler.all_connection_pools.count
assert_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.async_query_executor = old_value
end
def test_concurrency_can_be_set_on_global_thread_pool
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :global_thread_pool
old_concurrency = ActiveRecord::Base.global_executor_concurrency
ActiveRecord::Base.global_executor_concurrency = 8
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)
assert 0, async_pool1.min_length
assert 8, async_pool1.max_length
assert 32, async_pool1.max_queue
assert :caller_runs, async_pool1.fallback_policy
assert 0, async_pool2.min_length
assert 8, async_pool2.max_length
assert 32, async_pool2.max_queue
assert :caller_runs, async_pool2.fallback_policy
assert_equal 2, handler.all_connection_pools.count
assert_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.global_executor_concurrency = old_concurrency
ActiveRecord::Base.async_query_executor = old_value
end
def test_concurrency_cannot_be_set_with_immediate_or_multi_thread_pool
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :immediate
assert_raises ArgumentError do
ActiveRecord::Base.global_executor_concurrency = 8
end
ActiveRecord::Base.async_query_executor = :multi_thread_pool
assert_raises ArgumentError do
ActiveRecord::Base.global_executor_concurrency = 8
end
ensure
ActiveRecord::Base.async_query_executor = old_value
end
def test_one_global_thread_pool_uses_concurrency_if_set
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :multi_thread_pool
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
config_hash = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary").configuration_hash
new_config_hash = config_hash.merge(min_threads: 0, max_threads: 10)
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new("arunit", "primary", new_config_hash)
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)
assert 0, async_pool1.min_length
assert 10, async_pool1.max_length
assert 40, async_pool1.max_queue
assert :caller_runs, async_pool1.fallback_policy
assert 0, async_pool2.min_length
assert 4, async_pool2.max_length
assert 16, async_pool2.max_queue
assert :caller_runs, async_pool2.fallback_policy
assert_equal 2, handler.all_connection_pools.count
assert_not_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.async_query_executor = old_value
end
end

View file

@ -20,6 +20,39 @@ module ActiveRecord
assert_equal 5, config.pool
end
def test_min_threads_with_value
config = HashConfig.new("default_env", "primary", min_threads: "1")
assert_equal 1, config.min_threads
end
def test_min_threads_default
config = HashConfig.new("default_env", "primary", {})
assert_equal 0, config.min_threads
end
def test_max_threads_with_value
config = HashConfig.new("default_env", "primary", max_threads: "10")
assert_equal 10, config.max_threads
end
def test_max_threads_default_uses_pool_default
config = HashConfig.new("default_env", "primary", {})
assert_equal 5, config.pool
assert_equal 5, config.max_threads
end
def test_max_threads_uses_pool_when_set
config = HashConfig.new("default_env", "primary", pool: 1)
assert_equal 1, config.pool
assert_equal 1, config.max_threads
end
def test_max_queue_is_pool_multipled_by_4
config = HashConfig.new("default_env", "primary", {})
assert_equal 5, config.max_threads
assert_equal config.max_threads * 4, config.max_queue
end
def test_checkout_timeout_default_when_nil
config = HashConfig.new("default_env", "primary", checkout_timeout: nil)
assert_equal 5.0, config.checkout_timeout

View file

@ -20,6 +20,7 @@ module ARTest
def self.connect
ActiveRecord::Base.legacy_connection_handling = false
ActiveRecord::Base.async_query_executor = :global_thread_pool
puts "Using #{connection_name}"
ActiveRecord::Base.logger = ActiveSupport::Logger.new("debug.log", 0, 100 * 1024 * 1024)
ActiveRecord::Base.configurations = test_configuration_hashes