2018-09-17 12:41:14 -04:00
# frozen_string_literal: true
2013-02-05 01:31:40 -05:00
require 'puma/util'
2015-07-17 16:00:55 -04:00
require 'puma/minissl'
2013-02-05 01:31:40 -05:00
2019-02-19 19:38:21 -05:00
require 'nio'
2012-07-23 13:26:52 -04:00
module Puma
2018-04-30 18:00:56 -04:00
# Internal Docs, Not a public interface.
#
# The Reactor object is responsible for ensuring that a request has been
# completely received before it starts to be processed. This may be known as read buffering.
2018-05-01 12:19:50 -04:00
# If read buffering is not done, and no other read buffering is performed (such as by an application server
# such as nginx) then the application would be subject to a slow client attack.
2018-04-30 18:00:56 -04:00
#
2018-05-01 16:44:08 -04:00
# Each Puma "worker" process has its own Reactor. For example if you start puma with `$ puma -w 5` then
# it will have 5 workers and each worker will have it's own reactor.
#
2018-05-01 12:19:50 -04:00
# For a graphical representation of how the reactor works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
2018-04-30 18:00:56 -04:00
#
2018-05-01 12:19:50 -04:00
# ## Reactor Flow
#
2019-06-11 17:06:25 -04:00
# A connection comes into a `Puma::Server` instance, it is then passed to a `Puma::Reactor` instance,
# which stores it in an array and waits for any of the connections to be ready for reading.
2018-05-01 12:19:50 -04:00
#
2019-07-22 10:33:19 -04:00
# The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev, Java NIO or
2019-06-11 18:54:32 -04:00
# just plain IO#select). The call to `NIO::Selector#select` will "wake up" and
2018-04-30 18:00:56 -04:00
# return the references to any objects that caused it to "wake". The reactor
2018-05-03 18:08:29 -04:00
# then loops through each of these request objects, and sees if they're complete. If they
2018-05-01 12:19:50 -04:00
# have a full header and body then the reactor passes the request to a thread pool.
# Once in a thread pool, a "worker thread" can run the the application's Ruby code against the request.
2018-04-30 18:00:56 -04:00
#
2018-05-01 12:19:50 -04:00
# If the request is not complete, then it stays in the array, and the next time any
# data is written to that socket reference, then the loop is woken up and it is checked for completeness again.
2018-04-30 18:00:56 -04:00
#
# A detailed example is given in the docs for `run_internal` which is where the bulk
# of this logic lives.
2012-07-23 13:26:52 -04:00
class Reactor
DefaultSleepFor = 5
2018-05-01 12:21:05 -04:00
# Creates an instance of Puma::Reactor
#
# The `server` argument is an instance of `Puma::Server`
2018-05-03 18:08:29 -04:00
# that is used to write a response for "low level errors"
2018-05-01 12:21:05 -04:00
# when there is an exception inside of the reactor.
#
# The `app_pool` is an instance of `Puma::ThreadPool`.
# Once a request is fully formed (header and body are received)
# it will be passed to the `app_pool`.
2012-07-23 20:00:53 -04:00
def initialize ( server , app_pool )
@server = server
@events = server . events
2012-07-23 13:26:52 -04:00
@app_pool = app_pool
2019-02-19 19:38:21 -05:00
@selector = NIO :: Selector . new
2012-07-23 13:26:52 -04:00
@mutex = Mutex . new
2018-04-30 18:00:46 -04:00
# Read / Write pipes to wake up internal while loop
2013-02-05 01:31:40 -05:00
@ready , @trigger = Puma :: Util . pipe
2012-07-23 13:26:52 -04:00
@input = [ ]
@sleep_for = DefaultSleepFor
@timeouts = [ ]
2012-08-27 13:56:43 -04:00
2019-02-19 19:38:21 -05:00
mon = @selector . register ( @ready , :r )
2019-02-20 11:42:33 -05:00
mon . value = @ready
2019-02-19 19:38:21 -05:00
2019-02-20 12:30:11 -05:00
@monitors = [ mon ]
2012-07-23 13:26:52 -04:00
end
2013-10-28 08:56:45 -04:00
private
2018-04-30 18:00:46 -04:00
# Until a request is added via the `add` method this method will internally
# loop, waiting on the `sockets` array objects. The only object in this
# array at first is the `@ready` IO object, which is the read end of a pipe
2018-05-01 12:20:28 -04:00
# connected to `@trigger` object. When `@trigger` is written to, then the loop
2019-06-11 18:54:32 -04:00
# will break on `NIO::Selector#select` and return an array.
2018-04-30 18:00:46 -04:00
#
# ## When a request is added:
#
# When the `add` method is called, an instance of `Puma::Client` is added to the `@input` array.
# Next the `@ready` pipe is "woken" by writing a string of `"*"` to `@trigger`.
#
2019-06-11 18:54:32 -04:00
# When that happens, the internal loop stops blocking at `NIO::Selector#select` and returns a reference
2018-05-01 12:20:28 -04:00
# to whatever "woke" it up. On the very first loop, the only thing in `sockets` is `@ready`.
# When `@trigger` is written-to, the loop "wakes" and the `ready`
# variable returns an array of arrays that looks like `[[#<IO:fd 10>], [], []]` where the
2018-04-30 18:00:46 -04:00
# first IO object is the `@ready` object. This first array `[#<IO:fd 10>]`
2018-05-01 12:20:28 -04:00
# is saved as a `reads` variable.
2018-04-30 18:00:46 -04:00
#
2018-05-01 12:20:28 -04:00
# The `reads` variable is iterated through. In the case that the object
2018-04-30 18:00:46 -04:00
# is the same as the `@ready` input pipe, then we know that there was a `trigger` event.
#
2018-05-01 12:20:28 -04:00
# If there was a trigger event, then one byte of `@ready` is read into memory. In the case of the first request,
# the reactor sees that it's a `"*"` value and the reactor adds the contents of `@input` into the `sockets` array.
# The while then loop continues to iterate again, but now the `sockets` array contains a `Puma::Client` instance in addition
2018-04-30 18:00:46 -04:00
# to the `@ready` IO object. For example: `[#<IO:fd 10>, #<Puma::Client:0x3fdc1103bee8 @ready=false>]`.
#
# Since the `Puma::Client` in this example has data that has not been read yet,
2019-06-11 18:54:32 -04:00
# the `NIO::Selector#select` is immediately able to "wake" and read from the `Puma::Client`. At this point the
2018-04-30 18:00:46 -04:00
# `ready` output looks like this: `[[#<Puma::Client:0x3fdc1103bee8 @ready=false>], [], []]`.
#
# Each element in the first entry is iterated over. The `Puma::Client` object is not
2018-05-03 18:08:29 -04:00
# the `@ready` pipe, so the reactor checks to see if it has the full header and body with
2018-04-30 18:00:46 -04:00
# the `Puma::Client#try_to_finish` method. If the full request has been sent,
2018-05-01 12:20:28 -04:00
# then the request is passed off to the `@app_pool` thread pool so that a "worker thread"
# can pick up the request and begin to execute application logic. This is done
2018-04-30 18:00:46 -04:00
# via `@app_pool << c`. The `Puma::Client` is then removed from the `sockets` array.
#
# If the request body is not present then nothing will happen, and the loop will iterate
# again. When the client sends more data to the socket the `Puma::Client` object will
2019-06-11 18:54:32 -04:00
# wake up the `NIO::Selector#select` and it can again be checked to see if it's ready to be
2018-04-30 18:00:46 -04:00
# passed to the thread pool.
#
2018-05-01 12:20:28 -04:00
# ## Time Out Case
#
2019-06-11 18:54:32 -04:00
# In addition to being woken via a write to one of the sockets the `NIO::Selector#select` will
2018-05-01 12:20:28 -04:00
# periodically "time out" of the sleep. One of the functions of this is to check for
# any requests that have "timed out". At the end of the loop it's checked to see if
2018-05-03 18:08:29 -04:00
# the first element in the `@timeout` array has exceed its allowed time. If so,
# the client object is removed from the timeout array, a 408 response is written.
# Then its connection is closed, and the object is removed from the `sockets` array
2018-05-01 12:20:28 -04:00
# that watches for new data.
#
# This behavior loops until all the objects that have timed out have been removed.
#
2019-06-11 18:54:32 -04:00
# Once all the timeouts have been processed, the next duration of the `NIO::Selector#select` sleep
2018-05-01 12:20:28 -04:00
# will be set to be equal to the amount of time it will take for the next timeout to occur.
# This calculation happens in `calculate_sleep`.
2013-10-28 08:56:45 -04:00
def run_internal
2019-02-20 12:30:11 -05:00
monitors = @monitors
2019-02-19 19:38:21 -05:00
selector = @selector
2012-07-23 13:26:52 -04:00
while true
2013-10-28 09:36:54 -04:00
begin
2019-02-19 19:38:21 -05:00
ready = selector . select @sleep_for
2013-10-28 09:36:54 -04:00
rescue IOError = > e
2017-07-19 14:22:36 -04:00
Thread . current . purge_interrupt_queue if Thread . current . respond_to? :purge_interrupt_queue
2019-02-20 12:30:11 -05:00
if monitors . any? { | mon | mon . value . closed? }
2013-10-28 09:36:54 -04:00
STDERR . puts " Error in select: #{ e . message } ( #{ e . class } ) "
STDERR . puts e . backtrace
2019-02-20 12:30:11 -05:00
monitors . reject! do | mon |
if mon . value . closed?
2019-02-20 23:46:28 -05:00
selector . deregister mon . value
2019-02-19 19:38:21 -05:00
true
end
end
2013-10-28 09:36:54 -04:00
retry
else
raise
end
end
2012-07-23 13:26:52 -04:00
2019-02-19 19:38:21 -05:00
if ready
2019-02-20 12:30:11 -05:00
ready . each do | mon |
if mon . value == @ready
2012-07-23 13:26:52 -04:00
@mutex . synchronize do
2012-08-10 13:10:30 -04:00
case @ready . read ( 1 )
when " * "
2019-02-20 12:30:11 -05:00
@input . each do | c |
2019-02-20 11:42:33 -05:00
mon = nil
begin
2019-02-20 12:30:11 -05:00
begin
mon = selector . register ( c , :r )
rescue ArgumentError
# There is a bug where we seem to be registering an already registered
# client. This code deals with this situation but I wish we didn't have to.
monitors . delete_if { | submon | submon . value . to_io == c . to_io }
selector . deregister ( c )
mon = selector . register ( c , :r )
end
rescue IOError
# Means that the io is closed, so we should ignore this request
# entirely
else
mon . value = c
@timeouts << mon if c . timeout_at
monitors << mon
2019-02-20 11:42:33 -05:00
end
2019-02-20 12:30:11 -05:00
end
2012-08-10 13:10:30 -04:00
@input . clear
2019-02-20 11:42:33 -05:00
@timeouts . sort! { | a , b | a . value . timeout_at < = > b . value . timeout_at }
calculate_sleep
2012-09-02 23:33:09 -04:00
when " c "
2019-02-20 12:30:11 -05:00
monitors . reject! do | submon |
if submon . value == @ready
2012-09-02 23:33:09 -04:00
false
else
2019-02-20 12:30:11 -05:00
submon . value . close
2019-02-21 00:23:54 -05:00
begin
selector . deregister submon . value
rescue IOError
# nio4r on jruby seems to throw an IOError here if the IO is closed, so
# we need to swallow it.
end
2012-09-02 23:33:09 -04:00
true
end
end
2012-08-10 13:10:30 -04:00
when " ! "
return
end
2012-07-23 13:26:52 -04:00
end
else
2019-02-20 12:30:11 -05:00
c = mon . value
2019-02-19 19:38:21 -05:00
2012-07-30 19:12:23 -04:00
# We have to be sure to remove it from the timeout
# list or we'll accidentally close the socket when
# it's in use!
if c . timeout_at
2013-03-18 19:41:59 -04:00
@mutex . synchronize do
2019-02-20 12:30:11 -05:00
@timeouts . delete mon
2013-03-18 19:41:59 -04:00
end
2012-07-30 19:12:23 -04:00
end
2012-07-23 13:26:52 -04:00
begin
if c . try_to_finish
@app_pool << c
2019-02-20 12:30:11 -05:00
clear_monitor mon
2012-07-23 13:26:52 -04:00
end
2012-08-11 01:41:35 -04:00
2016-04-22 19:55:06 -04:00
# Don't report these to the lowlevel_error handler, otherwise
# will be flooding them with errors when persistent connections
# are closed.
rescue ConnectionError
c . write_500
c . close
2019-02-20 12:30:11 -05:00
clear_monitor mon
2016-04-22 19:55:06 -04:00
2015-01-13 23:11:26 -05:00
# SSL handshake failure
rescue MiniSSL :: SSLError = > e
2016-04-07 14:07:26 -04:00
@server . lowlevel_error ( e , c . env )
2015-01-13 23:11:26 -05:00
ssl_socket = c . io
2019-02-20 11:42:33 -05:00
begin
addr = ssl_socket . peeraddr . last
rescue IOError
addr = " <unknown> "
end
2015-01-13 23:11:26 -05:00
cert = ssl_socket . peercert
c . close
2019-02-20 12:30:11 -05:00
clear_monitor mon
2015-01-13 23:11:26 -05:00
@events . ssl_error @server , addr , cert , e
2012-07-23 13:26:52 -04:00
# The client doesn't know HTTP well
rescue HttpParserError = > e
2016-04-07 14:07:26 -04:00
@server . lowlevel_error ( e , c . env )
2012-09-06 01:09:42 -04:00
c . write_400
2012-07-23 20:08:11 -04:00
c . close
2012-09-06 01:09:42 -04:00
2019-02-20 12:30:11 -05:00
clear_monitor mon
2012-07-23 20:08:11 -04:00
2012-07-23 20:00:53 -04:00
@events . parse_error @server , c . env , e
2012-08-27 13:56:43 -04:00
rescue StandardError = > e
2016-04-07 14:07:26 -04:00
@server . lowlevel_error ( e , c . env )
2012-09-06 01:09:42 -04:00
c . write_500
2012-07-23 13:26:52 -04:00
c . close
2012-09-06 01:09:42 -04:00
2019-02-20 12:30:11 -05:00
clear_monitor mon
2012-07-23 13:26:52 -04:00
end
end
end
end
unless @timeouts . empty?
2013-03-18 19:41:59 -04:00
@mutex . synchronize do
now = Time . now
2012-07-23 13:26:52 -04:00
2019-02-20 11:42:33 -05:00
while @timeouts . first . value . timeout_at < now
2019-02-20 12:30:11 -05:00
mon = @timeouts . shift
c = mon . value
2014-01-30 17:37:38 -05:00
c . write_408 if c . in_data_phase
2013-03-18 19:41:59 -04:00
c . close
2019-02-19 19:38:21 -05:00
2019-02-20 12:30:11 -05:00
clear_monitor mon
2012-07-23 13:26:52 -04:00
2013-03-18 19:41:59 -04:00
break if @timeouts . empty?
end
2012-07-30 19:12:23 -04:00
2013-03-18 19:41:59 -04:00
calculate_sleep
end
2012-07-23 13:26:52 -04:00
end
end
2013-10-28 08:56:45 -04:00
end
2019-02-20 12:30:11 -05:00
def clear_monitor ( mon )
2019-02-20 23:46:28 -05:00
@selector . deregister mon . value
2019-02-20 12:30:11 -05:00
@monitors . delete mon
end
2013-10-28 08:56:45 -04:00
public
def run
run_internal
2013-02-05 01:31:40 -05:00
ensure
@trigger . close
@ready . close
2012-07-23 13:26:52 -04:00
end
def run_in_thread
2013-10-28 09:27:30 -04:00
@thread = Thread . new do
2019-09-15 04:52:34 -04:00
Puma . set_thread_name " reactor "
2013-10-28 09:27:30 -04:00
begin
run_internal
rescue StandardError = > e
STDERR . puts " Error in reactor loop escaped: #{ e . message } ( #{ e . class } ) "
STDERR . puts e . backtrace
retry
ensure
@trigger . close
@ready . close
2012-08-09 19:54:55 -04:00
end
2013-10-28 09:27:30 -04:00
end
2012-07-23 13:26:52 -04:00
end
2019-06-11 18:54:32 -04:00
# The `calculate_sleep` sets the value that the `NIO::Selector#select` will
2018-05-01 12:21:28 -04:00
# sleep for in the main reactor loop when no sockets are being written to.
#
# The values kept in `@timeouts` are sorted so that the first timeout
# comes first in the array. When there are no timeouts the default timeout is used.
#
# Otherwise a sleep value is set that is the same as the amount of time it
# would take for the first element to time out.
#
# If that value is in the past, then a sleep value of zero is used.
2012-07-30 19:12:23 -04:00
def calculate_sleep
if @timeouts . empty?
@sleep_for = DefaultSleepFor
else
2019-02-20 11:42:33 -05:00
diff = @timeouts . first . value . timeout_at . to_f - Time . now . to_f
2012-07-30 19:12:23 -04:00
if diff < 0 . 0
@sleep_for = 0
else
@sleep_for = diff
end
end
end
2018-05-01 12:21:37 -04:00
# This method adds a connection to the reactor
#
# Typically called by `Puma::Server` the value passed in
# is usually a `Puma::Client` object that responds like an IO
# object.
#
# The main body of the reactor loop is in `run_internal` and it
2019-06-11 18:54:32 -04:00
# will sleep on `NIO::Selector#select`. When a new connection is added to the
2018-05-03 18:08:29 -04:00
# reactor it cannot be added directly to the `sockets` array, because
2019-06-11 18:54:32 -04:00
# the `NIO::Selector#select` will not be watching for it yet.
2018-05-01 12:21:37 -04:00
#
2019-06-11 18:54:32 -04:00
# Instead what needs to happen is that `NIO::Selector#select` needs to be woken up,
2018-05-01 12:21:37 -04:00
# the contents of `@input` added to the `sockets` array, and then
2019-06-11 18:54:32 -04:00
# another call to `NIO::Selector#select` needs to happen. Since the `Puma::Client`
2018-05-01 12:21:37 -04:00
# object can be read immediately, it does not block, but instead returns
# right away.
#
# This behavior is accomplished by writing to `@trigger` which wakes up
2019-06-11 18:54:32 -04:00
# the `NIO::Selector#select` and then there is logic to detect the value of `*`,
2018-05-01 12:21:37 -04:00
# pull the contents from `@input` and add them to the sockets array.
#
# If the object passed in has a timeout value in `timeout_at` then
# it is added to a `@timeouts` array. This array is then re-arranged
# so that the first element to timeout will be at the front of the
# array. Then a value to sleep for is derived in the call to `calculate_sleep`
2012-07-23 13:26:52 -04:00
def add ( c )
@mutex . synchronize do
@input << c
2012-08-10 13:10:30 -04:00
@trigger << " * "
2012-07-23 13:26:52 -04:00
end
end
2012-08-10 13:10:30 -04:00
2012-09-02 23:33:09 -04:00
# Close all watched sockets and clear them from being watched
def clear!
2013-02-05 01:31:40 -05:00
begin
@trigger << " c "
rescue IOError
2017-07-19 14:22:36 -04:00
Thread . current . purge_interrupt_queue if Thread . current . respond_to? :purge_interrupt_queue
2013-02-05 01:31:40 -05:00
end
2012-09-02 23:33:09 -04:00
end
2012-08-10 13:10:30 -04:00
def shutdown
2013-02-05 01:31:40 -05:00
begin
@trigger << " ! "
rescue IOError
2017-07-19 14:22:36 -04:00
Thread . current . purge_interrupt_queue if Thread . current . respond_to? :purge_interrupt_queue
2013-02-05 01:31:40 -05:00
end
2012-09-02 23:33:09 -04:00
@thread . join
2012-08-10 13:10:30 -04:00
end
2012-07-23 13:26:52 -04:00
end
end