diff --git a/lib/puma/client.rb b/lib/puma/client.rb index 96c2f796..4ad819f8 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -222,6 +222,14 @@ module Puma end end # IS_JRUBY + def finish + return true if @ready + until try_to_finish + IO.select([@to_io], nil, nil) + end + true + end + def read_body # Read an odd sized chunk so we can read even sized ones # after this diff --git a/lib/puma/configuration.rb b/lib/puma/configuration.rb index fe616bc7..1354e3fb 100644 --- a/lib/puma/configuration.rb +++ b/lib/puma/configuration.rb @@ -27,6 +27,7 @@ module Puma @options[:after_worker_boot] ||= [] @options[:worker_timeout] ||= DefaultWorkerTimeout @options[:worker_shutdown_timeout] ||= DefaultWorkerShutdownTimeout + @options[:queue_requests] ||= true end attr_reader :options @@ -402,6 +403,23 @@ module Puma def worker_shutdown_timeout(timeout) @options[:worker_shutdown_timeout] = timeout end + + # When set to true (the default), workers accept all requests + # and queue them before passing them to the handlers. + # When set to false, each worker process accepts exactly as + # many requests as it is configured to simultaneously handle. + # + # Queueing requests generally improves performance. In some + # cases, such as a single threaded application, it may be + # better to ensure requests get balanced across workers. + # + # Note that setting this to false disables HTTP keepalive and + # slow clients will occupy a handler thread while the request + # is being sent. A reverse proxy, such as nginx, can handle + # slow clients and queue requests before they reach puma. + def queue_requests(answer=true) + @options[:queue_requests] = answer + end end end end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index a99daf6c..69b1b2b8 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -241,7 +241,12 @@ module Puma process_now = false begin - process_now = client.eagerly_finish + if @options[:queue_requests] + process_now = client.eagerly_finish + else + client.finish + process_now = true + end rescue HttpParserError => e client.write_400 client.close @@ -261,9 +266,10 @@ module Puma @thread_pool.clean_thread_locals = @options[:clean_thread_locals] - @reactor = Reactor.new self, @thread_pool - - @reactor.run_in_thread + if @options[:queue_requests] + @reactor = Reactor.new self, @thread_pool + @reactor.run_in_thread + end if @auto_trim_time @thread_pool.auto_trim!(@auto_trim_time) @@ -296,6 +302,7 @@ module Puma if io = sock.accept_nonblock client = Client.new io, @binder.env(sock) pool << client + pool.wait_until_not_full unless @options[:queue_requests] end rescue SystemCallError end @@ -312,9 +319,10 @@ module Puma @events.fire :state, @status graceful_shutdown if @status == :stop || @status == :restart - @reactor.clear! if @status == :restart - - @reactor.shutdown + if @options[:queue_requests] + @reactor.clear! if @status == :restart + @reactor.shutdown + end rescue Exception => e STDERR.puts "Exception handling servers: #{e.message} (#{e.class})" STDERR.puts e.backtrace @@ -367,6 +375,7 @@ module Puma close_socket = false return when true + return unless @options[:queue_requests] buffer.reset unless client.reset(@status == :run) diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index 7ec36ff9..acca9276 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -12,7 +12,8 @@ module Puma # thread. # def initialize(min, max, *extra, &block) - @cond = ConditionVariable.new + @not_empty = ConditionVariable.new + @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @@ -60,7 +61,8 @@ module Puma todo = @todo block = @block mutex = @mutex - cond = @cond + not_empty = @not_empty + not_full = @not_full extra = @extra.map { |i| i.new } @@ -83,10 +85,11 @@ module Puma end @waiting += 1 - cond.wait mutex - @waiting -= 1 + not_full.signal + not_empty.wait mutex end + @waiting -= 1 work = todo.shift if continue end @@ -125,9 +128,18 @@ module Puma if @waiting < @todo.size and @spawned < @max spawn_thread + @waiting+=1 end - @cond.signal + @not_empty.signal + end + end + + def wait_until_not_full + @mutex.synchronize do + until @todo.size - @waiting < @max - @spawned or @shutdown + @not_full.wait @mutex + end end end @@ -139,7 +151,7 @@ module Puma @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 - @cond.signal + @not_empty.signal end end end @@ -178,7 +190,8 @@ module Puma def shutdown @mutex.synchronize do @shutdown = true - @cond.broadcast + @not_empty.broadcast + @not_full.broadcast @auto_trim.stop if @auto_trim end