Automatically drop all connections after fork (#166)

* Automatically drop all connections after fork

Fix: https://github.com/mperham/connection_pool/issues/165

Using connections inherited from a parent process can have
very nasty consequences and it's pretty much never desired.

This patch use the `Process._fork` API added in Ruby 3.1 to
automatically detect when a fork is happening, and when it
does, drop all existing connections.

* after_fork callback: keep instances in a weakref set

This is much faster than relying on `ObjectSpace.each_object`

Co-authored-by: Jean Boussier <jean.boussier@gmail.com>
This commit is contained in:
Jean byroot Boussier 2022-10-04 16:01:56 +02:00 committed by GitHub
parent a8bc713b0e
commit 428c06f342
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 3 deletions

View File

@ -44,6 +44,47 @@ class ConnectionPool
Wrapper.new(options, &block)
end
if Process.respond_to?(:fork)
INSTANCES = ObjectSpace::WeakMap.new
private_constant :INSTANCES
def self.after_fork
INSTANCES.values.each do |pool|
# We're on after fork, so we know all other threads are dead.
# All we need to do is to ensure the main thread doesn't have a
# checked out connection
pool.checkin(force: true)
pool.reload do |connection|
# Unfortunately we don't know what method to call to close the connection,
# so we try the most common one.
connection.close if connection.respond_to?(:close)
end
end
nil
end
if ::Process.respond_to?(:_fork) # MRI 3.1+
module ForkTracker
def _fork
pid = super
if pid == 0
ConnectionPool.after_fork
end
pid
end
end
Process.singleton_class.prepend(ForkTracker)
end
else
INSTANCES = nil
private_constant :INSTANCES
def self.after_fork
# noop
end
end
def initialize(options = {}, &block)
raise ArgumentError, "Connection pool requires a block" unless block
@ -55,6 +96,7 @@ class ConnectionPool
@available = TimedStack.new(@size, &block)
@key = :"pool-#{@available.object_id}"
@key_count = :"pool-#{@available.object_id}-count"
INSTANCES[self] = self if INSTANCES
end
def with(options = {})
@ -81,16 +123,16 @@ class ConnectionPool
end
end
def checkin
def checkin(force: false)
if ::Thread.current[@key]
if ::Thread.current[@key_count] == 1
if ::Thread.current[@key_count] == 1 || force
@available.push(::Thread.current[@key])
::Thread.current[@key] = nil
::Thread.current[@key_count] = nil
else
::Thread.current[@key_count] -= 1
end
else
elsif !force
raise ConnectionPool::Error, "no connections are checked out"
end

View File

@ -547,4 +547,42 @@ class TestConnectionPool < Minitest::Test
assert_equal(1, pool.available)
end
end
def test_after_fork_callback
skip("MRI feature") unless Process.respond_to?(:fork)
GC.start # cleanup instances created by other tests
pool = ConnectionPool.new(size: 2) { NetworkConnection.new }
prefork_connection = pool.with { |c| c }
assert_equal(prefork_connection, pool.with { |c| c })
ConnectionPool.after_fork
refute_equal(prefork_connection, pool.with { |c| c })
end
def test_after_fork_callback_checkin
skip("MRI feature") unless Process.respond_to?(:fork)
GC.start # cleanup instances created by other tests
pool = ConnectionPool.new(size: 2) { NetworkConnection.new }
prefork_connection = pool.checkout
assert_equal(prefork_connection, pool.checkout)
ConnectionPool.after_fork
refute_equal(prefork_connection, pool.checkout)
end
def test_automatic_after_fork_callback
skip("MRI 3.1 feature") unless Process.respond_to?(:_fork)
GC.start # cleanup instances created by other tests
pool = ConnectionPool.new(size: 2) { NetworkConnection.new }
prefork_connection = pool.with { |c| c }
assert_equal(prefork_connection, pool.with { |c| c })
pid = fork do
refute_equal(prefork_connection, pool.with { |c| c })
exit!(0)
end
assert_equal(prefork_connection, pool.with { |c| c })
_, status = Process.waitpid2(pid)
assert_predicate(status, :success?)
end
end