mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Switch IO reactor to nio4r
This moves away from IO.select to using nio4r to allow the reactor to scale beyond 1024 active clients. This happens when folks are using websockets usually.
This commit is contained in:
parent
cee9a8e553
commit
e83a4954e4
3 changed files with 49 additions and 20 deletions
1
Gemfile
1
Gemfile
|
@ -5,6 +5,7 @@ gemspec
|
||||||
gem "rdoc"
|
gem "rdoc"
|
||||||
gem "rake-compiler"
|
gem "rake-compiler"
|
||||||
|
|
||||||
|
gem "nio4r", "~> 2.0"
|
||||||
gem "rack", "< 3.0"
|
gem "rack", "< 3.0"
|
||||||
gem "minitest", "~> 5.11"
|
gem "minitest", "~> 5.11"
|
||||||
gem "minitest-retry"
|
gem "minitest-retry"
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
require 'puma/util'
|
require 'puma/util'
|
||||||
require 'puma/minissl'
|
require 'puma/minissl'
|
||||||
|
|
||||||
|
require 'nio'
|
||||||
|
|
||||||
module Puma
|
module Puma
|
||||||
# Internal Docs, Not a public interface.
|
# Internal Docs, Not a public interface.
|
||||||
#
|
#
|
||||||
|
@ -49,6 +51,8 @@ module Puma
|
||||||
@events = server.events
|
@events = server.events
|
||||||
@app_pool = app_pool
|
@app_pool = app_pool
|
||||||
|
|
||||||
|
@selector = NIO::Selector.new
|
||||||
|
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
|
|
||||||
# Read / Write pipes to wake up internal while loop
|
# Read / Write pipes to wake up internal while loop
|
||||||
|
@ -57,7 +61,10 @@ module Puma
|
||||||
@sleep_for = DefaultSleepFor
|
@sleep_for = DefaultSleepFor
|
||||||
@timeouts = []
|
@timeouts = []
|
||||||
|
|
||||||
@sockets = [@ready]
|
mon = @selector.register(@ready, :r)
|
||||||
|
mon.value = :wakeup
|
||||||
|
|
||||||
|
@sockets = [mon]
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -122,36 +129,48 @@ module Puma
|
||||||
# This calculation happens in `calculate_sleep`.
|
# This calculation happens in `calculate_sleep`.
|
||||||
def run_internal
|
def run_internal
|
||||||
sockets = @sockets
|
sockets = @sockets
|
||||||
|
selector = @selector
|
||||||
|
|
||||||
while true
|
while true
|
||||||
begin
|
begin
|
||||||
ready = IO.select sockets, nil, nil, @sleep_for
|
ready = selector.select @sleep_for
|
||||||
rescue IOError => e
|
rescue IOError => e
|
||||||
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
|
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
|
||||||
if sockets.any? { |socket| socket.closed? }
|
if sockets.any? { |socket| m.value.closed? }
|
||||||
STDERR.puts "Error in select: #{e.message} (#{e.class})"
|
STDERR.puts "Error in select: #{e.message} (#{e.class})"
|
||||||
STDERR.puts e.backtrace
|
STDERR.puts e.backtrace
|
||||||
sockets = sockets.reject { |socket| socket.closed? }
|
sockets = sockets.reject do |socket|
|
||||||
|
if m.value.closed?
|
||||||
|
selector.deregister(socket)
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
retry
|
retry
|
||||||
else
|
else
|
||||||
raise
|
raise
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
if ready and reads = ready[0]
|
if ready
|
||||||
reads.each do |c|
|
ready.each do |m|
|
||||||
if c == @ready
|
if m.value == :wakeup
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
case @ready.read(1)
|
case @ready.read(1)
|
||||||
when "*"
|
when "*"
|
||||||
sockets += @input
|
sockets += @input.map { |i|
|
||||||
|
mon = selector.register(i, :r)
|
||||||
|
mon.value = i
|
||||||
|
mon
|
||||||
|
}
|
||||||
@input.clear
|
@input.clear
|
||||||
when "c"
|
when "c"
|
||||||
sockets.delete_if do |s|
|
sockets.delete_if do |sm|
|
||||||
if s == @ready
|
if sm.value == :wakeup
|
||||||
false
|
false
|
||||||
else
|
else
|
||||||
s.close
|
sm.value.close
|
||||||
|
selector.deregister sm
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -160,6 +179,8 @@ module Puma
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
|
c = m.value
|
||||||
|
|
||||||
# We have to be sure to remove it from the timeout
|
# We have to be sure to remove it from the timeout
|
||||||
# list or we'll accidentally close the socket when
|
# list or we'll accidentally close the socket when
|
||||||
# it's in use!
|
# it's in use!
|
||||||
|
@ -172,7 +193,8 @@ module Puma
|
||||||
begin
|
begin
|
||||||
if c.try_to_finish
|
if c.try_to_finish
|
||||||
@app_pool << c
|
@app_pool << c
|
||||||
sockets.delete c
|
selector.deregister m
|
||||||
|
sockets.delete m
|
||||||
end
|
end
|
||||||
|
|
||||||
# Don't report these to the lowlevel_error handler, otherwise
|
# Don't report these to the lowlevel_error handler, otherwise
|
||||||
|
@ -182,7 +204,8 @@ module Puma
|
||||||
c.write_500
|
c.write_500
|
||||||
c.close
|
c.close
|
||||||
|
|
||||||
sockets.delete c
|
selector.deregister m
|
||||||
|
sockets.delete m
|
||||||
|
|
||||||
# SSL handshake failure
|
# SSL handshake failure
|
||||||
rescue MiniSSL::SSLError => e
|
rescue MiniSSL::SSLError => e
|
||||||
|
@ -193,7 +216,8 @@ module Puma
|
||||||
cert = ssl_socket.peercert
|
cert = ssl_socket.peercert
|
||||||
|
|
||||||
c.close
|
c.close
|
||||||
sockets.delete c
|
selector.deregister m
|
||||||
|
sockets.delete m
|
||||||
|
|
||||||
@events.ssl_error @server, addr, cert, e
|
@events.ssl_error @server, addr, cert, e
|
||||||
|
|
||||||
|
@ -204,7 +228,8 @@ module Puma
|
||||||
c.write_400
|
c.write_400
|
||||||
c.close
|
c.close
|
||||||
|
|
||||||
sockets.delete c
|
selector.deregister m
|
||||||
|
sockets.delete m
|
||||||
|
|
||||||
@events.parse_error @server, c.env, e
|
@events.parse_error @server, c.env, e
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
|
@ -213,7 +238,8 @@ module Puma
|
||||||
c.write_500
|
c.write_500
|
||||||
c.close
|
c.close
|
||||||
|
|
||||||
sockets.delete c
|
selector.deregister m
|
||||||
|
sockets.delete m
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -224,10 +250,13 @@ module Puma
|
||||||
now = Time.now
|
now = Time.now
|
||||||
|
|
||||||
while @timeouts.first.timeout_at < now
|
while @timeouts.first.timeout_at < now
|
||||||
c = @timeouts.shift
|
m = @timeouts.shift
|
||||||
|
c = m.value
|
||||||
c.write_408 if c.in_data_phase
|
c.write_408 if c.in_data_phase
|
||||||
c.close
|
c.close
|
||||||
sockets.delete c
|
|
||||||
|
selector.deregister m
|
||||||
|
sockets.delete m
|
||||||
|
|
||||||
break if @timeouts.empty?
|
break if @timeouts.empty?
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
# -*- encoding: utf-8 -*-
|
# -*- encoding: utf-8 -*-
|
||||||
|
|
||||||
# This is only used when puma is a git dep from Bundler, keep in sync with Rakefile
|
|
||||||
|
|
||||||
version = File.read(File.expand_path("../lib/puma/const.rb", __FILE__))[/VERSION = "(\d+\.\d+\.\d+)"/, 1] || raise
|
version = File.read(File.expand_path("../lib/puma/const.rb", __FILE__))[/VERSION = "(\d+\.\d+\.\d+)"/, 1] || raise
|
||||||
|
|
||||||
Gem::Specification.new do |s|
|
Gem::Specification.new do |s|
|
||||||
|
@ -13,6 +11,7 @@ Gem::Specification.new do |s|
|
||||||
s.email = ["evan@phx.io"]
|
s.email = ["evan@phx.io"]
|
||||||
s.executables = ["puma", "pumactl"]
|
s.executables = ["puma", "pumactl"]
|
||||||
s.extensions = ["ext/puma_http11/extconf.rb"]
|
s.extensions = ["ext/puma_http11/extconf.rb"]
|
||||||
|
s.add_runtime_dependency "nio4r", "~> 2.0"
|
||||||
s.metadata["msys2_mingw_dependencies"] = "openssl"
|
s.metadata["msys2_mingw_dependencies"] = "openssl"
|
||||||
s.files = `git ls-files -- bin docs ext lib tools`.split("\n") +
|
s.files = `git ls-files -- bin docs ext lib tools`.split("\n") +
|
||||||
%w[History.md LICENSE README.md]
|
%w[History.md LICENSE README.md]
|
||||||
|
|
Loading…
Reference in a new issue