Fiber-safe ConnectionPool

Replaces a few hard-wired references to Thread/Thread.current within
ActiveRecord::ConnectionPool with IsolationExecutionState.
A few tweaks were required to IsolationExecutionState so that
I could implement the various cache key methods with a reference
to the current thread/fiber.
This commit is contained in:
machty 2022-01-08 13:11:05 -05:00
parent aaa64687e8
commit 449101e753
5 changed files with 179 additions and 80 deletions

View File

@ -1,3 +1,11 @@
* Make `ActiveRecord::ConnectionPool` Fiber-safe
When `ActiveSupport::IsolatedExecutionState.isolation_level` is set to `:fiber`,
the connection pool now supports multiple Fibers from the same Thread checking
out connections from the pool.
*Alex Matchneer*
* Add `update_attribute!` to `ActiveRecord::Persistence`
Similar to `update_attribute`, but raises `ActiveRecord::RecordNotSaved` when a `before_*` callback throws `:abort`.

View File

@ -166,7 +166,7 @@ module ActiveRecord
def lock_thread=(lock_thread)
if lock_thread
@lock_thread = Thread.current
@lock_thread = ActiveSupport::IsolatedExecutionState.context
else
@lock_thread = nil
end
@ -197,7 +197,7 @@ module ActiveRecord
# This method only works for connections that have been obtained through
# #connection or #with_connection methods, connections obtained through
# #checkout will not be automatically released.
def release_connection(owner_thread = Thread.current)
def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context)
if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
checkin conn
end
@ -208,7 +208,7 @@ module ActiveRecord
# exists checkout a connection, yield it to the block, and checkin the
# connection when finished.
def with_connection
unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
conn = connection
fresh_connection = true
end
@ -510,7 +510,7 @@ module ActiveRecord
end
def current_thread
@lock_thread || Thread.current
@lock_thread || ActiveSupport::IsolatedExecutionState.context
end
# Take control of all existing connections so a "group" action such as
@ -527,13 +527,13 @@ module ActiveRecord
def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
collected_conns = synchronize do
# account for our own connections
@connections.select { |conn| conn.owner == Thread.current }
@connections.select { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context }
end
newly_checked_out = []
timeout_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + (@checkout_timeout * 2)
@available.with_a_bias_for(Thread.current) do
@available.with_a_bias_for(ActiveSupport::IsolatedExecutionState.context) do
loop do
synchronize do
return if collected_conns.size == @connections.size && @now_connecting == 0
@ -580,7 +580,7 @@ module ActiveRecord
thread_report = []
@connections.each do |conn|
unless conn.owner == Thread.current
unless conn.owner == ActiveSupport::IsolatedExecutionState.context
thread_report << "#{conn} is owned by #{conn.owner}"
end
end

View File

@ -224,16 +224,16 @@ module ActiveRecord
def lease
if in_use?
msg = +"Cannot lease connection, "
if @owner == Thread.current
if @owner == ActiveSupport::IsolatedExecutionState.context
msg << "it is already leased by the current thread."
else
msg << "it is already in use by a different thread: #{@owner}. " \
"Current thread: #{Thread.current}."
"Current thread: #{ActiveSupport::IsolatedExecutionState.context}."
end
raise ActiveRecordError, msg
end
@owner = Thread.current
@owner = ActiveSupport::IsolatedExecutionState.context
end
def connection_class # :nodoc:
@ -264,10 +264,10 @@ module ActiveRecord
# this method must only be called while holding connection pool's mutex
def expire
if in_use?
if @owner != Thread.current
if @owner != ActiveSupport::IsolatedExecutionState.context
raise ActiveRecordError, "Cannot expire connection, " \
"it is owned by a different thread: #{@owner}. " \
"Current thread: #{Thread.current}."
"Current thread: #{ActiveSupport::IsolatedExecutionState.context}."
end
@idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@ -280,10 +280,10 @@ module ActiveRecord
# this method must only be called while holding connection pool's mutex (and a desire for segfaults)
def steal! # :nodoc:
if in_use?
if @owner != Thread.current
if @owner != ActiveSupport::IsolatedExecutionState.context
pool.send :remove_connection_from_thread_cache, self, @owner
@owner = Thread.current
@owner = ActiveSupport::IsolatedExecutionState.context
end
else
raise ActiveRecordError, "Cannot steal connection, it is not currently leased."

View File

