1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00

Add hot_restart_does_not_drop_connections tests

Co-authored-by: Chris LaRose <cjlarose@gmail.com>
This commit is contained in:
MSP-Greg 2020-10-10 18:55:11 -05:00
parent 5432a7cfc4
commit 67c686afa6
No known key found for this signature in database
GPG key ID: D688DA4A77D8FA18
5 changed files with 217 additions and 49 deletions

View file

@ -11,7 +11,6 @@ class TestIntegration < Minitest::Test
DARWIN = !!RUBY_PLATFORM[/darwin/]
HOST = "127.0.0.1"
TOKEN = "xxyyzz"
WORKERS = 2
BASE = defined?(Bundler) ? "bundle exec #{Gem.ruby} -Ilib" :
"#{Gem.ruby} -Ilib"
@ -129,7 +128,7 @@ class TestIntegration < Minitest::Test
end
# gets worker pids from @server output
def get_worker_pids(phase = 0, size = WORKERS)
def get_worker_pids(phase = 0, size = workers)
pids = []
re = /pid: (\d+)\) booted, phase: #{phase}/
while pids.size < size
@ -139,4 +138,172 @@ class TestIntegration < Minitest::Test
end
pids.map(&:to_i)
end
# used to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
[Errno::ENOENT, IOError]
else
DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end
def cli_pumactl(argv, unix: false)
if unix
pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r")
else
pumactl = IO.popen("#{BASE} bin/pumactl -C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}", "r")
end
@ios_to_close << pumactl
Process.wait pumactl.pid
pumactl
end
def hot_restart_does_not_drop_connections(num_threads: 1, total_requests: 500)
skipped = true
skip_on :jruby, suffix: <<-MSG
- file descriptors are not preserved on exec on JRuby; connection reset errors are expected during restarts
MSG
skip_on :truffleruby, suffix: ' - Undiagnosed failures on TruffleRuby'
skip "Undiagnosed failures on Ruby 2.2" if RUBY_VERSION < '2.3'
args = "-w #{workers} -t 0:5 -q test/rackup/hello_with_delay.ru"
if Puma.windows?
@control_tcp_port = UniquePort.call
cli_server "#{args} --control-url tcp://#{HOST}:#{@control_tcp_port} --control-token #{TOKEN}"
else
cli_server args
end
skipped = false
replies = Hash.new 0
refused = thread_run_refused unix: false
message = 'A' * 16_256 # 2^14 - 128
mutex = Mutex.new
restart_count = 0
client_threads = []
num_requests = (total_requests/num_threads).to_i
num_threads.times do |thread|
client_threads << Thread.new do
num_requests.times do
begin
socket = TCPSocket.new HOST, @tcp_port
fast_write socket, "POST / HTTP/1.1\r\nContent-Length: #{message.bytesize}\r\n\r\n#{message}"
true until socket.gets == "\r\n"
body = read_body(socket, 10)
if body == "Hello World"
mutex.synchronize {
replies[:success] += 1
replies[:restart] += 1 if restart_count > 0
}
else
mutex.synchronize { replies[:unexpected_response] += 1 }
end
rescue Errno::ECONNRESET, Errno::EBADF
# connection was accepted but then closed
# client would see an empty response
# Errno::EBADF Windows may not be able to make a connection
mutex.synchronize { replies[:reset] += 1 }
rescue *refused
mutex.synchronize { replies[:refused] += 1 }
rescue ::Timeout::Error
mutex.synchronize { replies[:read_timeout] += 1 }
ensure
if socket.is_a?(IO) && !socket.closed?
begin
socket.close
rescue Errno::EBADF
end
end
end
end
# STDOUT.puts "#{thread} #{replies[:success]}"
end
end
run = true
restart_thread = Thread.new do
sleep 0.30 # let some connections in before 1st restart
while run
if Puma.windows?
cli_pumactl 'restart'
else
Process.kill :USR2, @pid
end
wait_for_server_to_boot
restart_count += 1
sleep 1
end
end
client_threads.each(&:join)
run = false
restart_thread.join
if Puma.windows?
cli_pumactl 'stop'
Process.wait @server.pid
@server = nil
end
msg = (" %4d unexpected_response\n" % replies.fetch(:unexpected_response,0)).dup
msg << " %4d refused\n" % replies.fetch(:refused,0)
msg << " %4d read timeout\n" % replies.fetch(:read_timeout,0)
msg << " %4d reset\n" % replies.fetch(:reset,0)
msg << " %4d success\n" % replies.fetch(:success,0)
msg << " %4d success after restart\n" % replies.fetch(:restart,0)
msg << " %4d restart count\n" % restart_count
reset = replies[:reset]
if Puma.windows?
# 5 is default thread count in Puma?
reset_max = num_threads > 1 ? restart_count * 5 : 5
assert_operator reset_max, :>=, reset, "#{msg}Expected reset_max >= reset errors"
else
assert_equal 0, reset, "#{msg}Expected no reset errors"
end
assert_equal 0, replies[:unexpected_response], "#{msg}Unexpected response"
assert_equal 0, replies[:refused], "#{msg}Expected no refused connections"
assert_equal 0, replies[:read_timeout], "#{msg}Expected no read timeouts"
if Puma.windows?
assert_equal (num_threads * num_requests) - reset, replies[:success]
else
assert_equal (num_threads * num_requests), replies[:success]
end
ensure
return if skipped
if passed?
msg = " restart_count #{restart_count}, reset #{reset}, success after restart #{replies[:restart]}"
$debugging_info << "#{full_name}\n#{msg}\n"
else
$debugging_info << "#{full_name}\n#{msg}\n"
end
end
def fast_write(io, str)
n = 0
while true
begin
n = io.syswrite str
rescue Errno::EAGAIN, Errno::EWOULDBLOCK => e
if !IO.select(nil, [io], nil, 5)
raise e
end
retry
rescue Errno::EPIPE, SystemCallError, IOError => e
raise e
end
return if n == str.bytesize
str = str.byteslice(n..-1)
end
end
end

