1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/server.rb

692 lines
20 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
2011-09-27 13:53:45 -04:00
require 'stringio'
2011-09-27 12:23:03 -04:00
2011-09-22 22:24:43 -04:00
require 'puma/thread_pool'
2011-09-27 12:23:03 -04:00
require 'puma/const'
require 'puma/events'
require 'puma/null_io'
require 'puma/reactor'
require 'puma/client'
require 'puma/binder'
2013-02-05 01:31:40 -05:00
require 'puma/util'
2020-02-27 14:50:34 -05:00
require 'puma/io_buffer'
require 'puma/request'
2011-09-27 12:23:03 -04:00
require 'socket'
require 'forwardable'
2011-09-22 22:24:43 -04:00
module Puma
2011-12-01 18:23:14 -05:00
# The HTTP Server itself. Serves out a single Rack app.
#
# This class is used by the `Puma::Single` and `Puma::Cluster` classes
# to generate one or more `Puma::Server` instances capable of handling requests.
2019-05-06 10:26:13 -04:00
# Each Puma process will contain one `Puma::Server` instance.
#
# The `Puma::Server` instance pulls requests from the socket, adds them to a
# `Puma::Reactor` where they get eventually passed to a `Puma::ThreadPool`.
#
# Each `Puma::Server` will have one reactor and one thread pool.
class Server
2011-09-22 22:24:43 -04:00
include Puma::Const
include Request
extend Forwardable
2011-09-27 17:33:17 -04:00
attr_reader :thread
attr_reader :events
attr_reader :min_threads, :max_threads # for #stats
attr_reader :requests_count # @version 5.0.0
# @todo the following may be deprecated in the future
attr_reader :auto_trim_time, :early_hints, :first_data_timeout,
:leak_stack_on_error,
:persistent_timeout, :reaping_time
# @deprecated v6.0.0
attr_writer :auto_trim_time, :early_hints, :first_data_timeout,
:leak_stack_on_error, :min_threads, :max_threads,
:persistent_timeout, :reaping_time
2011-09-23 01:14:39 -04:00
attr_accessor :app
attr_accessor :binder
2011-09-23 01:14:39 -04:00
def_delegators :@binder, :add_tcp_listener, :add_ssl_listener,
:add_unix_listener, :connected_ports
ThreadLocalKey = :puma_server
2011-09-23 01:14:39 -04:00
2011-12-01 18:23:14 -05:00
# Create a server for the rack app +app+.
#
# +events+ is an object which will be called when certain error events occur
# to be handled. See Puma::Events for the list of current methods to implement.
#
2011-12-01 18:23:14 -05:00
# Server#run returns a thread that you can join on to wait for the server
# to do its work.
#
# @note Several instance variables exist so they are available for testing,
# and have default values set via +fetch+. Normally the values are set via
# `::Puma::Configuration.puma_default_options`.
#
def initialize(app, events=Events.stdio, options={})
2011-09-27 17:33:17 -04:00
@app = app
@events = events
@check, @notify = nil
@status = :stop
2011-09-23 01:14:39 -04:00
2016-09-05 19:41:46 -04:00
@auto_trim_time = 30
2015-05-19 10:14:30 -04:00
@reaping_time = 1
2011-09-24 03:26:17 -04:00
2011-09-27 17:33:17 -04:00
@thread = nil
@thread_pool = nil
2011-09-27 14:29:20 -04:00
@options = options
@early_hints = options.fetch :early_hints, nil
@first_data_timeout = options.fetch :first_data_timeout, FIRST_DATA_TIMEOUT
@min_threads = options.fetch :min_threads, 0
@max_threads = options.fetch :max_threads , (Puma.mri? ? 5 : 16)
@persistent_timeout = options.fetch :persistent_timeout, PERSISTENT_TIMEOUT
@queue_requests = options.fetch :queue_requests, true
temp = !!(@options[:environment] =~ /\A(development|test)\z/)
@leak_stack_on_error = @options[:environment] ? temp : true
@binder = Binder.new(events)
ENV['RACK_ENV'] ||= "development"
@mode = :http
@precheck_closing = true
@requests_count = 0
end
def inherit_binder(bind)
@binder = bind
end
2020-08-30 06:50:55 -04:00
class << self
# @!attribute [r] current
def current
Thread.current[ThreadLocalKey]
end
2020-08-30 06:50:55 -04:00
# :nodoc:
# @version 5.0.0
2020-08-30 06:50:55 -04:00
def tcp_cork_supported?
RbConfig::CONFIG['host_os'] =~ /linux/ &&
Socket.const_defined?(:IPPROTO_TCP) &&
Socket.const_defined?(:TCP_CORK)
end
# :nodoc:
# @version 5.0.0
def closed_socket_supported?
RbConfig::CONFIG['host_os'] =~ /linux/ &&
Socket.const_defined?(:IPPROTO_TCP) &&
2020-08-30 06:50:55 -04:00
Socket.const_defined?(:TCP_INFO)
end
private :tcp_cork_supported?
private :closed_socket_supported?
2020-08-30 06:50:55 -04:00
end
2011-12-01 18:23:14 -05:00
# On Linux, use TCP_CORK to better control how the TCP stack
# packetizes our stream. This improves both latency and throughput.
#
2020-08-30 06:50:55 -04:00
if tcp_cork_supported?
2017-02-25 10:21:07 -05:00
UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze
# 6 == Socket::IPPROTO_TCP
# 3 == TCP_CORK
# 1/0 == turn on/off
def cork_socket(socket)
begin
2020-08-30 06:50:55 -04:00
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if socket.kind_of? TCPSocket
2020-05-09 10:50:29 -04:00
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
def uncork_socket(socket)
begin
2020-08-30 06:50:55 -04:00
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if socket.kind_of? TCPSocket
2020-05-09 10:50:29 -04:00
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
else
def cork_socket(socket)
end
2017-02-25 10:21:07 -05:00
def uncork_socket(socket)
end
end
if closed_socket_supported?
2017-02-25 10:21:07 -05:00
def closed_socket?(socket)
return false unless socket.kind_of? TCPSocket
return false unless @precheck_closing
begin
2020-08-31 19:04:08 -04:00
tcp_info = socket.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO)
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
@precheck_closing = false
false
else
state = tcp_info.unpack(UNPACK_TCP_STATE_FROM_TCP_INFO)[0]
# TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11
(state >= 6 && state <= 9) || state == 11
end
2017-02-25 10:21:07 -05:00
end
else
2017-02-25 10:21:07 -05:00
def closed_socket?(socket)
false
end
end
# @!attribute [r] backlog
def backlog
@thread_pool and @thread_pool.backlog
end
# @!attribute [r] running
def running
@thread_pool and @thread_pool.spawned
end
# This number represents the number of requests that
# the server is capable of taking right now.
#
# For example if the number is 5 then it means
# there are 5 threads sitting idle ready to take
# a request. If one request comes in, then the
# value would be 4 until it finishes processing.
# @!attribute [r] pool_capacity
def pool_capacity
@thread_pool and @thread_pool.pool_capacity
end
# Runs the server.
2011-12-01 18:23:14 -05:00
#
# If +background+ is true (the default) then a thread is spun
# up in the background to handle requests. Otherwise requests
# are handled synchronously.
#
def run(background=true)
BasicSocket.do_not_reverse_lookup = true
2013-09-13 12:56:39 -04:00
@events.fire :state, :booting
@status = :run
2011-09-27 17:33:17 -04:00
@thread_pool = ThreadPool.new(
@min_threads,
@max_threads,
::Puma::IOBuffer,
&method(:process_client)
)
2011-09-27 17:33:17 -04:00
@thread_pool.out_of_band_hook = @options[:out_of_band]
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
if @queue_requests
@reactor = Reactor.new(&method(:reactor_wakeup))
@reactor.run
end
2015-05-19 10:14:30 -04:00
if @reaping_time
@thread_pool.auto_reap!(@reaping_time)
end
2011-12-05 13:07:01 -05:00
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
@check, @notify = Puma::Util.pipe unless @notify
2013-09-13 12:56:39 -04:00
@events.fire :state, :running
if background
2019-09-15 04:52:34 -04:00
@thread = Thread.new do
Puma.set_thread_name "server"
handle_servers
end
return @thread
else
handle_servers
end
end
# This method is called from the Reactor thread when a queued Client receives data,
# times out, or when the Reactor is shutting down.
#
# It is responsible for ensuring that a request has been completely received
# before it starts to be processed by the ThreadPool. This may be known as read buffering.
# If read buffering is not done, and no other read buffering is performed (such as by an application server
# such as nginx) then the application would be subject to a slow client attack.
#
# For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
#
# The method checks to see if it has the full header and body with
# the `Puma::Client#try_to_finish` method. If the full request has been sent,
# then the request is passed to the ThreadPool (`@thread_pool << client`)
# so that a "worker thread" can pick up the request and begin to execute application logic.
# The Client is then removed from the reactor (return `true`).
#
# If a client object times out, a 408 response is written, its connection is closed,
# and the object is removed from the reactor (return `true`).
#
# If the Reactor is shutting down, all Clients are either timed out or passed to the
# ThreadPool, depending on their current state (#can_close?).
#
# Otherwise, if the full request is not ready then the client will remain in the reactor
# (return `false`). When the client sends more data to the socket the `Puma::Client` object
# will wake up and again be checked to see if it's ready to be passed to the thread pool.
def reactor_wakeup(client)
shutdown = !@queue_requests
if client.try_to_finish || (shutdown && !client.can_close?)
@thread_pool << client
elsif shutdown || client.timeout == 0
client.timeout!
end
rescue StandardError => e
client_error(e, client)
client.close
true
end
def handle_servers
begin
check = @check
sockets = [check] + @binder.ios
pool = @thread_pool
queue_requests = @queue_requests
2016-01-06 13:12:09 -05:00
remote_addr_value = nil
remote_addr_header = nil
case @options[:remote_address]
when :value
remote_addr_value = @options[:remote_address_value]
when :header
remote_addr_header = @options[:remote_address_header]
end
while @status == :run
begin
ios = IO.select sockets
ios.first.each do |sock|
if sock == check
break if handle_check
else
pool.wait_until_not_full
pool.wait_for_less_busy_worker(
@options[:wait_for_less_busy_worker].to_f)
io = begin
sock.accept_nonblock
rescue IO::WaitReadable
next
2012-08-01 13:11:27 -04:00
end
client = Client.new io, @binder.env(sock)
if remote_addr_value
client.peerip = remote_addr_value
elsif remote_addr_header
client.remote_addr_header = remote_addr_header
2012-08-01 13:11:27 -04:00
end
pool << client
end
end
rescue Object => e
2020-05-09 12:03:32 -04:00
@events.unknown_error e, nil, "Listen loop"
end
end
2013-09-13 12:56:39 -04:00
@events.fire :state, @status
if queue_requests
@queue_requests = false
@reactor.shutdown
end
graceful_shutdown if @status == :stop || @status == :restart
rescue Exception => e
2020-05-09 12:03:32 -04:00
@events.unknown_error e, nil, "Exception handling servers"
ensure
begin
@check.close unless @check.closed?
rescue Errno::EBADF, RuntimeError
# RuntimeError is Ruby 2.2 issue, can't modify frozen IOError
# Errno::EBADF is infrequently raised
end
2013-02-05 01:31:40 -05:00
@notify.close
@notify = nil
@check = nil
end
2013-09-13 12:56:39 -04:00
@events.fire :state, :done
end
2011-12-01 18:23:14 -05:00
# :nodoc:
def handle_check
cmd = @check.read(1)
case cmd
when STOP_COMMAND
@status = :stop
return true
when HALT_COMMAND
@status = :halt
return true
when RESTART_COMMAND
@status = :restart
return true
end
return false
end
# Given a connection on +client+, handle the incoming requests,
# or queue the connection in the Reactor if no request is available.
#
# This method is called from a ThreadPool worker thread.
2011-12-01 18:23:14 -05:00
#
# This method supports HTTP Keep-Alive so it may, depending on if the client
2011-12-01 18:23:14 -05:00
# indicates that it supports keep alive, wait for another request before
# returning.
#
# Return true if one or more requests were processed.
def process_client(client, buffer)
# Advertise this server into the thread
Thread.current[ThreadLocalKey] = self
clean_thread_locals = @options[:clean_thread_locals]
close_socket = true
requests = 0
begin
if @queue_requests &&
!client.eagerly_finish
client.set_timeout(@first_data_timeout)
if @reactor.add client
close_socket = false
return false
end
end
with_force_shutdown(client) do
client.finish(@first_data_timeout)
end
while true
@requests_count += 1
case handle_request(client, buffer)
when false
break
when :async
close_socket = false
break
when true
buffer.reset
ThreadPool.clean_thread_locals if clean_thread_locals
requests += 1
check_for_more_data = @status == :run
if requests >= MAX_FAST_INLINE
# This will mean that reset will only try to use the data it already
# has buffered and won't try to read more data. What this means is that
# every client, independent of their request speed, gets treated like a slow
# one once every MAX_FAST_INLINE requests.
check_for_more_data = false
end
next_request_ready = with_force_shutdown(client) do
client.reset(check_for_more_data)
end
unless next_request_ready
break unless @queue_requests
client.set_timeout @persistent_timeout
if @reactor.add client
Prevent connections from entering Reactor after shutdown begins (#2377) When a `Puma::Server` instance starts to shutdown, it sets the instance variable `@queue_requests` to `false`, then starts to shut down the Reactor. The intent here is that after the Reactor shuts down, threads won't have the option of sending their connections to the Reactor (the connection must either be closed, assuming we've written a response to the socket, or the thread must wait until the client has finished writing its request, then write a response, then finally close the connection). This works most of the time just fine. The problem is that there are races in the implementation of the `ThreadPool` where it's possible for a thread to see that `@queue_requests` is `true` before the `Server` shutdown has started, then later try to add the request to the Reactor, not knowing that it's already shutting down. Depending on the precise order of operations, one possible outcome is that the `ThreadPool` executes `@reactor.add` after the `@reactor` has closed its `@trigger` pipe, resulting in the error `Error reached top of thread-pool: closed stream (IOError)`. Clients experience connection reset errors as a result. The fix introduced in this commit is to add a `Mutex` that makes it impossible for a thread in the `ThreadPool` to add a client connection to the `@reactor` after `@queue_requests` has been set to `false`. Co-Authored-By: Leo Belyi <leonid.belyi@mycase.com> Co-authored-by: Leo Belyi <leonid.belyi@mycase.com>
2020-09-23 10:34:10 -04:00
close_socket = false
break
Prevent connections from entering Reactor after shutdown begins (#2377) When a `Puma::Server` instance starts to shutdown, it sets the instance variable `@queue_requests` to `false`, then starts to shut down the Reactor. The intent here is that after the Reactor shuts down, threads won't have the option of sending their connections to the Reactor (the connection must either be closed, assuming we've written a response to the socket, or the thread must wait until the client has finished writing its request, then write a response, then finally close the connection). This works most of the time just fine. The problem is that there are races in the implementation of the `ThreadPool` where it's possible for a thread to see that `@queue_requests` is `true` before the `Server` shutdown has started, then later try to add the request to the Reactor, not knowing that it's already shutting down. Depending on the precise order of operations, one possible outcome is that the `ThreadPool` executes `@reactor.add` after the `@reactor` has closed its `@trigger` pipe, resulting in the error `Error reached top of thread-pool: closed stream (IOError)`. Clients experience connection reset errors as a result. The fix introduced in this commit is to add a `Mutex` that makes it impossible for a thread in the `ThreadPool` to add a client connection to the `@reactor` after `@queue_requests` has been set to `false`. Co-Authored-By: Leo Belyi <leonid.belyi@mycase.com> Co-authored-by: Leo Belyi <leonid.belyi@mycase.com>
2020-09-23 10:34:10 -04:00
end
end
end
end
true
rescue StandardError => e
client_error(e, client)
# The ensure tries to close +client+ down
requests > 0
ensure
buffer.reset
begin
2012-01-08 20:27:57 -05:00
client.close if close_socket
2020-05-09 12:03:32 -04:00
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
# Already closed
rescue StandardError => e
2020-05-09 12:03:32 -04:00
@events.unknown_error e, nil, "Client"
end
end
end
# Triggers a client timeout if the thread-pool shuts down
# during execution of the provided block.
def with_force_shutdown(client, &block)
@thread_pool.with_force_shutdown(&block)
rescue ThreadPool::ForceShutdown
client.timeout!
end
# :nocov:
2016-04-07 14:22:15 -04:00
# Given the request +env+ from +client+ and the partial body +body+
2011-12-01 18:23:14 -05:00
# plus a potential Content-Length value +cl+, finish reading
# the body and return it.
#
# If the body is larger than MAX_BODY, a Tempfile object is used
# for the body, otherwise a StringIO is used.
# @deprecated 6.0.0
2011-12-01 18:23:14 -05:00
#
def read_body(env, client, body, cl)
content_length = cl.to_i
remain = content_length - body.bytesize
return StringIO.new(body) if remain <= 0
# Use a Tempfile if there is a lot of data left
if remain > MAX_BODY
stream = Tempfile.new(Const::PUMA_TMP_BASE)
stream.binmode
else
# The body[0,0] trick is to get an empty string in the same
# encoding as body.
stream = StringIO.new body[0,0]
end
stream.write body
# Read an odd sized chunk so we can read even sized ones
# after this
chunk = client.readpartial(remain % CHUNK_SIZE)
# No chunk means a closed socket
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
# Read the rest of the chunks
while remain > 0
chunk = client.readpartial(CHUNK_SIZE)
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
end
stream.rewind
return stream
end
# :nocov:
# Handle various error types thrown by Client I/O operations.
def client_error(e, client)
# Swallow, do not log
return if [ConnectionError, EOFError].include?(e.class)
lowlevel_error(e, client.env)
case e
when MiniSSL::SSLError
@events.ssl_error e, client.io
when HttpParserError
client.write_error(400)
@events.parse_error e, client
else
client.write_error(500)
@events.unknown_error e, nil, "Read"
end
end
2011-12-01 18:23:14 -05:00
# A fallback rack response if +@app+ raises as exception.
#
def lowlevel_error(e, env, status=500)
if handler = @options[:lowlevel_error_handler]
if handler.arity == 1
return handler.call(e)
elsif handler.arity == 2
return handler.call(e, env)
else
return handler.call(e, env, status)
end
end
if @leak_stack_on_error
[status, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{e.backtrace.join("\n")}"]]
else
[status, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]]
end
2011-09-24 03:26:17 -04:00
end
# Wait for all outstanding requests to finish.
2011-12-01 18:23:14 -05:00
#
def graceful_shutdown
2015-04-11 01:52:38 -04:00
if @options[:shutdown_debug]
threads = Thread.list
total = threads.size
pid = Process.pid
$stdout.syswrite "#{pid}: === Begin thread backtrace dump ===\n"
2015-04-11 01:52:38 -04:00
threads.each_with_index do |t,i|
$stdout.syswrite "#{pid}: Thread #{i+1}/#{total}: #{t.inspect}\n"
$stdout.syswrite "#{pid}: #{t.backtrace.join("\n#{pid}: ")}\n\n"
2015-04-11 01:52:38 -04:00
end
$stdout.syswrite "#{pid}: === End thread backtrace dump ===\n"
2015-04-11 01:52:38 -04:00
end
if @options[:drain_on_shutdown]
count = 0
while true
ios = IO.select @binder.ios, nil, nil, 0
break unless ios
ios.first.each do |sock|
begin
if io = sock.accept_nonblock
count += 1
client = Client.new io, @binder.env(sock)
@thread_pool << client
end
2020-05-09 12:03:32 -04:00
rescue SystemCallError
end
end
end
@events.debug "Drained #{count} additional connections."
end
if @status != :restart
@binder.close
end
if @thread_pool
if timeout = @options[:force_shutdown_after]
@thread_pool.shutdown timeout.to_f
else
@thread_pool.shutdown
end
end
end
2011-09-23 01:14:39 -04:00
def notify_safely(message)
@notify << message
rescue IOError, NoMethodError, Errno::EPIPE
# The server, in another thread, is shutting down
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
rescue RuntimeError => e
# Temporary workaround for https://bugs.ruby-lang.org/issues/13239
if e.message.include?('IOError')
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
else
raise e
end
end
private :notify_safely
2011-09-23 01:14:39 -04:00
# Stops the acceptor thread and then causes the worker threads to finish
# off the request queue before finally exiting.
def stop(sync=false)
notify_safely(STOP_COMMAND)
2011-09-27 17:33:17 -04:00
@thread.join if @thread && sync
2011-09-23 01:14:39 -04:00
end
def halt(sync=false)
notify_safely(HALT_COMMAND)
@thread.join if @thread && sync
end
def begin_restart(sync=false)
notify_safely(RESTART_COMMAND)
@thread.join if @thread && sync
end
def shutting_down?
@status == :stop || @status == :restart
end
# List of methods invoked by #stats.
# @version 5.0.0
STAT_METHODS = [:backlog, :running, :pool_capacity, :max_threads, :requests_count].freeze
# Returns a hash of stats about the running server for reporting purposes.
# @version 5.0.0
# @!attribute [r] stats
def stats
STAT_METHODS.map {|name| [name, send(name) || 0]}.to_h
end
end
end