1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/thread_pool.rb

186 lines
3.6 KiB
Ruby
Raw Normal View History

2011-09-20 01:16:31 -04:00
require 'thread'
2011-09-22 22:24:43 -04:00
module Puma
2011-12-01 18:23:14 -05:00
# A simple thread pool management object.
#
2011-09-20 01:16:31 -04:00
class ThreadPool
2011-12-01 18:23:14 -05:00
# Maintain a minimum of +min+ and maximum of +max+ threads
# in the pool.
#
# The block passed is the work that will be performed in each
# thread.
#
def initialize(min, max, *extra, &block)
@cond = ConditionVariable.new
2011-09-20 01:16:31 -04:00
@mutex = Mutex.new
@todo = []
2011-09-20 01:16:31 -04:00
@spawned = 0
@waiting = 0
2013-07-16 20:07:59 -04:00
@min = Integer(min)
@max = Integer(max)
@block = block
@extra = extra
2011-09-20 01:16:31 -04:00
@shutdown = false
2011-09-20 01:16:31 -04:00
@trim_requested = 0
@workers = []
2011-12-05 13:07:01 -05:00
@auto_trim = nil
@mutex.synchronize do
2013-07-16 20:07:59 -04:00
@min.times { spawn_thread }
end
2011-09-20 01:16:31 -04:00
end
2012-07-30 19:37:43 -04:00
attr_reader :spawned, :trim_requested
2011-09-20 01:16:31 -04:00
2011-12-01 18:23:14 -05:00
# How many objects have yet to be processed by the pool?
#
2011-09-20 01:16:31 -04:00
def backlog
@mutex.synchronize { @todo.size }
2011-09-20 01:16:31 -04:00
end
2011-12-01 18:23:14 -05:00
# :nodoc:
#
# Must be called with @mutex held!
#
2011-09-20 01:16:31 -04:00
def spawn_thread
@spawned += 1
2011-09-20 01:16:31 -04:00
th = Thread.new do
todo = @todo
2011-09-20 01:16:31 -04:00
block = @block
mutex = @mutex
cond = @cond
2011-09-20 01:16:31 -04:00
extra = @extra.map { |i| i.new }
2011-09-20 01:16:31 -04:00
while true
work = nil
continue = true
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
break
end
if @shutdown
continue = false
break
end
@waiting += 1
cond.wait mutex
@waiting -= 1
2011-09-20 01:16:31 -04:00
end
2014-01-25 15:43:30 -05:00
work = todo.shift if continue
2011-09-20 01:16:31 -04:00
end
break unless continue
block.call(work, *extra)
2011-09-20 01:16:31 -04:00
end
mutex.synchronize do
2011-09-20 01:16:31 -04:00
@spawned -= 1
@workers.delete th
end
end
@workers << th
2011-09-20 01:16:31 -04:00
th
end
private :spawn_thread
2011-12-01 18:23:14 -05:00
# Add +work+ to the todo list for a Thread to pickup and process.
2011-09-20 01:16:31 -04:00
def <<(work)
@mutex.synchronize do
if @shutdown
raise "Unable to add work while shutting down"
end
2011-09-20 01:16:31 -04:00
@todo << work
if @waiting == 0 and @spawned < @max
spawn_thread
end
@cond.signal
end
2011-09-20 01:16:31 -04:00
end
2011-12-01 18:23:14 -05:00
# If too many threads are in the pool, tell one to finish go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.
2011-12-01 18:23:14 -05:00
#
def trim(force=false)
2011-09-20 01:16:31 -04:00
@mutex.synchronize do
if (force or @waiting > 0) and @spawned - @trim_requested > @min
2011-09-20 01:16:31 -04:00
@trim_requested += 1
@cond.signal
2011-09-20 01:16:31 -04:00
end
end
end
2011-12-05 13:07:01 -05:00
class AutoTrim
def initialize(pool, timeout)
@pool = pool
@timeout = timeout
@running = false
end
def start!
@running = true
@thread = Thread.new do
while @running
@pool.trim
sleep @timeout
end
end
end
def stop
@running = false
@thread.wakeup
end
end
def auto_trim!(timeout=5)
@auto_trim = AutoTrim.new(self, timeout)
@auto_trim.start!
end
2011-12-01 18:23:14 -05:00
# Tell all threads in the pool to exit and wait for them to finish.
#
2011-09-20 01:16:31 -04:00
def shutdown
@mutex.synchronize do
@shutdown = true
@cond.broadcast
2011-12-05 13:07:01 -05:00
@auto_trim.stop if @auto_trim
2011-09-20 01:16:31 -04:00
end
# Use this instead of #each so that we don't stop in the middle
# of each and see a mutated object mid #each
@workers.first.join until @workers.empty?
2011-09-20 01:16:31 -04:00
@spawned = 0
@workers = []
end
end
end