View file

@ -0,0 +1,4 @@
run lambda { |env|
sleep 0.001
[200, {"Content-Type" => "text/plain"}, ["Hello World"]]
}

View file

@ -4,6 +4,8 @@ require_relative "helpers/integration"
class TestIntegrationCluster < TestIntegration
parallelize_me!
def workers ; 2 ; end
def setup
skip NO_FORK_MSG unless HAS_FORK
super
@ -14,12 +16,20 @@ class TestIntegrationCluster < TestIntegration
super
end
def test_hot_restart_does_not_drop_connections_threads
hot_restart_does_not_drop_connections num_threads: 10, total_requests: 3_000
end
def test_hot_restart_does_not_drop_connections
hot_restart_does_not_drop_connections num_threads: 1, total_requests: 1_000
end
def test_pre_existing_unix
skip UNIX_SKT_MSG unless UNIX_SKT_EXIST
File.open(@bind_path, mode: 'wb') { |f| f.puts 'pre existing' }
cli_server "-w #{WORKERS} -q test/rackup/sleep_step.ru", unix: :unix
cli_server "-w #{workers} -q test/rackup/sleep_step.ru", unix: :unix
stop_server
@ -34,7 +44,7 @@ class TestIntegrationCluster < TestIntegration
def test_siginfo_thread_print
skip_unless_signal_exist? :INFO
cli_server "-w #{WORKERS} -q test/rackup/hello.ru"
cli_server "-w #{workers} -q test/rackup/hello.ru"
worker_pids = get_worker_pids
output = []
t = Thread.new { output << @server.readlines }
@ -46,7 +56,7 @@ class TestIntegrationCluster < TestIntegration
end
def test_usr2_restart
_, new_reply = restart_server_and_listen("-q -w #{WORKERS} test/rackup/hello.ru")
_, new_reply = restart_server_and_listen("-q -w #{workers} test/rackup/hello.ru")
assert_equal "Hello World", new_reply
end
@ -84,14 +94,14 @@ class TestIntegrationCluster < TestIntegration
end
def test_term_exit_code
cli_server "-w #{WORKERS} test/rackup/hello.ru"
cli_server "-w #{workers} test/rackup/hello.ru"
_, status = stop_server
assert_equal 15, status
end
def test_term_suppress
cli_server "-w #{WORKERS} -C test/config/suppress_exception.rb test/rackup/hello.ru"
cli_server "-w #{workers} -C test/config/suppress_exception.rb test/rackup/hello.ru"
_, status = stop_server
@ -101,7 +111,7 @@ class TestIntegrationCluster < TestIntegration
def test_term_worker_clean_exit
skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3'
cli_server "-w #{WORKERS} test/rackup/hello.ru"
cli_server "-w #{workers} test/rackup/hello.ru"
# Get the PIDs of the child workers.
worker_pids = get_worker_pids
@ -182,13 +192,13 @@ RUBY
def test_refork
refork = Tempfile.new('refork')
cli_server "-w #{WORKERS} test/rackup/sleep.ru", config: <<RUBY
cli_server "-w #{workers} test/rackup/sleep.ru", config: <<RUBY
fork_worker 1
on_refork {File.write('#{refork.path}', 'Reforked')}
RUBY
pids = get_worker_pids
read_body(connect('sleep1')) until refork.read == 'Reforked'
refute_includes pids, get_worker_pids(1, WORKERS - 1)
refute_includes pids, get_worker_pids(1, workers - 1)
end
def test_fork_worker_spawn
@ -206,7 +216,7 @@ RUBY
end
def test_nakayoshi
cli_server "-w #{WORKERS} test/rackup/hello.ru", config: <<RUBY
cli_server "-w #{workers} test/rackup/hello.ru", config: <<RUBY
nakayoshi_fork true
RUBY
@ -229,7 +239,7 @@ RUBY
end
def test_load_path_includes_extra_deps
cli_server "-w #{WORKERS} -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru"
cli_server "-w #{workers} -C test/config/prune_bundler_with_deps.rb test/rackup/hello-last-load-path.ru"
last_load_path = read_body(connect)
assert_match(%r{gems/rdoc-[\d.]+/lib$}, last_load_path)
@ -238,11 +248,11 @@ RUBY
private
def worker_timeout(timeout, iterations, config)
cli_server "-w #{WORKERS} -t 1:1 test/rackup/hello.ru", config: config
cli_server "-w #{workers} -t 1:1 test/rackup/hello.ru", config: config
pids = []
Timeout.timeout(iterations * timeout + 1) do
(pids << @server.gets[/Terminating timed out worker: (\d+)/, 1]).compact! while pids.size < WORKERS * iterations
(pids << @server.gets[/Terminating timed out worker: (\d+)/, 1]).compact! while pids.size < workers * iterations
pids.map!(&:to_i)
end
@ -254,7 +264,7 @@ RUBY
def term_closes_listeners(unix: false)
skip_unless_signal_exist? :TERM
cli_server "-w #{WORKERS} -t 0:6 -q test/rackup/sleep_step.ru", unix: unix
cli_server "-w #{workers} -t 0:6 -q test/rackup/sleep_step.ru", unix: unix
threads = []
replies = []
mutex = Mutex.new
@ -318,7 +328,7 @@ RUBY
# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def usr1_all_respond(unix: false, config: '')
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
cli_server "-w #{workers} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
threads = []
replies = []
mutex = Mutex.new
@ -361,10 +371,10 @@ RUBY
end
end
def worker_respawn(phase = 1, size = WORKERS)
def worker_respawn(phase = 1, size = workers)
threads = []
cli_server "-w #{WORKERS} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru"
cli_server "-w #{workers} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru"
# make sure two workers have booted
phase0_worker_pids = get_worker_pids
@ -397,9 +407,9 @@ RUBY
assert_operator (Time.now.to_f - @start_time).round(2), :<, 35
msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}"
assert_equal WORKERS, phase0_worker_pids.length, msg
assert_equal workers, phase0_worker_pids.length, msg
assert_equal WORKERS, phase1_worker_pids.length, msg
assert_equal workers, phase1_worker_pids.length, msg
assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new"
assert_empty phase0_exited, msg
@ -421,16 +431,6 @@ RUBY
end.compact
end
# used with thread_run to define correct 'refused' errors
def thread_run_refused(unix: false)
if unix
[Errno::ENOENT, IOError]
else
DARWIN ? [Errno::ECONNREFUSED, Errno::EPIPE, EOFError] :
[Errno::ECONNREFUSED]
end
end
# used in loop to create several 'requests'
def thread_run_pid(replies, delay, sleep_time, mutex, refused, unix: false)
begin
@ -466,6 +466,4 @@ RUBY
mutex.synchronize { replies[step] = :refused }
end
end
end
end if ::Process.respond_to?(:fork)

