1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Implement Relation#load_async to schedule the query on the background thread pool

This is built on previous async support added to Adapter#select_all.
This commit is contained in:
Jean Boussier 2021-02-08 14:22:26 +01:00
parent d79f3b2a37
commit 2a90104989
10 changed files with 261 additions and 29 deletions

View file

@ -1,3 +1,22 @@
* Add `ActiveRecord::Relation#load_async`.
This method schedules the query to be performed asynchronously from a thread pool.
If the result is accessed before a background thread had the opportunity to perform
the query, it will be performed in the foreground.
This is useful for queries that can be performed long enough before their result will be
needed, or for controllers which need to perform several independant queries.
```ruby
def index
@categories = Category.some_complex_scope.load_async
@posts = Post.some_complex_scope.load_async
end
```
*Jean Boussier*
* Implemented `ActiveRecord::Relation#excluding` method.
This method excludes the specified record (or collection of records) from

View file

@ -2,6 +2,14 @@
module ActiveRecord
class AsynchronousQueriesTracker # :nodoc:
module NullSession # :nodoc:
class << self
def active?
true
end
end
end
class Session # :nodoc:
def initialize
@active = true
@ -33,7 +41,7 @@ module ActiveRecord
attr_reader :current_session
def initialize
@current_session = nil
@current_session = NullSession
end
def start_session
@ -43,7 +51,7 @@ module ActiveRecord
def finalize_session
@current_session&.finalize
@current_session = nil
@current_session = NullSession
end
end
end

View file

@ -459,6 +459,7 @@ module ActiveRecord
def schedule_query(future_result) # :nodoc:
@async_executor.post { future_result.execute_or_skip }
Thread.pass
end
private

View file

@ -537,7 +537,7 @@ module ActiveRecord
binds,
prepare: prepare,
)
if supports_concurrent_connections? && current_transaction.closed? && ActiveRecord::Base.asynchronous_queries_session
if supports_concurrent_connections? && current_transaction.closed?
future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
else
future_result.execute!(self)

View file

@ -28,6 +28,12 @@ module ActiveRecord
execute_query(connection)
end
def cancel
@pending = false
@error = Canceled
self
end
def execute_or_skip
return unless pending?

View file

@ -44,7 +44,14 @@ module ActiveRecord
# Post.find_by_sql ["SELECT title FROM posts WHERE author = ? AND created > ?", author_id, start_date]
# Post.find_by_sql ["SELECT body FROM comments WHERE author = :user_id OR approved_by = :user_id", { :user_id => user_id }]
def find_by_sql(sql, binds = [], preparable: nil, &block)
result_set = connection.select_all(sanitize_sql(sql), "#{name} Load", binds, preparable: preparable)
_load_from_sql(_query_by_sql(sql, binds, preparable: preparable), &block)
end
def _query_by_sql(sql, binds = [], preparable: nil, async: false) # :nodoc:
connection.select_all(sanitize_sql(sql), "#{name} Load", binds, preparable: preparable, async: async)
end
def _load_from_sql(result_set, &block) # :nodoc:
column_types = result_set.column_types
unless column_types.empty?

View file

