1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Merge pull request #40037 from Shopify/ar-adapter-async-query

Allow Adapter#select_all to be performed asynchronously from a background thread pool
This commit is contained in:
Eileen M. Uchitelle 2021-02-08 14:06:00 -05:00 committed by GitHub
commit d75c2a1752
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 388 additions and 35 deletions

View file

@ -86,6 +86,7 @@ module ActiveRecord
autoload :AttributeAssignment
autoload :AttributeMethods
autoload :AutosaveAssociation
autoload :AsynchronousQueriesTracker
autoload :LegacyYamlAdapter
@ -104,6 +105,7 @@ module ActiveRecord
end
autoload :Result
autoload :FutureResult
autoload :TableMetadata
autoload :Type
end

View file

@ -0,0 +1,49 @@
# frozen_string_literal: true
module ActiveRecord
class AsynchronousQueriesTracker # :nodoc:
class Session # :nodoc:
def initialize
@active = true
end
def active?
@active
end
def finalize
@active = false
end
end
class << self
def install_executor_hooks(executor = ActiveSupport::Executor)
executor.register_hook(self)
end
def run
ActiveRecord::Base.asynchronous_queries_tracker.start_session
end
def complete(asynchronous_queries_tracker)
asynchronous_queries_tracker.finalize_session
end
end
attr_reader :current_session
def initialize
@current_session = nil
end
def start_session
@current_session = Session.new
self
end
def finalize_session
@current_session&.finalize
@current_session = nil
end
end
end

View file

@ -144,6 +144,13 @@ module ActiveRecord
@lock_thread = false
@async_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: @size,
max_queue: @size * 4,
fallback_policy: :caller_runs
)
@reaper = Reaper.new(self, db_config.reaping_frequency)
@reaper.run
end
@ -450,6 +457,10 @@ module ActiveRecord
end
end
def schedule_query(future_result) # :nodoc:
@async_executor.post { future_result.execute_or_skip }
end
private
#--
# this is unfortunately not concurrent

View file

@ -59,15 +59,11 @@ module ActiveRecord
end
# Returns an ActiveRecord::Result instance.
def select_all(arel, name = nil, binds = [], preparable: nil)
def select_all(arel, name = nil, binds = [], preparable: nil, async: false)
arel = arel_from_relation(arel)
sql, binds, preparable = to_sql_and_binds(arel, binds, preparable)
if prepared_statements && preparable
select_prepared(sql, name, binds)
else
select(sql, name, binds)
end
select(sql, name, binds, prepare: prepared_statements && preparable, async: async && FutureResult::SelectAll)
rescue ::RangeError
ActiveRecord::Result.new([], [])
end
@ -528,12 +524,27 @@ module ActiveRecord
end
# Returns an ActiveRecord::Result instance.
def select(sql, name = nil, binds = [])
exec_query(sql, name, binds, prepare: false)
end
def select(sql, name = nil, binds = [], prepare: false, async: false)
if async
if current_transaction.joinable?
raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions"
end
def select_prepared(sql, name = nil, binds = [])
exec_query(sql, name, binds, prepare: true)
future_result = async.new(
pool,
sql,
name,
binds,
prepare: prepare,
)
if supports_concurrent_connections? && current_transaction.closed? && ActiveRecord::Base.asynchronous_queries_session
future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
else
future_result.execute!(self)
end
return future_result
end
exec_query(sql, name, binds, prepare: prepare)
end
def sql_for_insert(sql, pk, binds)

View file

@ -93,7 +93,7 @@ module ActiveRecord
end
end
def select_all(arel, name = nil, binds = [], preparable: nil)
def select_all(arel, name = nil, binds = [], preparable: nil, async: false)
arel = arel_from_relation(arel)
# If arel is locked this is a SELECT ... FOR UPDATE or somesuch.
@ -101,13 +101,29 @@ module ActiveRecord
if @query_cache_enabled && !(arel.respond_to?(:locked) && arel.locked)
sql, binds, preparable = to_sql_and_binds(arel, binds, preparable)
cache_sql(sql, name, binds) { super(sql, name, binds, preparable: preparable) }
if async
lookup_sql_cache(sql, name, binds) || super(sql, name, binds, preparable: preparable, async: async)
else
cache_sql(sql, name, binds) { super(sql, name, binds, preparable: preparable, async: async) }
end
else
super
end
end
private
def lookup_sql_cache(sql, name, binds)
@lock.synchronize do
if @query_cache[sql].key?(binds)
ActiveSupport::Notifications.instrument(
"sql.active_record",
cache_notification_info(sql, name, binds)
)
@query_cache[sql][binds]
end
end
end
def cache_sql(sql, name, binds)
@lock.synchronize do
result =

