mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Vastly improve IO perf on 1.9.3
Puma::IOBuffer is a very simple memory buffer that allows for fast append without additional object overhead. Additionally, turns out that IO#write on 1.9.3 is extremely non-performant because it allows a Hash object on every invocation. Avoid calling IO#write in a loop on 1.9.3! Use IO#syswrite if you can (for instance when you don't care about the encoding of the output (sockets)).
This commit is contained in:
parent
e8d7979c36
commit
ab8dbfeb96
6 changed files with 192 additions and 45 deletions
146
ext/puma_http11/io_buffer.c
Normal file
146
ext/puma_http11/io_buffer.c
Normal file
|
@ -0,0 +1,146 @@
|
|||
#include "ruby.h"
|
||||
|
||||
#include <sys/types.h>
|
||||
|
||||
struct buf_int {
|
||||
uint8_t* top;
|
||||
uint8_t* cur;
|
||||
|
||||
size_t size;
|
||||
};
|
||||
|
||||
#define BUF_DEFAULT_SIZE 4096
|
||||
#define BUF_TOLERANCE 32
|
||||
|
||||
static void buf_free(struct buf_int* internal) {
|
||||
free(internal->top);
|
||||
free(internal);
|
||||
}
|
||||
|
||||
static VALUE buf_alloc(VALUE self) {
|
||||
VALUE buf;
|
||||
struct buf_int* internal;
|
||||
|
||||
buf = Data_Make_Struct(self, struct buf_int, 0, buf_free, internal);
|
||||
|
||||
internal->size = BUF_DEFAULT_SIZE;
|
||||
internal->top = malloc(BUF_DEFAULT_SIZE);
|
||||
internal->cur = internal->top;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static VALUE buf_append(VALUE self, VALUE str) {
|
||||
struct buf_int* b;
|
||||
size_t used, str_len, new_size;
|
||||
|
||||
Data_Get_Struct(self, struct buf_int, b);
|
||||
|
||||
used = b->cur - b->top;
|
||||
|
||||
StringValue(str);
|
||||
str_len = RSTRING_LEN(str);
|
||||
|
||||
new_size = used + str_len;
|
||||
|
||||
if(new_size > b->size) {
|
||||
size_t n = b->size + (b->size / 2);
|
||||
uint8_t* top;
|
||||
|
||||
new_size = (n > new_size ? n : new_size + BUF_TOLERANCE);
|
||||
|
||||
top = malloc(new_size);
|
||||
memcpy(top, b->top, used);
|
||||
b->top = top;
|
||||
b->cur = top + used;
|
||||
b->size = new_size;
|
||||
}
|
||||
|
||||
memcpy(b->cur, RSTRING_PTR(str), str_len);
|
||||
b->cur += str_len;
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
static VALUE buf_append2(int argc, VALUE* argv, VALUE self) {
|
||||
struct buf_int* b;
|
||||
size_t used, new_size;
|
||||
int i;
|
||||
VALUE str;
|
||||
|
||||
Data_Get_Struct(self, struct buf_int, b);
|
||||
|
||||
used = b->cur - b->top;
|
||||
new_size = used;
|
||||
|
||||
for(i = 0; i < argc; i++) {
|
||||
StringValue(argv[i]);
|
||||
|
||||
str = argv[i];
|
||||
|
||||
new_size += RSTRING_LEN(str);
|
||||
}
|
||||
|
||||
if(new_size > b->size) {
|
||||
size_t n = b->size + (b->size / 2);
|
||||
uint8_t* top;
|
||||
|
||||
new_size = (n > new_size ? n : new_size + BUF_TOLERANCE);
|
||||
|
||||
top = malloc(new_size);
|
||||
memcpy(top, b->top, used);
|
||||
b->top = top;
|
||||
b->cur = top + used;
|
||||
b->size = new_size;
|
||||
}
|
||||
|
||||
for(i = 0; i < argc; i++) {
|
||||
long str_len;
|
||||
str = argv[i];
|
||||
str_len = RSTRING_LEN(str);
|
||||
memcpy(b->cur, RSTRING_PTR(str), str_len);
|
||||
b->cur += str_len;
|
||||
}
|
||||
}
|
||||
|
||||
static VALUE buf_to_str(VALUE self) {
|
||||
struct buf_int* b;
|
||||
Data_Get_Struct(self, struct buf_int, b);
|
||||
|
||||
return rb_str_new(b->top, b->cur - b->top);
|
||||
}
|
||||
|
||||
static VALUE buf_used(VALUE self) {
|
||||
struct buf_int* b;
|
||||
Data_Get_Struct(self, struct buf_int, b);
|
||||
|
||||
return INT2FIX(b->cur - b->top);
|
||||
}
|
||||
|
||||
static VALUE buf_capa(VALUE self) {
|
||||
struct buf_int* b;
|
||||
Data_Get_Struct(self, struct buf_int, b);
|
||||
|
||||
return INT2FIX(b->size);
|
||||
}
|
||||
|
||||
static VALUE buf_reset(VALUE self) {
|
||||
struct buf_int* b;
|
||||
Data_Get_Struct(self, struct buf_int, b);
|
||||
|
||||
b->cur = b->top;
|
||||
return self;
|
||||
}
|
||||
|
||||
void Init_io_buffer(VALUE puma) {
|
||||
VALUE buf = rb_define_class_under(puma, "IOBuffer", rb_cObject);
|
||||
|
||||
rb_define_alloc_func(buf, buf_alloc);
|
||||
rb_define_method(buf, "<<", buf_append, 1);
|
||||
rb_define_method(buf, "append", buf_append2, -1);
|
||||
rb_define_method(buf, "to_str", buf_to_str, 0);
|
||||
rb_define_method(buf, "to_s", buf_to_str, 0);
|
||||
rb_define_method(buf, "used", buf_used, 0);
|
||||
rb_define_method(buf, "capacity", buf_capa, 0);
|
||||
rb_define_method(buf, "reset", buf_reset, 0);
|
||||
}
|
|
@ -456,6 +456,8 @@ VALUE HttpParser_body(VALUE self) {
|
|||
return http->body;
|
||||
}
|
||||
|
||||
void Init_io_buffer(VALUE puma);
|
||||
|
||||
void Init_puma_http11()
|
||||
{
|
||||
|
||||
|
@ -482,4 +484,6 @@ void Init_puma_http11()
|
|||
rb_define_method(cHttpParser, "nread", HttpParser_nread, 0);
|
||||
rb_define_method(cHttpParser, "body", HttpParser_body, 0);
|
||||
init_common_fields();
|
||||
|
||||
Init_io_buffer(mPuma);
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ require 'openssl'
|
|||
|
||||
module OpenSSL
|
||||
module SSL
|
||||
if RUBY_VERSION < "1.9"
|
||||
class SSLServer
|
||||
class SSLServer
|
||||
unless public_method_defined? :accept_nonblock
|
||||
def accept_nonblock
|
||||
sock = @svr.accept_nonblock
|
||||
|
||||
|
|
|
@ -209,7 +209,6 @@ module Puma
|
|||
|
||||
o.on "-w", "--workers COUNT",
|
||||
"Activate cluster mode: How many worker processes to create" do |arg|
|
||||
p :here => arg
|
||||
@options[:workers] = arg.to_i
|
||||
end
|
||||
|
||||
|
|
|
@ -112,7 +112,9 @@ module Puma
|
|||
|
||||
@status = :run
|
||||
|
||||
@thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client|
|
||||
@thread_pool = ThreadPool.new(@min_threads,
|
||||
@max_threads,
|
||||
IOBuffer) do |client, buffer|
|
||||
process_now = false
|
||||
|
||||
begin
|
||||
|
@ -124,7 +126,7 @@ module Puma
|
|||
client.close
|
||||
else
|
||||
if process_now
|
||||
process_client client
|
||||
process_client client, buffer
|
||||
else
|
||||
client.set_timeout @first_data_timeout
|
||||
@reactor.add client
|
||||
|
@ -166,7 +168,7 @@ module Puma
|
|||
c = Client.new io, @binder.env(sock)
|
||||
pool << c
|
||||
end
|
||||
rescue SystemCallError => e
|
||||
rescue SystemCallError
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -213,18 +215,20 @@ module Puma
|
|||
# indicates that it supports keep alive, wait for another request before
|
||||
# returning.
|
||||
#
|
||||
def process_client(client)
|
||||
def process_client(client, buffer)
|
||||
begin
|
||||
close_socket = true
|
||||
|
||||
while true
|
||||
case handle_request(client)
|
||||
case handle_request(client, buffer)
|
||||
when false
|
||||
return
|
||||
when :async
|
||||
close_socket = false
|
||||
return
|
||||
when true
|
||||
buffer.reset
|
||||
|
||||
unless client.reset
|
||||
close_socket = false
|
||||
client.set_timeout @persistent_timeout
|
||||
|
@ -304,7 +308,7 @@ module Puma
|
|||
# was one. This is an optimization to keep from having to look
|
||||
# it up again.
|
||||
#
|
||||
def handle_request(req)
|
||||
def handle_request(req, lines)
|
||||
env = req.env
|
||||
client = req.io
|
||||
|
||||
|
@ -349,6 +353,9 @@ module Puma
|
|||
|
||||
cork_socket client
|
||||
|
||||
line_ending = LINE_END
|
||||
colon = COLON
|
||||
|
||||
if env[HTTP_VERSION] == HTTP_11
|
||||
allow_chunked = true
|
||||
keep_alive = env[HTTP_CONNECTION] != CLOSE
|
||||
|
@ -359,13 +366,10 @@ module Puma
|
|||
# the response header.
|
||||
#
|
||||
if status == 200
|
||||
client.write HTTP_11_200
|
||||
lines << HTTP_11_200
|
||||
else
|
||||
client.write "HTTP/1.1 "
|
||||
client.write status.to_s
|
||||
client.write " "
|
||||
client.write HTTP_STATUS_CODES[status]
|
||||
client.write "\r\n"
|
||||
lines.append "HTTP/1.1 ", status.to_s, " ",
|
||||
HTTP_STATUS_CODES[status], line_ending
|
||||
|
||||
no_body = status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
|
||||
end
|
||||
|
@ -377,23 +381,15 @@ module Puma
|
|||
# Same optimization as above for HTTP/1.1
|
||||
#
|
||||
if status == 200
|
||||
client.write HTTP_10_200
|
||||
lines << HTTP_10_200
|
||||
else
|
||||
client.write "HTTP/1.0 "
|
||||
client.write status.to_s
|
||||
client.write " "
|
||||
client.write HTTP_STATUS_CODES[status]
|
||||
client.write "\r\n"
|
||||
lines.append "HTTP/1.0 ", status.to_s, " ",
|
||||
HTTP_STATUS_CODES[status], line_ending
|
||||
|
||||
no_body = status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
|
||||
end
|
||||
end
|
||||
|
||||
colon = COLON
|
||||
line_ending = LINE_END
|
||||
|
||||
lines = []
|
||||
|
||||
headers.each do |k, vs|
|
||||
case k
|
||||
when CONTENT_LENGTH2
|
||||
|
@ -407,50 +403,49 @@ module Puma
|
|||
end
|
||||
|
||||
vs.split(NEWLINE).each do |v|
|
||||
lines << (k + colon + v + line_ending)
|
||||
lines.append k, colon, v, line_ending
|
||||
end
|
||||
end
|
||||
|
||||
client.write lines.join
|
||||
|
||||
if no_body
|
||||
client.write line_ending
|
||||
lines << line_ending
|
||||
client.syswrite lines.to_s
|
||||
return keep_alive
|
||||
end
|
||||
|
||||
if include_keepalive_header
|
||||
client.write CONNECTION_KEEP_ALIVE
|
||||
lines << CONNECTION_KEEP_ALIVE
|
||||
elsif !keep_alive
|
||||
client.write CONNECTION_CLOSE
|
||||
lines << CONNECTION_CLOSE
|
||||
end
|
||||
|
||||
if content_length
|
||||
client.write CONTENT_LENGTH_S
|
||||
client.write content_length.to_s
|
||||
client.write line_ending
|
||||
lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
|
||||
chunked = false
|
||||
elsif allow_chunked
|
||||
client.write TRANSFER_ENCODING_CHUNKED
|
||||
lines << TRANSFER_ENCODING_CHUNKED
|
||||
chunked = true
|
||||
end
|
||||
|
||||
client.write line_ending
|
||||
lines << line_ending
|
||||
|
||||
client.syswrite lines.to_s
|
||||
|
||||
res_body.each do |part|
|
||||
if chunked
|
||||
client.write part.bytesize.to_s(16)
|
||||
client.write line_ending
|
||||
client.write part
|
||||
client.write line_ending
|
||||
client.syswrite part.bytesize.to_s(16)
|
||||
client.syswrite line_ending
|
||||
client.syswrite part
|
||||
client.syswrite line_ending
|
||||
else
|
||||
client.write part
|
||||
client.syswrite part
|
||||
end
|
||||
|
||||
client.flush
|
||||
end
|
||||
|
||||
if chunked
|
||||
client.write CLOSE_CHUNKED
|
||||
client.syswrite CLOSE_CHUNKED
|
||||
client.flush
|
||||
end
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ module Puma
|
|||
# The block passed is the work that will be performed in each
|
||||
# thread.
|
||||
#
|
||||
def initialize(min, max, &blk)
|
||||
def initialize(min, max, *extra, &blk)
|
||||
@cond = ConditionVariable.new
|
||||
@mutex = Mutex.new
|
||||
|
||||
|
@ -23,6 +23,7 @@ module Puma
|
|||
@min = min
|
||||
@max = max
|
||||
@block = blk
|
||||
@extra = extra
|
||||
|
||||
@shutdown = false
|
||||
|
||||
|
@ -58,6 +59,8 @@ module Puma
|
|||
mutex = @mutex
|
||||
cond = @cond
|
||||
|
||||
extra = @extra.map { |i| i.new }
|
||||
|
||||
while true
|
||||
work = nil
|
||||
|
||||
|
@ -91,7 +94,7 @@ module Puma
|
|||
|
||||
break unless continue
|
||||
|
||||
block.call work
|
||||
block.call(work, *extra)
|
||||
end
|
||||
|
||||
mutex.synchronize do
|
||||
|
|
Loading…
Reference in a new issue