mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Rearch how trimming takes place to not starve. Fixes #39
Using the work queue to communicate trimming doesn't work, it's far too easy to starve the system doing that. Instead we now detect trimming and work as seperate actions.
This commit is contained in:
parent
c2e6206e59
commit
256970e048
2 changed files with 90 additions and 33 deletions
|
@ -12,21 +12,29 @@ module Puma
|
|||
# thread.
|
||||
#
|
||||
def initialize(min, max, &blk)
|
||||
@todo = Queue.new
|
||||
@cond = ConditionVariable.new
|
||||
@mutex = Mutex.new
|
||||
|
||||
@todo = []
|
||||
|
||||
@spawned = 0
|
||||
@waiting = 0
|
||||
|
||||
@min = min
|
||||
@max = max
|
||||
@block = blk
|
||||
|
||||
@shutdown = false
|
||||
|
||||
@trim_requested = 0
|
||||
|
||||
@workers = []
|
||||
|
||||
@auto_trim = nil
|
||||
|
||||
min.times { spawn_thread }
|
||||
@mutex.synchronize do
|
||||
min.times { spawn_thread }
|
||||
end
|
||||
end
|
||||
|
||||
attr_reader :spawned
|
||||
|
@ -34,37 +42,54 @@ module Puma
|
|||
# How many objects have yet to be processed by the pool?
|
||||
#
|
||||
def backlog
|
||||
@todo.size
|
||||
@mutex.synchronize { @todo.size }
|
||||
end
|
||||
|
||||
Stop = Object.new
|
||||
Trim = Object.new
|
||||
|
||||
# :nodoc:
|
||||
#
|
||||
# Must be called with @mutex held!
|
||||
#
|
||||
def spawn_thread
|
||||
@mutex.synchronize do
|
||||
@spawned += 1
|
||||
end
|
||||
@spawned += 1
|
||||
|
||||
th = Thread.new do
|
||||
todo = @todo
|
||||
block = @block
|
||||
|
||||
while true
|
||||
work = todo.pop
|
||||
work = nil
|
||||
|
||||
case work
|
||||
when Stop
|
||||
break
|
||||
when Trim
|
||||
@mutex.synchronize do
|
||||
@trim_requested -= 1
|
||||
continue = true
|
||||
|
||||
@mutex.synchronize do
|
||||
while todo.empty?
|
||||
if @trim_requested > 0
|
||||
@trim_requested -= 1
|
||||
continue = false
|
||||
break
|
||||
end
|
||||
|
||||
if @shutdown
|
||||
continue = false
|
||||
break
|
||||
end
|
||||
|
||||
@waiting += 1
|
||||
@cond.wait @mutex
|
||||
@waiting -= 1
|
||||
|
||||
if @shutdown
|
||||
continue = false
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
break
|
||||
else
|
||||
block.call work
|
||||
work = todo.pop if continue
|
||||
end
|
||||
|
||||
break unless continue
|
||||
|
||||
block.call work
|
||||
end
|
||||
|
||||
@mutex.synchronize do
|
||||
|
@ -73,28 +98,39 @@ module Puma
|
|||
end
|
||||
end
|
||||
|
||||
@mutex.synchronize { @workers << th }
|
||||
@workers << th
|
||||
|
||||
th
|
||||
end
|
||||
|
||||
private :spawn_thread
|
||||
|
||||
# Add +work+ to the todo list for a Thread to pickup and process.
|
||||
def <<(work)
|
||||
if @todo.num_waiting == 0 and @spawned < @max
|
||||
spawn_thread
|
||||
end
|
||||
@mutex.synchronize do
|
||||
if @shutdown
|
||||
raise "Unable to add work while shutting down"
|
||||
end
|
||||
|
||||
@todo << work
|
||||
@todo << work
|
||||
|
||||
if @waiting == 0 and @spawned < @max
|
||||
spawn_thread
|
||||
end
|
||||
|
||||
@cond.signal
|
||||
end
|
||||
end
|
||||
|
||||
# If too many threads are in the pool, tell one to finish go ahead
|
||||
# and exit.
|
||||
# and exit. If +force+ is true, then a trim request is requested
|
||||
# even if all threads are being utilized.
|
||||
#
|
||||
def trim
|
||||
def trim(force=false)
|
||||
@mutex.synchronize do
|
||||
if @spawned - @trim_requested > @min
|
||||
if (force or @waiting > 0) and @spawned - @trim_requested > @min
|
||||
@trim_requested += 1
|
||||
@todo << Trim
|
||||
@cond.signal
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -131,10 +167,11 @@ module Puma
|
|||
# Tell all threads in the pool to exit and wait for them to finish.
|
||||
#
|
||||
def shutdown
|
||||
@auto_trim.stop if @auto_trim
|
||||
@mutex.synchronize do
|
||||
@shutdown = true
|
||||
@cond.broadcast
|
||||
|
||||
@spawned.times do
|
||||
@todo << Stop
|
||||
@auto_trim.stop if @auto_trim
|
||||
end
|
||||
|
||||
# Use this instead of #each so that we don't stop in the middle
|
||||
|
|
|
@ -70,6 +70,8 @@ class TestThreadPool < Test::Unit::TestCase
|
|||
|
||||
finish = true
|
||||
|
||||
pause
|
||||
|
||||
assert_equal 2, pool.spawned
|
||||
pool.trim
|
||||
pause
|
||||
|
@ -82,7 +84,25 @@ class TestThreadPool < Test::Unit::TestCase
|
|||
|
||||
end
|
||||
|
||||
def test_trim_doesnt_overtrim
|
||||
def test_force_trim_doesnt_overtrim
|
||||
finish = false
|
||||
pool = new_pool(1, 2) { Thread.pass until finish }
|
||||
|
||||
pool << 1
|
||||
pool << 2
|
||||
|
||||
assert_equal 2, pool.spawned
|
||||
pool.trim true
|
||||
pool.trim true
|
||||
|
||||
finish = true
|
||||
|
||||
pause
|
||||
|
||||
assert_equal 1, pool.spawned
|
||||
end
|
||||
|
||||
def test_trim_is_ignored_if_no_waiting_threads
|
||||
finish = false
|
||||
pool = new_pool(1, 2) { Thread.pass until finish }
|
||||
|
||||
|
@ -97,7 +117,7 @@ class TestThreadPool < Test::Unit::TestCase
|
|||
|
||||
pause
|
||||
|
||||
assert_equal 1, pool.spawned
|
||||
assert_equal 2, pool.spawned
|
||||
end
|
||||
|
||||
def test_autotrim
|
||||
|
|
Loading…
Reference in a new issue