mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Make connection pool fair with respect to waiting threads.
This commit is contained in:
parent
7c95be54b4
commit
d06674d5df
3 changed files with 202 additions and 35 deletions
|
@ -2,7 +2,6 @@ require 'thread'
|
|||
require 'monitor'
|
||||
require 'set'
|
||||
require 'active_support/core_ext/module/deprecation'
|
||||
require 'timeout'
|
||||
|
||||
module ActiveRecord
|
||||
# Raised when a connection could not be obtained within the connection
|
||||
|
@ -92,21 +91,6 @@ module ActiveRecord
|
|||
attr_accessor :automatic_reconnect, :timeout
|
||||
attr_reader :spec, :connections, :size, :reaper
|
||||
|
||||
class Latch # :nodoc:
|
||||
def initialize
|
||||
@mutex = Mutex.new
|
||||
@cond = ConditionVariable.new
|
||||
end
|
||||
|
||||
def release
|
||||
@mutex.synchronize { @cond.broadcast }
|
||||
end
|
||||
|
||||
def await
|
||||
@mutex.synchronize { @cond.wait @mutex }
|
||||
end
|
||||
end
|
||||
|
||||
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
|
||||
# object which describes database connection information (e.g. adapter,
|
||||
# host name, username, password, etc), as well as the maximum size for
|
||||
|
@ -128,9 +112,25 @@ module ActiveRecord
|
|||
# default max pool size to 5
|
||||
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
|
||||
|
||||
@latch = Latch.new
|
||||
@connections = []
|
||||
@automatic_reconnect = true
|
||||
|
||||
# connections available to be checked out
|
||||
@available = []
|
||||
|
||||
# number of threads waiting to check out a connection
|
||||
@num_waiting = 0
|
||||
|
||||
# signal threads waiting
|
||||
@cond = new_cond
|
||||
end
|
||||
|
||||
# Hack for tests to be able to add connections. Do not call outside of tests
|
||||
def insert_connection_for_test!(c)
|
||||
synchronize do
|
||||
@connections << c
|
||||
@available << c
|
||||
end
|
||||
end
|
||||
|
||||
# Retrieve the connection associated with the current thread, or call
|
||||
|
@ -188,6 +188,7 @@ module ActiveRecord
|
|||
conn.disconnect!
|
||||
end
|
||||
@connections = []
|
||||
@available = []
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -202,6 +203,9 @@ module ActiveRecord
|
|||
@connections.delete_if do |conn|
|
||||
conn.requires_reloading?
|
||||
end
|
||||
@available.delete_if do |conn|
|
||||
conn.requires_reloading?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -225,23 +229,19 @@ module ActiveRecord
|
|||
# Raises:
|
||||
# - PoolFullError: no connection can be obtained from the pool.
|
||||
def checkout
|
||||
loop do
|
||||
# Checkout an available connection
|
||||
synchronize do
|
||||
# Try to find a connection that hasn't been leased, and lease it
|
||||
conn = connections.find { |c| c.lease }
|
||||
synchronize do
|
||||
conn = nil
|
||||
|
||||
# If all connections were leased, and we have room to expand,
|
||||
# create a new connection and lease it.
|
||||
if !conn && connections.size < size
|
||||
conn = checkout_new_connection
|
||||
conn.lease
|
||||
end
|
||||
|
||||
return checkout_and_verify(conn) if conn
|
||||
if @num_waiting == 0
|
||||
conn = acquire_connection
|
||||
end
|
||||
|
||||
Timeout.timeout(@timeout, PoolFullError) { @latch.await }
|
||||
unless conn
|
||||
conn = wait_until(@timeout) { acquire_connection }
|
||||
end
|
||||
|
||||
conn.lease
|
||||
checkout_and_verify(conn)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -257,8 +257,10 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
release conn
|
||||
|
||||
@available.unshift conn
|
||||
@cond.signal
|
||||
end
|
||||
@latch.release
|
||||
end
|
||||
|
||||
# Remove a connection from the connection pool. The connection will
|
||||
|
@ -266,12 +268,14 @@ module ActiveRecord
|
|||
def remove(conn)
|
||||
synchronize do
|
||||
@connections.delete conn
|
||||
@available.delete conn
|
||||
|
||||
# FIXME: we might want to store the key on the connection so that removing
|
||||
# from the reserved hash will be a little easier.
|
||||
release conn
|
||||
|
||||
@cond.signal # can make a new connection now
|
||||
end
|
||||
@latch.release
|
||||
end
|
||||
|
||||
# Removes dead connections from the pool. A dead connection can occur
|
||||
|
@ -283,12 +287,60 @@ module ActiveRecord
|
|||
connections.dup.each do |conn|
|
||||
remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
|
||||
end
|
||||
@cond.broadcast # may violate fairness
|
||||
end
|
||||
@latch.release
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# Take an available connection or, if possible, create a new
|
||||
# one, or nil.
|
||||
#
|
||||
# Monitor must be held while calling this method.
|
||||
#
|
||||
# Returns: a newly acquired connection.
|
||||
def acquire_connection
|
||||
if @available.any?
|
||||
@available.pop
|
||||
elsif connections.size < size
|
||||
checkout_new_connection
|
||||
end
|
||||
end
|
||||
|
||||
# Wait on +@cond+ until the block returns non-nil. Note that
|
||||
# unlike MonitorMixin::ConditionVariable#wait_until, this method
|
||||
# does not test the block before the first wait period.
|
||||
#
|
||||
# Monitor must be held when calling this method.
|
||||
#
|
||||
# +timeout+: Integer timeout in seconds
|
||||
#
|
||||
# Returns: the result of the block
|
||||
#
|
||||
# Raises:
|
||||
# - PoolFullError: timeout elapsed before +&block+ returned a connection
|
||||
def wait_until(timeout, &block)
|
||||
@num_waiting += 1
|
||||
begin
|
||||
t0 = Time.now
|
||||
loop do
|
||||
elapsed = Time.now - t0
|
||||
if elapsed >= timeout
|
||||
msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
|
||||
[timeout, elapsed]
|
||||
raise PoolFullError, msg
|
||||
end
|
||||
|
||||
@cond.wait(timeout - elapsed)
|
||||
|
||||
conn = yield
|
||||
return conn if conn
|
||||
end
|
||||
ensure
|
||||
@num_waiting -= 1
|
||||
end
|
||||
end
|
||||
|
||||
def release(conn)
|
||||
thread_id = if @reserved_connections[current_connection_id] == conn
|
||||
current_connection_id
|
||||
|
|
|
@ -36,7 +36,7 @@ module ActiveRecord
|
|||
|
||||
def test_close
|
||||
pool = ConnectionPool.new(ConnectionSpecification.new({}, nil))
|
||||
pool.connections << adapter
|
||||
pool.insert_connection_for_test! adapter
|
||||
adapter.pool = pool
|
||||
|
||||
# Make sure the pool marks the connection in use
|
||||
|
|
|
@ -200,6 +200,121 @@ module ActiveRecord
|
|||
end.join
|
||||
end
|
||||
|
||||
# The connection pool is "fair" if threads waiting for
|
||||
# connections receive them the order in which they began
|
||||
# waiting. This ensures that we don't timeout one HTTP request
|
||||
# even while well under capacity in a multi-threaded environment
|
||||
# such as a Java servlet container.
|
||||
#
|
||||
# We don't need strict fairness: if two connections become
|
||||
# available at the same time, it's fine of two threads that were
|
||||
# waiting acquire the connections out of order.
|
||||
#
|
||||
# Thus this test prepares waiting threads and then trickles in
|
||||
# available connections slowly, ensuring the wakeup order is
|
||||
# correct in this case.
|
||||
#
|
||||
# Try a few times since it might work out just by chance.
|
||||
def test_checkout_fairness
|
||||
4.times { setup; do_checkout_fairness }
|
||||
end
|
||||
|
||||
def do_checkout_fairness
|
||||
expected = (1..@pool.size).to_a.freeze
|
||||
# check out all connections so our threads start out waiting
|
||||
conns = expected.map { @pool.checkout }
|
||||
mutex = Mutex.new
|
||||
order = []
|
||||
errors = []
|
||||
|
||||
threads = expected.map do |i|
|
||||
t = Thread.new {
|
||||
begin
|
||||
conn = @pool.checkout # never checked back in
|
||||
mutex.synchronize { order << i }
|
||||
rescue => e
|
||||
mutex.synchronize { errors << e }
|
||||
end
|
||||
}
|
||||
Thread.pass until t.status == "sleep"
|
||||
t
|
||||
end
|
||||
|
||||
# this should wake up the waiting threads one by one in order
|
||||
conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
|
||||
|
||||
threads.each(&:join)
|
||||
|
||||
raise errors.first if errors.any?
|
||||
|
||||
assert_equal(expected, order)
|
||||
end
|
||||
|
||||
# As mentioned in #test_checkout_fairness, we don't care about
|
||||
# strict fairness. This test creates two groups of threads:
|
||||
# group1 whose members all start waiting before any thread in
|
||||
# group2. Enough connections are checked in to wakeup all
|
||||
# group1 threads, and the fact that only group1 and no group2
|
||||
# threads acquired a connection is enforced.
|
||||
#
|
||||
# Try a few times since it might work out just by chance.
|
||||
def test_checkout_fairness_by_group
|
||||
4.times { setup; do_checkout_fairness_by_group }
|
||||
end
|
||||
|
||||
def do_checkout_fairness_by_group
|
||||
@pool.instance_variable_set(:@size, 10)
|
||||
# take all the connections
|
||||
conns = (1..10).map { @pool.checkout }
|
||||
mutex = Mutex.new
|
||||
successes = [] # threads that successfully got a connection
|
||||
errors = []
|
||||
|
||||
make_thread = proc do |i|
|
||||
t = Thread.new {
|
||||
begin
|
||||
conn = @pool.checkout # never checked back in
|
||||
mutex.synchronize { successes << i }
|
||||
rescue => e
|
||||
mutex.synchronize { errors << e }
|
||||
end
|
||||
}
|
||||
Thread.pass until t.status == "sleep"
|
||||
t
|
||||
end
|
||||
|
||||
# all group1 threads start waiting before any in group2
|
||||
group1 = (1..5).map(&make_thread)
|
||||
group2 = (6..10).map(&make_thread)
|
||||
|
||||
# checkin n connections back to the pool
|
||||
checkin = proc do |n|
|
||||
n.times do
|
||||
c = conns.pop
|
||||
@pool.checkin(c)
|
||||
end
|
||||
end
|
||||
|
||||
checkin.call(group1.size) # should wake up all group1
|
||||
|
||||
loop do
|
||||
sleep 0.1
|
||||
break if mutex.synchronize { (successes.size + errors.size) == group1.size }
|
||||
end
|
||||
|
||||
winners = mutex.synchronize { successes.dup }
|
||||
checkin.call(group2.size) # should wake up everyone remaining
|
||||
|
||||
group1.each(&:join)
|
||||
group2.each(&:join)
|
||||
|
||||
assert_equal((1..group1.size).to_a, winners.sort)
|
||||
|
||||
if errors.any?
|
||||
raise errors.first
|
||||
end
|
||||
end
|
||||
|
||||
def test_automatic_reconnect=
|
||||
pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
|
||||
assert pool.automatic_reconnect
|
||||
|
|
Loading…
Reference in a new issue