From ab8dbfeb96e50863fb78bfa110ee24db8f69bd1e Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Sat, 11 Aug 2012 15:09:09 -0700 Subject: [PATCH] Vastly improve IO perf on 1.9.3 Puma::IOBuffer is a very simple memory buffer that allows for fast append without additional object overhead. Additionally, turns out that IO#write on 1.9.3 is extremely non-performant because it allows a Hash object on every invocation. Avoid calling IO#write in a loop on 1.9.3! Use IO#syswrite if you can (for instance when you don't care about the encoding of the output (sockets)). --- ext/puma_http11/io_buffer.c | 146 ++++++++++++++++++++++++++++++++++ ext/puma_http11/puma_http11.c | 4 + lib/puma/accept_nonblock.rb | 4 +- lib/puma/cli.rb | 1 - lib/puma/server.rb | 75 ++++++++--------- lib/puma/thread_pool.rb | 7 +- 6 files changed, 192 insertions(+), 45 deletions(-) create mode 100644 ext/puma_http11/io_buffer.c diff --git a/ext/puma_http11/io_buffer.c b/ext/puma_http11/io_buffer.c new file mode 100644 index 00000000..a7df4289 --- /dev/null +++ b/ext/puma_http11/io_buffer.c @@ -0,0 +1,146 @@ +#include "ruby.h" + +#include + +struct buf_int { + uint8_t* top; + uint8_t* cur; + + size_t size; +}; + +#define BUF_DEFAULT_SIZE 4096 +#define BUF_TOLERANCE 32 + +static void buf_free(struct buf_int* internal) { + free(internal->top); + free(internal); +} + +static VALUE buf_alloc(VALUE self) { + VALUE buf; + struct buf_int* internal; + + buf = Data_Make_Struct(self, struct buf_int, 0, buf_free, internal); + + internal->size = BUF_DEFAULT_SIZE; + internal->top = malloc(BUF_DEFAULT_SIZE); + internal->cur = internal->top; + + return buf; +} + +static VALUE buf_append(VALUE self, VALUE str) { + struct buf_int* b; + size_t used, str_len, new_size; + + Data_Get_Struct(self, struct buf_int, b); + + used = b->cur - b->top; + + StringValue(str); + str_len = RSTRING_LEN(str); + + new_size = used + str_len; + + if(new_size > b->size) { + size_t n = b->size + (b->size / 2); + uint8_t* top; + + new_size = (n > new_size ? n : new_size + BUF_TOLERANCE); + + top = malloc(new_size); + memcpy(top, b->top, used); + b->top = top; + b->cur = top + used; + b->size = new_size; + } + + memcpy(b->cur, RSTRING_PTR(str), str_len); + b->cur += str_len; + + return self; +} + +static VALUE buf_append2(int argc, VALUE* argv, VALUE self) { + struct buf_int* b; + size_t used, new_size; + int i; + VALUE str; + + Data_Get_Struct(self, struct buf_int, b); + + used = b->cur - b->top; + new_size = used; + + for(i = 0; i < argc; i++) { + StringValue(argv[i]); + + str = argv[i]; + + new_size += RSTRING_LEN(str); + } + + if(new_size > b->size) { + size_t n = b->size + (b->size / 2); + uint8_t* top; + + new_size = (n > new_size ? n : new_size + BUF_TOLERANCE); + + top = malloc(new_size); + memcpy(top, b->top, used); + b->top = top; + b->cur = top + used; + b->size = new_size; + } + + for(i = 0; i < argc; i++) { + long str_len; + str = argv[i]; + str_len = RSTRING_LEN(str); + memcpy(b->cur, RSTRING_PTR(str), str_len); + b->cur += str_len; + } +} + +static VALUE buf_to_str(VALUE self) { + struct buf_int* b; + Data_Get_Struct(self, struct buf_int, b); + + return rb_str_new(b->top, b->cur - b->top); +} + +static VALUE buf_used(VALUE self) { + struct buf_int* b; + Data_Get_Struct(self, struct buf_int, b); + + return INT2FIX(b->cur - b->top); +} + +static VALUE buf_capa(VALUE self) { + struct buf_int* b; + Data_Get_Struct(self, struct buf_int, b); + + return INT2FIX(b->size); +} + +static VALUE buf_reset(VALUE self) { + struct buf_int* b; + Data_Get_Struct(self, struct buf_int, b); + + b->cur = b->top; + return self; +} + +void Init_io_buffer(VALUE puma) { + VALUE buf = rb_define_class_under(puma, "IOBuffer", rb_cObject); + + rb_define_alloc_func(buf, buf_alloc); + rb_define_method(buf, "<<", buf_append, 1); + rb_define_method(buf, "append", buf_append2, -1); + rb_define_method(buf, "to_str", buf_to_str, 0); + rb_define_method(buf, "to_s", buf_to_str, 0); + rb_define_method(buf, "used", buf_used, 0); + rb_define_method(buf, "capacity", buf_capa, 0); + rb_define_method(buf, "reset", buf_reset, 0); +} diff --git a/ext/puma_http11/puma_http11.c b/ext/puma_http11/puma_http11.c index caf97633..4b6d74fe 100644 --- a/ext/puma_http11/puma_http11.c +++ b/ext/puma_http11/puma_http11.c @@ -456,6 +456,8 @@ VALUE HttpParser_body(VALUE self) { return http->body; } +void Init_io_buffer(VALUE puma); + void Init_puma_http11() { @@ -482,4 +484,6 @@ void Init_puma_http11() rb_define_method(cHttpParser, "nread", HttpParser_nread, 0); rb_define_method(cHttpParser, "body", HttpParser_body, 0); init_common_fields(); + + Init_io_buffer(mPuma); } diff --git a/lib/puma/accept_nonblock.rb b/lib/puma/accept_nonblock.rb index dce51d3c..a8d67673 100644 --- a/lib/puma/accept_nonblock.rb +++ b/lib/puma/accept_nonblock.rb @@ -2,8 +2,8 @@ require 'openssl' module OpenSSL module SSL - if RUBY_VERSION < "1.9" - class SSLServer + class SSLServer + unless public_method_defined? :accept_nonblock def accept_nonblock sock = @svr.accept_nonblock diff --git a/lib/puma/cli.rb b/lib/puma/cli.rb index eacb85cf..0db7bb81 100644 --- a/lib/puma/cli.rb +++ b/lib/puma/cli.rb @@ -209,7 +209,6 @@ module Puma o.on "-w", "--workers COUNT", "Activate cluster mode: How many worker processes to create" do |arg| - p :here => arg @options[:workers] = arg.to_i end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 9d41314f..5a3e7274 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -112,7 +112,9 @@ module Puma @status = :run - @thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client| + @thread_pool = ThreadPool.new(@min_threads, + @max_threads, + IOBuffer) do |client, buffer| process_now = false begin @@ -124,7 +126,7 @@ module Puma client.close else if process_now - process_client client + process_client client, buffer else client.set_timeout @first_data_timeout @reactor.add client @@ -166,7 +168,7 @@ module Puma c = Client.new io, @binder.env(sock) pool << c end - rescue SystemCallError => e + rescue SystemCallError end end end @@ -213,18 +215,20 @@ module Puma # indicates that it supports keep alive, wait for another request before # returning. # - def process_client(client) + def process_client(client, buffer) begin close_socket = true while true - case handle_request(client) + case handle_request(client, buffer) when false return when :async close_socket = false return when true + buffer.reset + unless client.reset close_socket = false client.set_timeout @persistent_timeout @@ -304,7 +308,7 @@ module Puma # was one. This is an optimization to keep from having to look # it up again. # - def handle_request(req) + def handle_request(req, lines) env = req.env client = req.io @@ -349,6 +353,9 @@ module Puma cork_socket client + line_ending = LINE_END + colon = COLON + if env[HTTP_VERSION] == HTTP_11 allow_chunked = true keep_alive = env[HTTP_CONNECTION] != CLOSE @@ -359,13 +366,10 @@ module Puma # the response header. # if status == 200 - client.write HTTP_11_200 + lines << HTTP_11_200 else - client.write "HTTP/1.1 " - client.write status.to_s - client.write " " - client.write HTTP_STATUS_CODES[status] - client.write "\r\n" + lines.append "HTTP/1.1 ", status.to_s, " ", + HTTP_STATUS_CODES[status], line_ending no_body = status < 200 || STATUS_WITH_NO_ENTITY_BODY[status] end @@ -377,23 +381,15 @@ module Puma # Same optimization as above for HTTP/1.1 # if status == 200 - client.write HTTP_10_200 + lines << HTTP_10_200 else - client.write "HTTP/1.0 " - client.write status.to_s - client.write " " - client.write HTTP_STATUS_CODES[status] - client.write "\r\n" + lines.append "HTTP/1.0 ", status.to_s, " ", + HTTP_STATUS_CODES[status], line_ending no_body = status < 200 || STATUS_WITH_NO_ENTITY_BODY[status] end end - colon = COLON - line_ending = LINE_END - - lines = [] - headers.each do |k, vs| case k when CONTENT_LENGTH2 @@ -407,50 +403,49 @@ module Puma end vs.split(NEWLINE).each do |v| - lines << (k + colon + v + line_ending) + lines.append k, colon, v, line_ending end end - client.write lines.join - if no_body - client.write line_ending + lines << line_ending + client.syswrite lines.to_s return keep_alive end if include_keepalive_header - client.write CONNECTION_KEEP_ALIVE + lines << CONNECTION_KEEP_ALIVE elsif !keep_alive - client.write CONNECTION_CLOSE + lines << CONNECTION_CLOSE end if content_length - client.write CONTENT_LENGTH_S - client.write content_length.to_s - client.write line_ending + lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending chunked = false elsif allow_chunked - client.write TRANSFER_ENCODING_CHUNKED + lines << TRANSFER_ENCODING_CHUNKED chunked = true end - client.write line_ending + lines << line_ending + + client.syswrite lines.to_s res_body.each do |part| if chunked - client.write part.bytesize.to_s(16) - client.write line_ending - client.write part - client.write line_ending + client.syswrite part.bytesize.to_s(16) + client.syswrite line_ending + client.syswrite part + client.syswrite line_ending else - client.write part + client.syswrite part end client.flush end if chunked - client.write CLOSE_CHUNKED + client.syswrite CLOSE_CHUNKED client.flush end diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index 6bf566c7..844c9bf5 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -11,7 +11,7 @@ module Puma # The block passed is the work that will be performed in each # thread. # - def initialize(min, max, &blk) + def initialize(min, max, *extra, &blk) @cond = ConditionVariable.new @mutex = Mutex.new @@ -23,6 +23,7 @@ module Puma @min = min @max = max @block = blk + @extra = extra @shutdown = false @@ -58,6 +59,8 @@ module Puma mutex = @mutex cond = @cond + extra = @extra.map { |i| i.new } + while true work = nil @@ -91,7 +94,7 @@ module Puma break unless continue - block.call work + block.call(work, *extra) end mutex.synchronize do