mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
482ea5a24a
* Meter calling accept(2) with available pool capacity Talking through this with Nate and Richard, we realized that accepting new clients without taking account for the available capacity of the thread pool doesn't improve throughput, it only hurts it in the case of multiple workers. If a worker pauses (or starts up before other workers), then a worker can accidentally suck up a high number of clients and leave unused capacity inside the other workers. This change will smooth out this issue, with a minor penalty to maximum throughput. * Rewrite the conditional to be less confusing
305 lines
6.6 KiB
Ruby
305 lines
6.6 KiB
Ruby
require 'thread'
|
|
|
|
module Puma
|
|
# A simple thread pool management object.
|
|
#
|
|
class ThreadPool
|
|
class ForceShutdown < RuntimeError
|
|
end
|
|
|
|
# How long, after raising the ForceShutdown of a thread during
|
|
# forced shutdown mode, to wait for the thread to try and finish
|
|
# up its work before leaving the thread to die on the vine.
|
|
SHUTDOWN_GRACE_TIME = 5 # seconds
|
|
|
|
# Maintain a minimum of +min+ and maximum of +max+ threads
|
|
# in the pool.
|
|
#
|
|
# The block passed is the work that will be performed in each
|
|
# thread.
|
|
#
|
|
def initialize(min, max, *extra, &block)
|
|
@not_empty = ConditionVariable.new
|
|
@not_full = ConditionVariable.new
|
|
@mutex = Mutex.new
|
|
|
|
@todo = []
|
|
|
|
@spawned = 0
|
|
@waiting = 0
|
|
|
|
@min = Integer(min)
|
|
@max = Integer(max)
|
|
@block = block
|
|
@extra = extra
|
|
|
|
@shutdown = false
|
|
|
|
@trim_requested = 0
|
|
|
|
@workers = []
|
|
|
|
@auto_trim = nil
|
|
@reaper = nil
|
|
|
|
@mutex.synchronize do
|
|
@min.times { spawn_thread }
|
|
end
|
|
|
|
@clean_thread_locals = false
|
|
end
|
|
|
|
attr_reader :spawned, :trim_requested
|
|
attr_accessor :clean_thread_locals
|
|
|
|
def self.clean_thread_locals
|
|
Thread.current.keys.each do |key|
|
|
Thread.current[key] = nil unless key == :__recursive_key__
|
|
end
|
|
end
|
|
|
|
# How many objects have yet to be processed by the pool?
|
|
#
|
|
def backlog
|
|
@mutex.synchronize { @todo.size }
|
|
end
|
|
|
|
# :nodoc:
|
|
#
|
|
# Must be called with @mutex held!
|
|
#
|
|
def spawn_thread
|
|
@spawned += 1
|
|
|
|
th = Thread.new do
|
|
# Thread name is new in Ruby 2.3
|
|
Thread.current.name = 'puma %03i' % @spawned if Thread.current.respond_to?(:name=)
|
|
todo = @todo
|
|
block = @block
|
|
mutex = @mutex
|
|
not_empty = @not_empty
|
|
not_full = @not_full
|
|
|
|
extra = @extra.map { |i| i.new }
|
|
|
|
while true
|
|
work = nil
|
|
|
|
continue = true
|
|
|
|
mutex.synchronize do
|
|
while todo.empty?
|
|
if @trim_requested > 0
|
|
@trim_requested -= 1
|
|
continue = false
|
|
not_full.signal
|
|
break
|
|
end
|
|
|
|
if @shutdown
|
|
continue = false
|
|
break
|
|
end
|
|
|
|
@waiting += 1
|
|
not_full.signal
|
|
not_empty.wait mutex
|
|
@waiting -= 1
|
|
end
|
|
|
|
work = todo.shift if continue
|
|
end
|
|
|
|
break unless continue
|
|
|
|
if @clean_thread_locals
|
|
ThreadPool.clean_thread_locals
|
|
end
|
|
|
|
begin
|
|
block.call(work, *extra)
|
|
rescue Exception => e
|
|
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
|
|
end
|
|
end
|
|
|
|
mutex.synchronize do
|
|
@spawned -= 1
|
|
@workers.delete th
|
|
end
|
|
end
|
|
|
|
@workers << th
|
|
|
|
th
|
|
end
|
|
|
|
private :spawn_thread
|
|
|
|
# Add +work+ to the todo list for a Thread to pickup and process.
|
|
def <<(work)
|
|
@mutex.synchronize do
|
|
if @shutdown
|
|
raise "Unable to add work while shutting down"
|
|
end
|
|
|
|
@todo << work
|
|
|
|
if @waiting < @todo.size and @spawned < @max
|
|
spawn_thread
|
|
end
|
|
|
|
@not_empty.signal
|
|
end
|
|
end
|
|
|
|
def wait_until_not_full
|
|
@mutex.synchronize do
|
|
while true
|
|
return if @shutdown
|
|
return if @waiting > 0
|
|
|
|
# If we can still spin up new threads and there
|
|
# is work queued, then accept more work until we would
|
|
# spin up the max number of threads.
|
|
return if @todo.size < @max - @spawned
|
|
|
|
@not_full.wait @mutex
|
|
end
|
|
end
|
|
end
|
|
|
|
# If too many threads are in the pool, tell one to finish go ahead
|
|
# and exit. If +force+ is true, then a trim request is requested
|
|
# even if all threads are being utilized.
|
|
#
|
|
def trim(force=false)
|
|
@mutex.synchronize do
|
|
if (force or @waiting > 0) and @spawned - @trim_requested > @min
|
|
@trim_requested += 1
|
|
@not_empty.signal
|
|
end
|
|
end
|
|
end
|
|
|
|
# If there are dead threads in the pool make them go away while decreasing
|
|
# spawned counter so that new healthy threads could be created again.
|
|
def reap
|
|
@mutex.synchronize do
|
|
dead_workers = @workers.reject(&:alive?)
|
|
|
|
dead_workers.each do |worker|
|
|
worker.kill
|
|
@spawned -= 1
|
|
end
|
|
|
|
@workers.delete_if do |w|
|
|
dead_workers.include?(w)
|
|
end
|
|
end
|
|
end
|
|
|
|
class AutoTrim
|
|
def initialize(pool, timeout)
|
|
@pool = pool
|
|
@timeout = timeout
|
|
@running = false
|
|
end
|
|
|
|
def start!
|
|
@running = true
|
|
|
|
@thread = Thread.new do
|
|
while @running
|
|
@pool.trim
|
|
sleep @timeout
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop
|
|
@running = false
|
|
@thread.wakeup
|
|
end
|
|
end
|
|
|
|
def auto_trim!(timeout=30)
|
|
@auto_trim = AutoTrim.new(self, timeout)
|
|
@auto_trim.start!
|
|
end
|
|
|
|
class Reaper
|
|
def initialize(pool, timeout)
|
|
@pool = pool
|
|
@timeout = timeout
|
|
@running = false
|
|
end
|
|
|
|
def start!
|
|
@running = true
|
|
|
|
@thread = Thread.new do
|
|
while @running
|
|
@pool.reap
|
|
sleep @timeout
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop
|
|
@running = false
|
|
@thread.wakeup
|
|
end
|
|
end
|
|
|
|
def auto_reap!(timeout=5)
|
|
@reaper = Reaper.new(self, timeout)
|
|
@reaper.start!
|
|
end
|
|
|
|
# Tell all threads in the pool to exit and wait for them to finish.
|
|
#
|
|
def shutdown(timeout=-1)
|
|
threads = @mutex.synchronize do
|
|
@shutdown = true
|
|
@not_empty.broadcast
|
|
@not_full.broadcast
|
|
|
|
@auto_trim.stop if @auto_trim
|
|
@reaper.stop if @reaper
|
|
# dup workers so that we join them all safely
|
|
@workers.dup
|
|
end
|
|
|
|
if timeout == -1
|
|
# Wait for threads to finish without force shutdown.
|
|
threads.each(&:join)
|
|
else
|
|
# Wait for threads to finish after n attempts (+timeout+).
|
|
# If threads are still running, it will forcefully kill them.
|
|
timeout.times do
|
|
threads.delete_if do |t|
|
|
t.join 1
|
|
end
|
|
|
|
if threads.empty?
|
|
break
|
|
else
|
|
sleep 1
|
|
end
|
|
end
|
|
|
|
threads.each do |t|
|
|
t.raise ForceShutdown
|
|
end
|
|
|
|
threads.each do |t|
|
|
t.join SHUTDOWN_GRACE_TIME
|
|
end
|
|
end
|
|
|
|
@spawned = 0
|
|
@workers = []
|
|
end
|
|
end
|
|
end
|