From b784cb7f18dc7bb8c3de1ff83b9ebd52a1ee41cd Mon Sep 17 00:00:00 2001 From: Emanuel Rietveld Date: Tue, 20 Jan 2015 13:20:39 +0100 Subject: [PATCH] Add a configuration option that prevents puma from queueing requests. With queue_requests set to true (the default), workers accept all requests and queue them before passing them to the handlers. With it set to false, each worker process accepts exactly as many requests as it is configured to simultaneously handle. In combination with threads 1, 1 this ensures that requests are balanced across workers in a single threaded application. This can avoid deadlocks when a single threaded app sends a request to itself. (For example, to generate a PDF.) --- lib/puma/client.rb | 8 ++++++++ lib/puma/configuration.rb | 18 ++++++++++++++++++ lib/puma/server.rb | 23 ++++++++++++++++------- lib/puma/thread_pool.rb | 27 ++++++++++++++++++++------- 4 files changed, 62 insertions(+), 14 deletions(-) diff --git a/lib/puma/client.rb b/lib/puma/client.rb index 96c2f796..4ad819f8 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -222,6 +222,14 @@ module Puma end end # IS_JRUBY + def finish + return true if @ready + until try_to_finish + IO.select([@to_io], nil, nil) + end + true + end + def read_body # Read an odd sized chunk so we can read even sized ones # after this diff --git a/lib/puma/configuration.rb b/lib/puma/configuration.rb index fe616bc7..1354e3fb 100644 --- a/lib/puma/configuration.rb +++ b/lib/puma/configuration.rb @@ -27,6 +27,7 @@ module Puma @options[:after_worker_boot] ||= [] @options[:worker_timeout] ||= DefaultWorkerTimeout @options[:worker_shutdown_timeout] ||= DefaultWorkerShutdownTimeout + @options[:queue_requests] ||= true end attr_reader :options @@ -402,6 +403,23 @@ module Puma def worker_shutdown_timeout(timeout) @options[:worker_shutdown_timeout] = timeout end + + # When set to true (the default), workers accept all requests + # and queue them before passing them to the handlers. + # When set to false, each worker process accepts exactly as + # many requests as it is configured to simultaneously handle. + # + # Queueing requests generally improves performance. In some + # cases, such as a single threaded application, it may be + # better to ensure requests get balanced across workers. + # + # Note that setting this to false disables HTTP keepalive and + # slow clients will occupy a handler thread while the request + # is being sent. A reverse proxy, such as nginx, can handle + # slow clients and queue requests before they reach puma. + def queue_requests(answer=true) + @options[:queue_requests] = answer + end end end end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index a99daf6c..69b1b2b8 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -241,7 +241,12 @@ module Puma process_now = false begin - process_now = client.eagerly_finish + if @options[:queue_requests] + process_now = client.eagerly_finish + else + client.finish + process_now = true + end rescue HttpParserError => e client.write_400 client.close @@ -261,9 +266,10 @@ module Puma @thread_pool.clean_thread_locals = @options[:clean_thread_locals] - @reactor = Reactor.new self, @thread_pool - - @reactor.run_in_thread + if @options[:queue_requests] + @reactor = Reactor.new self, @thread_pool + @reactor.run_in_thread + end if @auto_trim_time @thread_pool.auto_trim!(@auto_trim_time) @@ -296,6 +302,7 @@ module Puma if io = sock.accept_nonblock client = Client.new io, @binder.env(sock) pool << client + pool.wait_until_not_full unless @options[:queue_requests] end rescue SystemCallError end @@ -312,9 +319,10 @@ module Puma @events.fire :state, @status graceful_shutdown if @status == :stop || @status == :restart - @reactor.clear! if @status == :restart - - @reactor.shutdown + if @options[:queue_requests] + @reactor.clear! if @status == :restart + @reactor.shutdown + end rescue Exception => e STDERR.puts "Exception handling servers: #{e.message} (#{e.class})" STDERR.puts e.backtrace @@ -367,6 +375,7 @@ module Puma close_socket = false return when true + return unless @options[:queue_requests] buffer.reset unless client.reset(@status == :run) diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index 7ec36ff9..acca9276 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -12,7 +12,8 @@ module Puma # thread. # def initialize(min, max, *extra, &block) - @cond = ConditionVariable.new + @not_empty = ConditionVariable.new + @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @@ -60,7 +61,8 @@ module Puma todo = @todo block = @block mutex = @mutex - cond = @cond + not_empty = @not_empty + not_full = @not_full extra = @extra.map { |i| i.new } @@ -83,10 +85,11 @@ module Puma end @waiting += 1 - cond.wait mutex - @waiting -= 1 + not_full.signal + not_empty.wait mutex end + @waiting -= 1 work = todo.shift if continue end @@ -125,9 +128,18 @@ module Puma if @waiting < @todo.size and @spawned < @max spawn_thread + @waiting+=1 end - @cond.signal + @not_empty.signal + end + end + + def wait_until_not_full + @mutex.synchronize do + until @todo.size - @waiting < @max - @spawned or @shutdown + @not_full.wait @mutex + end end end @@ -139,7 +151,7 @@ module Puma @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 - @cond.signal + @not_empty.signal end end end @@ -178,7 +190,8 @@ module Puma def shutdown @mutex.synchronize do @shutdown = true - @cond.broadcast + @not_empty.broadcast + @not_full.broadcast @auto_trim.stop if @auto_trim end