1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/thread_pool.rb
Evan Phoenix 482ea5a24a Meter calling accept(2) with available pool capacity (#1278)
* Meter calling accept(2) with available pool capacity

Talking through this with Nate and Richard, we realized that accepting
new clients without taking account for the available capacity of the
thread pool doesn't improve throughput, it only hurts it in the case of
multiple workers. If a worker pauses (or starts up before other
workers), then a worker can accidentally suck up a high number of
clients and leave unused capacity inside the other workers.

This change will smooth out this issue, with a minor penalty to maximum
throughput.

* Rewrite the conditional to be less confusing
2017-04-28 10:42:09 -07:00

305 lines
6.6 KiB
Ruby

require 'thread'
module Puma
# A simple thread pool management object.
#
class ThreadPool
class ForceShutdown < RuntimeError
end
# How long, after raising the ForceShutdown of a thread during
# forced shutdown mode, to wait for the thread to try and finish
# up its work before leaving the thread to die on the vine.
SHUTDOWN_GRACE_TIME = 5 # seconds
# Maintain a minimum of +min+ and maximum of +max+ threads
# in the pool.
#
# The block passed is the work that will be performed in each
# thread.
#
def initialize(min, max, *extra, &block)
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
@mutex = Mutex.new
@todo = []
@spawned = 0
@waiting = 0
@min = Integer(min)
@max = Integer(max)
@block = block
@extra = extra
@shutdown = false
@trim_requested = 0
@workers = []
@auto_trim = nil
@reaper = nil
@mutex.synchronize do
@min.times { spawn_thread }
end
@clean_thread_locals = false
end
attr_reader :spawned, :trim_requested
attr_accessor :clean_thread_locals
def self.clean_thread_locals
Thread.current.keys.each do |key|
Thread.current[key] = nil unless key == :__recursive_key__
end
end
# How many objects have yet to be processed by the pool?
#
def backlog
@mutex.synchronize { @todo.size }
end
# :nodoc:
#
# Must be called with @mutex held!
#
def spawn_thread
@spawned += 1
th = Thread.new do
# Thread name is new in Ruby 2.3
Thread.current.name = 'puma %03i' % @spawned if Thread.current.respond_to?(:name=)
todo = @todo
block = @block
mutex = @mutex
not_empty = @not_empty
not_full = @not_full
extra = @extra.map { |i| i.new }
while true
work = nil
continue = true
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
not_full.signal
break
end
if @shutdown
continue = false
break
end
@waiting += 1
not_full.signal
not_empty.wait mutex
@waiting -= 1
end
work = todo.shift if continue
end
break unless continue
if @clean_thread_locals
ThreadPool.clean_thread_locals
end
begin
block.call(work, *extra)
rescue Exception => e
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
end
mutex.synchronize do
@spawned -= 1
@workers.delete th
end
end
@workers << th
th
end
private :spawn_thread
# Add +work+ to the todo list for a Thread to pickup and process.
def <<(work)
@mutex.synchronize do
if @shutdown
raise "Unable to add work while shutting down"
end
@todo << work
if @waiting < @todo.size and @spawned < @max
spawn_thread
end
@not_empty.signal
end
end
def wait_until_not_full
@mutex.synchronize do
while true
return if @shutdown
return if @waiting > 0
# If we can still spin up new threads and there
# is work queued, then accept more work until we would
# spin up the max number of threads.
return if @todo.size < @max - @spawned
@not_full.wait @mutex
end
end
end
# If too many threads are in the pool, tell one to finish go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
#
def trim(force=false)
@mutex.synchronize do
if (force or @waiting > 0) and @spawned - @trim_requested > @min
@trim_requested += 1
@not_empty.signal
end
end
end
# If there are dead threads in the pool make them go away while decreasing
# spawned counter so that new healthy threads could be created again.
def reap
@mutex.synchronize do
dead_workers = @workers.reject(&:alive?)
dead_workers.each do |worker|
worker.kill
@spawned -= 1
end
@workers.delete_if do |w|
dead_workers.include?(w)
end
end
end
class AutoTrim
def initialize(pool, timeout)
@pool = pool
@timeout = timeout
@running = false
end
def start!
@running = true
@thread = Thread.new do
while @running
@pool.trim
sleep @timeout
end
end
end
def stop
@running = false
@thread.wakeup
end
end
def auto_trim!(timeout=30)
@auto_trim = AutoTrim.new(self, timeout)
@auto_trim.start!
end
class Reaper
def initialize(pool, timeout)
@pool = pool
@timeout = timeout
@running = false
end
def start!
@running = true
@thread = Thread.new do
while @running
@pool.reap
sleep @timeout
end
end
end
def stop
@running = false
@thread.wakeup
end
end
def auto_reap!(timeout=5)
@reaper = Reaper.new(self, timeout)
@reaper.start!
end
# Tell all threads in the pool to exit and wait for them to finish.
#
def shutdown(timeout=-1)
threads = @mutex.synchronize do
@shutdown = true
@not_empty.broadcast
@not_full.broadcast
@auto_trim.stop if @auto_trim
@reaper.stop if @reaper
# dup workers so that we join them all safely
@workers.dup
end
if timeout == -1
# Wait for threads to finish without force shutdown.
threads.each(&:join)
else
# Wait for threads to finish after n attempts (+timeout+).
# If threads are still running, it will forcefully kill them.
timeout.times do
threads.delete_if do |t|
t.join 1
end
if threads.empty?
break
else
sleep 1
end
end
threads.each do |t|
t.raise ForceShutdown
end
threads.each do |t|
t.join SHUTDOWN_GRACE_TIME
end
end
@spawned = 0
@workers = []
end
end
end