1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/server.rb

692 lines
20 KiB
Ruby

# frozen_string_literal: true
require 'stringio'
require 'puma/thread_pool'
require 'puma/const'
require 'puma/events'
require 'puma/null_io'
require 'puma/reactor'
require 'puma/client'
require 'puma/binder'
require 'puma/util'
require 'puma/io_buffer'
require 'puma/request'
require 'socket'
require 'forwardable'
module Puma
# The HTTP Server itself. Serves out a single Rack app.
#
# This class is used by the `Puma::Single` and `Puma::Cluster` classes
# to generate one or more `Puma::Server` instances capable of handling requests.
# Each Puma process will contain one `Puma::Server` instance.
#
# The `Puma::Server` instance pulls requests from the socket, adds them to a
# `Puma::Reactor` where they get eventually passed to a `Puma::ThreadPool`.
#
# Each `Puma::Server` will have one reactor and one thread pool.
class Server
include Puma::Const
include Request
extend Forwardable
attr_reader :thread
attr_reader :events
attr_reader :min_threads, :max_threads # for #stats
attr_reader :requests_count # @version 5.0.0
# @todo the following may be deprecated in the future
attr_reader :auto_trim_time, :early_hints, :first_data_timeout,
:leak_stack_on_error,
:persistent_timeout, :reaping_time
# @deprecated v6.0.0
attr_writer :auto_trim_time, :early_hints, :first_data_timeout,
:leak_stack_on_error, :min_threads, :max_threads,
:persistent_timeout, :reaping_time
attr_accessor :app
attr_accessor :binder
def_delegators :@binder, :add_tcp_listener, :add_ssl_listener,
:add_unix_listener, :connected_ports
ThreadLocalKey = :puma_server
# Create a server for the rack app +app+.
#
# +events+ is an object which will be called when certain error events occur
# to be handled. See Puma::Events for the list of current methods to implement.
#
# Server#run returns a thread that you can join on to wait for the server
# to do its work.
#
# @note Several instance variables exist so they are available for testing,
# and have default values set via +fetch+. Normally the values are set via
# `::Puma::Configuration.puma_default_options`.
#
def initialize(app, events=Events.stdio, options={})
@app = app
@events = events
@check, @notify = nil
@status = :stop
@auto_trim_time = 30
@reaping_time = 1
@thread = nil
@thread_pool = nil
@options = options
@early_hints = options.fetch :early_hints, nil
@first_data_timeout = options.fetch :first_data_timeout, FIRST_DATA_TIMEOUT
@min_threads = options.fetch :min_threads, 0
@max_threads = options.fetch :max_threads , (Puma.mri? ? 5 : 16)
@persistent_timeout = options.fetch :persistent_timeout, PERSISTENT_TIMEOUT
@queue_requests = options.fetch :queue_requests, true
@max_fast_inline = options.fetch :max_fast_inline, MAX_FAST_INLINE
temp = !!(@options[:environment] =~ /\A(development|test)\z/)
@leak_stack_on_error = @options[:environment] ? temp : true
@binder = Binder.new(events)
ENV['RACK_ENV'] ||= "development"
@mode = :http
@precheck_closing = true
@requests_count = 0
end
def inherit_binder(bind)
@binder = bind
end
class << self
# @!attribute [r] current
def current
Thread.current[ThreadLocalKey]
end
# :nodoc:
# @version 5.0.0
def tcp_cork_supported?
RbConfig::CONFIG['host_os'] =~ /linux/ &&
Socket.const_defined?(:IPPROTO_TCP) &&
Socket.const_defined?(:TCP_CORK)
end
# :nodoc:
# @version 5.0.0
def closed_socket_supported?
RbConfig::CONFIG['host_os'] =~ /linux/ &&
Socket.const_defined?(:IPPROTO_TCP) &&
Socket.const_defined?(:TCP_INFO)
end
private :tcp_cork_supported?
private :closed_socket_supported?
end
# On Linux, use TCP_CORK to better control how the TCP stack
# packetizes our stream. This improves both latency and throughput.
#
if tcp_cork_supported?
UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze
# 6 == Socket::IPPROTO_TCP
# 3 == TCP_CORK
# 1/0 == turn on/off
def cork_socket(socket)
begin
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if socket.kind_of? TCPSocket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
def uncork_socket(socket)
begin
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if socket.kind_of? TCPSocket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
else
def cork_socket(socket)
end
def uncork_socket(socket)
end
end
if closed_socket_supported?
def closed_socket?(socket)
return false unless socket.kind_of? TCPSocket
return false unless @precheck_closing
begin
tcp_info = socket.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO)
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
@precheck_closing = false
false
else
state = tcp_info.unpack(UNPACK_TCP_STATE_FROM_TCP_INFO)[0]
# TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11
(state >= 6 && state <= 9) || state == 11
end
end
else
def closed_socket?(socket)
false
end
end
# @!attribute [r] backlog
def backlog
@thread_pool and @thread_pool.backlog
end
# @!attribute [r] running
def running
@thread_pool and @thread_pool.spawned
end
# This number represents the number of requests that
# the server is capable of taking right now.
#
# For example if the number is 5 then it means
# there are 5 threads sitting idle ready to take
# a request. If one request comes in, then the
# value would be 4 until it finishes processing.
# @!attribute [r] pool_capacity
def pool_capacity
@thread_pool and @thread_pool.pool_capacity
end
# Runs the server.
#
# If +background+ is true (the default) then a thread is spun
# up in the background to handle requests. Otherwise requests
# are handled synchronously.
#
def run(background=true, thread_name: 'server')
BasicSocket.do_not_reverse_lookup = true
@events.fire :state, :booting
@status = :run
@thread_pool = ThreadPool.new(
@min_threads,
@max_threads,
::Puma::IOBuffer,
&method(:process_client)
)
@thread_pool.out_of_band_hook = @options[:out_of_band]
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
if @queue_requests
@reactor = Reactor.new(&method(:reactor_wakeup))
@reactor.run
end
if @reaping_time
@thread_pool.auto_reap!(@reaping_time)
end
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
@check, @notify = Puma::Util.pipe unless @notify
@events.fire :state, :running
if background
@thread = Thread.new do
Puma.set_thread_name thread_name
handle_servers
end
return @thread
else
handle_servers
end
end
# This method is called from the Reactor thread when a queued Client receives data,
# times out, or when the Reactor is shutting down.
#
# It is responsible for ensuring that a request has been completely received
# before it starts to be processed by the ThreadPool. This may be known as read buffering.
# If read buffering is not done, and no other read buffering is performed (such as by an application server
# such as nginx) then the application would be subject to a slow client attack.
#
# For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
#
# The method checks to see if it has the full header and body with
# the `Puma::Client#try_to_finish` method. If the full request has been sent,
# then the request is passed to the ThreadPool (`@thread_pool << client`)
# so that a "worker thread" can pick up the request and begin to execute application logic.
# The Client is then removed from the reactor (return `true`).
#
# If a client object times out, a 408 response is written, its connection is closed,
# and the object is removed from the reactor (return `true`).
#
# If the Reactor is shutting down, all Clients are either timed out or passed to the
# ThreadPool, depending on their current state (#can_close?).
#
# Otherwise, if the full request is not ready then the client will remain in the reactor
# (return `false`). When the client sends more data to the socket the `Puma::Client` object
# will wake up and again be checked to see if it's ready to be passed to the thread pool.
def reactor_wakeup(client)
shutdown = !@queue_requests
if client.try_to_finish || (shutdown && !client.can_close?)
@thread_pool << client
elsif shutdown || client.timeout == 0
client.timeout!
end
rescue StandardError => e
client_error(e, client)
client.close
true
end
def handle_servers
begin
check = @check
sockets = [check] + @binder.ios
pool = @thread_pool
queue_requests = @queue_requests
remote_addr_value = nil
remote_addr_header = nil
case @options[:remote_address]
when :value
remote_addr_value = @options[:remote_address_value]
when :header
remote_addr_header = @options[:remote_address_header]
end
while @status == :run
begin
ios = IO.select sockets
ios.first.each do |sock|
if sock == check
break if handle_check
else
pool.wait_until_not_full
pool.wait_for_less_busy_worker(
@options[:wait_for_less_busy_worker].to_f)
io = begin
sock.accept_nonblock
rescue IO::WaitReadable
next
end
client = Client.new io, @binder.env(sock)
if remote_addr_value
client.peerip = remote_addr_value
elsif remote_addr_header
client.remote_addr_header = remote_addr_header
end
pool << client
end
end
rescue Object => e
@events.unknown_error e, nil, "Listen loop"
end
end
@events.fire :state, @status
if queue_requests
@queue_requests = false
@reactor.shutdown
end
graceful_shutdown if @status == :stop || @status == :restart
rescue Exception => e
@events.unknown_error e, nil, "Exception handling servers"
ensure
begin
@check.close unless @check.closed?
rescue Errno::EBADF, RuntimeError
# RuntimeError is Ruby 2.2 issue, can't modify frozen IOError
# Errno::EBADF is infrequently raised
end
@notify.close
@notify = nil
@check = nil
end
@events.fire :state, :done
end
# :nodoc:
def handle_check
cmd = @check.read(1)
case cmd
when STOP_COMMAND
@status = :stop
return true
when HALT_COMMAND
@status = :halt
return true
when RESTART_COMMAND
@status = :restart
return true
end
return false
end
# Given a connection on +client+, handle the incoming requests,
# or queue the connection in the Reactor if no request is available.
#
# This method is called from a ThreadPool worker thread.
#
# This method supports HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
# Return true if one or more requests were processed.
def process_client(client, buffer)
# Advertise this server into the thread
Thread.current[ThreadLocalKey] = self
clean_thread_locals = @options[:clean_thread_locals]
close_socket = true
requests = 0
begin
if @queue_requests &&
!client.eagerly_finish
client.set_timeout(@first_data_timeout)
if @reactor.add client
close_socket = false
return false
end
end
with_force_shutdown(client) do
client.finish(@first_data_timeout)
end
while true
@requests_count += 1
case handle_request(client, buffer)
when false
break
when :async
close_socket = false
break
when true
buffer.reset
ThreadPool.clean_thread_locals if clean_thread_locals
requests += 1
check_for_more_data = @status == :run
if requests >= @max_fast_inline
# This will mean that reset will only try to use the data it already
# has buffered and won't try to read more data. What this means is that
# every client, independent of their request speed, gets treated like a slow
# one once every max_fast_inline requests.
check_for_more_data = false
end
next_request_ready = with_force_shutdown(client) do
client.reset(check_for_more_data)
end
unless next_request_ready
break unless @queue_requests
client.set_timeout @persistent_timeout
if @reactor.add client
close_socket = false
break
end
end
end
end
true
rescue StandardError => e
client_error(e, client)
# The ensure tries to close +client+ down
requests > 0
ensure
buffer.reset
begin
client.close if close_socket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
# Already closed
rescue StandardError => e
@events.unknown_error e, nil, "Client"
end
end
end
# Triggers a client timeout if the thread-pool shuts down
# during execution of the provided block.
def with_force_shutdown(client, &block)
@thread_pool.with_force_shutdown(&block)
rescue ThreadPool::ForceShutdown
client.timeout!
end
# :nocov:
# Given the request +env+ from +client+ and the partial body +body+
# plus a potential Content-Length value +cl+, finish reading
# the body and return it.
#
# If the body is larger than MAX_BODY, a Tempfile object is used
# for the body, otherwise a StringIO is used.
# @deprecated 6.0.0
#
def read_body(env, client, body, cl)
content_length = cl.to_i
remain = content_length - body.bytesize
return StringIO.new(body) if remain <= 0
# Use a Tempfile if there is a lot of data left
if remain > MAX_BODY
stream = Tempfile.new(Const::PUMA_TMP_BASE)
stream.binmode
else
# The body[0,0] trick is to get an empty string in the same
# encoding as body.
stream = StringIO.new body[0,0]
end
stream.write body
# Read an odd sized chunk so we can read even sized ones
# after this
chunk = client.readpartial(remain % CHUNK_SIZE)
# No chunk means a closed socket
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
# Read the rest of the chunks
while remain > 0
chunk = client.readpartial(CHUNK_SIZE)
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
end
stream.rewind
return stream
end
# :nocov:
# Handle various error types thrown by Client I/O operations.
def client_error(e, client)
# Swallow, do not log
return if [ConnectionError, EOFError].include?(e.class)
lowlevel_error(e, client.env)
case e
when MiniSSL::SSLError
@events.ssl_error e, client.io
when HttpParserError
client.write_error(400)
@events.parse_error e, client
else
client.write_error(500)
@events.unknown_error e, nil, "Read"
end
end
# A fallback rack response if +@app+ raises as exception.
#
def lowlevel_error(e, env, status=500)
if handler = @options[:lowlevel_error_handler]
if handler.arity == 1
return handler.call(e)
elsif handler.arity == 2
return handler.call(e, env)
else
return handler.call(e, env, status)
end
end
if @leak_stack_on_error
[status, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{e.backtrace.join("\n")}"]]
else
[status, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]]
end
end
# Wait for all outstanding requests to finish.
#
def graceful_shutdown
if @options[:shutdown_debug]
threads = Thread.list
total = threads.size
pid = Process.pid
$stdout.syswrite "#{pid}: === Begin thread backtrace dump ===\n"
threads.each_with_index do |t,i|
$stdout.syswrite "#{pid}: Thread #{i+1}/#{total}: #{t.inspect}\n"
$stdout.syswrite "#{pid}: #{t.backtrace.join("\n#{pid}: ")}\n\n"
end
$stdout.syswrite "#{pid}: === End thread backtrace dump ===\n"
end
if @options[:drain_on_shutdown]
count = 0
while true
ios = IO.select @binder.ios, nil, nil, 0
break unless ios
ios.first.each do |sock|
begin
if io = sock.accept_nonblock
count += 1
client = Client.new io, @binder.env(sock)
@thread_pool << client
end
rescue SystemCallError
end
end
end
@events.debug "Drained #{count} additional connections."
end
if @status != :restart
@binder.close
end
if @thread_pool
if timeout = @options[:force_shutdown_after]
@thread_pool.shutdown timeout.to_f
else
@thread_pool.shutdown
end
end
end
def notify_safely(message)
@notify << message
rescue IOError, NoMethodError, Errno::EPIPE
# The server, in another thread, is shutting down
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
rescue RuntimeError => e
# Temporary workaround for https://bugs.ruby-lang.org/issues/13239
if e.message.include?('IOError')
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
else
raise e
end
end
private :notify_safely
# Stops the acceptor thread and then causes the worker threads to finish
# off the request queue before finally exiting.
def stop(sync=false)
notify_safely(STOP_COMMAND)
@thread.join if @thread && sync
end
def halt(sync=false)
notify_safely(HALT_COMMAND)
@thread.join if @thread && sync
end
def begin_restart(sync=false)
notify_safely(RESTART_COMMAND)
@thread.join if @thread && sync
end
def shutting_down?
@status == :stop || @status == :restart
end
# List of methods invoked by #stats.
# @version 5.0.0
STAT_METHODS = [:backlog, :running, :pool_capacity, :max_threads, :requests_count].freeze
# Returns a hash of stats about the running server for reporting purposes.
# @version 5.0.0
# @!attribute [r] stats
def stats
STAT_METHODS.map {|name| [name, send(name) || 0]}.to_h
end
end
end