@ -5,11 +5,11 @@ require "concurrent/atomic/count_down_latch"
module ActiveRecord
module ConnectionAdapters
class ConnectionPoolTest < ActiveRecord::TestCase
module ConnectionPoolTests
attr_reader :pool
def setup
super
@previous_isolation_level = ActiveSupport::IsolatedExecutionState.isolation_level
# Keep a duplicate pool so we do not bother others
@db_config = ActiveRecord::Base.connection_pool.db_config
@ -27,8 +27,10 @@ module ActiveRecord
end
end
teardown do
def teardown
super
@pool.disconnect!
ActiveSupport::IsolatedExecutionState.isolation_level = @previous_isolation_level
end
def active_connections(pool)
@ -48,7 +50,7 @@ module ActiveRecord
def test_released_connection_moves_between_threads
thread_conn = nil
Thread.new {
new_thread {
pool.with_connection do |conn|
thread_conn = conn
end
@ -56,7 +58,7 @@ module ActiveRecord
assert thread_conn
Thread.new {
new_thread {
pool.with_connection do |conn|
assert_equal thread_conn, conn
end
@ -69,7 +71,7 @@ module ActiveRecord
main_thread = pool.connection
assert_equal 1, active_connections(pool).size
Thread.new {
new_thread {
pool.with_connection do |conn|
assert conn
assert_equal 2, active_connections(pool).size
@ -102,11 +104,12 @@ module ActiveRecord
end
def test_full_pool_blocks
skip_fiber_testing
cs = @pool.size.times.map { @pool.checkout }
t = Thread.new { @pool.checkout }
t = new_thread { @pool.checkout }
# make sure our thread is in the timeout section
Thread.pass until @pool.num_waiting_in_queue == 1
pass_to(t) until @pool.num_waiting_in_queue == 1
connection = cs.first
connection.close
@ -114,6 +117,7 @@ module ActiveRecord
end
def test_full_pool_blocking_shares_load_interlock
skip_fiber_testing
@pool.instance_variable_set(:@size, 1)
load_interlock_latch = Concurrent::CountDownLatch.new
@ -122,7 +126,7 @@ module ActiveRecord
able_to_get_connection = false
able_to_load = false
thread_with_load_interlock = Thread.new do
thread_with_load_interlock = new_thread do
ActiveSupport::Dependencies.interlock.running do
load_interlock_latch.count_down
connection_latch.wait
@ -133,7 +137,7 @@ module ActiveRecord
end
end
thread_with_last_connection = Thread.new do
thread_with_last_connection = new_thread do
@pool.with_connection do
connection_latch.count_down
load_interlock_latch.wait
@ -152,11 +156,12 @@ module ActiveRecord
end
def test_removing_releases_latch
skip_fiber_testing
cs = @pool.size.times.map { @pool.checkout }
t = Thread.new { @pool.checkout }
t = new_thread { @pool.checkout }
# make sure our thread is in the timeout section
Thread.pass until @pool.num_waiting_in_queue == 1
pass_to(t) until @pool.num_waiting_in_queue == 1
connection = cs.first
@pool.remove connection
@ -179,13 +184,13 @@ module ActiveRecord
def test_reap_inactive
ready = Concurrent::CountDownLatch.new
@pool.checkout
child = Thread.new do
child = new_thread do
@pool.checkout
@pool.checkout
ready.count_down
Thread.stop
stop_thread
end
ready.wait
pass_to(child) until ready.wait(0)
assert_equal 3, active_connections(@pool).size
@ -273,7 +278,7 @@ module ActiveRecord
idle_conn = @pool.checkout
recent_conn = @pool.checkout
active_conn = @pool.checkout
_dead_conn = Thread.new { @pool.checkout }.join
_dead_conn = new_thread { @pool.checkout }.join
@pool.checkin idle_conn
@pool.checkin recent_conn
@ -327,7 +332,7 @@ module ActiveRecord
assert_not_nil main_connection
threads = []
4.times do |i|
threads << Thread.new(i) do
threads << new_thread(i) do
thread_connection = pool.connection
assert_not_nil thread_connection
thread_connection.close
@ -336,7 +341,7 @@ module ActiveRecord
threads.each(&:join)
Thread.new do
new_thread do
assert pool.connection
pool.connection.close
end.join
@ -364,6 +369,8 @@ module ActiveRecord
# available connections slowly, ensuring the wakeup order is
# correct in this case.
def test_checkout_fairness
skip_fiber_testing
@pool.instance_variable_set(:@size, 10)
expected = (1..@pool.size).to_a.freeze
# check out all connections so our threads start out waiting
@ -373,7 +380,7 @@ module ActiveRecord
errors = []
threads = expected.map do |i|
t = Thread.new {
t = new_thread {
begin
@pool.checkout # never checked back in
mutex.synchronize { order << i }
@ -381,7 +388,7 @@ module ActiveRecord
mutex.synchronize { errors << e }
end
}
Thread.pass until @pool.num_waiting_in_queue == i
pass_to(t) until @pool.num_waiting_in_queue == i
t
end
@ -402,6 +409,8 @@ module ActiveRecord
# group1 threads, and the fact that only group1 and no group2
# threads acquired a connection is enforced.
def test_checkout_fairness_by_group
skip_fiber_testing
@pool.instance_variable_set(:@size, 10)
# take all the connections
conns = (1..10).map { @pool.checkout }
@ -410,7 +419,7 @@ module ActiveRecord
errors = []
make_thread = proc do |i|
t = Thread.new {
t = new_thread {
begin
@pool.checkout # never checked back in
mutex.synchronize { successes << i }
@ -418,7 +427,7 @@ module ActiveRecord
mutex.synchronize { errors << e }
end
}
Thread.pass until @pool.num_waiting_in_queue == i
pass_to(t) until @pool.num_waiting_in_queue == i
t
end
@ -491,19 +500,16 @@ module ActiveRecord
end
end
class ConnectionTestModel < ActiveRecord::Base
self.abstract_class = true
end
def test_connection_notification_is_called
payloads = []
subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload|
payloads << payload
end
ConnectionTestModel.establish_connection :arunit
@connection_test_model_class.establish_connection :arunit
assert_equal [:config, :shard, :spec_name], payloads[0].keys.sort
assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name]
assert_equal @connection_test_model_class.name, payloads[0][:spec_name]
assert_equal ActiveRecord::Base.default_shard, payloads[0][:shard]
ensure
ActiveSupport::Notifications.unsubscribe(subscription) if subscription
@ -514,10 +520,10 @@ module ActiveRecord
subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload|
payloads << payload
end
ConnectionTestModel.connects_to shards: { shard_two: { writing: :arunit } }
@connection_test_model_class.connects_to shards: { shard_two: { writing: :arunit } }
assert_equal [:config, :shard, :spec_name], payloads[0].keys.sort
assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name]
assert_equal @connection_test_model_class.name, payloads[0][:spec_name]
assert_equal :shard_two, payloads[0][:shard]
ensure
ActiveSupport::Notifications.unsubscribe(subscription) if subscription
@ -538,6 +544,7 @@ module ActiveRecord
end
def test_concurrent_connection_establishment
skip_fiber_testing
assert_operator @pool.connections.size, :<=, 1
all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
@ -553,7 +560,7 @@ module ActiveRecord
connecting_threads = []
@pool.size.times do
connecting_threads << Thread.new { @pool.checkout }
connecting_threads << new_thread { @pool.checkout }
end
begin
@ -579,7 +586,7 @@ module ActiveRecord
[:disconnect, :clear_reloadable_connections].each do |group_action_method|
@pool.with_connection do |connection|
assert_raises(ExclusiveConnectionTimeoutError) do
Thread.new { @pool.public_send(group_action_method) }.join
new_thread { @pool.public_send(group_action_method) }.join
end
end
end
@ -588,10 +595,11 @@ module ActiveRecord
end
def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
skip_fiber_testing
[:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
thread = timed_join_result = nil
@pool.with_connection do |connection|
thread = Thread.new { @pool.send(group_action_method) }
thread = new_thread { @pool.send(group_action_method) }
# give the other `thread` some time to get stuck in `group_action_method`
timed_join_result = thread.join(0.3)
@ -611,7 +619,7 @@ module ActiveRecord
@pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
[:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
@pool.with_connection do |connection|
Thread.new { @pool.send(group_action_method) }.join
new_thread { @pool.send(group_action_method) }.join
# assert connection has been forcefully taken away from us
assert_not_predicate @pool, :active_connection?
@ -622,6 +630,7 @@ module ActiveRecord
end
def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
skip_fiber_testing
with_single_connection_pool do |pool|
[:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
conn = pool.connection # drain the only available connection
@ -629,23 +638,23 @@ module ActiveRecord
begin
# create a first_thread and let it get into the FIFO queue first
first_thread = Thread.new do
first_thread = new_thread do
pool.with_connection { second_thread_done.wait }
end
# wait for first_thread to get in queue
Thread.pass until pool.num_waiting_in_queue == 1
pass_to(first_thread) until pool.num_waiting_in_queue == 1
# create a different, later thread, that will attempt to do a "group action",
# but because of the group action semantics it should be able to preempt the
# first_thread when a connection is made available
second_thread = Thread.new do
second_thread = new_thread do
pool.send(group_action_method)
second_thread_done.set
end
# wait for second_thread to get in queue
Thread.pass until pool.num_waiting_in_queue == 2
pass_to(second_thread) until pool.num_waiting_in_queue == 2
# return the only available connection
pool.checkin(conn)
@ -662,8 +671,8 @@ module ActiveRecord
if failed
second_thread_done.set
first_thread.join(2)
second_thread.join(2)
first_thread&.join(2)
second_thread&.join(2)
end
first_thread.join(10) || raise("first_thread got stuck")
@ -674,18 +683,19 @@ module ActiveRecord
end
def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
skip_fiber_testing
with_single_connection_pool do |pool|
conn = pool.connection # drain the only available connection
def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
true
end
stuck_thread = Thread.new do
stuck_thread = new_thread do
pool.with_connection { }
end
# wait for stuck_thread to get in queue
Thread.pass until pool.num_waiting_in_queue == 1
pass_to(stuck_thread) until pool.num_waiting_in_queue == 1
pool.clear_reloadable_connections
@ -698,6 +708,7 @@ module ActiveRecord
end
def test_connection_pool_stat
Thread.report_on_exception, original_report_on_exception = false, Thread.report_on_exception
with_single_connection_pool do |pool|
pool.with_connection do |connection|
stats = pool.stat
@ -707,13 +718,17 @@ module ActiveRecord
stats = pool.stat
assert_equal({ size: 1, connections: 1, busy: 0, dead: 0, idle: 1, waiting: 0, checkout_timeout: 5 }, stats)
Thread.new do
pool.checkout
Thread.current.kill
end.join
assert_raise(ThreadError) do
new_thread do
pool.checkout
raise ThreadError
end.join
end
stats = pool.stat
assert_equal({ size: 1, connections: 1, busy: 0, dead: 1, idle: 0, waiting: 0, checkout_timeout: 5 }, stats)
ensure
Thread.report_on_exception = original_report_on_exception
end
end
@ -729,7 +744,7 @@ module ActiveRecord
# the pool is thread-safe.
connections.each_index do |idx|
if connections[idx] == conn2
Thread.new do
new_thread do
@pool.remove(conn2)
end.join
end
@ -772,5 +787,76 @@ module ActiveRecord
pool.disconnect! if pool
end
end
class ConnectionPoolThreadTest < ActiveRecord::TestCase
include ConnectionPoolTests
class ThreadConnectionTestModel < ActiveRecord::Base
self.abstract_class = true
end
def setup
super
ActiveSupport::IsolatedExecutionState.isolation_level = :thread
@connection_test_model_class = ThreadConnectionTestModel
end
private
def new_thread(...)
Thread.new(...)
end
def stop_thread
Thread.stop
end
def pass_to(_thread)
Thread.pass
end
def skip_fiber_testing; end
end
class ConnectionPoolFiberTest < ActiveRecord::TestCase
include ConnectionPoolTests
class FiberConnectionTestModel < ActiveRecord::Base
self.abstract_class = true
end
class ThreadlikeFiber < Fiber
def join(timeout = nil)
now = Time.now
resume while alive? && (!timeout || Time.now - now < timeout)
end
def terminate
nil
end
end
def setup
super
ActiveSupport::IsolatedExecutionState.isolation_level = :fiber
@connection_test_model_class = FiberConnectionTestModel
end
private
def new_thread(*args, &block)
ThreadlikeFiber.new(*args, &block)
end
def stop_thread
Fiber.yield
end
def pass_to(fiber)
fiber.resume
end
def skip_fiber_testing
skip "Can't test isolation_level=fiber without a Ruby 3.1+ Fiber Scheduler"
end
end
end
end

View File

@ -4,25 +4,30 @@ require "fiber"
module ActiveSupport
module IsolatedExecutionState # :nodoc:
@isolation_level = :thread
@isolation_level = nil
Thread.attr_accessor :active_support_execution_state
Fiber.attr_accessor :active_support_execution_state
class << self
attr_reader :isolation_level
attr_reader :isolation_level, :scope
def isolation_level=(level)
return if level == @isolation_level
unless %i(thread fiber).include?(level)
raise ArgumentError, "isolation_level must be `:thread` or `:fiber`, got: `#{level.inspect}`"
end
if level != isolation_level
clear
singleton_class.alias_method(:current, "current_#{level}")
singleton_class.send(:private, :current)
@isolation_level = level
end
clear if @isolation_level
@scope =
case level
when :thread; Thread
when :fiber; Fiber
end
@isolation_level = level
end
def unique_id
@ -30,27 +35,27 @@ module ActiveSupport
end
def [](key)
current[key]
state[key]
end
def []=(key, value)
current[key] = value
state[key] = value
end
def clear
current.clear
state.clear
end
def context
scope.current
end
private
def current_thread
Thread.current.active_support_execution_state ||= {}
def state
context.active_support_execution_state ||= {}
end
def current_fiber
Fiber.current.active_support_execution_state ||= {}
end
alias_method :current, :current_thread
end
self.isolation_level = :thread
end
end