From a76d3905d89780840b8cf4407c51c1ba384812c7 Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Tue, 6 Oct 2020 06:22:53 -0700 Subject: [PATCH] Reactor refactor (#2279) * Refactor Reactor and Client request buffering Refactor Reactor into a more generic IO-with-timeout monitor, using a Queue to simplify the implementation. Move request-buffering logic into Server#reactor_wakeup. Fixes bug in managing timeouts on clients. Move, update and rewrite documentation to match updated class structure. * Fix a few concurrency bugs - In `Reactor#shutdown`, `@selector` can be closed before the call to `#wakeup`, so catch/ignore the `IOError` that may be thrown. - `Reactor#wakeup!` can delete elements from the `@timeouts` array so calling it from an `#each` block can cause the array iteration to miss elements. Call @block directly instead. - Change `Reactor#add` to return `false` if the reactor is already shut down instead of invoking the block immediately, so a client-request currently being processed can continue, rather than re-adding to the thread-pool (which may already be shutting down and unable to accept new work). Co-authored-by: Nate Berkopec --- History.md | 2 + lib/puma/client.rb | 27 ++- lib/puma/queue_close.rb | 24 +++ lib/puma/reactor.rb | 450 ++++++++-------------------------------- lib/puma/server.rb | 202 +++++++++--------- 5 files changed, 229 insertions(+), 476 deletions(-) create mode 100644 lib/puma/queue_close.rb diff --git a/History.md b/History.md index 97b17eba..10e8ae4a 100644 --- a/History.md +++ b/History.md @@ -7,6 +7,8 @@ * Cleanup daemonization in rc.d script (#2409) * Refactor + * Consolidate option handling in Server, Server small refactors, doc chang (#2389) + * Refactor Reactor and Client request buffering (#2279) * client.rb - remove JRuby specific 'finish' code (#2412) * Consolidate fast_write calls in Server, extract early_hints assembly (#2405) * Remove upstart from docs (#2408) diff --git a/lib/puma/client.rb b/lib/puma/client.rb index dff85a6e..7614a77b 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -103,7 +103,12 @@ module Puma end def set_timeout(val) - @timeout_at = Time.now + val + @timeout_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + val + end + + # Number of seconds until the timeout elapses. + def timeout + [@timeout_at - Process.clock_gettime(Process::CLOCK_MONOTONIC), 0].max end def reset(fast_check=true) @@ -195,19 +200,13 @@ module Puma end def finish(timeout) - return true if @ready - until try_to_finish - can_read = begin - IO.select([@to_io], nil, nil, timeout) - rescue ThreadPool::ForceShutdown - nil - end - unless can_read - write_error(408) if in_data_phase - raise ConnectionError - end - end - true + return if @ready + IO.select([@to_io], nil, nil, timeout) || timeout! until try_to_finish + end + + def timeout! + write_error(408) if in_data_phase + raise ConnectionError end def write_error(status_code) diff --git a/lib/puma/queue_close.rb b/lib/puma/queue_close.rb new file mode 100644 index 00000000..a9244b67 --- /dev/null +++ b/lib/puma/queue_close.rb @@ -0,0 +1,24 @@ +# Queue#close was added in Ruby 2.3. +# Add a simple implementation for earlier Ruby versions. +unless Queue.instance_methods.include?(:close) + class ClosedQueueError < StandardError; end + module Puma + module QueueClose + def initialize + @closed = false + super + end + def close + @closed = true + end + def closed? + @closed + end + def push(object) + raise ClosedQueueError if @closed + super + end + end + Queue.prepend QueueClose + end +end diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index 2941bea0..a656d1f0 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -1,394 +1,106 @@ # frozen_string_literal: true -require 'puma/util' -require 'puma/minissl' if ::Puma::HAS_SSL - require 'nio' +require 'puma/queue_close' if RUBY_VERSION < '2.3' module Puma - # Internal Docs, Not a public interface. + # Monitors a collection of IO objects, calling a block whenever + # any monitored object either receives data or times out, or when the Reactor shuts down. # - # The Reactor object is responsible for ensuring that a request has been - # completely received before it starts to be processed. This may be known as read buffering. - # If read buffering is not done, and no other read buffering is performed (such as by an application server - # such as nginx) then the application would be subject to a slow client attack. + # The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, + # Java NIO or just plain IO#select). The call to `NIO::Selector#select` will + # 'wakeup' any IO object that receives data. # - # Each Puma "worker" process has its own Reactor. For example if you start puma with `$ puma -w 5` then - # it will have 5 workers and each worker will have it's own reactor. + # This class additionally tracks a timeout for every added object, + # and wakes up any object when its timeout elapses. # - # For a graphical representation of how the reactor works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline). - # - # ## Reactor Flow - # - # A connection comes into a `Puma::Server` instance, it is then passed to a `Puma::Reactor` instance, - # which stores it in an array and waits for any of the connections to be ready for reading. - # - # The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, Java NIO or - # just plain IO#select). The call to `NIO::Selector#select` will "wake up" and - # return the references to any objects that caused it to "wake". The reactor - # then loops through each of these request objects, and sees if they're complete. If they - # have a full header and body then the reactor passes the request to a thread pool. - # Once in a thread pool, a "worker thread" can run the the application's Ruby code against the request. - # - # If the request is not complete, then it stays in the array, and the next time any - # data is written to that socket reference, then the loop is woken up and it is checked for completeness again. - # - # A detailed example is given in the docs for `run_internal` which is where the bulk - # of this logic lives. + # The implementation uses a Queue to synchronize adding new objects from the internal select loop. class Reactor - DefaultSleepFor = 5 - - # Creates an instance of Puma::Reactor - # - # The `server` argument is an instance of `Puma::Server` - # that is used to write a response for "low level errors" - # when there is an exception inside of the reactor. - # - # The `app_pool` is an instance of `Puma::ThreadPool`. - # Once a request is fully formed (header and body are received) - # it will be passed to the `app_pool`. - def initialize(server, app_pool) - @server = server - @events = server.events - @app_pool = app_pool - + # Create a new Reactor to monitor IO objects added by #add. + # The provided block will be invoked when an IO has data available to read, + # its timeout elapses, or when the Reactor shuts down. + def initialize(&block) @selector = NIO::Selector.new - - @mutex = Mutex.new - - # Read / Write pipes to wake up internal while loop - @ready, @trigger = Puma::Util.pipe - @input = [] - @sleep_for = DefaultSleepFor + @input = Queue.new @timeouts = [] + @block = block + end - mon = @selector.register(@ready, :r) - mon.value = @ready + # Run the internal select loop, using a background thread by default. + def run(background=true) + if background + @thread = Thread.new do + Puma.set_thread_name "reactor" + select_loop + end + else + select_loop + end + end - @monitors = [mon] + # Add a new IO object to monitor. + # The object must respond to #timeout and #timeout_at. + # Returns false if the reactor is already shut down. + def add(io) + @input << io + @selector.wakeup + true + rescue ClosedQueueError + false + end + + # Shutdown the reactor, blocking until the background thread is finished. + def shutdown + @input.close + begin + @selector.wakeup + rescue IOError # Ignore if selector is already closed + end + @thread.join if @thread end private - # Until a request is added via the `add` method this method will internally - # loop, waiting on the `sockets` array objects. The only object in this - # array at first is the `@ready` IO object, which is the read end of a pipe - # connected to `@trigger` object. When `@trigger` is written to, then the loop - # will break on `NIO::Selector#select` and return an array. - # - # ## When a request is added: - # - # When the `add` method is called, an instance of `Puma::Client` is added to the `@input` array. - # Next the `@ready` pipe is "woken" by writing a string of `"*"` to `@trigger`. - # - # When that happens, the internal loop stops blocking at `NIO::Selector#select` and returns a reference - # to whatever "woke" it up. On the very first loop, the only thing in `sockets` is `@ready`. - # When `@trigger` is written-to, the loop "wakes" and the `ready` - # variable returns an array of arrays that looks like `[[#], [], []]` where the - # first IO object is the `@ready` object. This first array `[#]` - # is saved as a `reads` variable. - # - # The `reads` variable is iterated through. In the case that the object - # is the same as the `@ready` input pipe, then we know that there was a `trigger` event. - # - # If there was a trigger event, then one byte of `@ready` is read into memory. In the case of the first request, - # the reactor sees that it's a `"*"` value and the reactor adds the contents of `@input` into the `sockets` array. - # The while then loop continues to iterate again, but now the `sockets` array contains a `Puma::Client` instance in addition - # to the `@ready` IO object. For example: `[#, #]`. - # - # Since the `Puma::Client` in this example has data that has not been read yet, - # the `NIO::Selector#select` is immediately able to "wake" and read from the `Puma::Client`. At this point the - # `ready` output looks like this: `[[#], [], []]`. - # - # Each element in the first entry is iterated over. The `Puma::Client` object is not - # the `@ready` pipe, so the reactor checks to see if it has the full header and body with - # the `Puma::Client#try_to_finish` method. If the full request has been sent, - # then the request is passed off to the `@app_pool` thread pool so that a "worker thread" - # can pick up the request and begin to execute application logic. This is done - # via `@app_pool << c`. The `Puma::Client` is then removed from the `sockets` array. - # - # If the request body is not present then nothing will happen, and the loop will iterate - # again. When the client sends more data to the socket the `Puma::Client` object will - # wake up the `NIO::Selector#select` and it can again be checked to see if it's ready to be - # passed to the thread pool. - # - # ## Time Out Case - # - # In addition to being woken via a write to one of the sockets the `NIO::Selector#select` will - # periodically "time out" of the sleep. One of the functions of this is to check for - # any requests that have "timed out". At the end of the loop it's checked to see if - # the first element in the `@timeout` array has exceed its allowed time. If so, - # the client object is removed from the timeout array, a 408 response is written. - # Then its connection is closed, and the object is removed from the `sockets` array - # that watches for new data. - # - # This behavior loops until all the objects that have timed out have been removed. - # - # Once all the timeouts have been processed, the next duration of the `NIO::Selector#select` sleep - # will be set to be equal to the amount of time it will take for the next timeout to occur. - # This calculation happens in `calculate_sleep`. - def run_internal - monitors = @monitors - selector = @selector - - while true - begin - ready = selector.select @sleep_for - rescue IOError => e - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - if monitors.any? { |mon| mon.value.closed? } - STDERR.puts "Error in select: #{e.message} (#{e.class})" - STDERR.puts e.backtrace - - monitors.reject! do |mon| - if mon.value.closed? - selector.deregister mon.value - true - end - end - - retry - else - raise - end - end - - if ready - ready.each do |mon| - if mon.value == @ready - @mutex.synchronize do - case @ready.read(1) - when "*" - @input.each do |c| - mon = nil - begin - begin - mon = selector.register(c, :r) - rescue ArgumentError - # There is a bug where we seem to be registering an already registered - # client. This code deals with this situation but I wish we didn't have to. - monitors.delete_if { |submon| submon.value.to_io == c.to_io } - selector.deregister(c) - mon = selector.register(c, :r) - end - rescue IOError - # Means that the io is closed, so we should ignore this request - # entirely - else - mon.value = c - @timeouts << mon if c.timeout_at - monitors << mon - end - end - @input.clear - - @timeouts.sort! { |a,b| a.value.timeout_at <=> b.value.timeout_at } - calculate_sleep - when "c" - monitors.reject! do |submon| - if submon.value == @ready - false - else - if submon.value.can_close? - submon.value.close - else - # Pass remaining open client connections to the thread pool. - @app_pool << submon.value - end - begin - selector.deregister submon.value - rescue IOError - # nio4r on jruby seems to throw an IOError here if the IO is closed, so - # we need to swallow it. - end - true - end - end - when "!" - return - end - end - else - c = mon.value - - # We have to be sure to remove it from the timeout - # list or we'll accidentally close the socket when - # it's in use! - if c.timeout_at - @mutex.synchronize do - @timeouts.delete mon - end - end - - begin - if c.try_to_finish - @app_pool << c - clear_monitor mon - end - - # Don't report these to the lowlevel_error handler, otherwise - # will be flooding them with errors when persistent connections - # are closed. - rescue ConnectionError - c.write_error(500) - c.close - - clear_monitor mon - - # SSL handshake failure - rescue MiniSSL::SSLError => e - @server.lowlevel_error e, c.env - @events.ssl_error e, c.io - - c.close - clear_monitor mon - - # The client doesn't know HTTP well - rescue HttpParserError => e - @server.lowlevel_error(e, c.env) - - c.write_error(400) - c.close - - clear_monitor mon - - @events.parse_error e, c - rescue StandardError => e - @server.lowlevel_error(e, c.env) - - c.write_error(500) - c.close - - clear_monitor mon - end - end - end - end - - unless @timeouts.empty? - @mutex.synchronize do - now = Time.now - - while @timeouts.first.value.timeout_at < now - mon = @timeouts.shift - c = mon.value - c.write_error(408) if c.in_data_phase - c.close - - clear_monitor mon - - break if @timeouts.empty? - end - - calculate_sleep - end - end - end - end - - def clear_monitor(mon) - @selector.deregister mon.value - @monitors.delete mon - end - - public - - def run - run_internal - ensure - @trigger.close - @ready.close - end - - def run_in_thread - @thread = Thread.new do - Puma.set_thread_name "reactor" - begin - run_internal - rescue StandardError => e - STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})" - STDERR.puts e.backtrace - retry - ensure - @trigger.close - @ready.close - end - end - end - - # The `calculate_sleep` sets the value that the `NIO::Selector#select` will - # sleep for in the main reactor loop when no sockets are being written to. - # - # The values kept in `@timeouts` are sorted so that the first timeout - # comes first in the array. When there are no timeouts the default timeout is used. - # - # Otherwise a sleep value is set that is the same as the amount of time it - # would take for the first element to time out. - # - # If that value is in the past, then a sleep value of zero is used. - def calculate_sleep - if @timeouts.empty? - @sleep_for = DefaultSleepFor - else - diff = @timeouts.first.value.timeout_at.to_f - Time.now.to_f - - if diff < 0.0 - @sleep_for = 0 - else - @sleep_for = diff - end - end - end - - # This method adds a connection to the reactor - # - # Typically called by `Puma::Server` the value passed in - # is usually a `Puma::Client` object that responds like an IO - # object. - # - # The main body of the reactor loop is in `run_internal` and it - # will sleep on `NIO::Selector#select`. When a new connection is added to the - # reactor it cannot be added directly to the `sockets` array, because - # the `NIO::Selector#select` will not be watching for it yet. - # - # Instead what needs to happen is that `NIO::Selector#select` needs to be woken up, - # the contents of `@input` added to the `sockets` array, and then - # another call to `NIO::Selector#select` needs to happen. Since the `Puma::Client` - # object can be read immediately, it does not block, but instead returns - # right away. - # - # This behavior is accomplished by writing to `@trigger` which wakes up - # the `NIO::Selector#select` and then there is logic to detect the value of `*`, - # pull the contents from `@input` and add them to the sockets array. - # - # If the object passed in has a timeout value in `timeout_at` then - # it is added to a `@timeouts` array. This array is then re-arranged - # so that the first element to timeout will be at the front of the - # array. Then a value to sleep for is derived in the call to `calculate_sleep` - def add(c) - @mutex.synchronize do - @input << c - @trigger << "*" - end - end - - # Close all watched sockets and clear them from being watched - def clear! + def select_loop begin - @trigger << "c" - rescue IOError - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + until @input.closed? && @input.empty? + # Wakeup any registered object that receives incoming data. + # Block until the earliest timeout or Selector#wakeup is called. + timeout = (earliest = @timeouts.first) && earliest.timeout + @selector.select(timeout) {|mon| wakeup!(mon.value)} + + # Wakeup all objects that timed out. + timed_out = @timeouts.take_while {|t| t.timeout == 0} + timed_out.each(&method(:wakeup!)) + + unless @input.empty? + register(@input.pop) until @input.empty? + @timeouts.sort_by!(&:timeout_at) + end + end + rescue StandardError => e + STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})" + STDERR.puts e.backtrace + retry end + # Wakeup all remaining objects on shutdown. + @timeouts.each(&@block.method(:call)) + @selector.close end - def shutdown - begin - @trigger << "!" - rescue IOError - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - end + # Start monitoring the object. + def register(io) + @selector.register(io, :r).value = io + @timeouts << io + end - @thread.join + # 'Wake up' a monitored object by calling the provided block. + # Stop monitoring the object if the block returns `true`. + def wakeup!(io) + if @block.call(io) + @selector.deregister(io) + @timeouts.delete(io) + end end end end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 0a8066b0..73295a25 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -101,8 +101,6 @@ module Puma @precheck_closing = true @requests_count = 0 - - @shutdown_mutex = Mutex.new end def inherit_binder(bind) @@ -225,60 +223,19 @@ module Puma @status = :run - @thread_pool = ThreadPool.new(@min_threads, - @max_threads, - ::Puma::IOBuffer) do |client, buffer| - - # Advertise this server into the thread - Thread.current[ThreadLocalKey] = self - - process_now = false - - begin - if @queue_requests - process_now = client.eagerly_finish - else - @thread_pool.with_force_shutdown do - client.finish(@first_data_timeout) - end - process_now = true - end - rescue MiniSSL::SSLError => e - @events.ssl_error e, client.io - client.close - - rescue HttpParserError => e - client.write_error(400) - client.close - - @events.parse_error e, client - rescue EOFError => e - client.close - - # Swallow, do not log - rescue ConnectionError, ThreadPool::ForceShutdown => e - client.close - - @events.connection_error e, client - else - process_now ||= @shutdown_mutex.synchronize do - next true unless @queue_requests - client.set_timeout @first_data_timeout - @reactor.add client - false - end - process_client client, buffer if process_now - end - - process_now - end + @thread_pool = ThreadPool.new( + @min_threads, + @max_threads, + ::Puma::IOBuffer, + &method(:process_client) + ) @thread_pool.out_of_band_hook = @options[:out_of_band] @thread_pool.clean_thread_locals = @options[:clean_thread_locals] if @queue_requests - @reactor = Reactor.new self, @thread_pool - @reactor.run_in_thread + @reactor = Reactor.new(&method(:reactor_wakeup)) + @reactor.run end if @reaping_time @@ -302,6 +259,44 @@ module Puma end end + # This method is called from the Reactor thread when a queued Client receives data, + # times out, or when the Reactor is shutting down. + # + # It is responsible for ensuring that a request has been completely received + # before it starts to be processed by the ThreadPool. This may be known as read buffering. + # If read buffering is not done, and no other read buffering is performed (such as by an application server + # such as nginx) then the application would be subject to a slow client attack. + # + # For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline). + # + # The method checks to see if it has the full header and body with + # the `Puma::Client#try_to_finish` method. If the full request has been sent, + # then the request is passed to the ThreadPool (`@thread_pool << client`) + # so that a "worker thread" can pick up the request and begin to execute application logic. + # The Client is then removed from the reactor (return `true`). + # + # If a client object times out, a 408 response is written, its connection is closed, + # and the object is removed from the reactor (return `true`). + # + # If the Reactor is shutting down, all Clients are either timed out or passed to the + # ThreadPool, depending on their current state (#can_close?). + # + # Otherwise, if the full request is not ready then the client will remain in the reactor + # (return `false`). When the client sends more data to the socket the `Puma::Client` object + # will wake up and again be checked to see if it's ready to be passed to the thread pool. + def reactor_wakeup(client) + shutdown = !@queue_requests + if client.try_to_finish || (shutdown && !client.can_close?) + @thread_pool << client + elsif shutdown || client.timeout == 0 + client.timeout! + end + rescue StandardError => e + client_error(e, client) + client.close + true + end + def handle_servers @check, @notify = Puma::Util.pipe unless @notify begin @@ -353,10 +348,7 @@ module Puma @events.fire :state, @status if queue_requests - @shutdown_mutex.synchronize do - @queue_requests = false - end - @reactor.clear! + @queue_requests = false @reactor.shutdown end graceful_shutdown if @status == :stop || @status == :restart @@ -396,27 +388,47 @@ module Puma return false end - # Given a connection on +client+, handle the incoming requests. + # Given a connection on +client+, handle the incoming requests, + # or queue the connection in the Reactor if no request is available. # - # This method support HTTP Keep-Alive so it may, depending on if the client + # This method is called from a ThreadPool worker thread. + # + # This method supports HTTP Keep-Alive so it may, depending on if the client # indicates that it supports keep alive, wait for another request before # returning. # + # Return true if one or more requests were processed. def process_client(client, buffer) + # Advertise this server into the thread + Thread.current[ThreadLocalKey] = self + + clean_thread_locals = @options[:clean_thread_locals] + close_socket = true + + requests = 0 + begin + if @queue_requests && + !client.eagerly_finish - clean_thread_locals = @options[:clean_thread_locals] - close_socket = true + client.set_timeout(@first_data_timeout) + if @reactor.add client + close_socket = false + return false + end + end - requests = 0 + with_force_shutdown(client) do + client.finish(@first_data_timeout) + end while true case handle_request(client, buffer) when false - return + break when :async close_socket = false - return + break when true buffer.reset @@ -434,47 +446,25 @@ module Puma check_for_more_data = false end - next_request_ready = @thread_pool.with_force_shutdown do + next_request_ready = with_force_shutdown(client) do client.reset(check_for_more_data) end unless next_request_ready - @shutdown_mutex.synchronize do - return unless @queue_requests + break unless @queue_requests + client.set_timeout @persistent_timeout + if @reactor.add client close_socket = false - client.set_timeout @persistent_timeout - @reactor.add client - return + break end end end end - - # The client disconnected while we were reading data - rescue ConnectionError, ThreadPool::ForceShutdown - # Swallow them. The ensure tries to close +client+ down - - # SSL handshake error - rescue MiniSSL::SSLError => e - lowlevel_error e, client.env - @events.ssl_error e, client.io - close_socket = true - - # The client doesn't know HTTP well - rescue HttpParserError => e - lowlevel_error(e, client.env) - - client.write_error(400) - - @events.parse_error e, client - - # Server error + true rescue StandardError => e - lowlevel_error(e, client.env) - - client.write_error(500) - - @events.unknown_error e, nil, "Read" + client_error(e, client) + # The ensure tries to close +client+ down + requests > 0 ensure buffer.reset @@ -489,6 +479,14 @@ module Puma end end + # Triggers a client timeout if the thread-pool shuts down + # during execution of the provided block. + def with_force_shutdown(client, &block) + @thread_pool.with_force_shutdown(&block) + rescue ThreadPool::ForceShutdown + client.timeout! + end + # Given a Hash +env+ for the request read from +client+, add # and fixup keys to comply with Rack's env guidelines. # @@ -872,6 +870,24 @@ module Puma return stream end + # Handle various error types thrown by Client I/O operations. + def client_error(e, client) + # Swallow, do not log + return if [ConnectionError, EOFError].include?(e.class) + + lowlevel_error(e, client.env) + case e + when MiniSSL::SSLError + @events.ssl_error e, client.io + when HttpParserError + client.write_error(400) + @events.parse_error e, client + else + client.write_error(500) + @events.unknown_error e, nil, "Read" + end + end + # A fallback rack response if +@app+ raises as exception. # def lowlevel_error(e, env, status=500)