mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Ensure reaper threads respawn in forks
This broke in 3e2e8ee
where it switched to one reaper thread per
process. However, the implementation will only spawn the reaping thread
if the reaping frequency has not been seen. Since the reaping
frequencies are stored in a class instance variable, that variable is
copied when the process is forked. As such, a forked process will not
spawn a new reaping thread since the class instance variable would have
contain the reaping frequency that was seen in the parent process.
This commit tracks threads separately and checks that they both have
been spawned and are currently alive.
This adds a failing test to reaper_test.rb, based on the previous test
without forking. It also improves the test to return an failure instead
of hanging forever if the reaper is not started.
Co-authored-by: Guo Xiang Tan <tgx_world@hotmail.com>
This commit is contained in:
parent
4c07b43102
commit
c72b77bc54
2 changed files with 53 additions and 12 deletions
|
@ -316,14 +316,15 @@ module ActiveRecord
|
|||
|
||||
@mutex = Mutex.new
|
||||
@pools = {}
|
||||
@threads = {}
|
||||
|
||||
class << self
|
||||
def register_pool(pool, frequency) # :nodoc:
|
||||
@mutex.synchronize do
|
||||
unless @pools.key?(frequency)
|
||||
@pools[frequency] = []
|
||||
spawn_thread(frequency)
|
||||
unless @threads[frequency]&.alive?
|
||||
@threads[frequency] = spawn_thread(frequency)
|
||||
end
|
||||
@pools[frequency] ||= []
|
||||
@pools[frequency] << WeakRef.new(pool)
|
||||
end
|
||||
end
|
||||
|
@ -332,7 +333,7 @@ module ActiveRecord
|
|||
def spawn_thread(frequency)
|
||||
Thread.new(frequency) do |t|
|
||||
running = true
|
||||
while running do
|
||||
while running
|
||||
sleep t
|
||||
@mutex.synchronize do
|
||||
@pools[frequency].select!(&:weakref_alive?)
|
||||
|
@ -344,6 +345,7 @@ module ActiveRecord
|
|||
|
||||
if @pools[frequency].empty?
|
||||
@pools.delete(frequency)
|
||||
@threads.delete(frequency)
|
||||
running = false
|
||||
end
|
||||
end
|
||||
|
|
|
@ -72,21 +72,60 @@ module ActiveRecord
|
|||
|
||||
pool = ConnectionPool.new spec
|
||||
|
||||
conn = nil
|
||||
child = Thread.new do
|
||||
conn = pool.checkout
|
||||
Thread.stop
|
||||
end
|
||||
Thread.pass while conn.nil?
|
||||
conn, child = new_conn_in_thread(pool)
|
||||
|
||||
assert_predicate conn, :in_use?
|
||||
|
||||
child.terminate
|
||||
|
||||
while conn.in_use?
|
||||
wait_for_conn_idle(conn)
|
||||
assert_not_predicate conn, :in_use?
|
||||
end
|
||||
|
||||
def test_connection_pool_starts_reaper_in_fork
|
||||
spec = ActiveRecord::Base.connection_pool.spec.dup
|
||||
spec.config[:reaping_frequency] = "0.0001"
|
||||
|
||||
pool = ConnectionPool.new spec
|
||||
pool.checkout
|
||||
|
||||
pid = fork do
|
||||
pool = ConnectionPool.new spec
|
||||
|
||||
conn, child = new_conn_in_thread(pool)
|
||||
child.terminate
|
||||
|
||||
wait_for_conn_idle(conn)
|
||||
if conn.in_use?
|
||||
exit!(1)
|
||||
else
|
||||
exit!(0)
|
||||
end
|
||||
end
|
||||
|
||||
Process.waitpid(pid)
|
||||
assert $?.success?
|
||||
end
|
||||
|
||||
def new_conn_in_thread(pool)
|
||||
event = Concurrent::Event.new
|
||||
conn = nil
|
||||
|
||||
child = Thread.new do
|
||||
conn = pool.checkout
|
||||
event.set
|
||||
Thread.stop
|
||||
end
|
||||
|
||||
event.wait
|
||||
[conn, child]
|
||||
end
|
||||
|
||||
def wait_for_conn_idle(conn, timeout = 5)
|
||||
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
while conn.in_use? && Process.clock_gettime(Process::CLOCK_MONOTONIC) - start < timeout
|
||||
Thread.pass
|
||||
end
|
||||
assert_not_predicate conn, :in_use?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue