mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Refactor connection handler
ConnectionHandler will not have any knowlodge of AR models now, it will only know about the specs. Like that we can decouple the two, and allow the same model to use more than one connection. Historically, folks used to create abstract AR classes on the fly in order to have multiple connections for the same model, and override the connection methods. With this, now we can override the `specificiation_id` method in the model, to return a key, that will be used to find the connection_pool from the handler.
This commit is contained in:
parent
3bed679670
commit
b83fb84733
9 changed files with 88 additions and 83 deletions
|
@ -829,9 +829,6 @@ module ActiveRecord
|
|||
@owner_to_pool = Concurrent::Map.new(:initial_capacity => 2) do |h,k|
|
||||
h[k] = Concurrent::Map.new(:initial_capacity => 2)
|
||||
end
|
||||
@class_to_pool = Concurrent::Map.new(:initial_capacity => 2) do |h,k|
|
||||
h[k] = Concurrent::Map.new
|
||||
end
|
||||
end
|
||||
|
||||
def connection_pool_list
|
||||
|
@ -839,10 +836,8 @@ module ActiveRecord
|
|||
end
|
||||
alias :connection_pools :connection_pool_list
|
||||
|
||||
def establish_connection(owner, spec)
|
||||
@class_to_pool.clear
|
||||
raise RuntimeError, "Anonymous class is not allowed." unless owner.name
|
||||
owner_to_pool[owner.name] = ConnectionAdapters::ConnectionPool.new(spec)
|
||||
def establish_connection(spec)
|
||||
owner_to_pool[spec.id] = ConnectionAdapters::ConnectionPool.new(spec)
|
||||
end
|
||||
|
||||
# Returns true if there are any active connections among the connection
|
||||
|
@ -873,18 +868,18 @@ module ActiveRecord
|
|||
# active or defined connection: if it is the latter, it will be
|
||||
# opened and set as the active connection for the class it was defined
|
||||
# for (not necessarily the current class).
|
||||
def retrieve_connection(klass) #:nodoc:
|
||||
pool = retrieve_connection_pool(klass)
|
||||
raise ConnectionNotEstablished, "No connection pool for #{klass}" unless pool
|
||||
def retrieve_connection(spec_id) #:nodoc:
|
||||
pool = retrieve_connection_pool(spec_id)
|
||||
raise ConnectionNotEstablished, "No connection pool with id #{spec_id} found." unless pool
|
||||
conn = pool.connection
|
||||
raise ConnectionNotEstablished, "No connection for #{klass} in connection pool" unless conn
|
||||
raise ConnectionNotEstablished, "No connection for #{spec_id} in connection pool" unless conn
|
||||
conn
|
||||
end
|
||||
|
||||
# Returns true if a connection that's accessible to this class has
|
||||
# already been opened.
|
||||
def connected?(klass)
|
||||
conn = retrieve_connection_pool(klass)
|
||||
def connected?(spec_id)
|
||||
conn = retrieve_connection_pool(spec_id)
|
||||
conn && conn.connected?
|
||||
end
|
||||
|
||||
|
@ -892,9 +887,8 @@ module ActiveRecord
|
|||
# connection and the defined connection (if they exist). The result
|
||||
# can be used as an argument for establish_connection, for easily
|
||||
# re-establishing the connection.
|
||||
def remove_connection(owner)
|
||||
if pool = owner_to_pool.delete(owner.name)
|
||||
@class_to_pool.clear
|
||||
def remove_connection(spec_id)
|
||||
if pool = owner_to_pool.delete(spec_id)
|
||||
pool.automatic_reconnect = false
|
||||
pool.disconnect!
|
||||
pool.spec.config
|
||||
|
@ -910,15 +904,8 @@ module ActiveRecord
|
|||
# #fetch is significantly slower than #[]. So in the nil case, no caching will
|
||||
# take place, but that's ok since the nil case is not the common one that we wish
|
||||
# to optimise for.
|
||||
def retrieve_connection_pool(klass)
|
||||
class_to_pool[klass.name] ||= begin
|
||||
until pool = pool_for(klass)
|
||||
klass = klass.superclass
|
||||
break unless klass <= Base
|
||||
end
|
||||
|
||||
class_to_pool[klass.name] = pool
|
||||
end
|
||||
def retrieve_connection_pool(spec_id)
|
||||
pool_for(spec_id)
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -927,28 +914,24 @@ module ActiveRecord
|
|||
@owner_to_pool[Process.pid]
|
||||
end
|
||||
|
||||
def class_to_pool
|
||||
@class_to_pool[Process.pid]
|
||||
end
|
||||
|
||||
def pool_for(owner)
|
||||
owner_to_pool.fetch(owner.name) {
|
||||
if ancestor_pool = pool_from_any_process_for(owner)
|
||||
def pool_for(spec_id)
|
||||
owner_to_pool.fetch(spec_id) {
|
||||
if ancestor_pool = pool_from_any_process_for(spec_id)
|
||||
# A connection was established in an ancestor process that must have
|
||||
# subsequently forked. We can't reuse the connection, but we can copy
|
||||
# the specification and establish a new connection with it.
|
||||
establish_connection(owner, ancestor_pool.spec).tap do |pool|
|
||||
establish_connection(ancestor_pool.spec).tap do |pool|
|
||||
pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache
|
||||
end
|
||||
else
|
||||
owner_to_pool[owner.name] = nil
|
||||
owner_to_pool[spec_id] = nil
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
def pool_from_any_process_for(owner)
|
||||
owner_to_pool = @owner_to_pool.values.find { |v| v[owner.name] }
|
||||
owner_to_pool && owner_to_pool[owner.name]
|
||||
def pool_from_any_process_for(spec_id)
|
||||
owner_to_pool = @owner_to_pool.values.find { |v| v[spec_id] }
|
||||
owner_to_pool && owner_to_pool[spec_id]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,10 +3,10 @@ require 'uri'
|
|||
module ActiveRecord
|
||||
module ConnectionAdapters
|
||||
class ConnectionSpecification #:nodoc:
|
||||
attr_reader :config, :adapter_method
|
||||
attr_reader :config, :adapter_method, :id
|
||||
|
||||
def initialize(config, adapter_method)
|
||||
@config, @adapter_method = config, adapter_method
|
||||
def initialize(id, config, adapter_method)
|
||||
@config, @adapter_method, @id = config, adapter_method, id
|
||||
end
|
||||
|
||||
def initialize_dup(original)
|
||||
|
@ -164,7 +164,7 @@ module ActiveRecord
|
|||
# spec.config
|
||||
# # => { "host" => "localhost", "database" => "foo", "adapter" => "sqlite3" }
|
||||
#
|
||||
def spec(config)
|
||||
def spec(config, id = "primary")
|
||||
spec = resolve(config).symbolize_keys
|
||||
|
||||
raise(AdapterNotSpecified, "database configuration does not specify adapter") unless spec.key?(:adapter)
|
||||
|
@ -179,7 +179,7 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
adapter_method = "#{spec[:adapter]}_connection"
|
||||
ConnectionSpecification.new(spec, adapter_method)
|
||||
ConnectionSpecification.new(id, spec, adapter_method)
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -45,16 +45,19 @@ module ActiveRecord
|
|||
# The exceptions AdapterNotSpecified, AdapterNotFound and +ArgumentError+
|
||||
# may be returned on an error.
|
||||
def establish_connection(spec = nil)
|
||||
raise RuntimeError, "Anonymous class is not allowed." unless name
|
||||
|
||||
spec ||= DEFAULT_ENV.call.to_sym
|
||||
resolver = ConnectionAdapters::ConnectionSpecification::Resolver.new configurations
|
||||
spec = resolver.spec(spec)
|
||||
spec = resolver.spec(spec, self == Base ? "primary" : name)
|
||||
self.specification_id = spec.id
|
||||
|
||||
unless respond_to?(spec.adapter_method)
|
||||
raise AdapterNotFound, "database configuration specifies nonexistent #{spec.config[:adapter]} adapter"
|
||||
end
|
||||
|
||||
remove_connection
|
||||
connection_handler.establish_connection self, spec
|
||||
connection_handler.establish_connection spec
|
||||
end
|
||||
|
||||
class MergeAndResolveDefaultUrlConfig # :nodoc:
|
||||
|
@ -87,6 +90,23 @@ module ActiveRecord
|
|||
retrieve_connection
|
||||
end
|
||||
|
||||
def specification_id=(value)
|
||||
@specification_id = value
|
||||
end
|
||||
|
||||
def specification_id(fallback = true)
|
||||
return @specification_id if defined?(@specification_id)
|
||||
find_legacy_spec_id(self) if fallback
|
||||
end
|
||||
|
||||
def find_legacy_spec_id(klass)
|
||||
return "primary" if klass == Base
|
||||
if id = klass.specification_id(false)
|
||||
return id
|
||||
end
|
||||
find_legacy_spec_id(klass.superclass)
|
||||
end
|
||||
|
||||
def connection_id
|
||||
ActiveRecord::RuntimeRegistry.connection_id ||= Thread.current.object_id
|
||||
end
|
||||
|
@ -106,20 +126,20 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def connection_pool
|
||||
connection_handler.retrieve_connection_pool(self) or raise ConnectionNotEstablished
|
||||
connection_handler.retrieve_connection_pool(specification_id) or raise ConnectionNotEstablished
|
||||
end
|
||||
|
||||
def retrieve_connection
|
||||
connection_handler.retrieve_connection(self)
|
||||
connection_handler.retrieve_connection(specification_id)
|
||||
end
|
||||
|
||||
# Returns +true+ if Active Record is connected.
|
||||
def connected?
|
||||
connection_handler.connected?(self)
|
||||
connection_handler.connected?(specification_id)
|
||||
end
|
||||
|
||||
def remove_connection(klass = self)
|
||||
connection_handler.remove_connection(klass)
|
||||
def remove_connection(id = specification_id)
|
||||
connection_handler.remove_connection(id)
|
||||
end
|
||||
|
||||
def clear_cache! # :nodoc:
|
||||
|
|
|
@ -257,7 +257,7 @@ module ActiveRecord
|
|||
# Returns the Arel engine.
|
||||
def arel_engine # :nodoc:
|
||||
@arel_engine ||=
|
||||
if Base == self || connection_handler.retrieve_connection_pool(self)
|
||||
if Base == self || connection_handler.retrieve_connection_pool(specification_id)
|
||||
self
|
||||
else
|
||||
superclass.arel_engine
|
||||
|
|
|
@ -37,7 +37,7 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def test_close
|
||||
pool = Pool.new(ConnectionSpecification.new({}, nil))
|
||||
pool = Pool.new(ConnectionSpecification.new("primary", {}, nil))
|
||||
pool.insert_connection_for_test! @adapter
|
||||
@adapter.pool = pool
|
||||
|
||||
|
|
|
@ -4,49 +4,51 @@ module ActiveRecord
|
|||
module ConnectionAdapters
|
||||
class ConnectionHandlerTest < ActiveRecord::TestCase
|
||||
def setup
|
||||
@klass = Class.new(Base) { def self.name; 'klass'; end }
|
||||
@subklass = Class.new(@klass) { def self.name; 'subklass'; end }
|
||||
|
||||
@handler = ConnectionHandler.new
|
||||
@pool = @handler.establish_connection(@klass, Base.connection_pool.spec)
|
||||
resolver = ConnectionAdapters::ConnectionSpecification::Resolver.new Base.configurations
|
||||
spec = resolver.spec(:arunit)
|
||||
|
||||
@spec_id = "primary"
|
||||
@pool = @handler.establish_connection(spec)
|
||||
end
|
||||
|
||||
def test_retrieve_connection
|
||||
assert @handler.retrieve_connection(@klass)
|
||||
assert @handler.retrieve_connection(@spec_id)
|
||||
end
|
||||
|
||||
def test_active_connections?
|
||||
assert !@handler.active_connections?
|
||||
assert @handler.retrieve_connection(@klass)
|
||||
assert @handler.retrieve_connection(@spec_id)
|
||||
assert @handler.active_connections?
|
||||
@handler.clear_active_connections!
|
||||
assert !@handler.active_connections?
|
||||
end
|
||||
|
||||
def test_retrieve_connection_pool_with_ar_base
|
||||
assert_nil @handler.retrieve_connection_pool(ActiveRecord::Base)
|
||||
end
|
||||
# def test_retrieve_connection_pool_with_ar_base
|
||||
# assert_nil @handler.retrieve_connection_pool(ActiveRecord::Base)
|
||||
# end
|
||||
|
||||
def test_retrieve_connection_pool
|
||||
assert_not_nil @handler.retrieve_connection_pool(@klass)
|
||||
assert_not_nil @handler.retrieve_connection_pool(@spec_id)
|
||||
end
|
||||
|
||||
def test_retrieve_connection_pool_uses_superclass_when_no_subclass_connection
|
||||
assert_not_nil @handler.retrieve_connection_pool(@subklass)
|
||||
end
|
||||
# def test_retrieve_connection_pool_uses_superclass_when_no_subclass_connection
|
||||
# assert_not_nil @handler.retrieve_connection_pool(@subklass)
|
||||
# end
|
||||
|
||||
def test_retrieve_connection_pool_uses_superclass_pool_after_subclass_establish_and_remove
|
||||
sub_pool = @handler.establish_connection(@subklass, Base.connection_pool.spec)
|
||||
assert_same sub_pool, @handler.retrieve_connection_pool(@subklass)
|
||||
|
||||
@handler.remove_connection @subklass
|
||||
assert_same @pool, @handler.retrieve_connection_pool(@subklass)
|
||||
end
|
||||
# def test_retrieve_connection_pool_uses_superclass_pool_after_subclass_establish_and_remove
|
||||
# sub_pool = @handler.establish_connection(@subklass, Base.connection_pool.spec)
|
||||
# assert_same sub_pool, @handler.retrieve_connection_pool(@subklass)
|
||||
#
|
||||
# @handler.remove_connection @subklass
|
||||
# assert_same @pool, @handler.retrieve_connection_pool(@subklass)
|
||||
# end
|
||||
|
||||
def test_connection_pools
|
||||
assert_equal([@pool], @handler.connection_pools)
|
||||
end
|
||||
|
||||
# TODO
|
||||
if Process.respond_to?(:fork)
|
||||
def test_connection_pool_per_pid
|
||||
object_id = ActiveRecord::Base.connection.object_id
|
||||
|
@ -79,7 +81,7 @@ module ActiveRecord
|
|||
|
||||
pid = fork {
|
||||
rd.close
|
||||
pool = @handler.retrieve_connection_pool(@klass)
|
||||
pool = @handler.retrieve_connection_pool(@spec_id)
|
||||
wr.write Marshal.dump pool.schema_cache.size
|
||||
wr.close
|
||||
exit!
|
||||
|
|
|
@ -4,7 +4,7 @@ module ActiveRecord
|
|||
module ConnectionAdapters
|
||||
class ConnectionSpecificationTest < ActiveRecord::TestCase
|
||||
def test_dup_deep_copy_config
|
||||
spec = ConnectionSpecification.new({ :a => :b }, "bar")
|
||||
spec = ConnectionSpecification.new("primary", { :a => :b }, "bar")
|
||||
assert_not_equal(spec.config.object_id, spec.dup.config.object_id)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -333,14 +333,14 @@ module ActiveRecord
|
|||
|
||||
# make sure exceptions are thrown when establish_connection
|
||||
# is called with an anonymous class
|
||||
def test_anonymous_class_exception
|
||||
anonymous = Class.new(ActiveRecord::Base)
|
||||
handler = ActiveRecord::Base.connection_handler
|
||||
|
||||
assert_raises(RuntimeError) {
|
||||
handler.establish_connection anonymous, nil
|
||||
}
|
||||
end
|
||||
# def test_anonymous_class_exception
|
||||
# anonymous = Class.new(ActiveRecord::Base)
|
||||
# handler = ActiveRecord::Base.connection_handler
|
||||
#
|
||||
# assert_raises(RuntimeError) {
|
||||
# handler.establish_connection anonymous, nil
|
||||
# }
|
||||
# end
|
||||
|
||||
def test_pool_sets_connection_schema_cache
|
||||
connection = pool.checkout
|
||||
|
|
|
@ -89,8 +89,8 @@ class MultipleDbTest < ActiveRecord::TestCase
|
|||
end
|
||||
|
||||
def test_connection
|
||||
assert_equal Entrant.arel_engine.connection, Bird.arel_engine.connection
|
||||
assert_not_equal Entrant.arel_engine.connection, Course.arel_engine.connection
|
||||
assert_equal Entrant.arel_engine.connection.object_id, Bird.arel_engine.connection.object_id
|
||||
assert_not_equal Entrant.arel_engine.connection.object_id, Course.arel_engine.connection.object_id
|
||||
end
|
||||
|
||||
unless in_memory_db?
|
||||
|
|
Loading…
Reference in a new issue