mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
opening a connection will block if the pool is full
This commit is contained in:
parent
9a4bfd04a3
commit
b8f7482695
2 changed files with 62 additions and 19 deletions
|
@ -2,6 +2,7 @@ require 'thread'
|
|||
require 'monitor'
|
||||
require 'set'
|
||||
require 'active_support/core_ext/module/deprecation'
|
||||
require 'timeout'
|
||||
|
||||
module ActiveRecord
|
||||
# Raised when a connection could not be obtained within the connection
|
||||
|
@ -11,9 +12,6 @@ module ActiveRecord
|
|||
|
||||
# Raised when a connection pool is full and another connection is requested
|
||||
class PoolFullError < ConnectionNotEstablished
|
||||
def initialize size, timeout
|
||||
super("Connection pool of size #{size} and timeout #{timeout}s is full")
|
||||
end
|
||||
end
|
||||
|
||||
module ConnectionAdapters
|
||||
|
@ -94,6 +92,21 @@ module ActiveRecord
|
|||
attr_accessor :automatic_reconnect, :timeout
|
||||
attr_reader :spec, :connections, :size, :reaper
|
||||
|
||||
class Latch # :nodoc:
|
||||
def initialize
|
||||
@mutex = Mutex.new
|
||||
@cond = ConditionVariable.new
|
||||
end
|
||||
|
||||
def release
|
||||
@mutex.synchronize { @cond.broadcast }
|
||||
end
|
||||
|
||||
def await
|
||||
@mutex.synchronize { @cond.wait @mutex }
|
||||
end
|
||||
end
|
||||
|
||||
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
|
||||
# object which describes database connection information (e.g. adapter,
|
||||
# host name, username, password, etc), as well as the maximum size for
|
||||
|
@ -115,6 +128,7 @@ module ActiveRecord
|
|||
# default max pool size to 5
|
||||
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
|
||||
|
||||
@latch = Latch.new
|
||||
@connections = []
|
||||
@automatic_reconnect = true
|
||||
end
|
||||
|
@ -139,9 +153,11 @@ module ActiveRecord
|
|||
# #release_connection releases the connection-thread association
|
||||
# and returns the connection to the pool.
|
||||
def release_connection(with_id = current_connection_id)
|
||||
synchronize do
|
||||
conn = @reserved_connections.delete(with_id)
|
||||
checkin conn if conn
|
||||
end
|
||||
end
|
||||
|
||||
# If a connection already exists yield it to the block. If no connection
|
||||
# exists checkout a connection, yield it to the block, and checkin the
|
||||
|
@ -205,6 +221,7 @@ module ActiveRecord
|
|||
# Raises:
|
||||
# - PoolFullError: no connection can be obtained from the pool.
|
||||
def checkout
|
||||
loop do
|
||||
# Checkout an available connection
|
||||
synchronize do
|
||||
# Try to find a connection that hasn't been leased, and lease it
|
||||
|
@ -217,11 +234,10 @@ module ActiveRecord
|
|||
conn.lease
|
||||
end
|
||||
|
||||
if conn
|
||||
checkout_and_verify conn
|
||||
else
|
||||
raise PoolFullError.new(size, timeout)
|
||||
return checkout_and_verify(conn) if conn
|
||||
end
|
||||
|
||||
Timeout.timeout(@timeout, PoolFullError) { @latch.await }
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -238,6 +254,7 @@ module ActiveRecord
|
|||
|
||||
release conn
|
||||
end
|
||||
@latch.release
|
||||
end
|
||||
|
||||
# Remove a connection from the connection pool. The connection will
|
||||
|
@ -250,6 +267,7 @@ module ActiveRecord
|
|||
# from the reserved hash will be a little easier.
|
||||
release conn
|
||||
end
|
||||
@latch.release
|
||||
end
|
||||
|
||||
# Removes dead connections from the pool. A dead connection can occur
|
||||
|
@ -262,6 +280,7 @@ module ActiveRecord
|
|||
remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
|
||||
end
|
||||
end
|
||||
@latch.release
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -96,6 +96,30 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
def test_full_pool_blocks
|
||||
cs = @pool.size.times.map { @pool.checkout }
|
||||
t = Thread.new { @pool.checkout }
|
||||
|
||||
# make sure our thread is in the timeout section
|
||||
Thread.pass until t.status == "sleep"
|
||||
|
||||
connection = cs.first
|
||||
connection.close
|
||||
assert_equal connection, t.join.value
|
||||
end
|
||||
|
||||
def test_removing_releases_latch
|
||||
cs = @pool.size.times.map { @pool.checkout }
|
||||
t = Thread.new { @pool.checkout }
|
||||
|
||||
# make sure our thread is in the timeout section
|
||||
Thread.pass until t.status == "sleep"
|
||||
|
||||
connection = cs.first
|
||||
@pool.remove connection
|
||||
assert_respond_to t.join.value, :execute
|
||||
end
|
||||
|
||||
def test_reap_and_active
|
||||
@pool.checkout
|
||||
@pool.checkout
|
||||
|
|
Loading…
Reference in a new issue