diff --git a/lib/mongrel/handlers.rb b/lib/mongrel/handlers.rb index 9459a333..3c43b6d4 100644 --- a/lib/mongrel/handlers.rb +++ b/lib/mongrel/handlers.rb @@ -351,7 +351,6 @@ module Mongrel def process(request, response) if rand(@sample_rate)+1 == @sample_rate - @processors.sample(listener.workers.list.length) @headcount.sample(request.params.length) @reqsize.sample(request.body.length / 1024.0) @respsize.sample((response.body.length + response.header.out.length) / 1024.0) @@ -360,7 +359,7 @@ module Mongrel end def dump - "#{@processors.to_s}\n#{@reqsize.to_s}\n#{@headcount.to_s}\n#{@respsize.to_s}\n#{@interreq.to_s}" + "#{@reqsize.to_s}\n#{@headcount.to_s}\n#{@respsize.to_s}\n#{@interreq.to_s}" end end @@ -390,7 +389,7 @@ module Mongrel ["port",listener.port], ["throttle",listener.throttle], ["timeout",listener.timeout], - ["workers max",listener.num_processors], + ["concurrency",listener.concurrent], ]) if @stats diff --git a/lib/mongrel/server.rb b/lib/mongrel/server.rb index 4adb6e5b..fd7c17b6 100644 --- a/lib/mongrel/server.rb +++ b/lib/mongrel/server.rb @@ -1,4 +1,5 @@ require 'rack' +require 'mongrel/thread_pool' module Mongrel # Thrown at a thread when it is timed out. @@ -35,7 +36,7 @@ module Mongrel attr_reader :port attr_reader :throttle attr_reader :timeout - attr_reader :num_processors + attr_reader :concurrent # Creates a working server on host:port (strange things happen if port # isn't a Number). @@ -43,29 +44,29 @@ module Mongrel # Use HttpServer.run to start the server and HttpServer.acceptor.join to # join the thread that's processing incoming requests on the socket. # - # The num_processors optional argument is the maximum number of concurrent - # processors to accept, anything over this is closed immediately to maintain - # server processing performance. This may seem mean but it is the most - # efficient way to deal with overload. Other schemes involve still - # parsing the client's request which defeats the point of an overload - # handling system. - # + # +concurrent+ indicates how many concurrent requests should be run at + # the same time. Any requests over this ammount are queued and handled + # as soon as a thread is available. + # # The throttle parameter is a sleep timeout (in hundredths of a second) # that is placed between socket.accept calls in order to give the server # a cheap throttle time. It defaults to 0 and actually if it is 0 then # the sleep is not done at all. - def initialize(host, port, num_processors=950, throttle=0, timeout=60) + def initialize(host, port, concurrent=10, throttle=0, timeout=60) @socket = TCPServer.new(host, port) @host = host @port = port - @workers = ThreadGroup.new @throttle = throttle / 100.0 - @num_processors = num_processors + @concurrent = concurrent @timeout = timeout @check, @notify = IO.pipe @running = true + + @thread_pool = ThreadPool.new(0, concurrent) do |client| + process_client(client) + end end def handle_request(params, client, body) @@ -147,9 +148,6 @@ module Mongrel STDERR.puts "#{Time.now}: HTTP parse error, malformed request (#{params[HTTP_X_FORWARDED_FOR] || client.peeraddr.last}): #{e.inspect}" STDERR.puts "#{Time.now}: REQUEST DATA: #{data.inspect}\n---\nPARAMS: #{params.inspect}\n---\n" - rescue Errno::EMFILE - reap_dead_workers('too many files') - rescue Object => e STDERR.puts "#{Time.now}: Read error: #{e.inspect}" STDERR.puts e.backtrace.join("\n") @@ -163,42 +161,12 @@ module Mongrel STDERR.puts "#{Time.now}: Client error: #{e.inspect}" STDERR.puts e.backtrace.join("\n") end - - request.close_body if request end end - # Used internally to kill off any worker threads that have taken too long - # to complete processing. Only called if there are too many processors - # currently servicing. It returns the count of workers still active - # after the reap is done. It only runs if there are workers to reap. - def reap_dead_workers(reason='unknown') - if @workers.list.length > 0 - STDERR.puts "#{Time.now}: Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'" - error_msg = "Mongrel timed out this thread: #{reason}" - mark = Time.now - @workers.list.each do |worker| - worker[:started_on] = Time.now if not worker[:started_on] - - if mark - worker[:started_on] > @timeout + @throttle - STDERR.puts "Thread #{worker.inspect} is too old, killing." - worker.raise(TimeoutError.new(error_msg)) - end - end - end - - return @workers.list.length - end - - # Performs a wait on all the currently running threads and kills any that - # take too long. It waits by @timeout seconds, which can be set in - # .initialize or via mongrel_rails. The @throttle setting does extend - # this waiting period by that much longer. + # Wait for all outstanding requests to finish. def graceful_shutdown - while reap_dead_workers("shutdown") > 0 - STDERR.puts "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout + @throttle} seconds." - sleep @timeout / 10 - end + @thread_pool.shutdown end def configure_socket_options @@ -234,12 +202,6 @@ module Mongrel return false end - def handle_overload(client) - STDERR.puts "Server overloaded with #{@workers.list.size} processors (#@num_processors max). Dropping connection." - client.close rescue nil - reap_dead_workers "max processors" - end - # Runs the thing. It returns the thread used so you can "join" it. # You can also access the HttpServer::acceptor attribute to get the # thread later. @@ -258,6 +220,7 @@ module Mongrel begin check = @check sockets = [check, @socket] + pool = @thread_pool while @running begin @@ -269,23 +232,10 @@ module Mongrel client = sock.accept client.setsockopt(*tcp_cork_opts) if tcp_cork_opts - - worker_list = @workers.list - - if worker_list.length >= @num_processors - handle_overload(client) - else - thread = Thread.new(client) { |c| process_client(c) } - thread[:started_on] = Time.now - @workers.add(thread) - - sleep @throttle if @throttle > 0 - end + + pool << client end end - rescue Errno::EMFILE - reap_dead_workers("too many open files") - sleep 0.5 rescue Errno::ECONNABORTED # client closed the socket even before accept client.close rescue nil @@ -339,19 +289,23 @@ module Mongrel return end - # request is good so far, continue processing the response - response = HttpResponse.new(client) + begin + # request is good so far, continue processing the response + response = HttpResponse.new(client) - # Process each handler in registered order until we run out - # or one finalizes the response. - handlers.each do |handler| - handler.process(request, response) - break if response.done or client.closed? - end + # Process each handler in registered order until we run out + # or one finalizes the response. + handlers.each do |handler| + handler.process(request, response) + break if response.done or client.closed? + end - # And finally, if nobody closed the response off, we finalize it. - unless response.done or client.closed? - response.finished + # And finally, if nobody closed the response off, we finalize it. + unless response.done or client.closed? + response.finished + end + ensure + request.close_body end else # Didn't find it, return a stock 404 response. @@ -396,9 +350,9 @@ module Mongrel class RackServer < Server attr_accessor :app - def process(params, client, body) + def process(env, client, body) begin - request = HttpRequest.new(params, client, body) + request = HttpRequest.new(env, client, body) # in the case of large file uploads the user could close # the socket, so skip those requests @@ -406,24 +360,22 @@ module Mongrel return end - env = params - - env["SCRIPT_NAME"] = "" - - env["rack.version"] = Rack::VERSION - env["rack.input"] = request.body - env["rack.errors"] = $stderr - env["rack.multithread"] = true - env["rack.multiprocess"] = false - env["rack.run_once"] = true - env["rack.url_scheme"] = env["HTTPS"] ? "https" : "http" - - env["CONTENT_TYPE"] ||= "" - env["QUERY_STRING"] ||= "" - - status, headers, body = @app.call(env) - begin + env["SCRIPT_NAME"] = "" + + env["rack.version"] = Rack::VERSION + env["rack.input"] = request.body + env["rack.errors"] = $stderr + env["rack.multithread"] = true + env["rack.multiprocess"] = false + env["rack.run_once"] = true + env["rack.url_scheme"] = env["HTTPS"] ? "https" : "http" + + env["CONTENT_TYPE"] ||= "" + env["QUERY_STRING"] ||= "" + + status, headers, body = @app.call(env) + client.write "HTTP/1.1 " client.write status.to_s client.write " " @@ -454,6 +406,7 @@ module Mongrel end end ensure + request.close_body body.close if body.respond_to? :close end end diff --git a/lib/mongrel/thread_pool.rb b/lib/mongrel/thread_pool.rb new file mode 100644 index 00000000..b0cce474 --- /dev/null +++ b/lib/mongrel/thread_pool.rb @@ -0,0 +1,95 @@ +require 'thread' + +module Mongrel + class ThreadPool + def initialize(min, max, &blk) + @todo = Queue.new + @mutex = Mutex.new + + @spawned = 0 + @min = min + @max = max + @block = blk + + @trim_requested = 0 + + @workers = [] + + min.times { spawn_thread } + end + + attr_reader :spawned + + def backlog + @todo.size + end + + Stop = Object.new + Trim = Object.new + + def spawn_thread + @mutex.synchronize do + @spawned += 1 + end + + th = Thread.new do + todo = @todo + block = @block + + while true + work = todo.pop + + case work + when Stop + break + when Trim + @mutex.synchronize do + @trim_requested -= 1 + end + + break + else + block.call work + end + end + + @mutex.synchronize do + @spawned -= 1 + @workers.delete th + end + end + + @mutex.synchronize { @workers << th } + + th + end + + def <<(work) + if @todo.num_waiting == 0 and @spawned < @max + spawn_thread + end + + @todo << work + end + + def trim + @mutex.synchronize do + if @spawned - @trim_requested > @min + @trim_requested += 1 + @todo << Trim + end + end + end + + def shutdown + @spawned.times do + @todo << Stop + end + + @workers.each { |w| w.join } + + @spawned = 0 + @workers = [] + end + end +end diff --git a/test/test_thread_pool.rb b/test/test_thread_pool.rb new file mode 100644 index 00000000..8edd44ea --- /dev/null +++ b/test/test_thread_pool.rb @@ -0,0 +1,90 @@ +require 'test/unit' + +require 'mongrel/thread_pool' + +class TestThreadPool < Test::Unit::TestCase + + def teardown + @pool.shutdown if @pool + end + + def new_pool(min, max, &blk) + blk = lambda { } unless blk + @pool = Mongrel::ThreadPool.new(min, max, &blk) + end + + def test_append_spawns + saw = [] + + pool = new_pool(0, 1) do |work| + saw << work + end + + pool << 1 + + assert_equal [1], saw + assert_equal 1, pool.spawned + end + + def test_append_queues_on_max + finish = false + pool = new_pool(0, 1) { Thread.pass until finish } + + pool << 1 + pool << 2 + pool << 3 + + assert_equal 2, pool.backlog + + finish = true + end + + def test_trim + pool = new_pool(0, 1) + + pool << 1 + + assert_equal 1, pool.spawned + pool.trim + assert_equal 0, pool.spawned + end + + def test_trim_leaves_min + finish = false + pool = new_pool(1, 2) { Thread.pass until finish } + + pool << 1 + pool << 2 + + finish = true + + assert_equal 2, pool.spawned + pool.trim + Thread.pass # give the others a chance to run and exit + + assert_equal 1, pool.spawned + pool.trim + Thread.pass # give the others a chance to run and exit + + assert_equal 1, pool.spawned + + end + + def test_trim_doesnt_overtrim + finish = false + pool = new_pool(1, 2) { Thread.pass until finish } + + pool << 1 + pool << 2 + + assert_equal 2, pool.spawned + pool.trim + pool.trim + + finish = true + + Thread.pass # give the others a chance to run and exit + + assert_equal 1, pool.spawned + end +end diff --git a/test/test_ws.rb b/test/test_ws.rb index d239e189..9239dc81 100644 --- a/test/test_ws.rb +++ b/test/test_ws.rb @@ -24,8 +24,7 @@ class WebServerTest < Test::Unit::TestCase @valid_request = "GET / HTTP/1.1\r\nHost: www.zedshaw.com\r\nContent-Type: text/plain\r\n\r\n" redirect_test_io do - # We set num_processors=1 so that we can test the reaping code - @server = HttpServer.new("127.0.0.1", 9998, num_processors=1) + @server = HttpServer.new("127.0.0.1", 9998) end @tester = TestHandler.new @@ -92,19 +91,6 @@ class WebServerTest < Test::Unit::TestCase end end - def test_num_processors_overload - redirect_test_io do - assert_raises Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EINVAL, IOError do - tests = [ - Thread.new { do_test(@valid_request, 1) }, - Thread.new { do_test(@valid_request, 10) }, - ] - - tests.each {|t| t.join} - end - end - end - def test_file_streamed_request body = "a" * (Mongrel::Const::MAX_BODY * 2) long = "GET /test HTTP/1.1\r\nContent-length: #{body.length}\r\n\r\n" + body