1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00

Add thread pooling

This commit is contained in:
Evan Phoenix 2011-09-19 22:16:31 -07:00
parent 7d96353c5a
commit fdd5d7df78
5 changed files with 238 additions and 115 deletions

View file

@ -351,7 +351,6 @@ module Mongrel
def process(request, response)
if rand(@sample_rate)+1 == @sample_rate
@processors.sample(listener.workers.list.length)
@headcount.sample(request.params.length)
@reqsize.sample(request.body.length / 1024.0)
@respsize.sample((response.body.length + response.header.out.length) / 1024.0)
@ -360,7 +359,7 @@ module Mongrel
end
def dump
"#{@processors.to_s}\n#{@reqsize.to_s}\n#{@headcount.to_s}\n#{@respsize.to_s}\n#{@interreq.to_s}"
"#{@reqsize.to_s}\n#{@headcount.to_s}\n#{@respsize.to_s}\n#{@interreq.to_s}"
end
end
@ -390,7 +389,7 @@ module Mongrel
["port",listener.port],
["throttle",listener.throttle],
["timeout",listener.timeout],
["workers max",listener.num_processors],
["concurrency",listener.concurrent],
])
if @stats

View file

@ -1,4 +1,5 @@
require 'rack'
require 'mongrel/thread_pool'
module Mongrel
# Thrown at a thread when it is timed out.
@ -35,7 +36,7 @@ module Mongrel
attr_reader :port
attr_reader :throttle
attr_reader :timeout
attr_reader :num_processors
attr_reader :concurrent
# Creates a working server on host:port (strange things happen if port
# isn't a Number).
@ -43,29 +44,29 @@ module Mongrel
# Use HttpServer.run to start the server and HttpServer.acceptor.join to
# join the thread that's processing incoming requests on the socket.
#
# The num_processors optional argument is the maximum number of concurrent
# processors to accept, anything over this is closed immediately to maintain
# server processing performance. This may seem mean but it is the most
# efficient way to deal with overload. Other schemes involve still
# parsing the client's request which defeats the point of an overload
# handling system.
#
# +concurrent+ indicates how many concurrent requests should be run at
# the same time. Any requests over this ammount are queued and handled
# as soon as a thread is available.
#
# The throttle parameter is a sleep timeout (in hundredths of a second)
# that is placed between socket.accept calls in order to give the server
# a cheap throttle time. It defaults to 0 and actually if it is 0 then
# the sleep is not done at all.
def initialize(host, port, num_processors=950, throttle=0, timeout=60)
def initialize(host, port, concurrent=10, throttle=0, timeout=60)
@socket = TCPServer.new(host, port)
@host = host
@port = port
@workers = ThreadGroup.new
@throttle = throttle / 100.0
@num_processors = num_processors
@concurrent = concurrent
@timeout = timeout
@check, @notify = IO.pipe
@running = true
@thread_pool = ThreadPool.new(0, concurrent) do |client|
process_client(client)
end
end
def handle_request(params, client, body)
@ -147,9 +148,6 @@ module Mongrel
STDERR.puts "#{Time.now}: HTTP parse error, malformed request (#{params[HTTP_X_FORWARDED_FOR] || client.peeraddr.last}): #{e.inspect}"
STDERR.puts "#{Time.now}: REQUEST DATA: #{data.inspect}\n---\nPARAMS: #{params.inspect}\n---\n"
rescue Errno::EMFILE
reap_dead_workers('too many files')
rescue Object => e
STDERR.puts "#{Time.now}: Read error: #{e.inspect}"
STDERR.puts e.backtrace.join("\n")
@ -163,42 +161,12 @@ module Mongrel
STDERR.puts "#{Time.now}: Client error: #{e.inspect}"
STDERR.puts e.backtrace.join("\n")
end
request.close_body if request
end
end
# Used internally to kill off any worker threads that have taken too long
# to complete processing. Only called if there are too many processors
# currently servicing. It returns the count of workers still active
# after the reap is done. It only runs if there are workers to reap.
def reap_dead_workers(reason='unknown')
if @workers.list.length > 0
STDERR.puts "#{Time.now}: Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'"
error_msg = "Mongrel timed out this thread: #{reason}"
mark = Time.now
@workers.list.each do |worker|
worker[:started_on] = Time.now if not worker[:started_on]
if mark - worker[:started_on] > @timeout + @throttle
STDERR.puts "Thread #{worker.inspect} is too old, killing."
worker.raise(TimeoutError.new(error_msg))
end
end
end
return @workers.list.length
end
# Performs a wait on all the currently running threads and kills any that
# take too long. It waits by @timeout seconds, which can be set in
# .initialize or via mongrel_rails. The @throttle setting does extend
# this waiting period by that much longer.
# Wait for all outstanding requests to finish.
def graceful_shutdown
while reap_dead_workers("shutdown") > 0
STDERR.puts "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout + @throttle} seconds."
sleep @timeout / 10
end
@thread_pool.shutdown
end
def configure_socket_options
@ -234,12 +202,6 @@ module Mongrel
return false
end
def handle_overload(client)
STDERR.puts "Server overloaded with #{@workers.list.size} processors (#@num_processors max). Dropping connection."
client.close rescue nil
reap_dead_workers "max processors"
end
# Runs the thing. It returns the thread used so you can "join" it.
# You can also access the HttpServer::acceptor attribute to get the
# thread later.
@ -258,6 +220,7 @@ module Mongrel
begin
check = @check
sockets = [check, @socket]
pool = @thread_pool
while @running
begin
@ -269,23 +232,10 @@ module Mongrel
client = sock.accept
client.setsockopt(*tcp_cork_opts) if tcp_cork_opts
worker_list = @workers.list
if worker_list.length >= @num_processors
handle_overload(client)
else
thread = Thread.new(client) { |c| process_client(c) }
thread[:started_on] = Time.now
@workers.add(thread)
sleep @throttle if @throttle > 0
end
pool << client
end
end
rescue Errno::EMFILE
reap_dead_workers("too many open files")
sleep 0.5
rescue Errno::ECONNABORTED
# client closed the socket even before accept
client.close rescue nil
@ -339,19 +289,23 @@ module Mongrel
return
end
# request is good so far, continue processing the response
response = HttpResponse.new(client)
begin
# request is good so far, continue processing the response
response = HttpResponse.new(client)
# Process each handler in registered order until we run out
# or one finalizes the response.
handlers.each do |handler|
handler.process(request, response)
break if response.done or client.closed?
end
# Process each handler in registered order until we run out
# or one finalizes the response.
handlers.each do |handler|
handler.process(request, response)
break if response.done or client.closed?
end
# And finally, if nobody closed the response off, we finalize it.
unless response.done or client.closed?
response.finished
# And finally, if nobody closed the response off, we finalize it.
unless response.done or client.closed?
response.finished
end
ensure
request.close_body
end
else
# Didn't find it, return a stock 404 response.
@ -396,9 +350,9 @@ module Mongrel
class RackServer < Server
attr_accessor :app
def process(params, client, body)
def process(env, client, body)
begin
request = HttpRequest.new(params, client, body)
request = HttpRequest.new(env, client, body)
# in the case of large file uploads the user could close
# the socket, so skip those requests
@ -406,24 +360,22 @@ module Mongrel
return
end
env = params
env["SCRIPT_NAME"] = ""
env["rack.version"] = Rack::VERSION
env["rack.input"] = request.body
env["rack.errors"] = $stderr
env["rack.multithread"] = true
env["rack.multiprocess"] = false
env["rack.run_once"] = true
env["rack.url_scheme"] = env["HTTPS"] ? "https" : "http"
env["CONTENT_TYPE"] ||= ""
env["QUERY_STRING"] ||= ""
status, headers, body = @app.call(env)
begin
env["SCRIPT_NAME"] = ""
env["rack.version"] = Rack::VERSION
env["rack.input"] = request.body
env["rack.errors"] = $stderr
env["rack.multithread"] = true
env["rack.multiprocess"] = false
env["rack.run_once"] = true
env["rack.url_scheme"] = env["HTTPS"] ? "https" : "http"
env["CONTENT_TYPE"] ||= ""
env["QUERY_STRING"] ||= ""
status, headers, body = @app.call(env)
client.write "HTTP/1.1 "
client.write status.to_s
client.write " "
@ -454,6 +406,7 @@ module Mongrel
end
end
ensure
request.close_body
body.close if body.respond_to? :close
end
end

