diff --git a/Gemfile b/Gemfile index d0ab3265..44f6e48b 100644 --- a/Gemfile +++ b/Gemfile @@ -5,6 +5,7 @@ gemspec gem "rdoc" gem "rake-compiler" +gem "nio4r", "~> 2.0" gem "rack", "< 3.0" gem "minitest", "~> 5.11" gem "minitest-retry" diff --git a/lib/puma/client.rb b/lib/puma/client.rb index 08875b43..b8410b0c 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -64,6 +64,8 @@ module Puma @peerip = nil @remote_addr_header = nil + + @body_remain = 0 end attr_reader :env, :to_io, :body, :io, :timeout_at, :ready, :hijacked, @@ -102,6 +104,7 @@ module Puma @tempfile = nil @parsed_bytes = 0 @ready = false + @body_remain = 0 @peerip = nil if @buffer @@ -115,9 +118,16 @@ module Puma end return false - elsif fast_check && - IO.select([@to_io], nil, nil, FAST_TRACK_KA_TIMEOUT) - return try_to_finish + else + begin + if fast_check && + IO.select([@to_io], nil, nil, FAST_TRACK_KA_TIMEOUT) + return try_to_finish + end + rescue IOError + # swallow it + end + end end @@ -296,7 +306,7 @@ module Puma data = @io.read_nonblock(CHUNK_SIZE) rescue Errno::EAGAIN return false - rescue SystemCallError, IOError + rescue SystemCallError, IOError, EOFError raise ConnectionError, "Connection error detected during read" end diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index c462d86b..f4ec92ee 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -3,6 +3,8 @@ require 'puma/util' require 'puma/minissl' +require 'nio' + module Puma # Internal Docs, Not a public interface. # @@ -49,6 +51,8 @@ module Puma @events = server.events @app_pool = app_pool + @selector = NIO::Selector.new + @mutex = Mutex.new # Read / Write pipes to wake up internal while loop @@ -57,7 +61,10 @@ module Puma @sleep_for = DefaultSleepFor @timeouts = [] - @sockets = [@ready] + mon = @selector.register(@ready, :r) + mon.value = @ready + + @monitors = [mon] end private @@ -121,37 +128,74 @@ module Puma # will be set to be equal to the amount of time it will take for the next timeout to occur. # This calculation happens in `calculate_sleep`. def run_internal - sockets = @sockets + monitors = @monitors + selector = @selector while true begin - ready = IO.select sockets, nil, nil, @sleep_for + ready = selector.select @sleep_for rescue IOError => e Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - if sockets.any? { |socket| socket.closed? } + if monitors.any? { |mon| mon.value.closed? } STDERR.puts "Error in select: #{e.message} (#{e.class})" STDERR.puts e.backtrace - sockets = sockets.reject { |socket| socket.closed? } + + monitors.reject! do |mon| + if mon.value.closed? + selector.deregister mon.value + true + end + end + retry else raise end end - if ready and reads = ready[0] - reads.each do |c| - if c == @ready + if ready + ready.each do |mon| + if mon.value == @ready @mutex.synchronize do case @ready.read(1) when "*" - sockets += @input + @input.each do |c| + mon = nil + begin + begin + mon = selector.register(c, :r) + rescue ArgumentError + # There is a bug where we seem to be registering an already registered + # client. This code deals with this situation but I wish we didn't have to. + monitors.delete_if { |submon| submon.value.to_io == c.to_io } + selector.deregister(c) + mon = selector.register(c, :r) + end + rescue IOError + # Means that the io is closed, so we should ignore this request + # entirely + else + mon.value = c + @timeouts << mon if c.timeout_at + monitors << mon + end + end @input.clear + + @timeouts.sort! { |a,b| a.value.timeout_at <=> b.value.timeout_at } + calculate_sleep when "c" - sockets.delete_if do |s| - if s == @ready + monitors.reject! do |submon| + if submon.value == @ready false else - s.close + submon.value.close + begin + selector.deregister submon.value + rescue IOError + # nio4r on jruby seems to throw an IOError here if the IO is closed, so + # we need to swallow it. + end true end end @@ -160,19 +204,21 @@ module Puma end end else + c = mon.value + # We have to be sure to remove it from the timeout # list or we'll accidentally close the socket when # it's in use! if c.timeout_at @mutex.synchronize do - @timeouts.delete c + @timeouts.delete mon end end begin if c.try_to_finish @app_pool << c - sockets.delete c + clear_monitor mon end # Don't report these to the lowlevel_error handler, otherwise @@ -182,18 +228,23 @@ module Puma c.write_500 c.close - sockets.delete c + clear_monitor mon # SSL handshake failure rescue MiniSSL::SSLError => e @server.lowlevel_error(e, c.env) ssl_socket = c.io - addr = ssl_socket.peeraddr.last + begin + addr = ssl_socket.peeraddr.last + rescue IOError + addr = "" + end + cert = ssl_socket.peercert c.close - sockets.delete c + clear_monitor mon @events.ssl_error @server, addr, cert, e @@ -204,7 +255,7 @@ module Puma c.write_400 c.close - sockets.delete c + clear_monitor mon @events.parse_error @server, c.env, e rescue StandardError => e @@ -213,7 +264,7 @@ module Puma c.write_500 c.close - sockets.delete c + clear_monitor mon end end end @@ -223,11 +274,13 @@ module Puma @mutex.synchronize do now = Time.now - while @timeouts.first.timeout_at < now - c = @timeouts.shift + while @timeouts.first.value.timeout_at < now + mon = @timeouts.shift + c = mon.value c.write_408 if c.in_data_phase c.close - sockets.delete c + + clear_monitor mon break if @timeouts.empty? end @@ -238,6 +291,11 @@ module Puma end end + def clear_monitor(mon) + @selector.deregister mon.value + @monitors.delete mon + end + public def run @@ -276,7 +334,7 @@ module Puma if @timeouts.empty? @sleep_for = DefaultSleepFor else - diff = @timeouts.first.timeout_at.to_f - Time.now.to_f + diff = @timeouts.first.value.timeout_at.to_f - Time.now.to_f if diff < 0.0 @sleep_for = 0 @@ -315,13 +373,6 @@ module Puma @mutex.synchronize do @input << c @trigger << "*" - - if c.timeout_at - @timeouts << c - @timeouts.sort! { |a,b| a.timeout_at <=> b.timeout_at } - - calculate_sleep - end end end diff --git a/puma.gemspec b/puma.gemspec index e1348d2c..b11ffec0 100644 --- a/puma.gemspec +++ b/puma.gemspec @@ -1,7 +1,5 @@ # -*- encoding: utf-8 -*- -# This is only used when puma is a git dep from Bundler, keep in sync with Rakefile - version = File.read(File.expand_path("../lib/puma/const.rb", __FILE__))[/VERSION = "(\d+\.\d+\.\d+)"/, 1] || raise Gem::Specification.new do |s| @@ -13,6 +11,7 @@ Gem::Specification.new do |s| s.email = ["evan@phx.io"] s.executables = ["puma", "pumactl"] s.extensions = ["ext/puma_http11/extconf.rb"] + s.add_runtime_dependency "nio4r", "~> 2.0" s.metadata["msys2_mingw_dependencies"] = "openssl" s.files = `git ls-files -- bin docs ext lib tools`.split("\n") + %w[History.md LICENSE README.md]