@ -31,6 +31,7 @@ module ActiveRecord
@loaded = false
@predicate_builder = predicate_builder
@delegate_to_klass = false
@future_result = nil
end
def initialize_copy(other)
@ -261,13 +262,20 @@ module ActiveRecord
# Returns size of the records.
def size
loaded? ? @records.length : count(:all)
if loaded?
records.length
else
count(:all)
end
end
# Returns true if there are no records.
def empty?
return @records.empty? if loaded?
!exists?
if loaded?
records.empty?
else
!exists?
end
end
# Returns true if there are no records.
@ -642,6 +650,28 @@ module ActiveRecord
where(*args).delete_all
end
# Schedule the query to be performed from a background thread pool.
#
# Post.where(published: true).load_async # => #<ActiveRecord::Relation>
def load_async
unless loaded?
result = exec_main_query(async: connection.current_transaction.closed?)
if result.is_a?(Array)
@records = result
else
@future_result = result
end
@loaded = true
end
self
end
# Returns <tt>true</tt> if the relation was scheduled on the background
# thread pool.
def scheduled?
!!@future_result
end
# Causes the records to be loaded from the database if they have not
# been loaded already. You can use this if for some reason you need
# to explicitly load some records before actually using them. The
@ -649,7 +679,7 @@ module ActiveRecord
#
# Post.where(published: true).load # => #<ActiveRecord::Relation>
def load(&block)
unless loaded?
if !loaded? || scheduled?
@records = exec_queries(&block)
@loaded = true
end
@ -664,6 +694,8 @@ module ActiveRecord
end
def reset
@future_result&.cancel
@future_result = nil
@delegate_to_klass = false
@to_sql = @arel = @loaded = @should_eager_load = nil
@offsets = @take = nil
@ -855,23 +887,15 @@ module ActiveRecord
def exec_queries(&block)
skip_query_cache_if_necessary do
records =
if where_clause.contradiction?
[]
elsif eager_loading?
apply_join_dependency do |relation, join_dependency|
if relation.null_relation?
[]
else
relation = join_dependency.apply_column_aliases(relation)
rows = connection.select_all(relation.arel, "SQL")
join_dependency.instantiate(rows, strict_loading_value, &block)
end.freeze
end
else
klass.find_by_sql(arel, &block).freeze
end
rows = if scheduled?
future = @future_result
@future_result = nil
future.result
else
exec_main_query
end
records = instantiate_records(rows, &block)
preload_associations(records) unless skip_preloading_value
records.each(&:readonly!) if readonly_value
@ -881,6 +905,37 @@ module ActiveRecord
end
end
def exec_main_query(async: false)
skip_query_cache_if_necessary do
if where_clause.contradiction?
[].freeze
elsif eager_loading?
apply_join_dependency do |relation, join_dependency|
if relation.null_relation?
[].freeze
else
relation = join_dependency.apply_column_aliases(relation)
@_join_dependency = join_dependency
connection.select_all(relation.arel, "SQL", async: async)
end
end
else
klass._query_by_sql(arel, async: async)
end
end
end
def instantiate_records(rows, &block)
return [].freeze if rows.empty?
if eager_loading?
records = @_join_dependency.instantiate(rows, strict_loading_value, &block).freeze
@_join_dependency = nil
records
else
klass._load_from_sql(rows, &block).freeze
end
end
def skip_query_cache_if_necessary
if skip_query_cache_value
uncached do

View file

@ -102,6 +102,10 @@ module ActiveRecord
self
end
def cancel # :nodoc:
self
end
def cast_values(type_overrides = {}) # :nodoc:
if columns.one?
# Separated to avoid allocating an array per row

View file

@ -385,7 +385,7 @@ module ActiveRecord
@connection.disable_query_cache!
end
def test_async_query_outside_session
def test_async_query_foreground_fallback
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
@ -395,10 +395,12 @@ module ActiveRecord
end
end
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
@connection.pool.stub(:schedule_query, proc { }) do
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
end
assert_equal true, status[:executed]

View file

@ -0,0 +1,130 @@
# frozen_string_literal: true
require "cases/helper"
require "models/post"
require "models/comment"
module ActiveRecord
class LoadAsyncTest < ActiveRecord::TestCase
self.use_transactional_tests = false
fixtures :posts, :comments
def test_scheduled?
defered_posts = Post.where(author_id: 1).load_async
assert_predicate defered_posts, :scheduled?
assert_predicate defered_posts, :loaded?
defered_posts.to_a
assert_not_predicate defered_posts, :scheduled?
end
def test_reset
defered_posts = Post.where(author_id: 1).load_async
assert_predicate defered_posts, :scheduled?
defered_posts.reset
assert_not_predicate defered_posts, :scheduled?
end
def test_simple_query
expected_records = Post.where(author_id: 1).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "Post Load"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
defered_posts = Post.where(author_id: 1).load_async
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_equal expected_records, defered_posts.to_a
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
def test_load_async_from_transaction
posts = nil
Post.transaction do
Post.where(author_id: 1).update_all(title: "In Transaction")
posts = Post.where(author_id: 1).load_async
assert_predicate posts, :scheduled?
assert_predicate posts, :loaded?
raise ActiveRecord::Rollback
end
assert_not_nil posts
assert_equal ["In Transaction"], posts.map(&:title).uniq
end
def test_eager_loading_query
expected_records = Post.where(author_id: 1).eager_load(:comments).to_a
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:name] == "SQL"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async
assert_predicate defered_posts, :scheduled?
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_equal expected_records, defered_posts.to_a
assert_queries(0) do
defered_posts.each(&:comments)
end
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
def test_contradiction
assert_queries(0) do
assert_equal [], Post.where(id: []).load_async.to_a
end
Post.where(id: []).load_async.reset
end
def test_pluck
titles = Post.where(author_id: 1).pluck(:title)
assert_equal titles, Post.where(author_id: 1).load_async.pluck(:title)
end
def test_size
expected_size = Post.where(author_id: 1).size
defered_posts = Post.where(author_id: 1).load_async
assert_equal expected_size, defered_posts.size
assert_predicate defered_posts, :loaded?
end
def test_empty?
defered_posts = Post.where(author_id: 1).load_async
assert_equal false, defered_posts.empty?
assert_predicate defered_posts, :loaded?
end
end
end