View file

@ -0,0 +1,95 @@
require 'thread'
module Mongrel
class ThreadPool
def initialize(min, max, &blk)
@todo = Queue.new
@mutex = Mutex.new
@spawned = 0
@min = min
@max = max
@block = blk
@trim_requested = 0
@workers = []
min.times { spawn_thread }
end
attr_reader :spawned
def backlog
@todo.size
end
Stop = Object.new
Trim = Object.new
def spawn_thread
@mutex.synchronize do
@spawned += 1
end
th = Thread.new do
todo = @todo
block = @block
while true
work = todo.pop
case work
when Stop
break
when Trim
@mutex.synchronize do
@trim_requested -= 1
end
break
else
block.call work
end
end
@mutex.synchronize do
@spawned -= 1
@workers.delete th
end
end
@mutex.synchronize { @workers << th }
th
end
def <<(work)
if @todo.num_waiting == 0 and @spawned < @max
spawn_thread
end
@todo << work
end
def trim
@mutex.synchronize do
if @spawned - @trim_requested > @min
@trim_requested += 1
@todo << Trim
end
end
end
def shutdown
@spawned.times do
@todo << Stop
end
@workers.each { |w| w.join }
@spawned = 0
@workers = []
end
end
end

90
test/test_thread_pool.rb Normal file
View file

