mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Add a configuration option that prevents puma from queueing requests.
With queue_requests set to true (the default), workers accept all requests and queue them before passing them to the handlers. With it set to false, each worker process accepts exactly as many requests as it is configured to simultaneously handle. In combination with threads 1, 1 this ensures that requests are balanced across workers in a single threaded application. This can avoid deadlocks when a single threaded app sends a request to itself. (For example, to generate a PDF.)
This commit is contained in:
parent
12c4c2d222
commit
b784cb7f18
4 changed files with 62 additions and 14 deletions
|
@ -222,6 +222,14 @@ module Puma
|
||||||
end
|
end
|
||||||
end # IS_JRUBY
|
end # IS_JRUBY
|
||||||
|
|
||||||
|
def finish
|
||||||
|
return true if @ready
|
||||||
|
until try_to_finish
|
||||||
|
IO.select([@to_io], nil, nil)
|
||||||
|
end
|
||||||
|
true
|
||||||
|
end
|
||||||
|
|
||||||
def read_body
|
def read_body
|
||||||
# Read an odd sized chunk so we can read even sized ones
|
# Read an odd sized chunk so we can read even sized ones
|
||||||
# after this
|
# after this
|
||||||
|
|
|
@ -27,6 +27,7 @@ module Puma
|
||||||
@options[:after_worker_boot] ||= []
|
@options[:after_worker_boot] ||= []
|
||||||
@options[:worker_timeout] ||= DefaultWorkerTimeout
|
@options[:worker_timeout] ||= DefaultWorkerTimeout
|
||||||
@options[:worker_shutdown_timeout] ||= DefaultWorkerShutdownTimeout
|
@options[:worker_shutdown_timeout] ||= DefaultWorkerShutdownTimeout
|
||||||
|
@options[:queue_requests] ||= true
|
||||||
end
|
end
|
||||||
|
|
||||||
attr_reader :options
|
attr_reader :options
|
||||||
|
@ -402,6 +403,23 @@ module Puma
|
||||||
def worker_shutdown_timeout(timeout)
|
def worker_shutdown_timeout(timeout)
|
||||||
@options[:worker_shutdown_timeout] = timeout
|
@options[:worker_shutdown_timeout] = timeout
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# When set to true (the default), workers accept all requests
|
||||||
|
# and queue them before passing them to the handlers.
|
||||||
|
# When set to false, each worker process accepts exactly as
|
||||||
|
# many requests as it is configured to simultaneously handle.
|
||||||
|
#
|
||||||
|
# Queueing requests generally improves performance. In some
|
||||||
|
# cases, such as a single threaded application, it may be
|
||||||
|
# better to ensure requests get balanced across workers.
|
||||||
|
#
|
||||||
|
# Note that setting this to false disables HTTP keepalive and
|
||||||
|
# slow clients will occupy a handler thread while the request
|
||||||
|
# is being sent. A reverse proxy, such as nginx, can handle
|
||||||
|
# slow clients and queue requests before they reach puma.
|
||||||
|
def queue_requests(answer=true)
|
||||||
|
@options[:queue_requests] = answer
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -241,7 +241,12 @@ module Puma
|
||||||
process_now = false
|
process_now = false
|
||||||
|
|
||||||
begin
|
begin
|
||||||
process_now = client.eagerly_finish
|
if @options[:queue_requests]
|
||||||
|
process_now = client.eagerly_finish
|
||||||
|
else
|
||||||
|
client.finish
|
||||||
|
process_now = true
|
||||||
|
end
|
||||||
rescue HttpParserError => e
|
rescue HttpParserError => e
|
||||||
client.write_400
|
client.write_400
|
||||||
client.close
|
client.close
|
||||||
|
@ -261,9 +266,10 @@ module Puma
|
||||||
|
|
||||||
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
|
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
|
||||||
|
|
||||||
@reactor = Reactor.new self, @thread_pool
|
if @options[:queue_requests]
|
||||||
|
@reactor = Reactor.new self, @thread_pool
|
||||||
@reactor.run_in_thread
|
@reactor.run_in_thread
|
||||||
|
end
|
||||||
|
|
||||||
if @auto_trim_time
|
if @auto_trim_time
|
||||||
@thread_pool.auto_trim!(@auto_trim_time)
|
@thread_pool.auto_trim!(@auto_trim_time)
|
||||||
|
@ -296,6 +302,7 @@ module Puma
|
||||||
if io = sock.accept_nonblock
|
if io = sock.accept_nonblock
|
||||||
client = Client.new io, @binder.env(sock)
|
client = Client.new io, @binder.env(sock)
|
||||||
pool << client
|
pool << client
|
||||||
|
pool.wait_until_not_full unless @options[:queue_requests]
|
||||||
end
|
end
|
||||||
rescue SystemCallError
|
rescue SystemCallError
|
||||||
end
|
end
|
||||||
|
@ -312,9 +319,10 @@ module Puma
|
||||||
@events.fire :state, @status
|
@events.fire :state, @status
|
||||||
|
|
||||||
graceful_shutdown if @status == :stop || @status == :restart
|
graceful_shutdown if @status == :stop || @status == :restart
|
||||||
@reactor.clear! if @status == :restart
|
if @options[:queue_requests]
|
||||||
|
@reactor.clear! if @status == :restart
|
||||||
@reactor.shutdown
|
@reactor.shutdown
|
||||||
|
end
|
||||||
rescue Exception => e
|
rescue Exception => e
|
||||||
STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
|
STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
|
||||||
STDERR.puts e.backtrace
|
STDERR.puts e.backtrace
|
||||||
|
@ -367,6 +375,7 @@ module Puma
|
||||||
close_socket = false
|
close_socket = false
|
||||||
return
|
return
|
||||||
when true
|
when true
|
||||||
|
return unless @options[:queue_requests]
|
||||||
buffer.reset
|
buffer.reset
|
||||||
|
|
||||||
unless client.reset(@status == :run)
|
unless client.reset(@status == :run)
|
||||||
|
|
|
@ -12,7 +12,8 @@ module Puma
|
||||||
# thread.
|
# thread.
|
||||||
#
|
#
|
||||||
def initialize(min, max, *extra, &block)
|
def initialize(min, max, *extra, &block)
|
||||||
@cond = ConditionVariable.new
|
@not_empty = ConditionVariable.new
|
||||||
|
@not_full = ConditionVariable.new
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
|
|
||||||
@todo = []
|
@todo = []
|
||||||
|
@ -60,7 +61,8 @@ module Puma
|
||||||
todo = @todo
|
todo = @todo
|
||||||
block = @block
|
block = @block
|
||||||
mutex = @mutex
|
mutex = @mutex
|
||||||
cond = @cond
|
not_empty = @not_empty
|
||||||
|
not_full = @not_full
|
||||||
|
|
||||||
extra = @extra.map { |i| i.new }
|
extra = @extra.map { |i| i.new }
|
||||||
|
|
||||||
|
@ -83,10 +85,11 @@ module Puma
|
||||||
end
|
end
|
||||||
|
|
||||||
@waiting += 1
|
@waiting += 1
|
||||||
cond.wait mutex
|
not_full.signal
|
||||||
@waiting -= 1
|
not_empty.wait mutex
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@waiting -= 1
|
||||||
work = todo.shift if continue
|
work = todo.shift if continue
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -125,9 +128,18 @@ module Puma
|
||||||
|
|
||||||
if @waiting < @todo.size and @spawned < @max
|
if @waiting < @todo.size and @spawned < @max
|
||||||
spawn_thread
|
spawn_thread
|
||||||
|
@waiting+=1
|
||||||
end
|
end
|
||||||
|
|
||||||
@cond.signal
|
@not_empty.signal
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def wait_until_not_full
|
||||||
|
@mutex.synchronize do
|
||||||
|
until @todo.size - @waiting < @max - @spawned or @shutdown
|
||||||
|
@not_full.wait @mutex
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -139,7 +151,7 @@ module Puma
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
if (force or @waiting > 0) and @spawned - @trim_requested > @min
|
if (force or @waiting > 0) and @spawned - @trim_requested > @min
|
||||||
@trim_requested += 1
|
@trim_requested += 1
|
||||||
@cond.signal
|
@not_empty.signal
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -178,7 +190,8 @@ module Puma
|
||||||
def shutdown
|
def shutdown
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
@shutdown = true
|
@shutdown = true
|
||||||
@cond.broadcast
|
@not_empty.broadcast
|
||||||
|
@not_full.broadcast
|
||||||
|
|
||||||
@auto_trim.stop if @auto_trim
|
@auto_trim.stop if @auto_trim
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue