From e83a4954e4c0214d18beb594ddf598fafdf058d7 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Tue, 19 Feb 2019 16:38:21 -0800 Subject: [PATCH 1/6] Switch IO reactor to nio4r This moves away from IO.select to using nio4r to allow the reactor to scale beyond 1024 active clients. This happens when folks are using websockets usually. --- Gemfile | 1 + lib/puma/reactor.rb | 65 ++++++++++++++++++++++++++++++++------------- puma.gemspec | 3 +-- 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/Gemfile b/Gemfile index d0ab3265..44f6e48b 100644 --- a/Gemfile +++ b/Gemfile @@ -5,6 +5,7 @@ gemspec gem "rdoc" gem "rake-compiler" +gem "nio4r", "~> 2.0" gem "rack", "< 3.0" gem "minitest", "~> 5.11" gem "minitest-retry" diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index 98a7e824..b4ce3a9d 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -3,6 +3,8 @@ require 'puma/util' require 'puma/minissl' +require 'nio' + module Puma # Internal Docs, Not a public interface. # @@ -49,6 +51,8 @@ module Puma @events = server.events @app_pool = app_pool + @selector = NIO::Selector.new + @mutex = Mutex.new # Read / Write pipes to wake up internal while loop @@ -57,7 +61,10 @@ module Puma @sleep_for = DefaultSleepFor @timeouts = [] - @sockets = [@ready] + mon = @selector.register(@ready, :r) + mon.value = :wakeup + + @sockets = [mon] end private @@ -122,36 +129,48 @@ module Puma # This calculation happens in `calculate_sleep`. def run_internal sockets = @sockets + selector = @selector while true begin - ready = IO.select sockets, nil, nil, @sleep_for + ready = selector.select @sleep_for rescue IOError => e Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - if sockets.any? { |socket| socket.closed? } + if sockets.any? { |socket| m.value.closed? } STDERR.puts "Error in select: #{e.message} (#{e.class})" STDERR.puts e.backtrace - sockets = sockets.reject { |socket| socket.closed? } + sockets = sockets.reject do |socket| + if m.value.closed? + selector.deregister(socket) + true + end + end + retry else raise end end - if ready and reads = ready[0] - reads.each do |c| - if c == @ready + if ready + ready.each do |m| + if m.value == :wakeup @mutex.synchronize do case @ready.read(1) when "*" - sockets += @input + sockets += @input.map { |i| + mon = selector.register(i, :r) + mon.value = i + mon + } @input.clear when "c" - sockets.delete_if do |s| - if s == @ready + sockets.delete_if do |sm| + if sm.value == :wakeup false else - s.close + sm.value.close + selector.deregister sm true end end @@ -160,6 +179,8 @@ module Puma end end else + c = m.value + # We have to be sure to remove it from the timeout # list or we'll accidentally close the socket when # it's in use! @@ -172,7 +193,8 @@ module Puma begin if c.try_to_finish @app_pool << c - sockets.delete c + selector.deregister m + sockets.delete m end # Don't report these to the lowlevel_error handler, otherwise @@ -182,7 +204,8 @@ module Puma c.write_500 c.close - sockets.delete c + selector.deregister m + sockets.delete m # SSL handshake failure rescue MiniSSL::SSLError => e @@ -193,7 +216,8 @@ module Puma cert = ssl_socket.peercert c.close - sockets.delete c + selector.deregister m + sockets.delete m @events.ssl_error @server, addr, cert, e @@ -204,7 +228,8 @@ module Puma c.write_400 c.close - sockets.delete c + selector.deregister m + sockets.delete m @events.parse_error @server, c.env, e rescue StandardError => e @@ -213,7 +238,8 @@ module Puma c.write_500 c.close - sockets.delete c + selector.deregister m + sockets.delete m end end end @@ -224,10 +250,13 @@ module Puma now = Time.now while @timeouts.first.timeout_at < now - c = @timeouts.shift + m = @timeouts.shift + c = m.value c.write_408 if c.in_data_phase c.close - sockets.delete c + + selector.deregister m + sockets.delete m break if @timeouts.empty? end diff --git a/puma.gemspec b/puma.gemspec index e1348d2c..b11ffec0 100644 --- a/puma.gemspec +++ b/puma.gemspec @@ -1,7 +1,5 @@ # -*- encoding: utf-8 -*- -# This is only used when puma is a git dep from Bundler, keep in sync with Rakefile - version = File.read(File.expand_path("../lib/puma/const.rb", __FILE__))[/VERSION = "(\d+\.\d+\.\d+)"/, 1] || raise Gem::Specification.new do |s| @@ -13,6 +11,7 @@ Gem::Specification.new do |s| s.email = ["evan@phx.io"] s.executables = ["puma", "pumactl"] s.extensions = ["ext/puma_http11/extconf.rb"] + s.add_runtime_dependency "nio4r", "~> 2.0" s.metadata["msys2_mingw_dependencies"] = "openssl" s.files = `git ls-files -- bin docs ext lib tools`.split("\n") + %w[History.md LICENSE README.md] From d68f4b49335c490bb765f7e1dc5dcfde06bf68ab Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Wed, 20 Feb 2019 08:42:33 -0800 Subject: [PATCH 2/6] Move around some code and deal with test failures --- lib/puma/client.rb | 18 +++++++++++++---- lib/puma/reactor.rb | 47 ++++++++++++++++++++++++++++----------------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/lib/puma/client.rb b/lib/puma/client.rb index 04241f6d..3638f751 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -64,6 +64,8 @@ module Puma @peerip = nil @remote_addr_header = nil + + @body_remain = 0 end attr_reader :env, :to_io, :body, :io, :timeout_at, :ready, :hijacked, @@ -102,6 +104,7 @@ module Puma @tempfile = nil @parsed_bytes = 0 @ready = false + @body_remain = 0 if @buffer @parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes) @@ -114,9 +117,16 @@ module Puma end return false - elsif fast_check && - IO.select([@to_io], nil, nil, FAST_TRACK_KA_TIMEOUT) - return try_to_finish + else + begin + if fast_check && + IO.select([@to_io], nil, nil, FAST_TRACK_KA_TIMEOUT) + return try_to_finish + end + rescue IOError + # swallow it + end + end end @@ -294,7 +304,7 @@ module Puma data = @io.read_nonblock(CHUNK_SIZE) rescue Errno::EAGAIN return false - rescue SystemCallError, IOError + rescue SystemCallError, IOError, EOFError raise ConnectionError, "Connection error detected during read" end diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index b4ce3a9d..c2e77494 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -62,7 +62,7 @@ module Puma @timeouts = [] mon = @selector.register(@ready, :r) - mon.value = :wakeup + mon.value = @ready @sockets = [mon] end @@ -154,19 +154,32 @@ module Puma if ready ready.each do |m| - if m.value == :wakeup + if m.value == @ready @mutex.synchronize do case @ready.read(1) when "*" - sockets += @input.map { |i| - mon = selector.register(i, :r) - mon.value = i + sockets.concat(@input.map { |c| + mon = nil + begin + mon = selector.register(c, :r) + rescue ArgumentError + sockets.delete_if { |sm| sm.value.to_io == c.to_io } + selector.deregister(c) + mon = selector.register(c, :r) + end + + mon.value = c + @timeouts << mon if c.timeout_at + sockets << mon mon - } + }) @input.clear + + @timeouts.sort! { |a,b| a.value.timeout_at <=> b.value.timeout_at } + calculate_sleep when "c" sockets.delete_if do |sm| - if sm.value == :wakeup + if sm.value == @ready false else sm.value.close @@ -186,7 +199,7 @@ module Puma # it's in use! if c.timeout_at @mutex.synchronize do - @timeouts.delete c + @timeouts.delete m end end @@ -212,7 +225,12 @@ module Puma @server.lowlevel_error(e, c.env) ssl_socket = c.io - addr = ssl_socket.peeraddr.last + begin + addr = ssl_socket.peeraddr.last + rescue IOError + addr = "" + end + cert = ssl_socket.peercert c.close @@ -249,7 +267,7 @@ module Puma @mutex.synchronize do now = Time.now - while @timeouts.first.timeout_at < now + while @timeouts.first.value.timeout_at < now m = @timeouts.shift c = m.value c.write_408 if c.in_data_phase @@ -305,7 +323,7 @@ module Puma if @timeouts.empty? @sleep_for = DefaultSleepFor else - diff = @timeouts.first.timeout_at.to_f - Time.now.to_f + diff = @timeouts.first.value.timeout_at.to_f - Time.now.to_f if diff < 0.0 @sleep_for = 0 @@ -344,13 +362,6 @@ module Puma @mutex.synchronize do @input << c @trigger << "*" - - if c.timeout_at - @timeouts << c - @timeouts.sort! { |a,b| a.timeout_at <=> b.timeout_at } - - calculate_sleep - end end end From 34aa2c6cd0c63ed7fd64b3cee0243e96d1c43eae Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Wed, 20 Feb 2019 09:30:11 -0800 Subject: [PATCH 3/6] Naming clarifications --- lib/puma/reactor.rb | 86 ++++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index c2e77494..19c0eb18 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -64,7 +64,7 @@ module Puma mon = @selector.register(@ready, :r) mon.value = @ready - @sockets = [mon] + @monitors = [mon] end private @@ -128,7 +128,7 @@ module Puma # will be set to be equal to the amount of time it will take for the next timeout to occur. # This calculation happens in `calculate_sleep`. def run_internal - sockets = @sockets + monitors = @monitors selector = @selector while true @@ -136,12 +136,13 @@ module Puma ready = selector.select @sleep_for rescue IOError => e Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - if sockets.any? { |socket| m.value.closed? } + if monitors.any? { |mon| mon.value.closed? } STDERR.puts "Error in select: #{e.message} (#{e.class})" STDERR.puts e.backtrace - sockets = sockets.reject do |socket| - if m.value.closed? - selector.deregister(socket) + + monitors.reject! do |mon| + if mon.value.closed? + selector.deregister mon true end end @@ -153,37 +154,43 @@ module Puma end if ready - ready.each do |m| - if m.value == @ready + ready.each do |mon| + if mon.value == @ready @mutex.synchronize do case @ready.read(1) when "*" - sockets.concat(@input.map { |c| + @input.each do |c| mon = nil begin - mon = selector.register(c, :r) - rescue ArgumentError - sockets.delete_if { |sm| sm.value.to_io == c.to_io } - selector.deregister(c) - mon = selector.register(c, :r) + begin + mon = selector.register(c, :r) + rescue ArgumentError + # There is a bug where we seem to be registering an already registered + # client. This code deals with this situation but I wish we didn't have to. + monitors.delete_if { |submon| submon.value.to_io == c.to_io } + selector.deregister(c) + mon = selector.register(c, :r) + end + rescue IOError + # Means that the io is closed, so we should ignore this request + # entirely + else + mon.value = c + @timeouts << mon if c.timeout_at + monitors << mon end - - mon.value = c - @timeouts << mon if c.timeout_at - sockets << mon - mon - }) + end @input.clear @timeouts.sort! { |a,b| a.value.timeout_at <=> b.value.timeout_at } calculate_sleep when "c" - sockets.delete_if do |sm| - if sm.value == @ready + monitors.reject! do |submon| + if submon.value == @ready false else - sm.value.close - selector.deregister sm + submon.value.close + selector.deregister submon true end end @@ -192,22 +199,21 @@ module Puma end end else - c = m.value + c = mon.value # We have to be sure to remove it from the timeout # list or we'll accidentally close the socket when # it's in use! if c.timeout_at @mutex.synchronize do - @timeouts.delete m + @timeouts.delete mon end end begin if c.try_to_finish @app_pool << c - selector.deregister m - sockets.delete m + clear_monitor mon end # Don't report these to the lowlevel_error handler, otherwise @@ -217,8 +223,7 @@ module Puma c.write_500 c.close - selector.deregister m - sockets.delete m + clear_monitor mon # SSL handshake failure rescue MiniSSL::SSLError => e @@ -234,8 +239,7 @@ module Puma cert = ssl_socket.peercert c.close - selector.deregister m - sockets.delete m + clear_monitor mon @events.ssl_error @server, addr, cert, e @@ -246,8 +250,7 @@ module Puma c.write_400 c.close - selector.deregister m - sockets.delete m + clear_monitor mon @events.parse_error @server, c.env, e rescue StandardError => e @@ -256,8 +259,7 @@ module Puma c.write_500 c.close - selector.deregister m - sockets.delete m + clear_monitor mon end end end @@ -268,13 +270,12 @@ module Puma now = Time.now while @timeouts.first.value.timeout_at < now - m = @timeouts.shift - c = m.value + mon = @timeouts.shift + c = mon.value c.write_408 if c.in_data_phase c.close - selector.deregister m - sockets.delete m + clear_monitor mon break if @timeouts.empty? end @@ -285,6 +286,11 @@ module Puma end end + def clear_monitor(mon) + @selector.deregister mon + @monitors.delete mon + end + public def run From 37c9c1b5be199aa6005eac23b019fece979f0db1 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Wed, 20 Feb 2019 10:19:50 -0800 Subject: [PATCH 4/6] Fixup some busted testsn --- test/test_cli.rb | 2 +- test/test_config.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_cli.rb b/test/test_cli.rb index 31a7d445..d2710d9e 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -197,7 +197,7 @@ class TestCLI < Minitest::Test gc_count_after = gc_stats["count"].to_i # Hitting the /gc route should increment the count by 1 - assert_equal gc_count_before + 1, gc_count_after + assert(gc_count_before < gc_count_after, "make sure a gc has happened") cli.launcher.stop t.join diff --git a/test/test_config.rb b/test/test_config.rb index 4dd2c6e9..41c0f828 100644 --- a/test/test_config.rb +++ b/test/test_config.rb @@ -56,7 +56,7 @@ class TestConfigFile < Minitest::Test conf.load - ssl_binding = "ssl://0.0.0.0:9292?cert=/path/to/cert&key=/path/to/key&verify_mode=the_verify_mode" + ssl_binding = "ssl://0.0.0.0:9292?cert=/path/to/cert&key=/path/to/key&verify_mode=the_verify_mode&no_tlsv1=false" assert_equal [ssl_binding], conf.options[:binds] end From f5674699024cc9ce82c143350fa0ed993c61a531 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Wed, 20 Feb 2019 20:46:28 -0800 Subject: [PATCH 5/6] Send deregister the IO, not the monitor --- lib/puma/reactor.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index c30537d0..1401c893 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -142,7 +142,7 @@ module Puma monitors.reject! do |mon| if mon.value.closed? - selector.deregister mon + selector.deregister mon.value true end end @@ -190,7 +190,7 @@ module Puma false else submon.value.close - selector.deregister submon + selector.deregister submon.value true end end @@ -287,7 +287,7 @@ module Puma end def clear_monitor(mon) - @selector.deregister mon + @selector.deregister mon.value @monitors.delete mon end From 5b2668ab27f10b3dae0ddc1a112cb0bc070ee376 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Wed, 20 Feb 2019 21:23:54 -0800 Subject: [PATCH 6/6] More more jruby bits --- lib/puma/reactor.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index 1401c893..f4ec92ee 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -190,7 +190,12 @@ module Puma false else submon.value.close - selector.deregister submon.value + begin + selector.deregister submon.value + rescue IOError + # nio4r on jruby seems to throw an IOError here if the IO is closed, so + # we need to swallow it. + end true end end