@ -0,0 +1,90 @@
require 'test/unit'
require 'mongrel/thread_pool'
class TestThreadPool < Test::Unit::TestCase
def teardown
@pool.shutdown if @pool
end
def new_pool(min, max, &blk)
blk = lambda { } unless blk
@pool = Mongrel::ThreadPool.new(min, max, &blk)
end
def test_append_spawns
saw = []
pool = new_pool(0, 1) do |work|
saw << work
end
pool << 1
assert_equal [1], saw
assert_equal 1, pool.spawned
end
def test_append_queues_on_max
finish = false
pool = new_pool(0, 1) { Thread.pass until finish }
pool << 1
pool << 2
pool << 3
assert_equal 2, pool.backlog
finish = true
end
def test_trim
pool = new_pool(0, 1)
pool << 1
assert_equal 1, pool.spawned
pool.trim
assert_equal 0, pool.spawned
end
def test_trim_leaves_min
finish = false
pool = new_pool(1, 2) { Thread.pass until finish }
pool << 1
pool << 2
finish = true
assert_equal 2, pool.spawned
pool.trim
Thread.pass # give the others a chance to run and exit
assert_equal 1, pool.spawned
pool.trim
Thread.pass # give the others a chance to run and exit
assert_equal 1, pool.spawned
end
def test_trim_doesnt_overtrim
finish = false
pool = new_pool(1, 2) { Thread.pass until finish }
pool << 1
pool << 2
assert_equal 2, pool.spawned
pool.trim
pool.trim
finish = true
Thread.pass # give the others a chance to run and exit
assert_equal 1, pool.spawned
end
end

View file

@ -24,8 +24,7 @@ class WebServerTest < Test::Unit::TestCase
@valid_request = "GET / HTTP/1.1\r\nHost: www.zedshaw.com\r\nContent-Type: text/plain\r\n\r\n"
redirect_test_io do
# We set num_processors=1 so that we can test the reaping code
@server = HttpServer.new("127.0.0.1", 9998, num_processors=1)
@server = HttpServer.new("127.0.0.1", 9998)
end
@tester = TestHandler.new
@ -92,19 +91,6 @@ class WebServerTest < Test::Unit::TestCase
end
end
def test_num_processors_overload
redirect_test_io do
assert_raises Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EINVAL, IOError do
tests = [
Thread.new { do_test(@valid_request, 1) },
Thread.new { do_test(@valid_request, 10) },
]
tests.each {|t| t.join}
end
end
end
def test_file_streamed_request
body = "a" * (Mongrel::Const::MAX_BODY * 2)
long = "GET /test HTTP/1.1\r\nContent-length: #{body.length}\r\n\r\n" + body