mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Add separate IO reactor to defeat slow clients
Previously, the app thread would be in charge of reading the request directly from the client. This resulted in a set of slow clients being able to completely starve the app thread pool and prevent any further connections from being handled. This new organization uses a seperate reactor thread that is in charge of responding when a client has more data, buffering the data and attempting to parse the data. When the data represents a fully realized request, only then is it handed to the app thread pool. This means we trust apps to not starve the pool, but don't trust clients.
This commit is contained in:
parent
5b11c5e23c
commit
6777c771d8
4 changed files with 259 additions and 64 deletions
142
lib/puma/client.rb
Normal file
142
lib/puma/client.rb
Normal file
|
@ -0,0 +1,142 @@
|
|||
module Puma
|
||||
class Client
|
||||
include Puma::Const
|
||||
|
||||
def initialize(io, env)
|
||||
@io = io
|
||||
@to_io = io.to_io
|
||||
@proto_env = env
|
||||
@env = env.dup
|
||||
|
||||
@parser = HttpParser.new
|
||||
@parsed_bytes = 0
|
||||
@read_header = true
|
||||
@body = nil
|
||||
@buffer = nil
|
||||
|
||||
@timeout_at = nil
|
||||
end
|
||||
|
||||
attr_reader :env, :to_io, :body, :io, :timeout_at
|
||||
|
||||
def set_timeout(val)
|
||||
@timeout_at = Time.now + val
|
||||
end
|
||||
|
||||
def reset
|
||||
@parser.reset
|
||||
@read_header = true
|
||||
@env = @proto_env.dup
|
||||
@body = nil
|
||||
@parsed_bytes = 0
|
||||
|
||||
if @buffer
|
||||
@parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes)
|
||||
|
||||
if @parser.finished?
|
||||
return setup_body
|
||||
elsif @parsed_bytes >= MAX_HEADER
|
||||
raise HttpParserError,
|
||||
"HEADER is longer than allowed, aborting client early."
|
||||
end
|
||||
|
||||
return false
|
||||
end
|
||||
end
|
||||
|
||||
def close
|
||||
@io.close
|
||||
end
|
||||
|
||||
EmptyBody = NullIO.new
|
||||
|
||||
def setup_body
|
||||
body = @parser.body
|
||||
cl = @env[CONTENT_LENGTH]
|
||||
|
||||
unless cl
|
||||
@buffer = body.empty? ? nil : body
|
||||
@body = EmptyBody
|
||||
return true
|
||||
end
|
||||
|
||||
remain = cl.to_i - body.bytesize
|
||||
|
||||
if remain <= 0
|
||||
@body = StringIO.new(body)
|
||||
return true
|
||||
end
|
||||
|
||||
if remain > MAX_BODY
|
||||
@body = Tempfile.new(Const::PUMA_TMP_BASE)
|
||||
@body.binmode
|
||||
else
|
||||
# The body[0,0] trick is to get an empty string in the same
|
||||
# encoding as body.
|
||||
@body = StringIO.new body[0,0]
|
||||
end
|
||||
|
||||
@body.write body
|
||||
|
||||
@body_remain = remain
|
||||
|
||||
@read_header = false
|
||||
|
||||
return false
|
||||
end
|
||||
|
||||
def try_to_finish
|
||||
return read_body unless @read_header
|
||||
|
||||
data = @io.readpartial(CHUNK_SIZE)
|
||||
|
||||
if @buffer
|
||||
@buffer << data
|
||||
else
|
||||
@buffer = data
|
||||
end
|
||||
|
||||
@parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes)
|
||||
|
||||
if @parser.finished?
|
||||
return setup_body
|
||||
elsif @parsed_bytes >= MAX_HEADER
|
||||
raise HttpParserError,
|
||||
"HEADER is longer than allowed, aborting client early."
|
||||
end
|
||||
|
||||
false
|
||||
end
|
||||
|
||||
def read_body
|
||||
# Read an odd sized chunk so we can read even sized ones
|
||||
# after this
|
||||
remain = @body_remain
|
||||
|
||||
if remain > CHUNK_SIZE
|
||||
want = CHUNK_SIZE
|
||||
else
|
||||
want = remain
|
||||
end
|
||||
|
||||
chunk = @io.readpartial(want)
|
||||
|
||||
# No chunk means a closed socket
|
||||
unless chunk
|
||||
@body.close
|
||||
raise EOFError
|
||||
end
|
||||
|
||||
remain -= @body.write(chunk)
|
||||
|
||||
if remain <= 0
|
||||
@body.rewind
|
||||
return true
|
||||
end
|
||||
|
||||
@body_remain = remain
|
||||
|
||||
false
|
||||
end
|
||||
end
|
||||
end
|
82
lib/puma/reactor.rb
Normal file
82
lib/puma/reactor.rb
Normal file
|
@ -0,0 +1,82 @@
|
|||
module Puma
|
||||
class Reactor
|
||||
DefaultSleepFor = 5
|
||||
|
||||
def initialize(events, app_pool)
|
||||
@events = events
|
||||
@app_pool = app_pool
|
||||
|
||||
@mutex = Mutex.new
|
||||
@ready, @trigger = IO.pipe
|
||||
@input = []
|
||||
@sleep_for = DefaultSleepFor
|
||||
@timeouts = []
|
||||
end
|
||||
|
||||
def run
|
||||
sockets = [@ready]
|
||||
|
||||
while true
|
||||
ready = IO.select sockets, nil, nil, @sleep_for
|
||||
|
||||
if ready and reads = ready[0]
|
||||
reads.each do |c|
|
||||
if c == @ready
|
||||
@mutex.synchronize do
|
||||
@ready.read(1) # drain
|
||||
sockets += @input
|
||||
@input.clear
|
||||
end
|
||||
else
|
||||
begin
|
||||
if c.try_to_finish
|
||||
@app_pool << c
|
||||
sockets.delete c
|
||||
end
|
||||
# The client doesn't know HTTP well
|
||||
rescue HttpParserError => e
|
||||
@events.parse_error self, c.env, e
|
||||
|
||||
rescue EOFError
|
||||
c.close
|
||||
sockets.delete c
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
unless @timeouts.empty?
|
||||
now = Time.now
|
||||
|
||||
while @timeouts.first.timeout_at < now
|
||||
c = @timeouts.shift
|
||||
sockets.delete c
|
||||
c.close
|
||||
|
||||
if @timeouts.empty?
|
||||
@sleep_for = DefaultSleepFor
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def run_in_thread
|
||||
@thread = Thread.new { run }
|
||||
end
|
||||
|
||||
def add(c)
|
||||
@mutex.synchronize do
|
||||
@input << c
|
||||
@trigger << "!"
|
||||
|
||||
if c.timeout_at
|
||||
@timeouts << c
|
||||
@timeouts.sort! { |a,b| a.timeout_at <=> b.timeout_at }
|
||||
@sleep_for = @timeouts.first.timeout_at.to_f - Time.now.to_f
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -6,6 +6,8 @@ require 'puma/const'
|
|||
require 'puma/events'
|
||||
require 'puma/null_io'
|
||||
require 'puma/compat'
|
||||
require 'puma/reactor'
|
||||
require 'puma/client'
|
||||
|
||||
require 'puma/puma_http11'
|
||||
|
||||
|
@ -196,10 +198,14 @@ module Puma
|
|||
|
||||
@status = :run
|
||||
|
||||
@thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client, env|
|
||||
process_client(client, env)
|
||||
@thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client|
|
||||
process_client(client)
|
||||
end
|
||||
|
||||
@reactor = Reactor.new @events, @thread_pool
|
||||
|
||||
@reactor.run_in_thread
|
||||
|
||||
if @auto_trim_time
|
||||
@thread_pool.auto_trim!(@auto_trim_time)
|
||||
end
|
||||
|
@ -225,7 +231,8 @@ module Puma
|
|||
if sock == check
|
||||
break if handle_check
|
||||
else
|
||||
pool << [sock.accept, @envs.fetch(sock, @proto_env)]
|
||||
c = Client.new sock.accept, @envs.fetch(sock, @proto_env)
|
||||
@reactor.add c
|
||||
end
|
||||
end
|
||||
rescue Errno::ECONNABORTED
|
||||
|
@ -270,61 +277,23 @@ module Puma
|
|||
# indicates that it supports keep alive, wait for another request before
|
||||
# returning.
|
||||
#
|
||||
def process_client(client, proto_env)
|
||||
parser = HttpParser.new
|
||||
close_socket = true
|
||||
|
||||
def process_client(client)
|
||||
begin
|
||||
close_socket = true
|
||||
|
||||
while true
|
||||
parser.reset
|
||||
|
||||
env = proto_env.dup
|
||||
data = client.readpartial(CHUNK_SIZE)
|
||||
nparsed = 0
|
||||
|
||||
# Assumption: nparsed will always be less since data will get filled
|
||||
# with more after each parsing. If it doesn't get more then there was
|
||||
# a problem with the read operation on the client socket.
|
||||
# Effect is to stop processing when the socket can't fill the buffer
|
||||
# for further parsing.
|
||||
while nparsed < data.bytesize
|
||||
nparsed = parser.execute(env, data, nparsed)
|
||||
|
||||
if parser.finished?
|
||||
cl = env[CONTENT_LENGTH]
|
||||
|
||||
case handle_request(env, client, parser.body, cl)
|
||||
when false
|
||||
return
|
||||
when :async
|
||||
close_socket = false
|
||||
return
|
||||
end
|
||||
|
||||
nparsed += parser.body.bytesize if cl
|
||||
|
||||
if data.bytesize > nparsed
|
||||
data.slice!(0, nparsed)
|
||||
parser.reset
|
||||
env = @proto_env.dup
|
||||
nparsed = 0
|
||||
else
|
||||
unless ret = IO.select([client, @persistent_check], nil, nil, @persistent_timeout)
|
||||
raise EOFError, "Timed out persistent connection"
|
||||
end
|
||||
|
||||
return if ret.first.include? @persistent_check
|
||||
end
|
||||
else
|
||||
# Parser is not done, queue up more data to read and continue parsing
|
||||
chunk = client.readpartial(CHUNK_SIZE)
|
||||
return if !chunk or chunk.length == 0 # read failed, stop processing
|
||||
|
||||
data << chunk
|
||||
if data.bytesize >= MAX_HEADER
|
||||
raise HttpParserError,
|
||||
"HEADER is longer than allowed, aborting client early."
|
||||
end
|
||||
case handle_request(client)
|
||||
when false
|
||||
return
|
||||
when :async
|
||||
close_socket = false
|
||||
return
|
||||
when true
|
||||
unless client.reset
|
||||
close_socket = false
|
||||
client.set_timeout @persistent_timeout
|
||||
@reactor.add client
|
||||
return
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -335,7 +304,7 @@ module Puma
|
|||
|
||||
# The client doesn't know HTTP well
|
||||
rescue HttpParserError => e
|
||||
@events.parse_error self, env, e
|
||||
@events.parse_error self, client.env, e
|
||||
|
||||
# Server error
|
||||
rescue StandardError => e
|
||||
|
@ -403,17 +372,15 @@ module Puma
|
|||
# was one. This is an optimization to keep from having to look
|
||||
# it up again.
|
||||
#
|
||||
def handle_request(env, client, body, cl)
|
||||
def handle_request(req)
|
||||
env = req.env
|
||||
client = req.io
|
||||
|
||||
normalize_env env, client
|
||||
|
||||
env[PUMA_SOCKET] = client
|
||||
|
||||
if cl
|
||||
body = read_body env, client, body, cl
|
||||
return false unless body
|
||||
else
|
||||
body = EmptyBody
|
||||
end
|
||||
body = req.body
|
||||
|
||||
env[RACK_INPUT] = body
|
||||
env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP
|
||||
|
|
4
test/hello-post.ru
Normal file
4
test/hello-post.ru
Normal file
|
@ -0,0 +1,4 @@
|
|||
run lambda { |env|
|
||||
p :body => env['rack.input'].read
|
||||
[200, {"Content-Type" => "text/plain"}, ["Hello World"]]
|
||||
}
|
Loading…
Add table
Reference in a new issue