View file

@ -424,6 +424,10 @@ module ActiveRecord
false
end
def supports_concurrent_connections?
true
end
# This is meant to be implemented by the adapters that support extensions
def disable_extension(name)
end
@ -706,7 +710,7 @@ module ActiveRecord
exception
end
def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = nil) # :doc:
def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = nil, async: false) # :doc:
@instrumenter.instrument(
"sql.active_record",
sql: sql,
@ -714,6 +718,7 @@ module ActiveRecord
binds: binds,
type_casted_binds: type_casted_binds,
statement_name: statement_name,
async: async,
connection: self) do
@lock.synchronize do
yield

View file

@ -197,11 +197,11 @@ module ActiveRecord
#++
# Executes the SQL statement in the context of this connection.
def execute(sql, name = nil)
def execute(sql, name = nil, async: false)
materialize_transactions
mark_transaction_written_if_write(sql)
log(sql, name) do
log(sql, name, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@connection.query(sql)
end
@ -211,8 +211,8 @@ module ActiveRecord
# Mysql2Adapter doesn't have to free a result after using it, but we use this method
# to write stuff in an abstract way without concerning ourselves about whether it
# needs to be explicitly freed or not.
def execute_and_free(sql, name = nil) # :nodoc:
yield execute(sql, name)
def execute_and_free(sql, name = nil, async: false) # :nodoc:
yield execute(sql, name, async: async)
end
def begin_db_transaction

View file

@ -38,7 +38,7 @@ module ActiveRecord
end
# Executes the SQL statement in the context of this connection.
def execute(sql, name = nil)
def execute(sql, name = nil, async: false)
check_if_write_query(sql)
# make sure we carry over any changes to ActiveRecord::Base.default_timezone that have been
@ -48,9 +48,9 @@ module ActiveRecord
super
end
def exec_query(sql, name = "SQL", binds = [], prepare: false)
def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false)
if without_prepared_statement?(binds)
execute_and_free(sql, name) do |result|
execute_and_free(sql, name, async: async) do |result|
if result
build_result(columns: result.fields, rows: result.to_a)
else
@ -58,7 +58,7 @@ module ActiveRecord
end
end
else
exec_stmt_and_free(sql, name, binds, cache_stmt: prepare) do |_, result|
exec_stmt_and_free(sql, name, binds, cache_stmt: prepare, async: async) do |_, result|
if result
build_result(columns: result.fields, rows: result.to_a)
else
@ -146,7 +146,7 @@ module ActiveRecord
@max_allowed_packet ||= show_variable("max_allowed_packet")
end
def exec_stmt_and_free(sql, name, binds, cache_stmt: false)
def exec_stmt_and_free(sql, name, binds, cache_stmt: false, async: false)
check_if_write_query(sql)
materialize_transactions
@ -158,7 +158,7 @@ module ActiveRecord
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds) do
log(sql, name, binds, type_casted_binds, async: async) do
if cache_stmt
stmt = @statements[sql] ||= @connection.prepare(sql)
else

View file

@ -47,8 +47,8 @@ module ActiveRecord
end
end
def exec_query(sql, name = "SQL", binds = [], prepare: false)
execute_and_clear(sql, name, binds, prepare: prepare) do |result|
def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false)
execute_and_clear(sql, name, binds, prepare: prepare, async: async) do |result|
types = {}
fields = result.fields
fields.each_with_index do |fname, i|

View file