View file

@ -5,6 +5,8 @@ class TestIntegrationPumactl < TestIntegration
include TmpPath
parallelize_me!
def workers ; 2 ; end
def setup
super
@ -58,7 +60,7 @@ class TestIntegrationPumactl < TestIntegration
def test_phased_restart_cluster
skip NO_FORK_MSG unless HAS_FORK
cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true
cli_server "-q -w #{workers} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true
start = Time.now
@ -78,8 +80,8 @@ class TestIntegrationPumactl < TestIntegration
msg = "phase 0 pids #{phase0_worker_pids.inspect} phase 1 pids #{phase1_worker_pids.inspect}"
assert_equal WORKERS, phase0_worker_pids.length, msg
assert_equal WORKERS, phase1_worker_pids.length, msg
assert_equal workers, phase0_worker_pids.length, msg
assert_equal workers, phase1_worker_pids.length, msg
assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new"
assert File.exist?(@bind_path), "Bind path must exist after phased restart"
@ -130,17 +132,4 @@ class TestIntegrationPumactl < TestIntegration
assert_match(/No pid '\d+' found|bad URI\(is not URI\?\)/, sout.readlines.join(""))
assert_equal(1, e.status)
end
private
def cli_pumactl(argv, unix: false)
if unix
pumactl = IO.popen("#{BASE} bin/pumactl -C unix://#{@control_path} -T #{TOKEN} #{argv}", "r")
else
pumactl = IO.popen("#{BASE} bin/pumactl -C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}", "r")
end
@ios_to_close << pumactl
Process.wait pumactl.pid
pumactl
end
end

View file

@ -4,6 +4,16 @@ require_relative "helpers/integration"
class TestIntegrationSingle < TestIntegration
parallelize_me!
def workers ; 0 ; end
def test_hot_restart_does_not_drop_connections_threads
hot_restart_does_not_drop_connections num_threads: 5, total_requests: 1_000
end
def test_hot_restart_does_not_drop_connections
hot_restart_does_not_drop_connections
end
def test_usr2_restart
skip_unless_signal_exist? :USR2
_, new_reply = restart_server_and_listen("-q test/rackup/hello.ru")