mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
64f930dfb7
* Move shutdown grace time constant to ThreadPool. SHUTDOWN_GRACE_TIME is the only constant (from const.rb) used by ThreadPool. It's better to move the constant than require all const.rb. * Fix minor typo. * Don't need to check if timeout is zero to immediately shutdown. This removes the duplicated code and add test for forced shutdowns.
297 lines
6.3 KiB
Ruby
297 lines
6.3 KiB
Ruby
require 'thread'
|
|
|
|
module Puma
|
|
# A simple thread pool management object.
|
|
#
|
|
class ThreadPool
|
|
class ForceShutdown < RuntimeError
|
|
end
|
|
|
|
# How long, after raising the ForceShutdown of a thread during
|
|
# forced shutdown mode, to wait for the thread to try and finish
|
|
# up its work before leaving the thread to die on the vine.
|
|
SHUTDOWN_GRACE_TIME = 5 # seconds
|
|
|
|
# Maintain a minimum of +min+ and maximum of +max+ threads
|
|
# in the pool.
|
|
#
|
|
# The block passed is the work that will be performed in each
|
|
# thread.
|
|
#
|
|
def initialize(min, max, *extra, &block)
|
|
@not_empty = ConditionVariable.new
|
|
@not_full = ConditionVariable.new
|
|
@mutex = Mutex.new
|
|
|
|
@todo = []
|
|
|
|
@spawned = 0
|
|
@waiting = 0
|
|
|
|
@min = Integer(min)
|
|
@max = Integer(max)
|
|
@block = block
|
|
@extra = extra
|
|
|
|
@shutdown = false
|
|
|
|
@trim_requested = 0
|
|
|
|
@workers = []
|
|
|
|
@auto_trim = nil
|
|
@reaper = nil
|
|
|
|
@mutex.synchronize do
|
|
@min.times { spawn_thread }
|
|
end
|
|
|
|
@clean_thread_locals = false
|
|
end
|
|
|
|
attr_reader :spawned, :trim_requested
|
|
attr_accessor :clean_thread_locals
|
|
|
|
def self.clean_thread_locals
|
|
Thread.current.keys.each do |key|
|
|
Thread.current[key] = nil unless key == :__recursive_key__
|
|
end
|
|
end
|
|
|
|
# How many objects have yet to be processed by the pool?
|
|
#
|
|
def backlog
|
|
@mutex.synchronize { @todo.size }
|
|
end
|
|
|
|
# :nodoc:
|
|
#
|
|
# Must be called with @mutex held!
|
|
#
|
|
def spawn_thread
|
|
@spawned += 1
|
|
|
|
th = Thread.new do
|
|
# Thread name is new in Ruby 2.3
|
|
Thread.current.name = 'puma %03i' % @spawned if Thread.current.respond_to?(:name=)
|
|
todo = @todo
|
|
block = @block
|
|
mutex = @mutex
|
|
not_empty = @not_empty
|
|
not_full = @not_full
|
|
|
|
extra = @extra.map { |i| i.new }
|
|
|
|
while true
|
|
work = nil
|
|
|
|
continue = true
|
|
|
|
mutex.synchronize do
|
|
while todo.empty?
|
|
if @trim_requested > 0
|
|
@trim_requested -= 1
|
|
continue = false
|
|
not_full.signal
|
|
break
|
|
end
|
|
|
|
if @shutdown
|
|
continue = false
|
|
break
|
|
end
|
|
|
|
@waiting += 1
|
|
not_full.signal
|
|
not_empty.wait mutex
|
|
@waiting -= 1
|
|
end
|
|
|
|
work = todo.shift if continue
|
|
end
|
|
|
|
break unless continue
|
|
|
|
if @clean_thread_locals
|
|
ThreadPool.clean_thread_locals
|
|
end
|
|
|
|
begin
|
|
block.call(work, *extra)
|
|
rescue Exception => e
|
|
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
|
|
end
|
|
end
|
|
|
|
mutex.synchronize do
|
|
@spawned -= 1
|
|
@workers.delete th
|
|
end
|
|
end
|
|
|
|
@workers << th
|
|
|
|
th
|
|
end
|
|
|
|
private :spawn_thread
|
|
|
|
# Add +work+ to the todo list for a Thread to pickup and process.
|
|
def <<(work)
|
|
@mutex.synchronize do
|
|
if @shutdown
|
|
raise "Unable to add work while shutting down"
|
|
end
|
|
|
|
@todo << work
|
|
|
|
if @waiting < @todo.size and @spawned < @max
|
|
spawn_thread
|
|
end
|
|
|
|
@not_empty.signal
|
|
end
|
|
end
|
|
|
|
def wait_until_not_full
|
|
@mutex.synchronize do
|
|
until @todo.size - @waiting < @max - @spawned or @shutdown
|
|
@not_full.wait @mutex
|
|
end
|
|
end
|
|
end
|
|
|
|
# If too many threads are in the pool, tell one to finish go ahead
|
|
# and exit. If +force+ is true, then a trim request is requested
|
|
# even if all threads are being utilized.
|
|
#
|
|
def trim(force=false)
|
|
@mutex.synchronize do
|
|
if (force or @waiting > 0) and @spawned - @trim_requested > @min
|
|
@trim_requested += 1
|
|
@not_empty.signal
|
|
end
|
|
end
|
|
end
|
|
|
|
# If there are dead threads in the pool make them go away while decreasing
|
|
# spawned counter so that new healthy threads could be created again.
|
|
def reap
|
|
@mutex.synchronize do
|
|
dead_workers = @workers.reject(&:alive?)
|
|
|
|
dead_workers.each do |worker|
|
|
worker.kill
|
|
@spawned -= 1
|
|
end
|
|
|
|
@workers.delete_if do |w|
|
|
dead_workers.include?(w)
|
|
end
|
|
end
|
|
end
|
|
|
|
class AutoTrim
|
|
def initialize(pool, timeout)
|
|
@pool = pool
|
|
@timeout = timeout
|
|
@running = false
|
|
end
|
|
|
|
def start!
|
|
@running = true
|
|
|
|
@thread = Thread.new do
|
|
while @running
|
|
@pool.trim
|
|
sleep @timeout
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop
|
|
@running = false
|
|
@thread.wakeup
|
|
end
|
|
end
|
|
|
|
def auto_trim!(timeout=30)
|
|
@auto_trim = AutoTrim.new(self, timeout)
|
|
@auto_trim.start!
|
|
end
|
|
|
|
class Reaper
|
|
def initialize(pool, timeout)
|
|
@pool = pool
|
|
@timeout = timeout
|
|
@running = false
|
|
end
|
|
|
|
def start!
|
|
@running = true
|
|
|
|
@thread = Thread.new do
|
|
while @running
|
|
@pool.reap
|
|
sleep @timeout
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop
|
|
@running = false
|
|
@thread.wakeup
|
|
end
|
|
end
|
|
|
|
def auto_reap!(timeout=5)
|
|
@reaper = Reaper.new(self, timeout)
|
|
@reaper.start!
|
|
end
|
|
|
|
# Tell all threads in the pool to exit and wait for them to finish.
|
|
#
|
|
def shutdown(timeout=-1)
|
|
threads = @mutex.synchronize do
|
|
@shutdown = true
|
|
@not_empty.broadcast
|
|
@not_full.broadcast
|
|
|
|
@auto_trim.stop if @auto_trim
|
|
@reaper.stop if @reaper
|
|
# dup workers so that we join them all safely
|
|
@workers.dup
|
|
end
|
|
|
|
if timeout == -1
|
|
# Wait for threads to finish without force shutdown.
|
|
threads.each(&:join)
|
|
else
|
|
# Wait for threads to finish after n attempts (+timeout+).
|
|
# If threads are still running, it will forcefully kill them.
|
|
timeout.times do
|
|
threads.delete_if do |t|
|
|
t.join 1
|
|
end
|
|
|
|
if threads.empty?
|
|
break
|
|
else
|
|
sleep 1
|
|
end
|
|
end
|
|
|
|
threads.each do |t|
|
|
t.raise ForceShutdown
|
|
end
|
|
|
|
threads.each do |t|
|
|
t.join SHUTDOWN_GRACE_TIME
|
|
end
|
|
end
|
|
|
|
@spawned = 0
|
|
@workers = []
|
|
end
|
|
end
|
|
end
|