1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/server.rb
Evan Phoenix 455c8740bd Add ability to save state and start the status server
The state saving works like saving the pidfile, but it contains much
more rich information about the running server.

Additionally, added the --status option to start the status server so
that the server can be controlled over HTTP.

The state file contains the information about where the status server is
located.
2011-12-05 11:15:44 -08:00

528 lines
14 KiB
Ruby

require 'rubygems'
require 'rack'
require 'stringio'
require 'puma/thread_pool'
require 'puma/const'
require 'puma/events'
require 'puma/null_io'
require 'puma_http11'
require 'socket'
module Puma
# The HTTP Server itself. Serves out a single Rack app.
class Server
include Puma::Const
attr_reader :thread
attr_reader :events
attr_accessor :app
attr_accessor :min_threads
attr_accessor :max_threads
attr_accessor :persistent_timeout
attr_accessor :auto_trim_time
# Create a server for the rack app +app+.
#
# +events+ is an object which will be called when certain error events occur
# to be handled. See Puma::Events for the list of current methods to implement.
#
# Server#run returns a thread that you can join on to wait for the server
# to do it's work.
#
def initialize(app, events=Events::DEFAULT)
@app = app
@events = events
@check, @notify = IO.pipe
@ios = [@check]
@status = :stop
@min_threads = 0
@max_threads = 16
@auto_trim_time = 1
@thread = nil
@thread_pool = nil
@persistent_timeout = PERSISTENT_TIMEOUT
@proto_env = {
"rack.version".freeze => Rack::VERSION,
"rack.errors".freeze => events.stderr,
"rack.multithread".freeze => true,
"rack.multiprocess".freeze => false,
"rack.run_once".freeze => true,
"SCRIPT_NAME".freeze => "",
"CONTENT_TYPE".freeze => "",
"QUERY_STRING".freeze => "",
SERVER_PROTOCOL => HTTP_11,
SERVER_SOFTWARE => PUMA_VERSION,
GATEWAY_INTERFACE => CGI_VER
}
end
# On Linux, use TCP_CORK to better control how the TCP stack
# packetizes our stream. This improves both latency and throughput.
#
if RUBY_PLATFORM =~ /linux/
# 6 == Socket::IPPROTO_TCP
# 3 == TCP_CORK
# 1/0 == turn on/off
def cork_socket(socket)
socket.setsockopt(6, 3, 1) if socket.kind_of? TCPSocket
end
def uncork_socket(socket)
socket.setsockopt(6, 3, 0) if socket.kind_of? TCPSocket
end
else
def cork_socket(socket)
end
def uncork_socket(socket)
end
end
# Tell the server to listen on host +host+, port +port+.
# If optimize_for_latency is true (the default) then clients connecting
# will be optimized for latency over throughput.
#
def add_tcp_listener(host, port, optimize_for_latency=true)
s = TCPServer.new(host, port)
if optimize_for_latency
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
@ios << s
end
# Tell the server to listen on +path+ as a UNIX domain socket.
#
def add_unix_listener(path)
@ios << UNIXServer.new(path)
end
def backlog
@thread_pool and @thread_pool.backlog
end
def running
@thread_pool and @thread_pool.spawned
end
# Runs the server. It returns the thread used so you can join it.
# The thread is always available via #thread to be join'd
#
def run
BasicSocket.do_not_reverse_lookup = true
@status = :run
@thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client|
process_client(client)
end
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
@thread = Thread.new do
begin
check = @check
sockets = @ios
pool = @thread_pool
while @status == :run
begin
ios = IO.select sockets
ios.first.each do |sock|
if sock == check
break if handle_check
else
pool << sock.accept
end
end
rescue Errno::ECONNABORTED
# client closed the socket even before accept
client.close rescue nil
rescue Object => e
@events.unknown_error self, env, e, "Listen loop"
end
end
graceful_shutdown if @status == :stop
ensure
@ios.each { |i| i.close }
end
end
return @thread
end
# :nodoc:
def handle_check
cmd = @check.read(1)
case cmd
when STOP_COMMAND
@status = :stop
return true
when HALT_COMMAND
@status = :halt
return true
end
return false
end
# Given a connection on +client+, handle the incoming requests.
#
# This method support HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
def process_client(client)
parser = HttpParser.new
begin
while true
parser.reset
env = @proto_env.dup
data = client.readpartial(CHUNK_SIZE)
nparsed = 0
# Assumption: nparsed will always be less since data will get filled
# with more after each parsing. If it doesn't get more then there was
# a problem with the read operation on the client socket.
# Effect is to stop processing when the socket can't fill the buffer
# for further parsing.
while nparsed < data.length
nparsed = parser.execute(env, data, nparsed)
if parser.finished?
cl = env[CONTENT_LENGTH]
return unless handle_request(env, client, parser.body, cl)
nparsed += parser.body.size if cl
if data.size > nparsed
data.slice!(0, nparsed)
parser.reset
env = @proto_env.dup
nparsed = 0
else
unless IO.select([client], nil, nil, @persistent_timeout)
raise EOFError, "Timed out persistent connection"
end
end
else
# Parser is not done, queue up more data to read and continue parsing
chunk = client.readpartial(CHUNK_SIZE)
return if !chunk or chunk.length == 0 # read failed, stop processing
data << chunk
if data.length >= MAX_HEADER
raise HttpParserError,
"HEADER is longer than allowed, aborting client early."
end
end
end
end
# The client disconnected while we were reading data
rescue EOFError, SystemCallError
# Swallow them. The ensure tries to close +client+ down
# The client doesn't know HTTP well
rescue HttpParserError => e
@events.parse_error self, env, e
# Server error
rescue StandardError => e
@events.unknown_error self, env, e, "Read"
ensure
begin
client.close
rescue IOError, SystemCallError
# Already closed
rescue StandardError => e
@events.unknown_error self, env, e, "Client"
end
end
end
# Given a Hash +env+ for the request read from +client+, add
# and fixup keys to comply with Rack's env guidelines.
#
def normalize_env(env, client)
if host = env[HTTP_HOST]
if colon = host.index(":")
env[SERVER_NAME] = host[0, colon]
env[SERVER_PORT] = host[colon+1, host.size]
else
env[SERVER_NAME] = host
env[SERVER_PORT] = PORT_80
end
end
unless env[REQUEST_PATH]
# it might be a dumbass full host request header
uri = URI.parse(env[REQUEST_URI])
env[REQUEST_PATH] = uri.path
raise "No REQUEST PATH" unless env[REQUEST_PATH]
end
env[PATH_INFO] = env[REQUEST_PATH]
# From http://www.ietf.org/rfc/rfc3875 :
# "Script authors should be aware that the REMOTE_ADDR and
# REMOTE_HOST meta-variables (see sections 4.1.8 and 4.1.9)
# may not identify the ultimate source of the request.
# They identify the client for the immediate request to the
# server; that client may be a proxy, gateway, or other
# intermediary acting on behalf of the actual source client."
#
env[REMOTE_ADDR] = client.peeraddr.last
end
# The object used for a request with no body. All requests with
# no body share this one object since it has no state.
EmptyBody = NullIO.new
# Given the request +env+ from +client+ and a partial request body
# in +body+, finish reading the body if there is one and invoke
# the rack app. Then construct the response and write it back to
# +client+
#
# +cl+ is the previously fetched Content-Length header if there
# was one. This is an optimization to keep from having to look
# it up again.
#
def handle_request(env, client, body, cl)
normalize_env env, client
if cl
body = read_body env, client, body, cl
return false unless body
else
body = EmptyBody
end
env[RACK_INPUT] = body
env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP
# A rack extension. If the app writes #call'ables to this
# array, we will invoke them when the request is done.
#
after_reply = env[RACK_AFTER_REPLY] = []
begin
begin
status, headers, res_body = @app.call(env)
rescue => e
status, headers, res_body = lowlevel_error(e)
end
content_length = nil
if res_body.kind_of? Array and res_body.size == 1
content_length = res_body[0].size
end
cork_socket client
if env[HTTP_VERSION] == HTTP_11
allow_chunked = true
keep_alive = env[HTTP_CONNECTION] != CLOSE
include_keepalive_header = false
# An optimization. The most common response is 200, so we can
# reply with the proper 200 status without having to compute
# the response header.
#
if status == 200
client.write HTTP_11_200
else
client.write "HTTP/1.1 "
client.write status.to_s
client.write " "
client.write HTTP_STATUS_CODES[status]
client.write "\r\n"
end
else
allow_chunked = false
keep_alive = env[HTTP_CONNECTION] == KEEP_ALIVE
include_keepalive_header = keep_alive
# Same optimization as above for HTTP/1.1
#
if status == 200
client.write HTTP_10_200
else
client.write "HTTP/1.1 "
client.write status.to_s
client.write " "
client.write HTTP_STATUS_CODES[status]
client.write "\r\n"
end
end
colon = COLON
line_ending = LINE_END
headers.each do |k, vs|
case k
when CONTENT_LENGTH2
content_length = vs
next
when TRANSFER_ENCODING
allow_chunked = false
content_length = nil
end
vs.split(NEWLINE).each do |v|
client.write k
client.write colon
client.write v
client.write line_ending
end
end
if include_keepalive_header
client.write CONNECTION_KEEP_ALIVE
elsif !keep_alive
client.write CONNECTION_CLOSE
end
if content_length
client.write CONTENT_LENGTH_S
client.write content_length.to_s
client.write line_ending
chunked = false
elsif allow_chunked
client.write TRANSFER_ENCODING_CHUNKED
chunked = true
end
client.write line_ending
res_body.each do |part|
if chunked
client.write part.size.to_s(16)
client.write line_ending
client.write part
client.write line_ending
else
client.write part
end
client.flush
end
if chunked
client.write CLOSE_CHUNKED
client.flush
end
ensure
uncork_socket client
body.close
res_body.close if res_body.respond_to? :close
after_reply.each { |o| o.call }
end
return keep_alive
end
# Given the requset +env+ from +client+ and the partial body +body+
# plus a potential Content-Length value +cl+, finish reading
# the body and return it.
#
# If the body is larger than MAX_BODY, a Tempfile object is used
# for the body, otherwise a StringIO is used.
#
def read_body(env, client, body, cl)
content_length = cl.to_i
remain = content_length - body.size
return StringIO.new(body) if remain <= 0
# Use a Tempfile if there is a lot of data left
if remain > MAX_BODY
stream = Tempfile.new(Const::PUMA_TMP_BASE)
stream.binmode
else
stream = StringIO.new
end
stream.write body
# Read an odd sized chunk so we can read even sized ones
# after this
chunk = client.readpartial(remain % CHUNK_SIZE)
# No chunk means a closed socket
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
# Raed the rest of the chunks
while remain > 0
chunk = client.readpartial(CHUNK_SIZE)
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
end
stream.rewind
return stream
end
# A fallback rack response if +@app+ raises as exception.
#
def lowlevel_error(e)
[500, {}, ["No application configured"]]
end
# Wait for all outstanding requests to finish.
#
def graceful_shutdown
@thread_pool.shutdown if @thread_pool
end
# Stops the acceptor thread and then causes the worker threads to finish
# off the request queue before finally exiting.
#
def stop(sync=false)
@notify << STOP_COMMAND
@thread.join if @thread && sync
end
def halt(sync=false)
@notify << HALT_COMMAND
@thread.join if @thread && sync
end
end
end