mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Make clear_active_connections! also return stale connections back to the pool
- also clean up some cruft remaining from per-thread connection cache
This commit is contained in:
parent
212134dce1
commit
d07a6b1a4a
6 changed files with 112 additions and 129 deletions
|
@ -60,11 +60,11 @@ module ActiveRecord
|
|||
# #connection can be called any number of times; the connection is
|
||||
# held in a hash keyed by the thread id.
|
||||
def connection
|
||||
if conn = @reserved_connections[active_connection_name]
|
||||
if conn = @reserved_connections[current_connection_id]
|
||||
conn.verify!(verification_timeout)
|
||||
conn
|
||||
else
|
||||
@reserved_connections[active_connection_name] = checkout
|
||||
@reserved_connections[current_connection_id] = checkout
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -72,7 +72,7 @@ module ActiveRecord
|
|||
# #release_connection releases the connection-thread association
|
||||
# and returns the connection to the pool.
|
||||
def release_connection
|
||||
conn = @reserved_connections.delete(active_connection_name)
|
||||
conn = @reserved_connections.delete(current_connection_id)
|
||||
checkin conn if conn
|
||||
end
|
||||
|
||||
|
@ -118,14 +118,20 @@ module ActiveRecord
|
|||
# Verify active connections and remove and disconnect connections
|
||||
# associated with stale threads.
|
||||
def verify_active_connections! #:nodoc:
|
||||
remove_stale_cached_threads!(@reserved_connections) do |name, conn|
|
||||
checkin conn
|
||||
end
|
||||
clear_stale_cached_connections!
|
||||
connections.each do |connection|
|
||||
connection.verify!(verification_timeout)
|
||||
end
|
||||
end
|
||||
|
||||
# Return any checked-out connections back to the pool by threads that
|
||||
# are no longer alive.
|
||||
def clear_stale_cached_connections!
|
||||
remove_stale_cached_threads!(@reserved_connections) do |name, conn|
|
||||
checkin conn
|
||||
end
|
||||
end
|
||||
|
||||
# Check-out a database connection from the pool.
|
||||
def checkout
|
||||
raise NotImplementedError, "checkout is an abstract method"
|
||||
|
@ -156,7 +162,7 @@ module ActiveRecord
|
|||
ActiveRecord::Base.send(spec.adapter_method, config)
|
||||
end
|
||||
|
||||
def active_connection_name #:nodoc:
|
||||
def current_connection_id #:nodoc:
|
||||
Thread.current.object_id
|
||||
end
|
||||
|
||||
|
@ -178,12 +184,6 @@ module ActiveRecord
|
|||
# NewConnectionEveryTime is a simple implementation: always
|
||||
# create/disconnect on checkout/checkin.
|
||||
class NewConnectionEveryTime < ConnectionPool
|
||||
def active_connection
|
||||
@reserved_connections[active_connection_name]
|
||||
end
|
||||
|
||||
def active_connections; @reserved_connections; end
|
||||
|
||||
def checkout
|
||||
new_connection
|
||||
end
|
||||
|
@ -288,19 +288,14 @@ module ActiveRecord
|
|||
@connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec)
|
||||
end
|
||||
|
||||
# for internal use only and for testing;
|
||||
# only works with ConnectionPerThread pool class
|
||||
def active_connections #:nodoc:
|
||||
@connection_pools.inject({}) do |hash,kv|
|
||||
hash[kv.first] = kv.last.active_connection
|
||||
hash.delete(kv.first) unless hash[kv.first]
|
||||
hash
|
||||
end
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes to connections.
|
||||
# Returns any connections in use by the current thread back to the pool,
|
||||
# and also returns connections to the pool cached by threads that are no
|
||||
# longer alive.
|
||||
def clear_active_connections!
|
||||
@connection_pools.each_value {|pool| pool.release_connection }
|
||||
@connection_pools.each_value do |pool|
|
||||
pool.release_connection
|
||||
pool.clear_stale_cached_connections!
|
||||
end
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes
|
||||
|
@ -353,11 +348,9 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
# Apply monitor to all public methods that access the pool.
|
||||
synchronize :establish_connection, :retrieve_connection,
|
||||
:connected?, :remove_connection, :active_connections,
|
||||
:clear_active_connections!, :clear_reloadable_connections!,
|
||||
:clear_all_connections!, :verify_active_connections!,
|
||||
:with => :connection_pools_lock
|
||||
synchronize :establish_connection, :retrieve_connection, :connected?, :remove_connection,
|
||||
:clear_active_connections!, :clear_reloadable_connections!, :clear_all_connections!,
|
||||
:verify_active_connections!, :with => :connection_pools_lock
|
||||
end
|
||||
end
|
||||
end
|
|
@ -124,9 +124,8 @@ module ActiveRecord
|
|||
connection_handler.remove_connection(klass)
|
||||
end
|
||||
|
||||
delegate :active_connections, :clear_active_connections!,
|
||||
:clear_reloadable_connections!, :clear_all_connections!,
|
||||
:verify_active_connections!, :to => :connection_handler
|
||||
delegate :clear_active_connections!, :clear_reloadable_connections!,
|
||||
:clear_all_connections!,:verify_active_connections!, :to => :connection_handler
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -280,7 +280,6 @@ unless current_adapter?(:SQLServerAdapter, :SybaseAdapter, :OpenBaseAdapter)
|
|||
sleep zzz # block thread 2 for zzz seconds
|
||||
end
|
||||
t1 = Time.now
|
||||
Person.clear_active_connections!
|
||||
end
|
||||
|
||||
b = Thread.new do
|
||||
|
@ -288,7 +287,6 @@ unless current_adapter?(:SQLServerAdapter, :SybaseAdapter, :OpenBaseAdapter)
|
|||
t2 = Time.now
|
||||
Person.transaction { yield }
|
||||
t3 = Time.now
|
||||
Person.clear_active_connections!
|
||||
end
|
||||
|
||||
a.join
|
||||
|
|
87
activerecord/test/cases/pooled_connections_test.rb
Normal file
87
activerecord/test/cases/pooled_connections_test.rb
Normal file
|
@ -0,0 +1,87 @@
|
|||
require "cases/helper"
|
||||
|
||||
class PooledConnectionsTest < ActiveRecord::TestCase
|
||||
def setup
|
||||
super
|
||||
@connection = ActiveRecord::Base.remove_connection
|
||||
end
|
||||
|
||||
def teardown
|
||||
ActiveRecord::Base.clear_all_connections!
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
super
|
||||
end
|
||||
|
||||
def checkout_connections
|
||||
ActiveRecord::Base.establish_connection(@connection.merge({:pool => 2, :wait_timeout => 0.3}))
|
||||
@connections = []
|
||||
@timed_out = 0
|
||||
|
||||
4.times do
|
||||
Thread.new do
|
||||
begin
|
||||
@connections << ActiveRecord::Base.connection_pool.checkout
|
||||
rescue ActiveRecord::ConnectionTimeoutError
|
||||
@timed_out += 1
|
||||
end
|
||||
end.join
|
||||
end
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkout
|
||||
checkout_connections
|
||||
assert_equal @connections.length, 2
|
||||
assert_equal @timed_out, 2
|
||||
end
|
||||
|
||||
def checkout_checkin_connections(pool_size, threads)
|
||||
ActiveRecord::Base.establish_connection(@connection.merge({:pool => pool_size, :wait_timeout => 0.5}))
|
||||
@connection_count = 0
|
||||
@timed_out = 0
|
||||
threads.times do
|
||||
Thread.new do
|
||||
begin
|
||||
conn = ActiveRecord::Base.connection_pool.checkout
|
||||
sleep 0.1
|
||||
ActiveRecord::Base.connection_pool.checkin conn
|
||||
@connection_count += 1
|
||||
rescue ActiveRecord::ConnectionTimeoutError
|
||||
@timed_out += 1
|
||||
end
|
||||
end.join
|
||||
end
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkin_one
|
||||
checkout_checkin_connections 1, 2
|
||||
assert_equal 2, @connection_count
|
||||
assert_equal 0, @timed_out
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkin_two
|
||||
checkout_checkin_connections 2, 3
|
||||
assert_equal 3, @connection_count
|
||||
assert_equal 0, @timed_out
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkout_existing_first
|
||||
ActiveRecord::Base.establish_connection(@connection.merge({:pool => 1}))
|
||||
conn_pool = ActiveRecord::Base.connection_pool
|
||||
conn = conn_pool.checkout
|
||||
conn_pool.checkin(conn)
|
||||
conn = conn_pool.checkout
|
||||
assert ActiveRecord::ConnectionAdapters::AbstractAdapter === conn
|
||||
conn_pool.checkin(conn)
|
||||
end
|
||||
end unless %w(FrontBase).include? ActiveRecord::Base.connection.adapter_name
|
||||
|
||||
class AllowConcurrencyDeprecatedTest < ActiveRecord::TestCase
|
||||
def test_allow_concurrency_is_deprecated
|
||||
assert_deprecated('ActiveRecord::Base.allow_concurrency') do
|
||||
ActiveRecord::Base.allow_concurrency
|
||||
end
|
||||
assert_deprecated('ActiveRecord::Base.allow_concurrency=') do
|
||||
ActiveRecord::Base.allow_concurrency = true
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,91 +0,0 @@
|
|||
require "cases/helper"
|
||||
require 'models/topic'
|
||||
require 'models/reply'
|
||||
|
||||
unless %w(FrontBase).include? ActiveRecord::Base.connection.adapter_name
|
||||
class ThreadedConnectionsTest < ActiveRecord::TestCase
|
||||
def test_allow_concurrency_is_deprecated
|
||||
assert_deprecated('ActiveRecord::Base.allow_concurrency') do
|
||||
ActiveRecord::Base.allow_concurrency
|
||||
end
|
||||
assert_deprecated('ActiveRecord::Base.allow_concurrency=') do
|
||||
ActiveRecord::Base.allow_concurrency = true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class PooledConnectionsTest < ActiveRecord::TestCase
|
||||
def setup
|
||||
super
|
||||
@connection = ActiveRecord::Base.remove_connection
|
||||
end
|
||||
|
||||
def teardown
|
||||
ActiveRecord::Base.clear_all_connections!
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
super
|
||||
end
|
||||
|
||||
def checkout_connections
|
||||
ActiveRecord::Base.establish_connection(@connection.merge({:pool => 2, :wait_timeout => 0.3}))
|
||||
@connections = []
|
||||
@timed_out = 0
|
||||
|
||||
4.times do
|
||||
Thread.new do
|
||||
begin
|
||||
@connections << ActiveRecord::Base.connection_pool.checkout
|
||||
rescue ActiveRecord::ConnectionTimeoutError
|
||||
@timed_out += 1
|
||||
end
|
||||
end.join
|
||||
end
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkout
|
||||
checkout_connections
|
||||
assert_equal @connections.length, 2
|
||||
assert_equal @timed_out, 2
|
||||
end
|
||||
|
||||
def checkout_checkin_connections(pool_size, threads)
|
||||
ActiveRecord::Base.establish_connection(@connection.merge({:pool => pool_size, :wait_timeout => 0.5}))
|
||||
@connection_count = 0
|
||||
@timed_out = 0
|
||||
threads.times do
|
||||
Thread.new do
|
||||
begin
|
||||
conn = ActiveRecord::Base.connection_pool.checkout
|
||||
sleep 0.1
|
||||
ActiveRecord::Base.connection_pool.checkin conn
|
||||
@connection_count += 1
|
||||
rescue ActiveRecord::ConnectionTimeoutError
|
||||
@timed_out += 1
|
||||
end
|
||||
end.join
|
||||
end
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkin_one
|
||||
checkout_checkin_connections 1, 2
|
||||
assert_equal 2, @connection_count
|
||||
assert_equal 0, @timed_out
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkin_two
|
||||
checkout_checkin_connections 2, 3
|
||||
assert_equal 3, @connection_count
|
||||
assert_equal 0, @timed_out
|
||||
end
|
||||
|
||||
def test_pooled_connection_checkout_existing_first
|
||||
ActiveRecord::Base.establish_connection(@connection.merge({:pool => 1}))
|
||||
conn_pool = ActiveRecord::Base.connection_pool
|
||||
conn = conn_pool.checkout
|
||||
conn_pool.checkin(conn)
|
||||
conn = conn_pool.checkout
|
||||
assert ActiveRecord::ConnectionAdapters::AbstractAdapter === conn
|
||||
conn_pool.checkin(conn)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -296,7 +296,6 @@ if current_adapter?(:PostgreSQLAdapter)
|
|||
topic.approved = !topic.approved?
|
||||
topic.save!
|
||||
end
|
||||
Topic.clear_active_connections!
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -332,7 +331,6 @@ if current_adapter?(:PostgreSQLAdapter)
|
|||
dev = Developer.find(1)
|
||||
assert_equal original_salary, dev.salary
|
||||
end
|
||||
Developer.clear_active_connections!
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -344,7 +342,6 @@ if current_adapter?(:PostgreSQLAdapter)
|
|||
# Always expect original salary.
|
||||
assert_equal original_salary, Developer.find(1).salary
|
||||
end
|
||||
Developer.clear_active_connections!
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in a new issue