1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00

Reactor refactor (#2279)

* Refactor Reactor and Client request buffering
Refactor Reactor into a more generic IO-with-timeout monitor,
using a Queue to simplify the implementation.
Move request-buffering logic into Server#reactor_wakeup.
Fixes bug in managing timeouts on clients.
Move, update and rewrite documentation to match updated class structure.

* Fix a few concurrency bugs
- In `Reactor#shutdown`, `@selector` can be closed before the call to `#wakeup`, so catch/ignore the `IOError` that may be thrown.
- `Reactor#wakeup!` can delete elements from the `@timeouts` array so calling it from an `#each` block can cause the array iteration to miss elements. Call @block directly instead.
- Change `Reactor#add` to return `false` if the reactor is already shut down instead of invoking the block immediately, so a client-request currently being processed can continue, rather than re-adding to the thread-pool (which may already be shutting down and unable to accept new work).

Co-authored-by: Nate Berkopec <nate.berkopec@gmail.com>
This commit is contained in:
Will Jordan 2020-10-06 06:22:53 -07:00 committed by GitHub
parent 8f9396f439
commit a76d3905d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 229 additions and 476 deletions

View file

@ -7,6 +7,8 @@
* Cleanup daemonization in rc.d script (#2409)
* Refactor
* Consolidate option handling in Server, Server small refactors, doc chang (#2389)
* Refactor Reactor and Client request buffering (#2279)
* client.rb - remove JRuby specific 'finish' code (#2412)
* Consolidate fast_write calls in Server, extract early_hints assembly (#2405)
* Remove upstart from docs (#2408)

View file

@ -103,7 +103,12 @@ module Puma
end
def set_timeout(val)
@timeout_at = Time.now + val
@timeout_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + val
end
# Number of seconds until the timeout elapses.
def timeout
[@timeout_at - Process.clock_gettime(Process::CLOCK_MONOTONIC), 0].max
end
def reset(fast_check=true)
@ -195,19 +200,13 @@ module Puma
end
def finish(timeout)
return true if @ready
until try_to_finish
can_read = begin
IO.select([@to_io], nil, nil, timeout)
rescue ThreadPool::ForceShutdown
nil
end
unless can_read
write_error(408) if in_data_phase
raise ConnectionError
end
end
true
return if @ready
IO.select([@to_io], nil, nil, timeout) || timeout! until try_to_finish
end
def timeout!
write_error(408) if in_data_phase
raise ConnectionError
end
def write_error(status_code)

24
lib/puma/queue_close.rb Normal file
View file

@ -0,0 +1,24 @@
# Queue#close was added in Ruby 2.3.
# Add a simple implementation for earlier Ruby versions.
unless Queue.instance_methods.include?(:close)
class ClosedQueueError < StandardError; end
module Puma
module QueueClose
def initialize
@closed = false
super
end
def close
@closed = true
end
def closed?
@closed
end
def push(object)
raise ClosedQueueError if @closed
super
end
end
Queue.prepend QueueClose
end
end

View file

@ -1,394 +1,106 @@
# frozen_string_literal: true
require 'puma/util'
require 'puma/minissl' if ::Puma::HAS_SSL
require 'nio'
require 'puma/queue_close' if RUBY_VERSION < '2.3'
module Puma
# Internal Docs, Not a public interface.
# Monitors a collection of IO objects, calling a block whenever
# any monitored object either receives data or times out, or when the Reactor shuts down.
#
# The Reactor object is responsible for ensuring that a request has been
# completely received before it starts to be processed. This may be known as read buffering.
# If read buffering is not done, and no other read buffering is performed (such as by an application server
# such as nginx) then the application would be subject to a slow client attack.
# The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev,
# Java NIO or just plain IO#select). The call to `NIO::Selector#select` will
# 'wakeup' any IO object that receives data.
#
# Each Puma "worker" process has its own Reactor. For example if you start puma with `$ puma -w 5` then
# it will have 5 workers and each worker will have it's own reactor.
# This class additionally tracks a timeout for every added object,
# and wakes up any object when its timeout elapses.
#
# For a graphical representation of how the reactor works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
#
# ## Reactor Flow
#
# A connection comes into a `Puma::Server` instance, it is then passed to a `Puma::Reactor` instance,
# which stores it in an array and waits for any of the connections to be ready for reading.
#
# The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, Java NIO or
# just plain IO#select). The call to `NIO::Selector#select` will "wake up" and
# return the references to any objects that caused it to "wake". The reactor
# then loops through each of these request objects, and sees if they're complete. If they
# have a full header and body then the reactor passes the request to a thread pool.
# Once in a thread pool, a "worker thread" can run the the application's Ruby code against the request.
#
# If the request is not complete, then it stays in the array, and the next time any
# data is written to that socket reference, then the loop is woken up and it is checked for completeness again.
#
# A detailed example is given in the docs for `run_internal` which is where the bulk
# of this logic lives.
# The implementation uses a Queue to synchronize adding new objects from the internal select loop.
class Reactor
DefaultSleepFor = 5
# Creates an instance of Puma::Reactor
#
# The `server` argument is an instance of `Puma::Server`
# that is used to write a response for "low level errors"
# when there is an exception inside of the reactor.
#
# The `app_pool` is an instance of `Puma::ThreadPool`.
# Once a request is fully formed (header and body are received)
# it will be passed to the `app_pool`.
def initialize(server, app_pool)
@server = server
@events = server.events
@app_pool = app_pool
# Create a new Reactor to monitor IO objects added by #add.
# The provided block will be invoked when an IO has data available to read,
# its timeout elapses, or when the Reactor shuts down.
def initialize(&block)
@selector = NIO::Selector.new
@mutex = Mutex.new
# Read / Write pipes to wake up internal while loop
@ready, @trigger = Puma::Util.pipe
@input = []
@sleep_for = DefaultSleepFor
@input = Queue.new
@timeouts = []
@block = block
end
mon = @selector.register(@ready, :r)
mon.value = @ready
# Run the internal select loop, using a background thread by default.
def run(background=true)
if background
@thread = Thread.new do
Puma.set_thread_name "reactor"
select_loop
end
else
select_loop
end
end
@monitors = [mon]
# Add a new IO object to monitor.
# The object must respond to #timeout and #timeout_at.
# Returns false if the reactor is already shut down.
def add(io)
@input << io
@selector.wakeup
true
rescue ClosedQueueError
false
end
# Shutdown the reactor, blocking until the background thread is finished.
def shutdown
@input.close
begin
@selector.wakeup
rescue IOError # Ignore if selector is already closed
end
@thread.join if @thread
end
private
# Until a request is added via the `add` method this method will internally
# loop, waiting on the `sockets` array objects. The only object in this
# array at first is the `@ready` IO object, which is the read end of a pipe
# connected to `@trigger` object. When `@trigger` is written to, then the loop
# will break on `NIO::Selector#select` and return an array.
#
# ## When a request is added:
#
# When the `add` method is called, an instance of `Puma::Client` is added to the `@input` array.
# Next the `@ready` pipe is "woken" by writing a string of `"*"` to `@trigger`.
#
# When that happens, the internal loop stops blocking at `NIO::Selector#select` and returns a reference
# to whatever "woke" it up. On the very first loop, the only thing in `sockets` is `@ready`.
# When `@trigger` is written-to, the loop "wakes" and the `ready`
# variable returns an array of arrays that looks like `[[#<IO:fd 10>], [], []]` where the
# first IO object is the `@ready` object. This first array `[#<IO:fd 10>]`
# is saved as a `reads` variable.
#
# The `reads` variable is iterated through. In the case that the object
# is the same as the `@ready` input pipe, then we know that there was a `trigger` event.
#
# If there was a trigger event, then one byte of `@ready` is read into memory. In the case of the first request,
# the reactor sees that it's a `"*"` value and the reactor adds the contents of `@input` into the `sockets` array.
# The while then loop continues to iterate again, but now the `sockets` array contains a `Puma::Client` instance in addition
# to the `@ready` IO object. For example: `[#<IO:fd 10>, #<Puma::Client:0x3fdc1103bee8 @ready=false>]`.
#
# Since the `Puma::Client` in this example has data that has not been read yet,
# the `NIO::Selector#select` is immediately able to "wake" and read from the `Puma::Client`. At this point the
# `ready` output looks like this: `[[#<Puma::Client:0x3fdc1103bee8 @ready=false>], [], []]`.
#
# Each element in the first entry is iterated over. The `Puma::Client` object is not
# the `@ready` pipe, so the reactor checks to see if it has the full header and body with
# the `Puma::Client#try_to_finish` method. If the full request has been sent,
# then the request is passed off to the `@app_pool` thread pool so that a "worker thread"
# can pick up the request and begin to execute application logic. This is done
# via `@app_pool << c`. The `Puma::Client` is then removed from the `sockets` array.
#
# If the request body is not present then nothing will happen, and the loop will iterate
# again. When the client sends more data to the socket the `Puma::Client` object will
# wake up the `NIO::Selector#select` and it can again be checked to see if it's ready to be
# passed to the thread pool.
#
# ## Time Out Case
#
# In addition to being woken via a write to one of the sockets the `NIO::Selector#select` will
# periodically "time out" of the sleep. One of the functions of this is to check for
# any requests that have "timed out". At the end of the loop it's checked to see if
# the first element in the `@timeout` array has exceed its allowed time. If so,
# the client object is removed from the timeout array, a 408 response is written.
# Then its connection is closed, and the object is removed from the `sockets` array
# that watches for new data.
#
# This behavior loops until all the objects that have timed out have been removed.
#
# Once all the timeouts have been processed, the next duration of the `NIO::Selector#select` sleep
# will be set to be equal to the amount of time it will take for the next timeout to occur.
# This calculation happens in `calculate_sleep`.
def run_internal
monitors = @monitors
selector = @selector
while true
begin
ready = selector.select @sleep_for
rescue IOError => e
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
if monitors.any? { |mon| mon.value.closed? }
STDERR.puts "Error in select: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
monitors.reject! do |mon|
if mon.value.closed?
selector.deregister mon.value
true
end
end
retry
else
raise
end
end
if ready
ready.each do |mon|
if mon.value == @ready
@mutex.synchronize do
case @ready.read(1)
when "*"
@input.each do |c|
mon = nil
begin
begin
mon = selector.register(c, :r)
rescue ArgumentError
# There is a bug where we seem to be registering an already registered
# client. This code deals with this situation but I wish we didn't have to.
monitors.delete_if { |submon| submon.value.to_io == c.to_io }
selector.deregister(c)
mon = selector.register(c, :r)
end
rescue IOError
# Means that the io is closed, so we should ignore this request
# entirely
else
mon.value = c
@timeouts << mon if c.timeout_at
monitors << mon
end
end
@input.clear
@timeouts.sort! { |a,b| a.value.timeout_at <=> b.value.timeout_at }
calculate_sleep
when "c"
monitors.reject! do |submon|
if submon.value == @ready
false
else
if submon.value.can_close?
submon.value.close
else
# Pass remaining open client connections to the thread pool.
@app_pool << submon.value
end
begin
selector.deregister submon.value
rescue IOError
# nio4r on jruby seems to throw an IOError here if the IO is closed, so
# we need to swallow it.
end
true
end
end
when "!"
return
end
end
else
c = mon.value
# We have to be sure to remove it from the timeout
# list or we'll accidentally close the socket when
# it's in use!
if c.timeout_at
@mutex.synchronize do
@timeouts.delete mon
end
end
begin
if c.try_to_finish
@app_pool << c
clear_monitor mon
end
# Don't report these to the lowlevel_error handler, otherwise
# will be flooding them with errors when persistent connections
# are closed.
rescue ConnectionError
c.write_error(500)
c.close
clear_monitor mon
# SSL handshake failure
rescue MiniSSL::SSLError => e
@server.lowlevel_error e, c.env
@events.ssl_error e, c.io
c.close
clear_monitor mon
# The client doesn't know HTTP well
rescue HttpParserError => e
@server.lowlevel_error(e, c.env)
c.write_error(400)
c.close
clear_monitor mon
@events.parse_error e, c
rescue StandardError => e
@server.lowlevel_error(e, c.env)
c.write_error(500)
c.close
clear_monitor mon
end
end
end
end
unless @timeouts.empty?
@mutex.synchronize do
now = Time.now
while @timeouts.first.value.timeout_at < now
mon = @timeouts.shift
c = mon.value
c.write_error(408) if c.in_data_phase
c.close
clear_monitor mon
break if @timeouts.empty?
end
calculate_sleep
end
end
end
end
def clear_monitor(mon)
@selector.deregister mon.value
@monitors.delete mon
end
public
def run
run_internal
ensure
@trigger.close
@ready.close
end
def run_in_thread
@thread = Thread.new do
Puma.set_thread_name "reactor"
begin
run_internal
rescue StandardError => e
STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
retry
ensure
@trigger.close
@ready.close
end
end
end
# The `calculate_sleep` sets the value that the `NIO::Selector#select` will
# sleep for in the main reactor loop when no sockets are being written to.
#
# The values kept in `@timeouts` are sorted so that the first timeout
# comes first in the array. When there are no timeouts the default timeout is used.
#
# Otherwise a sleep value is set that is the same as the amount of time it
# would take for the first element to time out.
#
# If that value is in the past, then a sleep value of zero is used.
def calculate_sleep
if @timeouts.empty?
@sleep_for = DefaultSleepFor
else
diff = @timeouts.first.value.timeout_at.to_f - Time.now.to_f
if diff < 0.0
@sleep_for = 0
else
@sleep_for = diff
end
end
end
# This method adds a connection to the reactor
#
# Typically called by `Puma::Server` the value passed in
# is usually a `Puma::Client` object that responds like an IO
# object.
#
# The main body of the reactor loop is in `run_internal` and it
# will sleep on `NIO::Selector#select`. When a new connection is added to the
# reactor it cannot be added directly to the `sockets` array, because
# the `NIO::Selector#select` will not be watching for it yet.
#
# Instead what needs to happen is that `NIO::Selector#select` needs to be woken up,
# the contents of `@input` added to the `sockets` array, and then
# another call to `NIO::Selector#select` needs to happen. Since the `Puma::Client`
# object can be read immediately, it does not block, but instead returns
# right away.
#
# This behavior is accomplished by writing to `@trigger` which wakes up
# the `NIO::Selector#select` and then there is logic to detect the value of `*`,
# pull the contents from `@input` and add them to the sockets array.
#
# If the object passed in has a timeout value in `timeout_at` then
# it is added to a `@timeouts` array. This array is then re-arranged
# so that the first element to timeout will be at the front of the
# array. Then a value to sleep for is derived in the call to `calculate_sleep`
def add(c)
@mutex.synchronize do
@input << c
@trigger << "*"
end
end
# Close all watched sockets and clear them from being watched
def clear!
def select_loop
begin
@trigger << "c"
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
until @input.closed? && @input.empty?
# Wakeup any registered object that receives incoming data.
# Block until the earliest timeout or Selector#wakeup is called.
timeout = (earliest = @timeouts.first) && earliest.timeout
@selector.select(timeout) {|mon| wakeup!(mon.value)}
# Wakeup all objects that timed out.
timed_out = @timeouts.take_while {|t| t.timeout == 0}
timed_out.each(&method(:wakeup!))
unless @input.empty?
register(@input.pop) until @input.empty?
@timeouts.sort_by!(&:timeout_at)
end
end
rescue StandardError => e
STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
retry
end
# Wakeup all remaining objects on shutdown.
@timeouts.each(&@block.method(:call))
@selector.close
end
def shutdown
begin
@trigger << "!"
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
# Start monitoring the object.
def register(io)
@selector.register(io, :r).value = io
@timeouts << io
end
@thread.join
# 'Wake up' a monitored object by calling the provided block.
# Stop monitoring the object if the block returns `true`.
def wakeup!(io)
if @block.call(io)
@selector.deregister(io)
@timeouts.delete(io)
end
end
end
end

View file

@ -101,8 +101,6 @@ module Puma
@precheck_closing = true
@requests_count = 0
@shutdown_mutex = Mutex.new
end
def inherit_binder(bind)
@ -225,60 +223,19 @@ module Puma
@status = :run
@thread_pool = ThreadPool.new(@min_threads,
@max_threads,
::Puma::IOBuffer) do |client, buffer|
# Advertise this server into the thread
Thread.current[ThreadLocalKey] = self
process_now = false
begin
if @queue_requests
process_now = client.eagerly_finish
else
@thread_pool.with_force_shutdown do
client.finish(@first_data_timeout)
end
process_now = true
end
rescue MiniSSL::SSLError => e
@events.ssl_error e, client.io
client.close
rescue HttpParserError => e
client.write_error(400)
client.close
@events.parse_error e, client
rescue EOFError => e
client.close
# Swallow, do not log
rescue ConnectionError, ThreadPool::ForceShutdown => e
client.close
@events.connection_error e, client
else
process_now ||= @shutdown_mutex.synchronize do
next true unless @queue_requests
client.set_timeout @first_data_timeout
@reactor.add client
false
end
process_client client, buffer if process_now
end
process_now
end
@thread_pool = ThreadPool.new(
@min_threads,
@max_threads,
::Puma::IOBuffer,
&method(:process_client)
)
@thread_pool.out_of_band_hook = @options[:out_of_band]
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
if @queue_requests
@reactor = Reactor.new self, @thread_pool
@reactor.run_in_thread
@reactor = Reactor.new(&method(:reactor_wakeup))
@reactor.run
end
if @reaping_time
@ -302,6 +259,44 @@ module Puma
end
end
# This method is called from the Reactor thread when a queued Client receives data,
# times out, or when the Reactor is shutting down.
#
# It is responsible for ensuring that a request has been completely received
# before it starts to be processed by the ThreadPool. This may be known as read buffering.
# If read buffering is not done, and no other read buffering is performed (such as by an application server
# such as nginx) then the application would be subject to a slow client attack.
#
# For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
#
# The method checks to see if it has the full header and body with
# the `Puma::Client#try_to_finish` method. If the full request has been sent,
# then the request is passed to the ThreadPool (`@thread_pool << client`)
# so that a "worker thread" can pick up the request and begin to execute application logic.
# The Client is then removed from the reactor (return `true`).
#
# If a client object times out, a 408 response is written, its connection is closed,
# and the object is removed from the reactor (return `true`).
#
# If the Reactor is shutting down, all Clients are either timed out or passed to the
# ThreadPool, depending on their current state (#can_close?).
#
# Otherwise, if the full request is not ready then the client will remain in the reactor
# (return `false`). When the client sends more data to the socket the `Puma::Client` object
# will wake up and again be checked to see if it's ready to be passed to the thread pool.
def reactor_wakeup(client)
shutdown = !@queue_requests
if client.try_to_finish || (shutdown && !client.can_close?)
@thread_pool << client
elsif shutdown || client.timeout == 0
client.timeout!
end
rescue StandardError => e
client_error(e, client)
client.close
true
end
def handle_servers
@check, @notify = Puma::Util.pipe unless @notify
begin
@ -353,10 +348,7 @@ module Puma
@events.fire :state, @status
if queue_requests
@shutdown_mutex.synchronize do
@queue_requests = false
end
@reactor.clear!
@queue_requests = false
@reactor.shutdown
end
graceful_shutdown if @status == :stop || @status == :restart
@ -396,27 +388,47 @@ module Puma
return false
end
# Given a connection on +client+, handle the incoming requests.
# Given a connection on +client+, handle the incoming requests,
# or queue the connection in the Reactor if no request is available.
#
# This method support HTTP Keep-Alive so it may, depending on if the client
# This method is called from a ThreadPool worker thread.
#
# This method supports HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
# Return true if one or more requests were processed.
def process_client(client, buffer)
# Advertise this server into the thread
Thread.current[ThreadLocalKey] = self
clean_thread_locals = @options[:clean_thread_locals]
close_socket = true
requests = 0
begin
if @queue_requests &&
!client.eagerly_finish
clean_thread_locals = @options[:clean_thread_locals]
close_socket = true
client.set_timeout(@first_data_timeout)
if @reactor.add client
close_socket = false
return false
end
end
requests = 0
with_force_shutdown(client) do
client.finish(@first_data_timeout)
end
while true
case handle_request(client, buffer)
when false
return
break
when :async
close_socket = false
return
break
when true
buffer.reset
@ -434,47 +446,25 @@ module Puma
check_for_more_data = false
end
next_request_ready = @thread_pool.with_force_shutdown do
next_request_ready = with_force_shutdown(client) do
client.reset(check_for_more_data)
end
unless next_request_ready
@shutdown_mutex.synchronize do
return unless @queue_requests
break unless @queue_requests
client.set_timeout @persistent_timeout
if @reactor.add client
close_socket = false
client.set_timeout @persistent_timeout
@reactor.add client
return
break
end
end
end
end
# The client disconnected while we were reading data
rescue ConnectionError, ThreadPool::ForceShutdown
# Swallow them. The ensure tries to close +client+ down
# SSL handshake error
rescue MiniSSL::SSLError => e
lowlevel_error e, client.env
@events.ssl_error e, client.io
close_socket = true
# The client doesn't know HTTP well
rescue HttpParserError => e
lowlevel_error(e, client.env)
client.write_error(400)
@events.parse_error e, client
# Server error
true
rescue StandardError => e
lowlevel_error(e, client.env)
client.write_error(500)
@events.unknown_error e, nil, "Read"
client_error(e, client)
# The ensure tries to close +client+ down
requests > 0
ensure
buffer.reset
@ -489,6 +479,14 @@ module Puma
end
end
# Triggers a client timeout if the thread-pool shuts down
# during execution of the provided block.
def with_force_shutdown(client, &block)
@thread_pool.with_force_shutdown(&block)
rescue ThreadPool::ForceShutdown
client.timeout!
end
# Given a Hash +env+ for the request read from +client+, add
# and fixup keys to comply with Rack's env guidelines.
#
@ -872,6 +870,24 @@ module Puma
return stream
end
# Handle various error types thrown by Client I/O operations.
def client_error(e, client)
# Swallow, do not log
return if [ConnectionError, EOFError].include?(e.class)
lowlevel_error(e, client.env)
case e
when MiniSSL::SSLError
@events.ssl_error e, client.io
when HttpParserError
client.write_error(400)
@events.parse_error e, client
else
client.write_error(500)
@events.unknown_error e, nil, "Read"
end
end
# A fallback rack response if +@app+ raises as exception.
#
def lowlevel_error(e, env, status=500)