mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
284a9ba8ec
The concurrent-ruby gem is a toolset containing many concurrency utilities. Many of these utilities include runtime-specific optimizations when possible. Rather than clutter the Rails codebase with concurrency utilities separate from the core task, such tools can be superseded by similar tools in the more specialized gem. This commit replaces `ActiveSupport::Concurrency::Latch` with `Concurrent::CountDownLatch`, which is functionally equivalent.
525 lines
17 KiB
Ruby
525 lines
17 KiB
Ruby
require "cases/helper"
|
|
require 'concurrent/atomics'
|
|
|
|
module ActiveRecord
|
|
module ConnectionAdapters
|
|
class ConnectionPoolTest < ActiveRecord::TestCase
|
|
attr_reader :pool
|
|
|
|
def setup
|
|
super
|
|
|
|
# Keep a duplicate pool so we do not bother others
|
|
@pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
|
|
|
|
if in_memory_db?
|
|
# Separate connections to an in-memory database create an entirely new database,
|
|
# with an empty schema etc, so we just stub out this schema on the fly.
|
|
@pool.with_connection do |connection|
|
|
connection.create_table :posts do |t|
|
|
t.integer :cololumn
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
teardown do
|
|
@pool.disconnect!
|
|
end
|
|
|
|
def active_connections(pool)
|
|
pool.connections.find_all(&:in_use?)
|
|
end
|
|
|
|
def test_checkout_after_close
|
|
connection = pool.connection
|
|
assert connection.in_use?
|
|
|
|
connection.close
|
|
assert !connection.in_use?
|
|
|
|
assert pool.connection.in_use?
|
|
end
|
|
|
|
def test_released_connection_moves_between_threads
|
|
thread_conn = nil
|
|
|
|
Thread.new {
|
|
pool.with_connection do |conn|
|
|
thread_conn = conn
|
|
end
|
|
}.join
|
|
|
|
assert thread_conn
|
|
|
|
Thread.new {
|
|
pool.with_connection do |conn|
|
|
assert_equal thread_conn, conn
|
|
end
|
|
}.join
|
|
end
|
|
|
|
def test_with_connection
|
|
assert_equal 0, active_connections(pool).size
|
|
|
|
main_thread = pool.connection
|
|
assert_equal 1, active_connections(pool).size
|
|
|
|
Thread.new {
|
|
pool.with_connection do |conn|
|
|
assert conn
|
|
assert_equal 2, active_connections(pool).size
|
|
end
|
|
assert_equal 1, active_connections(pool).size
|
|
}.join
|
|
|
|
main_thread.close
|
|
assert_equal 0, active_connections(pool).size
|
|
end
|
|
|
|
def test_active_connection_in_use
|
|
assert !pool.active_connection?
|
|
main_thread = pool.connection
|
|
|
|
assert pool.active_connection?
|
|
|
|
main_thread.close
|
|
|
|
assert !pool.active_connection?
|
|
end
|
|
|
|
def test_full_pool_exception
|
|
@pool.size.times { @pool.checkout }
|
|
assert_raises(ConnectionTimeoutError) do
|
|
@pool.checkout
|
|
end
|
|
end
|
|
|
|
def test_full_pool_blocks
|
|
cs = @pool.size.times.map { @pool.checkout }
|
|
t = Thread.new { @pool.checkout }
|
|
|
|
# make sure our thread is in the timeout section
|
|
Thread.pass until @pool.num_waiting_in_queue == 1
|
|
|
|
connection = cs.first
|
|
connection.close
|
|
assert_equal connection, t.join.value
|
|
end
|
|
|
|
def test_removing_releases_latch
|
|
cs = @pool.size.times.map { @pool.checkout }
|
|
t = Thread.new { @pool.checkout }
|
|
|
|
# make sure our thread is in the timeout section
|
|
Thread.pass until @pool.num_waiting_in_queue == 1
|
|
|
|
connection = cs.first
|
|
@pool.remove connection
|
|
assert_respond_to t.join.value, :execute
|
|
connection.close
|
|
end
|
|
|
|
def test_reap_and_active
|
|
@pool.checkout
|
|
@pool.checkout
|
|
@pool.checkout
|
|
|
|
connections = @pool.connections.dup
|
|
|
|
@pool.reap
|
|
|
|
assert_equal connections.length, @pool.connections.length
|
|
end
|
|
|
|
def test_reap_inactive
|
|
ready = Concurrent::CountDownLatch.new
|
|
@pool.checkout
|
|
child = Thread.new do
|
|
@pool.checkout
|
|
@pool.checkout
|
|
ready.count_down
|
|
Thread.stop
|
|
end
|
|
ready.wait
|
|
|
|
assert_equal 3, active_connections(@pool).size
|
|
|
|
child.terminate
|
|
child.join
|
|
@pool.reap
|
|
|
|
assert_equal 1, active_connections(@pool).size
|
|
ensure
|
|
@pool.connections.each(&:close)
|
|
end
|
|
|
|
def test_remove_connection
|
|
conn = @pool.checkout
|
|
assert conn.in_use?
|
|
|
|
length = @pool.connections.length
|
|
@pool.remove conn
|
|
assert conn.in_use?
|
|
assert_equal(length - 1, @pool.connections.length)
|
|
ensure
|
|
conn.close
|
|
end
|
|
|
|
def test_remove_connection_for_thread
|
|
conn = @pool.connection
|
|
@pool.remove conn
|
|
assert_not_equal(conn, @pool.connection)
|
|
ensure
|
|
conn.close if conn
|
|
end
|
|
|
|
def test_active_connection?
|
|
assert !@pool.active_connection?
|
|
assert @pool.connection
|
|
assert @pool.active_connection?
|
|
@pool.release_connection
|
|
assert !@pool.active_connection?
|
|
end
|
|
|
|
def test_checkout_behaviour
|
|
pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
|
|
connection = pool.connection
|
|
assert_not_nil connection
|
|
threads = []
|
|
4.times do |i|
|
|
threads << Thread.new(i) do
|
|
connection = pool.connection
|
|
assert_not_nil connection
|
|
connection.close
|
|
end
|
|
end
|
|
|
|
threads.each(&:join)
|
|
|
|
Thread.new do
|
|
assert pool.connection
|
|
pool.connection.close
|
|
end.join
|
|
end
|
|
|
|
# The connection pool is "fair" if threads waiting for
|
|
# connections receive them in the order in which they began
|
|
# waiting. This ensures that we don't timeout one HTTP request
|
|
# even while well under capacity in a multi-threaded environment
|
|
# such as a Java servlet container.
|
|
#
|
|
# We don't need strict fairness: if two connections become
|
|
# available at the same time, it's fine if two threads that were
|
|
# waiting acquire the connections out of order.
|
|
#
|
|
# Thus this test prepares waiting threads and then trickles in
|
|
# available connections slowly, ensuring the wakeup order is
|
|
# correct in this case.
|
|
def test_checkout_fairness
|
|
@pool.instance_variable_set(:@size, 10)
|
|
expected = (1..@pool.size).to_a.freeze
|
|
# check out all connections so our threads start out waiting
|
|
conns = expected.map { @pool.checkout }
|
|
mutex = Mutex.new
|
|
order = []
|
|
errors = []
|
|
|
|
threads = expected.map do |i|
|
|
t = Thread.new {
|
|
begin
|
|
@pool.checkout # never checked back in
|
|
mutex.synchronize { order << i }
|
|
rescue => e
|
|
mutex.synchronize { errors << e }
|
|
end
|
|
}
|
|
Thread.pass until @pool.num_waiting_in_queue == i
|
|
t
|
|
end
|
|
|
|
# this should wake up the waiting threads one by one in order
|
|
conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
|
|
|
|
threads.each(&:join)
|
|
|
|
raise errors.first if errors.any?
|
|
|
|
assert_equal(expected, order)
|
|
end
|
|
|
|
# As mentioned in #test_checkout_fairness, we don't care about
|
|
# strict fairness. This test creates two groups of threads:
|
|
# group1 whose members all start waiting before any thread in
|
|
# group2. Enough connections are checked in to wakeup all
|
|
# group1 threads, and the fact that only group1 and no group2
|
|
# threads acquired a connection is enforced.
|
|
def test_checkout_fairness_by_group
|
|
@pool.instance_variable_set(:@size, 10)
|
|
# take all the connections
|
|
conns = (1..10).map { @pool.checkout }
|
|
mutex = Mutex.new
|
|
successes = [] # threads that successfully got a connection
|
|
errors = []
|
|
|
|
make_thread = proc do |i|
|
|
t = Thread.new {
|
|
begin
|
|
@pool.checkout # never checked back in
|
|
mutex.synchronize { successes << i }
|
|
rescue => e
|
|
mutex.synchronize { errors << e }
|
|
end
|
|
}
|
|
Thread.pass until @pool.num_waiting_in_queue == i
|
|
t
|
|
end
|
|
|
|
# all group1 threads start waiting before any in group2
|
|
group1 = (1..5).map(&make_thread)
|
|
group2 = (6..10).map(&make_thread)
|
|
|
|
# checkin n connections back to the pool
|
|
checkin = proc do |n|
|
|
n.times do
|
|
c = conns.pop
|
|
@pool.checkin(c)
|
|
end
|
|
end
|
|
|
|
checkin.call(group1.size) # should wake up all group1
|
|
|
|
loop do
|
|
sleep 0.1
|
|
break if mutex.synchronize { (successes.size + errors.size) == group1.size }
|
|
end
|
|
|
|
winners = mutex.synchronize { successes.dup }
|
|
checkin.call(group2.size) # should wake up everyone remaining
|
|
|
|
group1.each(&:join)
|
|
group2.each(&:join)
|
|
|
|
assert_equal((1..group1.size).to_a, winners.sort)
|
|
|
|
if errors.any?
|
|
raise errors.first
|
|
end
|
|
end
|
|
|
|
def test_automatic_reconnect=
|
|
pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
|
|
assert pool.automatic_reconnect
|
|
assert pool.connection
|
|
|
|
pool.disconnect!
|
|
assert pool.connection
|
|
|
|
pool.disconnect!
|
|
pool.automatic_reconnect = false
|
|
|
|
assert_raises(ConnectionNotEstablished) do
|
|
pool.connection
|
|
end
|
|
|
|
assert_raises(ConnectionNotEstablished) do
|
|
pool.with_connection
|
|
end
|
|
end
|
|
|
|
def test_pool_sets_connection_visitor
|
|
assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql)
|
|
end
|
|
|
|
# make sure exceptions are thrown when establish_connection
|
|
# is called with an anonymous class
|
|
def test_anonymous_class_exception
|
|
anonymous = Class.new(ActiveRecord::Base)
|
|
handler = ActiveRecord::Base.connection_handler
|
|
|
|
assert_raises(RuntimeError) {
|
|
handler.establish_connection anonymous, nil
|
|
}
|
|
end
|
|
|
|
def test_pool_sets_connection_schema_cache
|
|
connection = pool.checkout
|
|
schema_cache = SchemaCache.new connection
|
|
schema_cache.add(:posts)
|
|
pool.schema_cache = schema_cache
|
|
|
|
pool.with_connection do |conn|
|
|
assert_not_same pool.schema_cache, conn.schema_cache
|
|
assert_equal pool.schema_cache.size, conn.schema_cache.size
|
|
assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts)
|
|
end
|
|
|
|
pool.checkin connection
|
|
end
|
|
|
|
def test_concurrent_connection_establishment
|
|
assert_operator @pool.connections.size, :<=, 1
|
|
|
|
all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
|
|
all_go = Concurrent::CountDownLatch.new
|
|
|
|
@pool.singleton_class.class_eval do
|
|
define_method(:new_connection) do
|
|
all_threads_in_new_connection.count_down
|
|
all_go.wait
|
|
super()
|
|
end
|
|
end
|
|
|
|
connecting_threads = []
|
|
@pool.size.times do
|
|
connecting_threads << Thread.new { @pool.checkout }
|
|
end
|
|
|
|
begin
|
|
Timeout.timeout(5) do
|
|
# the kernel of the whole test is here, everything else is just scaffolding,
|
|
# this latch will not be released unless conn. pool allows for concurrent
|
|
# connection creation
|
|
all_threads_in_new_connection.wait
|
|
end
|
|
rescue Timeout::Error
|
|
flunk 'pool unable to establish connections concurrently or implementation has ' <<
|
|
'changed, this test then needs to patch a different :new_connection method'
|
|
ensure
|
|
# clean up the threads
|
|
all_go.count_down
|
|
connecting_threads.map(&:join)
|
|
end
|
|
end
|
|
|
|
def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
|
|
@pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
|
|
[:disconnect, :clear_reloadable_connections].each do |group_action_method|
|
|
@pool.with_connection do |connection|
|
|
assert_raises(ExclusiveConnectionTimeoutError) do
|
|
Thread.new { @pool.send(group_action_method) }.join
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
|
|
[:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
|
|
begin
|
|
thread = timed_join_result = nil
|
|
@pool.with_connection do |connection|
|
|
thread = Thread.new { @pool.send(group_action_method) }
|
|
|
|
# give the other `thread` some time to get stuck in `group_action_method`
|
|
timed_join_result = thread.join(0.3)
|
|
# thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
|
|
# release our connection
|
|
assert_nil timed_join_result
|
|
|
|
# assert that since this is within default timeout our connection hasn't been forcefully taken away from us
|
|
assert @pool.active_connection?
|
|
end
|
|
ensure
|
|
thread.join if thread && !timed_join_result # clean up the other thread
|
|
end
|
|
end
|
|
end
|
|
|
|
def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_aquire_all_connections_proceed_anyway
|
|
@pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
|
|
[:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
|
|
@pool.with_connection do |connection|
|
|
Thread.new { @pool.send(group_action_method) }.join
|
|
# assert connection has been forcefully taken away from us
|
|
assert_not @pool.active_connection?
|
|
end
|
|
end
|
|
end
|
|
|
|
def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
|
|
with_single_connection_pool do |pool|
|
|
[:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
|
|
conn = pool.connection # drain the only available connection
|
|
second_thread_done = Concurrent::CountDownLatch.new
|
|
|
|
# create a first_thread and let it get into the FIFO queue first
|
|
first_thread = Thread.new do
|
|
pool.with_connection { second_thread_done.wait }
|
|
end
|
|
|
|
# wait for first_thread to get in queue
|
|
Thread.pass until pool.num_waiting_in_queue == 1
|
|
|
|
# create a different, later thread, that will attempt to do a "group action",
|
|
# but because of the group action semantics it should be able to preempt the
|
|
# first_thread when a connection is made available
|
|
second_thread = Thread.new do
|
|
pool.send(group_action_method)
|
|
second_thread_done.count_down
|
|
end
|
|
|
|
# wait for second_thread to get in queue
|
|
Thread.pass until pool.num_waiting_in_queue == 2
|
|
|
|
# return the only available connection
|
|
pool.checkin(conn)
|
|
|
|
# if the second_thread is not able to preempt the first_thread,
|
|
# they will temporarily (until either of them timeouts with ConnectionTimeoutError)
|
|
# deadlock and a join(2) timeout will be reached
|
|
failed = true unless second_thread.join(2)
|
|
|
|
#--- post test clean up start
|
|
second_thread_done.count_down if failed
|
|
|
|
# after `pool.disconnect()` the first thread will be left stuck in queue, no need to wait for
|
|
# it to timeout with ConnectionTimeoutError
|
|
if (group_action_method == :disconnect || group_action_method == :disconnect!) && pool.num_waiting_in_queue > 0
|
|
pool.with_connection {} # create a new connection in case there are threads still stuck in a queue
|
|
end
|
|
|
|
first_thread.join
|
|
second_thread.join
|
|
#--- post test clean up end
|
|
|
|
flunk "#{group_action_method} is not able to preempt other waiting threads" if failed
|
|
end
|
|
end
|
|
end
|
|
|
|
def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
|
|
with_single_connection_pool do |pool|
|
|
conn = pool.connection # drain the only available connection
|
|
def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
|
|
true
|
|
end
|
|
|
|
stuck_thread = Thread.new do
|
|
pool.with_connection {}
|
|
end
|
|
|
|
# wait for stuck_thread to get in queue
|
|
Thread.pass until pool.num_waiting_in_queue == 1
|
|
|
|
pool.clear_reloadable_connections
|
|
|
|
unless stuck_thread.join(2)
|
|
flunk 'clear_reloadable_connections must not let other connection waiting threads get stuck in queue'
|
|
end
|
|
|
|
assert_equal 0, pool.num_waiting_in_queue
|
|
end
|
|
end
|
|
|
|
private
|
|
def with_single_connection_pool
|
|
one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
|
|
one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
|
|
yield(pool = ConnectionPool.new(one_conn_spec))
|
|
ensure
|
|
pool.disconnect! if pool
|
|
end
|
|
end
|
|
end
|
|
end
|