1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00

ThreadPool concurrency refactoring (#2220)

- Wait for threads to enter waiting loop on ThreadPool startup
- Simplify #spawn_thread inner threadpool loop
- Refactor TestThreadPool to make tests faster and more stable

Co-authored-by: Nate Berkopec <nate.berkopec@gmail.com>
This commit is contained in:
Will Jordan 2020-04-14 16:43:59 -07:00 committed by GitHub
parent f47d6d139b
commit b16d8cb963
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 120 additions and 147 deletions

View file

@ -29,6 +29,7 @@
* Read directly from the socket in #read_and_drop to avoid raising further SSL errors (#2198)
* Set `Connection: closed` header when queue requests is disabled (#2216)
* Pass queued requests to thread pool on server shutdown (#2122)
* Fixed a few minor concurrency bugs in ThreadPool that may have affected non-GVL Rubies (#2220)
* Refactor
* Remove unused loader argument from Plugin initializer (#2095)
@ -36,6 +37,7 @@
* Simplify `Runner#start_control` URL parsing (#2111)
* Removed the IOBuffer extension and replaced with Ruby (#1980)
* Update `Rack::Handler::Puma.run` to use `**options` (#2189)
* ThreadPool concurrency refactoring (#2220)
* JSON parse cluster worker stats instead of regex (#2124)
## 4.3.3 and 3.12.4 / 2020-02-28

View file

@ -54,7 +54,10 @@ module Puma
@reaper = nil
@mutex.synchronize do
@min.times { spawn_thread }
@min.times do
spawn_thread
@not_full.wait(@mutex)
end
end
@clean_thread_locals = false
@ -72,7 +75,7 @@ module Puma
# How many objects have yet to be processed by the pool?
#
def backlog
@mutex.synchronize { @todo.size }
with_mutex { @todo.size }
end
def pool_capacity
@ -99,20 +102,13 @@ module Puma
while true
work = nil
continue = true
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
not_full.signal
break
end
if @shutdown
continue = false
break
@spawned -= 1
@workers.delete th
Thread.exit
end
@waiting += 1
@ -121,11 +117,9 @@ module Puma
@waiting -= 1
end
work = todo.shift if continue
work = todo.shift
end
break unless continue
if @clean_thread_locals
ThreadPool.clean_thread_locals
end
@ -136,11 +130,6 @@ module Puma
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
end
mutex.synchronize do
@spawned -= 1
@workers.delete th
end
end
@workers << th
@ -150,9 +139,15 @@ module Puma
private :spawn_thread
def with_mutex(&block)
@mutex.owned? ?
yield :
@mutex.synchronize(&block)
end
# Add +work+ to the todo list for a Thread to pickup and process.
def <<(work)
@mutex.synchronize do
with_mutex do
if @shutdown
raise "Unable to add work while shutting down"
end
@ -197,7 +192,7 @@ module Puma
# Returns the current number of busy threads, or +nil+ if shutting down.
#
def wait_until_not_full
@mutex.synchronize do
with_mutex do
while true
return if @shutdown
@ -213,13 +208,14 @@ module Puma
end
end
# If too many threads are in the pool, tell one to finish go ahead
# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
#
def trim(force=false)
@mutex.synchronize do
if (force or @waiting > 0) and @spawned - @trim_requested > @min
with_mutex do
free = @waiting - @todo.size
if (force or free > 0) and @spawned - @trim_requested > @min
@trim_requested += 1
@not_empty.signal
end
@ -229,7 +225,7 @@ module Puma
# If there are dead threads in the pool make them go away while decreasing
# spawned counter so that new healthy threads could be created again.
def reap
@mutex.synchronize do
with_mutex do
dead_workers = @workers.reject(&:alive?)
dead_workers.each do |worker|
@ -283,8 +279,9 @@ module Puma
# Tell all threads in the pool to exit and wait for them to finish.
#
def shutdown(timeout=-1)
threads = @mutex.synchronize do
threads = with_mutex do
@shutdown = true
@trim_requested = @spawned
@not_empty.broadcast
@not_full.broadcast

View file

@ -10,35 +10,56 @@ class TestThreadPool < Minitest::Test
def new_pool(min, max, &block)
block = proc { } unless block
@work_mutex = Mutex.new
@work_done = ConditionVariable.new
@pool = Puma::ThreadPool.new(min, max, &block)
end
def pause
sleep 1
def mutex_pool(min, max, &block)
block = proc { } unless block
@pool = MutexPool.new(min, max, &block)
end
# Wraps ThreadPool work in mutex for better concurrency control.
class MutexPool < Puma::ThreadPool
# Wait until the added work is completed before returning.
# Array argument is treated as a batch of work items to be added.
# Block will run after work is added but before it is executed on a worker thread.
def <<(work, &block)
work = [work] unless work.is_a?(Array)
with_mutex do
work.each {|arg| super arg}
yield if block_given?
@not_full.wait(@mutex)
end
end
def signal
@not_full.signal
end
# If +wait+ is true, wait until the trim request is completed before returning.
def trim(force=false, wait: true)
super(force)
Thread.pass until @trim_requested == 0 if wait
end
end
def test_append_spawns
saw = []
thread_name = nil
pool = new_pool(0, 1) do |work|
@work_mutex.synchronize do
saw << work
thread_name = Thread.current.name if Thread.current.respond_to?(:name)
@work_done.signal
end
pool = mutex_pool(0, 1) do |work|
saw << work
end
pool << 1
assert_equal 1, pool.spawned
assert_equal [1], saw
end
@work_mutex.synchronize do
@work_done.wait(@work_mutex, 5)
assert_equal 1, pool.spawned
assert_equal [1], saw
assert_equal('puma threadpool 001', thread_name) if Thread.current.respond_to?(:name)
end
def test_thread_name
skip 'Thread.name not supported' unless Thread.current.respond_to?(:name)
thread_name = nil
pool = mutex_pool(0, 1) {thread_name = Thread.current.name}
pool << 1
assert_equal('puma threadpool 001', thread_name)
end
def test_converts_pool_sizes
@ -64,110 +85,68 @@ class TestThreadPool < Minitest::Test
end
def test_trim
skip_on :jruby, :truffleruby # Undiagnose thread race. TODO fix
pool = new_pool(0, 1) do |work|
@work_mutex.synchronize do
@work_done.signal
end
end
pool = mutex_pool(0, 1)
pool << 1
@work_mutex.synchronize do
@work_done.wait(@work_mutex, 5)
assert_equal 1, pool.spawned
end
assert_equal 1, pool.spawned
pool.trim
# wait/join required here for MRI, JRuby races the access here
worker = pool.instance_variable_get(:@workers).first
worker.join if worker
assert_equal 0, pool.spawned
end
def test_trim_leaves_min
skip_on :jruby, :truffleruby # Undiagnose thread race. TODO fix
pool = new_pool(1, 2) do |work|
@work_mutex.synchronize do
@work_done.signal
end
end
pool = mutex_pool(1, 2)
pool << 1
pool << 2
pool << [1, 2]
@work_mutex.synchronize do
@work_done.wait(@work_mutex, 5)
assert_equal 2, pool.spawned
end
assert_equal 2, pool.spawned
pool.trim
pause
assert_equal 1, pool.spawned
pool.trim
pause
assert_equal 1, pool.spawned
end
def test_force_trim_doesnt_overtrim
finish = false
pool = new_pool(1, 2) { Thread.pass until finish }
pool = mutex_pool(1, 2)
pool << 1
pool << 2
assert_equal 2, pool.spawned
pool.trim true
pool.trim true
finish = true
pause
pool.<< [1, 2] do
assert_equal 2, pool.spawned
pool.trim true, wait: false
pool.trim true, wait: false
end
assert_equal 1, pool.spawned
end
def test_trim_is_ignored_if_no_waiting_threads
finish = false
pool = new_pool(1, 2) { Thread.pass until finish }
pool = mutex_pool(1, 2)
pool << 1
pool << 2
pool.<< [1, 2] do
assert_equal 2, pool.spawned
pool.trim
pool.trim
end
assert_equal 2, pool.spawned
pool.trim
pool.trim
assert_equal 0, pool.trim_requested
finish = true
pause
end
def test_autotrim
finish = false
pool = new_pool(1, 2) { Thread.pass until finish }
pool = mutex_pool(1, 2)
pool << 1
pool << 2
timeout = 0
pool.auto_trim! timeout
assert_equal 2, pool.spawned
pool.<< [1, 2] do
assert_equal 2, pool.spawned
end
finish = true
pause
assert_equal 2, pool.spawned
pool.auto_trim! 1
sleep 1
pause
start = Time.now
Thread.pass until pool.spawned == 1 ||
Time.now - start > 1
assert_equal 1, pool.spawned
end
@ -175,23 +154,15 @@ class TestThreadPool < Minitest::Test
def test_cleanliness
values = []
n = 100
mutex = Mutex.new
finished = false
pool = new_pool(1,1) {
mutex.synchronize { values.push Thread.current[:foo] }
pool = mutex_pool(1,1) {
values.push Thread.current[:foo]
Thread.current[:foo] = :hai
Thread.pass until finished
}
pool.clean_thread_locals = true
n.times { pool << 1 }
finished = true
pause
pool << [1] * n
assert_equal n, values.length
@ -199,14 +170,16 @@ class TestThreadPool < Minitest::Test
end
def test_reap_only_dead_threads
pool = new_pool(2,2) { Thread.current.kill }
pool = mutex_pool(2,2) do
th = Thread.current
Thread.new {th.join; pool.signal}
th.kill
end
assert_equal 2, pool.spawned
pool << 1
pause
assert_equal 2, pool.spawned
pool.reap
@ -215,8 +188,6 @@ class TestThreadPool < Minitest::Test
pool << 2
pause
assert_equal 1, pool.spawned
pool.reap
@ -225,45 +196,48 @@ class TestThreadPool < Minitest::Test
end
def test_auto_reap_dead_threads
pool = new_pool(2,2) { Thread.current.kill }
pool = mutex_pool(2,2) do
th = Thread.current
Thread.new {th.join; pool.signal}
th.kill
end
pool.auto_reap! 0.1
pause
timeout = 0
pool.auto_reap! timeout
assert_equal 2, pool.spawned
pool << 1
pool << 2
pause
start = Time.now
Thread.pass until pool.spawned == 0 ||
Time.now - start > 1
assert_equal 0, pool.spawned
end
def test_force_shutdown_immediately
finish = false
rescued = false
pool = new_pool(0, 1) do |work|
pool = mutex_pool(0, 1) do
begin
@work_mutex.synchronize do
@work_done.signal
end
Thread.pass until finish
pool.signal
sleep
rescue Puma::ThreadPool::ForceShutdown
rescued = true
end
end
pool << 1
pool.shutdown(0)
@work_mutex.synchronize do
@work_done.wait(@work_mutex, 5)
pool.shutdown(0)
finish = true
assert_equal 0, pool.spawned
assert rescued
end
assert_equal 0, pool.spawned
assert rescued
end
def test_waiting_on_startup
pool = new_pool(1, 2)
assert_equal 1, pool.waiting
end
end