@ -648,13 +648,13 @@ module ActiveRecord
FEATURE_NOT_SUPPORTED = "0A000" #:nodoc:
def execute_and_clear(sql, name, binds, prepare: false)
def execute_and_clear(sql, name, binds, prepare: false, async: false)
check_if_write_query(sql)
if !prepare || without_prepared_statement?(binds)
result = exec_no_cache(sql, name, binds)
result = exec_no_cache(sql, name, binds, async: async)
else
result = exec_cache(sql, name, binds)
result = exec_cache(sql, name, binds, async: async)
end
begin
ret = yield result
@ -664,7 +664,7 @@ module ActiveRecord
ret
end
def exec_no_cache(sql, name, binds)
def exec_no_cache(sql, name, binds, async: false)
materialize_transactions
mark_transaction_written_if_write(sql)
@ -673,14 +673,14 @@ module ActiveRecord
update_typemap_for_default_timezone
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds) do
log(sql, name, binds, type_casted_binds, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@connection.exec_params(sql, type_casted_binds)
end
end
end
def exec_cache(sql, name, binds)
def exec_cache(sql, name, binds, async: false)
materialize_transactions
mark_transaction_written_if_write(sql)
update_typemap_for_default_timezone
@ -688,7 +688,7 @@ module ActiveRecord
stmt_key = prepare_statement(sql, binds)
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds, stmt_key) do
log(sql, name, binds, type_casted_binds, stmt_key, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@connection.exec_prepared(stmt_key, type_casted_binds)
end

View file

@ -31,7 +31,7 @@ module ActiveRecord
end
end
def exec_query(sql, name = nil, binds = [], prepare: false)
def exec_query(sql, name = nil, binds = [], prepare: false, async: false)
check_if_write_query(sql)
materialize_transactions
@ -39,7 +39,7 @@ module ActiveRecord
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds) do
log(sql, name, binds, type_casted_binds, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
# Don't cache statements if they are not prepared
unless prepare

View file

@ -84,6 +84,7 @@ module ActiveRecord
end
def initialize(connection, logger, connection_options, config)
@memory_database = config[:database] == ":memory:"
super(connection, logger, config)
configure_connection
end
@ -153,6 +154,10 @@ module ActiveRecord
alias supports_insert_on_duplicate_update? supports_insert_on_conflict?
alias supports_insert_conflict_target? supports_insert_on_conflict?
def supports_concurrent_connections?
!@memory_database
end
def active?
!@connection.closed?
end

View file

@ -194,6 +194,15 @@ module ActiveRecord
@@connection_handlers = handlers
end
def self.asynchronous_queries_session # :nodoc:
asynchronous_queries_tracker.current_session
end
def self.asynchronous_queries_tracker # :nodoc:
Thread.current.thread_variable_get(:ar_asynchronous_queries_tracker) ||
Thread.current.thread_variable_set(:ar_asynchronous_queries_tracker, AsynchronousQueriesTracker.new)
end
# Returns the symbol representing the current connected role.
#
# ActiveRecord::Base.connected_to(role: :writing) do

View file

@ -373,6 +373,11 @@ module ActiveRecord
class TransactionRollbackError < StatementInvalid
end
# AsynchronousQueryInsideTransactionError will be raised when attempting
# to perform an aynchronous query from inside a transaction
class AsynchronousQueryInsideTransactionError < ActiveRecordError
end
# SerializationFailure will be raised when a transaction is rolled
# back by the database due to a serialization failure.
class SerializationFailure < TransactionRollbackError

View file

@ -0,0 +1,97 @@
# frozen_string_literal: true
module ActiveRecord
class FutureResult # :nodoc:
Canceled = Class.new(ActiveRecordError)
delegate :empty?, :to_a, to: :result
def initialize(pool, *args, **kwargs)
@mutex = Mutex.new
@session = nil
@pool = pool
@args = args
@kwargs = kwargs
@pending = true
@error = nil
@result = nil
end
def schedule!(session)
@session = session
@pool.schedule_query(self)
end
def execute!(connection)
execute_query(connection)
end
def execute_or_skip
return unless pending?
@pool.with_connection do |connection|
return unless @mutex.try_lock
begin
if pending?
execute_query(connection, async: true)
end
ensure
@mutex.unlock
end
end
end
def result
execute_or_wait
if @error
raise @error
elsif canceled?
raise Canceled
else
@result
end
end
private
def pending?
@pending && (!@session || @session.active?)
end
def canceled?
@session && !@session.active?
end
def execute_or_wait
return unless pending?
@mutex.synchronize do
if pending?
execute_query(@pool.connection)
end
end
end
def execute_query(connection, async: false)
@result = exec_query(connection, *@args, **@kwargs, async: async)
rescue => error
@error = error
ensure
@pending = false
end
def exec_query(connection, *args, **kwargs)
connection.exec_query(*args, **kwargs)
end
class SelectAll < FutureResult # :nodoc:
private
def exec_query(*, **)
super
rescue ::RangeError
ActiveRecord::Result.new([], [])
end
end
end
end

View file

@ -39,6 +39,7 @@ module ActiveRecord
name = "#{payload[:name]} (#{event.duration.round(1)}ms)"
name = "CACHE #{name}" if payload[:cached]
name = "ASYNC #{name}" if payload[:async]
sql = payload[:sql]
binds = nil

View file

@ -240,6 +240,7 @@ To keep using the current cache store, you can turn off cache versioning entirel
initializer "active_record.set_executor_hooks" do
ActiveRecord::QueryCache.install_executor_hooks
ActiveRecord::AsynchronousQueriesTracker.install_executor_hooks
end
initializer "active_record.add_watchable_files" do |app|

View file

@ -91,6 +91,10 @@ module ActiveRecord
n ? hash_rows.last(n) : hash_rows.last
end
def result # :nodoc:
self
end
def cast_values(type_overrides = {}) # :nodoc:
if columns.one?
# Separated to avoid allocating an array per row

View file

@ -343,6 +343,120 @@ module ActiveRecord
end
end
module AsynchronousQueriesSharedTests
def test_async_select_failure
ActiveRecord::Base.asynchronous_queries_tracker.start_session
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_from_transaction
ActiveRecord::Base.asynchronous_queries_tracker.start_session
assert_nothing_raised do
@connection.select_all "SELECT * FROM posts", async: true
end
@connection.transaction do
assert_raises AsynchronousQueryInsideTransactionError do
@connection.select_all "SELECT * FROM posts", async: true
end
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_cache
ActiveRecord::Base.asynchronous_queries_tracker.start_session
@connection.enable_query_cache!
@connection.select_all "SELECT * FROM posts"
result = @connection.select_all "SELECT * FROM posts", async: true
assert_equal Result, result.class
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
@connection.disable_query_cache!
end
def test_async_query_outside_session
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM does_not_exists"
status[:executed] = true
status[:async] = event.payload[:async]
end
end
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
assert_equal true, status[:executed]
assert_equal false, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesTest < ActiveRecord::TestCase
self.use_transactional_tests = false
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
end
def test_async_select_all
ActiveRecord::Base.asynchronous_queries_tracker.start_session
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM posts"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
future_result = @connection.select_all "SELECT * FROM posts", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_kind_of ActiveRecord::Result, future_result.result
assert_equal @connection.supports_concurrent_connections?, status[:async]
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
self.use_transactional_tests = true
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
@connection.materialize_transactions
end
end
class AdapterForeignKeyTest < ActiveRecord::TestCase
self.use_transactional_tests = false

View file

@ -74,6 +74,28 @@ module ActiveRecord
end
end
test "cancel asynchronous queries if an exception is raised" do
unless ActiveRecord::Base.connection.supports_concurrent_connections?
skip "This adapter doesn't support asynchronous queries"
end
app = Class.new(App) do
attr_reader :future_result
def call(env)
@future_result = ActiveRecord::Base.connection.select_all("SELECT * FROM does_not_exists", async: true)
raise NotImplementedError
end
end.new
explosive = middleware(app)
assert_raises(NotImplementedError) { explosive.call(@env) }
assert_raises FutureResult::Canceled do
app.future_result.to_a
end
end
test "doesn't clear active connections when running in a test case" do
executor.wrap do
@management.call(@env)
@ -100,6 +122,7 @@ module ActiveRecord
def executor
@executor ||= Class.new(ActiveSupport::Executor).tap do |exe|
ActiveRecord::QueryCache.install_executor_hooks(exe)
ActiveRecord::AsynchronousQueriesTracker.install_executor_hooks(exe)
end
end