mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
AR::ConPool - remove synchronization around connection cache.
Renamed `@reserved_connections` -> `@thread_cached_conns`. New name clearly conveys the purpose of the cache, which is to speed-up `#connection` method. The new `@thread_cached_conns` now also uses `Thread` objects as keys (instead of previously `Thread.current.object_id`). Since there is no longer any synchronization around `@thread_cached_conns`, `disconnect!` and `clear_reloadable_connections!` methods now pre-emptively obtain ownership (via `checkout`) of all existing connections, before modifying internal data structures. A private method `release` has been renamed `thread_conn_uncache` to clear-up its purpose. Fixed some brittle `thread.status == "sleep"` tests (threads can go into sleep even without locks).
This commit is contained in:
parent
e92f5a99d6
commit
603fe20c0b
3 changed files with 436 additions and 84 deletions
|
@ -1,7 +1,6 @@
|
|||
require 'thread'
|
||||
require 'thread_safe'
|
||||
require 'monitor'
|
||||
require 'set'
|
||||
|
||||
module ActiveRecord
|
||||
# Raised when a connection could not be obtained within the connection
|
||||
|
@ -10,6 +9,12 @@ module ActiveRecord
|
|||
class ConnectionTimeoutError < ConnectionNotEstablished
|
||||
end
|
||||
|
||||
# Raised when a pool was unable to get ahold of all its connections
|
||||
# to perform a "group" action such as +ConnectionPool#disconnect!+
|
||||
# or +ConnectionPool#clear_reloadable_connections!+.
|
||||
class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
|
||||
end
|
||||
|
||||
module ConnectionAdapters
|
||||
# Connection pool base class for managing Active Record database
|
||||
# connections.
|
||||
|
@ -63,6 +68,15 @@ module ActiveRecord
|
|||
# connection at the end of a thread or a thread dies unexpectedly.
|
||||
# Regardless of this setting, the Reaper will be invoked before every
|
||||
# blocking wait. (Default nil, which means don't schedule the Reaper).
|
||||
#
|
||||
#--
|
||||
# Synchronization policy:
|
||||
# * all public methods can be called outside +synchronize+
|
||||
# * access to these i-vars needs to be in +synchronize+:
|
||||
# * @connections
|
||||
# * @now_connecting
|
||||
# * private methods that require being called in a +synchronize+ blocks
|
||||
# are now explicitly documented
|
||||
class ConnectionPool
|
||||
# Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
|
||||
# with which it shares a Monitor. But could be a generic Queue.
|
||||
|
@ -191,12 +205,72 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
# Adds the ability to turn a basic fair FIFO queue into one
|
||||
# biased to some thread.
|
||||
module BiasableQueue # :nodoc:
|
||||
class BiasedConditionVariable # :nodoc:
|
||||
# semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
|
||||
# +signal+ and +wait+ methods are only called while holding a lock
|
||||
def initialize(lock, other_cond, preferred_thread)
|
||||
@real_cond = lock.new_cond
|
||||
@other_cond = other_cond
|
||||
@preferred_thread = preferred_thread
|
||||
@num_waiting_on_real_cond = 0
|
||||
end
|
||||
|
||||
def broadcast
|
||||
broadcast_on_biased
|
||||
@other_cond.broadcast
|
||||
end
|
||||
|
||||
def broadcast_on_biased
|
||||
@num_waiting_on_real_cond = 0
|
||||
@real_cond.broadcast
|
||||
end
|
||||
|
||||
def signal
|
||||
if @num_waiting_on_real_cond > 0
|
||||
@num_waiting_on_real_cond -= 1
|
||||
@real_cond
|
||||
else
|
||||
@other_cond
|
||||
end.signal
|
||||
end
|
||||
|
||||
def wait(timeout)
|
||||
if Thread.current == @preferred_thread
|
||||
@num_waiting_on_real_cond += 1
|
||||
@real_cond
|
||||
else
|
||||
@other_cond
|
||||
end.wait(timeout)
|
||||
end
|
||||
end
|
||||
|
||||
def with_a_bias_for(thread)
|
||||
previous_cond = nil
|
||||
new_cond = nil
|
||||
synchronize do
|
||||
previous_cond = @cond
|
||||
@cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
|
||||
end
|
||||
yield
|
||||
ensure
|
||||
synchronize do
|
||||
@cond = previous_cond if previous_cond
|
||||
new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Connections must be leased while holding the main pool mutex. This is
|
||||
# an internal subclass that also +.leases+ returned connections while
|
||||
# still in queue's critical section (queue synchronizes with the same
|
||||
# +@lock+ as the main pool) so that a returned connection is already
|
||||
# leased and there is no need to re-enter synchronized block.
|
||||
class ConnectionLeasingQueue < Queue # :nodoc:
|
||||
include BiasableQueue
|
||||
|
||||
private
|
||||
def internal_poll(timeout)
|
||||
conn = super
|
||||
|
@ -253,8 +327,17 @@ module ActiveRecord
|
|||
# default max pool size to 5
|
||||
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
|
||||
|
||||
# The cache of reserved connections mapped to threads
|
||||
@reserved_connections = ThreadSafe::Cache.new(:initial_capacity => @size)
|
||||
# The cache of threads mapped to reserved connections, the sole purpose
|
||||
# of the cache is to speed-up +connection+ method, it is not the authoritative
|
||||
# registry of which thread owns which connection, that is tracked by
|
||||
# +connection.owner+ attr on each +connection+ instance.
|
||||
# The invariant works like this: if there is mapping of +thread => conn+,
|
||||
# then that +thread+ does indeed own that +conn+, however an absence of a such
|
||||
# mapping does not mean that the +thread+ doesn't own the said connection, in
|
||||
# that case +conn.owner+ attr should be consulted.
|
||||
# Access and modification of +@thread_cached_conns+ does not require
|
||||
# synchronization.
|
||||
@thread_cached_conns = ThreadSafe::Cache.new(:initial_capacity => @size)
|
||||
|
||||
@connections = []
|
||||
@automatic_reconnect = true
|
||||
|
@ -264,6 +347,9 @@ module ActiveRecord
|
|||
# currently in the process of independently establishing connections to the DB.
|
||||
@now_connecting = 0
|
||||
|
||||
# A boolean toggle that allows/disallows new connections.
|
||||
@new_cons_enabled = true
|
||||
|
||||
@available = ConnectionLeasingQueue.new self
|
||||
end
|
||||
|
||||
|
@ -271,41 +357,45 @@ module ActiveRecord
|
|||
# #checkout to obtain one if necessary.
|
||||
#
|
||||
# #connection can be called any number of times; the connection is
|
||||
# held in a hash keyed by the thread id.
|
||||
# held in a cache keyed by a thread.
|
||||
def connection
|
||||
# this is correctly done double-checked locking
|
||||
# (ThreadSafe::Cache's lookups have volatile semantics)
|
||||
@reserved_connections[current_connection_id] || synchronized_connection_retrieval
|
||||
@thread_cached_conns[connection_cache_key(Thread.current)] ||= checkout
|
||||
end
|
||||
|
||||
# Is there an open connection that is being used for the current thread?
|
||||
#
|
||||
# This method only works for connections that have been abtained through
|
||||
# #connection or #with_connection methods, connections obtained through
|
||||
# #checkout will not be detected by #active_connection?
|
||||
def active_connection?
|
||||
synchronize do
|
||||
@reserved_connections.fetch(current_connection_id) {
|
||||
return false
|
||||
}.in_use?
|
||||
end
|
||||
@thread_cached_conns[connection_cache_key(Thread.current)]
|
||||
end
|
||||
|
||||
# Signal that the thread is finished with the current connection.
|
||||
# #release_connection releases the connection-thread association
|
||||
# and returns the connection to the pool.
|
||||
def release_connection(with_id = current_connection_id)
|
||||
synchronize do
|
||||
conn = @reserved_connections.delete(with_id)
|
||||
checkin conn if conn
|
||||
#
|
||||
# This method only works for connections that have been obtained through
|
||||
# #connection or #with_connection methods, connections obtained through
|
||||
# #checkout will not be automatically released.
|
||||
def release_connection(owner_thread = Thread.current)
|
||||
if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
|
||||
checkin conn
|
||||
end
|
||||
end
|
||||
|
||||
# If a connection already exists yield it to the block. If no connection
|
||||
# If a connection obtained through #connection or #with_connection methods
|
||||
# already exists yield it to the block. If no such connection
|
||||
# exists checkout a connection, yield it to the block, and checkin the
|
||||
# connection when finished.
|
||||
def with_connection
|
||||
connection_id = current_connection_id
|
||||
fresh_connection = true unless active_connection?
|
||||
yield connection
|
||||
unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
|
||||
conn = connection
|
||||
fresh_connection = true
|
||||
end
|
||||
yield conn
|
||||
ensure
|
||||
release_connection(connection_id) if fresh_connection
|
||||
release_connection if fresh_connection
|
||||
end
|
||||
|
||||
# Returns true if a connection has already been opened.
|
||||
|
@ -314,32 +404,81 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
# Disconnects all connections in the pool, and clears the pool.
|
||||
def disconnect!
|
||||
synchronize do
|
||||
@reserved_connections.clear
|
||||
@connections.each do |conn|
|
||||
checkin conn
|
||||
conn.disconnect!
|
||||
#
|
||||
# Raises:
|
||||
# - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all
|
||||
# connections in the pool within a timeout interval (default duration is
|
||||
# +spec.config[:checkout_timeout] * 2+ seconds).
|
||||
def disconnect(raise_on_acquisition_timeout = true)
|
||||
with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
|
||||
synchronize do
|
||||
@connections.each do |conn|
|
||||
checkin conn
|
||||
conn.disconnect!
|
||||
end
|
||||
@connections = []
|
||||
@available.clear
|
||||
end
|
||||
@connections = []
|
||||
@available.clear
|
||||
end
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes.
|
||||
def clear_reloadable_connections!
|
||||
synchronize do
|
||||
@reserved_connections.clear
|
||||
@connections.each do |conn|
|
||||
checkin conn
|
||||
conn.disconnect! if conn.requires_reloading?
|
||||
end
|
||||
@connections.delete_if(&:requires_reloading?)
|
||||
@available.clear
|
||||
@connections.each do |conn|
|
||||
@available.add conn
|
||||
# Disconnects all connections in the pool, and clears the pool.
|
||||
#
|
||||
# The pool first tries to gain ownership of all connections, if unable to
|
||||
# do so within a timeout interval (default duration is
|
||||
# +spec.config[:checkout_timeout] * 2+ seconds), the pool is forcefully
|
||||
# disconneted wihout any regard for other connection owning threads.
|
||||
def disconnect!
|
||||
disconnect(false)
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes and re-connects connections that
|
||||
# require reloading.
|
||||
#
|
||||
# Raises:
|
||||
# - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all
|
||||
# connections in the pool within a timeout interval (default duration is
|
||||
# +spec.config[:checkout_timeout] * 2+ seconds).
|
||||
def clear_reloadable_connections(raise_on_acquisition_timeout = true)
|
||||
num_new_conns_required = 0
|
||||
|
||||
with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
|
||||
synchronize do
|
||||
@connections.each do |conn|
|
||||
checkin conn
|
||||
conn.disconnect! if conn.requires_reloading?
|
||||
end
|
||||
@connections.delete_if(&:requires_reloading?)
|
||||
|
||||
@available.clear
|
||||
|
||||
if @connections.size < @size
|
||||
# because of the pruning done by this method, we might be running
|
||||
# low on connections, while threads stuck in queue are helpless
|
||||
# (not being able to establish new connections for themselves),
|
||||
# see also more detailed explanation in +remove+
|
||||
num_new_conns_required = num_waiting_in_queue - @connections.size
|
||||
end
|
||||
|
||||
@connections.each do |conn|
|
||||
@available.add conn
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes and re-connects connections that
|
||||
# require reloading.
|
||||
#
|
||||
# The pool first tries to gain ownership of all connections, if unable to
|
||||
# do so within a timeout interval (default duration is
|
||||
# +spec.config[:checkout_timeout] * 2+ seconds), the pool forcefully
|
||||
# clears the cache and reloads connections without any regard for other
|
||||
# connection owning threads.
|
||||
def clear_reloadable_connections!
|
||||
clear_reloadable_connections(false)
|
||||
end
|
||||
|
||||
# Check-out a database connection from the pool, indicating that you want
|
||||
|
@ -356,8 +495,8 @@ module ActiveRecord
|
|||
#
|
||||
# Raises:
|
||||
# - ConnectionTimeoutError: no connection can be obtained from the pool.
|
||||
def checkout
|
||||
checkout_and_verify(acquire_connection)
|
||||
def checkout(checkout_timeout = @checkout_timeout)
|
||||
checkout_and_verify(acquire_connection(checkout_timeout))
|
||||
end
|
||||
|
||||
# Check-in a database connection back into the pool, indicating that you
|
||||
|
@ -367,14 +506,12 @@ module ActiveRecord
|
|||
# calling +checkout+ on this pool.
|
||||
def checkin(conn)
|
||||
synchronize do
|
||||
owner = conn.owner
|
||||
remove_connection_from_thread_cache conn
|
||||
|
||||
conn.run_callbacks :checkin do
|
||||
conn.expire
|
||||
end
|
||||
|
||||
release conn, owner
|
||||
|
||||
@available.add conn
|
||||
end
|
||||
end
|
||||
|
@ -385,11 +522,11 @@ module ActiveRecord
|
|||
needs_new_connection = false
|
||||
|
||||
synchronize do
|
||||
remove_connection_from_thread_cache conn
|
||||
|
||||
@connections.delete conn
|
||||
@available.delete conn
|
||||
|
||||
release conn, conn.owner
|
||||
|
||||
# @available.any_waiting? => true means that prior to removing this
|
||||
# conn, the pool was at its max size (@connections.size == @size)
|
||||
# this would mean that any threads stuck waiting in the queue wouldn't
|
||||
|
@ -405,12 +542,9 @@ module ActiveRecord
|
|||
# This is intentionally done outside of the synchronized section as we
|
||||
# would like not to hold the main mutex while checking out new connections,
|
||||
# thus there is some chance that needs_new_connection information is now
|
||||
# stale, we can live with that (try_to_checkout_new_connection will make
|
||||
# stale, we can live with that (bulk_make_new_connections will make
|
||||
# sure not to exceed the pool's @size limit).
|
||||
if needs_new_connection && new_conn = try_to_checkout_new_connection
|
||||
# make the new_conn available to the starving threads stuck @available Queue
|
||||
checkin new_conn
|
||||
end
|
||||
bulk_make_new_connections(1) if needs_new_connection
|
||||
end
|
||||
|
||||
# Recover lost connections for the pool. A lost connection can occur if
|
||||
|
@ -435,22 +569,117 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
def num_waiting_in_queue # :nodoc:
|
||||
@available.num_waiting
|
||||
end
|
||||
|
||||
private
|
||||
def synchronized_connection_retrieval
|
||||
conn = checkout
|
||||
#--
|
||||
# this is unfortunately not concurrent
|
||||
def bulk_make_new_connections(num_new_conns_needed)
|
||||
num_new_conns_needed.times do
|
||||
# try_to_checkout_new_connection will not exceed pool's @size limit
|
||||
if new_conn = try_to_checkout_new_connection
|
||||
# make the new_conn available to the starving threads stuck @available Queue
|
||||
checkin(new_conn)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
#--
|
||||
# From the discussion on Github:
|
||||
# https://github.com/rails/rails/pull/14938#commitcomment-6601951
|
||||
# This hook-in method allows for easier monkey-patching fixes needed by
|
||||
# JRuby users that use Fibers.
|
||||
def connection_cache_key(thread)
|
||||
thread
|
||||
end
|
||||
|
||||
# Take control of all existing connections so a "group" action such as
|
||||
# reload/disconnect can be performed safely. It is no longer enough to
|
||||
# wrap it in +synchronize+ because some pool's actions are allowed
|
||||
# to be performed outside of the main +synchronize+ block.
|
||||
def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
|
||||
with_new_connections_blocked do
|
||||
attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
|
||||
collected_conns = synchronize do
|
||||
# account for our own connections
|
||||
@connections.select {|conn| conn.owner == Thread.current}
|
||||
end
|
||||
|
||||
newly_checked_out = []
|
||||
timeout_time = Time.now + (@checkout_timeout * 2)
|
||||
|
||||
@available.with_a_bias_for(Thread.current) do
|
||||
while true
|
||||
synchronize do
|
||||
return if collected_conns.size == @connections.size && @now_connecting == 0
|
||||
remaining_timeout = timeout_time - Time.now
|
||||
remaining_timeout = 0 if remaining_timeout < 0
|
||||
conn = checkout_for_exclusive_access(remaining_timeout)
|
||||
collected_conns << conn
|
||||
newly_checked_out << conn
|
||||
end
|
||||
end
|
||||
end
|
||||
rescue ExclusiveConnectionTimeoutError
|
||||
# `raise_on_acquisition_timeout == false` means we are directed to ignore any
|
||||
# timeouts and are expected to just give up: we've obtained as many connections
|
||||
# as possible, note that in a case like that we don't return any of the
|
||||
# `newly_checked_out` connections.
|
||||
|
||||
if raise_on_acquisition_timeout
|
||||
release_newly_checked_out = true
|
||||
raise
|
||||
end
|
||||
rescue Exception # if something else went wrong
|
||||
# this can't be a "naked" rescue, because we have should return conns
|
||||
# even for non-StandardErrors
|
||||
release_newly_checked_out = true
|
||||
raise
|
||||
ensure
|
||||
if release_newly_checked_out && newly_checked_out
|
||||
# releasing only those conns that were checked out in this method, conns
|
||||
# checked outside this method (before it was called) are not for us to release
|
||||
newly_checked_out.each {|conn| checkin(conn)}
|
||||
end
|
||||
end
|
||||
|
||||
#--
|
||||
# Must be called in a synchronize block.
|
||||
def checkout_for_exclusive_access(checkout_timeout)
|
||||
checkout(checkout_timeout)
|
||||
rescue ConnectionTimeoutError
|
||||
# this block can't be easily moved into attempt_to_checkout_all_existing_connections's
|
||||
# rescue block, because doing so would put it outside of synchronize section, without
|
||||
# being in a critical section thread_report might become inaccurate
|
||||
msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds"
|
||||
|
||||
thread_report = []
|
||||
@connections.each do |conn|
|
||||
unless conn.owner == Thread.current
|
||||
thread_report << "#{conn} is owned by #{conn.owner}"
|
||||
end
|
||||
end
|
||||
|
||||
msg << " (#{thread_report.join(', ')})" if thread_report.any?
|
||||
|
||||
raise ExclusiveConnectionTimeoutError, msg
|
||||
end
|
||||
|
||||
def with_new_connections_blocked
|
||||
previous_value = nil
|
||||
synchronize do # re-checking under lock for correct DCL semantics
|
||||
# Cache#put_if_absent returns either `nil` (if insertion was successful, ie there was
|
||||
# no previous current_connection_id mapping) or an existing value (if insertion
|
||||
# failed because there already was a current_connection_id mapping)
|
||||
previous_value = @reserved_connections.put_if_absent(current_connection_id, conn)
|
||||
end
|
||||
if previous_value # if we were too late and insertion failed
|
||||
checkin(conn)
|
||||
previous_value
|
||||
else
|
||||
conn
|
||||
synchronize do
|
||||
previous_value, @new_cons_enabled = @new_cons_enabled, false
|
||||
end
|
||||
yield
|
||||
ensure
|
||||
synchronize { @new_cons_enabled = previous_value }
|
||||
end
|
||||
|
||||
# Acquire a connection by one of 1) immediately removing one
|
||||
|
@ -464,7 +693,7 @@ module ActiveRecord
|
|||
#--
|
||||
# Implementation detail: the connection returned by +acquire_connection+
|
||||
# will already be "+connection.lease+ -ed" to the current thread.
|
||||
def acquire_connection
|
||||
def acquire_connection(checkout_timeout)
|
||||
# NOTE: we rely on `@available.poll` and `try_to_checkout_new_connection` to
|
||||
# `conn.lease` the returned connection (and to do this in a `synchronized`
|
||||
# section), this is not the cleanest implementation, as ideally we would
|
||||
|
@ -475,17 +704,16 @@ module ActiveRecord
|
|||
conn
|
||||
else
|
||||
reap
|
||||
@available.poll(@checkout_timeout)
|
||||
@available.poll(checkout_timeout)
|
||||
end
|
||||
end
|
||||
|
||||
def release(conn, owner)
|
||||
thread_id = owner.object_id
|
||||
|
||||
if @reserved_connections[thread_id] == conn
|
||||
@reserved_connections.delete thread_id
|
||||
end
|
||||
#--
|
||||
# if owner_thread param is omitted, this must be called in synchronize block
|
||||
def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
|
||||
@thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
|
||||
end
|
||||
alias_method :release, :remove_connection_from_thread_cache
|
||||
|
||||
def new_connection
|
||||
Base.send(spec.adapter_method, spec.config).tap do |conn|
|
||||
|
@ -493,10 +721,6 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
def current_connection_id #:nodoc:
|
||||
Base.connection_id ||= Thread.current.object_id
|
||||
end
|
||||
|
||||
# If the pool is not at a +@size+ limit, establish new connection. Connecting
|
||||
# to the DB is done outside main synchronized section.
|
||||
#--
|
||||
|
@ -507,7 +731,7 @@ module ActiveRecord
|
|||
# and increment @now_connecting, to prevent overstepping this pool's @size
|
||||
# constraint
|
||||
do_checkout = synchronize do
|
||||
if (@connections.size + @now_connecting) < @size
|
||||
if @new_cons_enabled && (@connections.size + @now_connecting) < @size
|
||||
@now_connecting += 1
|
||||
end
|
||||
end
|
||||
|
|
|
@ -88,7 +88,7 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def connection_id
|
||||
ActiveRecord::RuntimeRegistry.connection_id
|
||||
ActiveRecord::RuntimeRegistry.connection_id ||= Thread.current.object_id
|
||||
end
|
||||
|
||||
def connection_id=(connection_id)
|
||||
|
|
|
@ -100,7 +100,7 @@ module ActiveRecord
|
|||
t = Thread.new { @pool.checkout }
|
||||
|
||||
# make sure our thread is in the timeout section
|
||||
Thread.pass until t.status == "sleep"
|
||||
Thread.pass until @pool.num_waiting_in_queue == 1
|
||||
|
||||
connection = cs.first
|
||||
connection.close
|
||||
|
@ -112,7 +112,7 @@ module ActiveRecord
|
|||
t = Thread.new { @pool.checkout }
|
||||
|
||||
# make sure our thread is in the timeout section
|
||||
Thread.pass until t.status == "sleep"
|
||||
Thread.pass until @pool.num_waiting_in_queue == 1
|
||||
|
||||
connection = cs.first
|
||||
@pool.remove connection
|
||||
|
@ -234,7 +234,7 @@ module ActiveRecord
|
|||
mutex.synchronize { errors << e }
|
||||
end
|
||||
}
|
||||
Thread.pass until t.status == "sleep"
|
||||
Thread.pass until @pool.num_waiting_in_queue == i
|
||||
t
|
||||
end
|
||||
|
||||
|
@ -271,7 +271,7 @@ module ActiveRecord
|
|||
mutex.synchronize { errors << e }
|
||||
end
|
||||
}
|
||||
Thread.pass until t.status == "sleep"
|
||||
Thread.pass until @pool.num_waiting_in_queue == i
|
||||
t
|
||||
end
|
||||
|
||||
|
@ -390,6 +390,134 @@ module ActiveRecord
|
|||
connecting_threads.map(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
|
||||
@pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
|
||||
[:disconnect, :clear_reloadable_connections].each do |group_action_method|
|
||||
@pool.with_connection do |connection|
|
||||
assert_raises(ExclusiveConnectionTimeoutError) do
|
||||
Thread.new { @pool.send(group_action_method) }.join
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
|
||||
[:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
|
||||
begin
|
||||
thread = timed_join_result = nil
|
||||
@pool.with_connection do |connection|
|
||||
thread = Thread.new { @pool.send(group_action_method) }
|
||||
|
||||
# give the other `thread` some time to get stuck in `group_action_method`
|
||||
timed_join_result = thread.join(0.3)
|
||||
# thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
|
||||
# release our connection
|
||||
assert_nil timed_join_result
|
||||
|
||||
# assert that since this is within default timeout our connection hasn't been forcefully taken away from us
|
||||
assert @pool.active_connection?
|
||||
end
|
||||
ensure
|
||||
thread.join if thread && !timed_join_result # clean up the other thread
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_aquire_all_connections_proceed_anyway
|
||||
@pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
|
||||
[:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
|
||||
@pool.with_connection do |connection|
|
||||
Thread.new { @pool.send(group_action_method) }.join
|
||||
# assert connection has been forcefully taken away from us
|
||||
assert_not @pool.active_connection?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
|
||||
with_single_connection_pool do |pool|
|
||||
[:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
|
||||
conn = pool.connection # drain the only available connection
|
||||
second_thread_done = ActiveSupport::Concurrency::Latch.new
|
||||
|
||||
# create a first_thread and let it get into the FIFO queue first
|
||||
first_thread = Thread.new do
|
||||
pool.with_connection { second_thread_done.await }
|
||||
end
|
||||
|
||||
# wait for first_thread to get in queue
|
||||
Thread.pass until pool.num_waiting_in_queue == 1
|
||||
|
||||
# create a different, later thread, that will attempt to do a "group action",
|
||||
# but because of the group action semantics it should be able to preempt the
|
||||
# first_thread when a connection is made available
|
||||
second_thread = Thread.new do
|
||||
pool.send(group_action_method)
|
||||
second_thread_done.release
|
||||
end
|
||||
|
||||
# wait for second_thread to get in queue
|
||||
Thread.pass until pool.num_waiting_in_queue == 2
|
||||
|
||||
# return the only available connection
|
||||
pool.checkin(conn)
|
||||
|
||||
# if the second_thread is not able to preempt the first_thread,
|
||||
# they will temporarily (until either of them timeouts with ConnectionTimeoutError)
|
||||
# deadlock and a join(2) timeout will be reached
|
||||
failed = true unless second_thread.join(2)
|
||||
|
||||
#--- post test clean up start
|
||||
second_thread_done.release if failed
|
||||
|
||||
# after `pool.disconnect()` the first thread will be left stuck in queue, no need to wait for
|
||||
# it to timeout with ConnectionTimeoutError
|
||||
if (group_action_method == :disconnect || group_action_method == :disconnect!) && pool.num_waiting_in_queue > 0
|
||||
pool.with_connection {} # create a new connection in case there are threads still stuck in a queue
|
||||
end
|
||||
|
||||
first_thread.join
|
||||
second_thread.join
|
||||
#--- post test clean up end
|
||||
|
||||
flunk "#{group_action_method} is not able to preempt other waiting threads" if failed
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
|
||||
with_single_connection_pool do |pool|
|
||||
conn = pool.connection # drain the only available connection
|
||||
def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
|
||||
true
|
||||
end
|
||||
|
||||
stuck_thread = Thread.new do
|
||||
pool.with_connection {}
|
||||
end
|
||||
|
||||
# wait for stuck_thread to get in queue
|
||||
Thread.pass until pool.num_waiting_in_queue == 1
|
||||
|
||||
pool.clear_reloadable_connections
|
||||
|
||||
unless stuck_thread.join(2)
|
||||
flunk 'clear_reloadable_connections must not let other connection waiting threads get stuck in queue'
|
||||
end
|
||||
|
||||
assert_equal 0, pool.num_waiting_in_queue
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def with_single_connection_pool
|
||||
one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
|
||||
one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
|
||||
yield(pool = ConnectionPool.new(one_conn_spec))
|
||||
ensure
|
||||
pool.disconnect! if pool
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue