require_relative "helper" require_relative "helpers/integration" class TestIntegrationCluster < TestIntegration parallelize_me! if ::Puma.mri? def workers ; 2 ; end def setup skip NO_FORK_MSG unless HAS_FORK super end def teardown return if skipped? super end def test_hot_restart_does_not_drop_connections_threads hot_restart_does_not_drop_connections num_threads: 10, total_requests: 3_000 end def test_hot_restart_does_not_drop_connections hot_restart_does_not_drop_connections num_threads: 1, total_requests: 1_000 end def test_pre_existing_unix skip UNIX_SKT_MSG unless UNIX_SKT_EXIST File.open(@bind_path, mode: 'wb') { |f| f.puts 'pre existing' } cli_server "-w #{workers} -q test/rackup/sleep_step.ru", unix: :unix stop_server assert File.exist?(@bind_path) ensure if UNIX_SKT_EXIST File.unlink @bind_path if File.exist? @bind_path end end def test_siginfo_thread_print skip_unless_signal_exist? :INFO cli_server "-w #{workers} -q test/rackup/hello.ru" worker_pids = get_worker_pids output = [] t = Thread.new { output << @server.readlines } Process.kill :INFO, worker_pids.first Process.kill :INT , @pid t.join assert_match "Thread: TID", output.join end def test_usr2_restart _, new_reply = restart_server_and_listen("-q -w #{workers} test/rackup/hello.ru") assert_equal "Hello World", new_reply end # Next two tests, one tcp, one unix # Send requests 10 per second. Send 10, then :TERM server, then send another 30. # No more than 10 should throw Errno::ECONNRESET. def test_term_closes_listeners_tcp skip_unless_signal_exist? :TERM term_closes_listeners unix: false end def test_term_closes_listeners_unix skip_unless_signal_exist? :TERM term_closes_listeners unix: true end # Next two tests, one tcp, one unix # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. # All should be responded to, and at least three workers should be used def test_usr1_all_respond_tcp skip_unless_signal_exist? :USR1 usr1_all_respond unix: false end def test_usr1_fork_worker skip_unless_signal_exist? :USR1 usr1_all_respond config: '--fork-worker' end def test_usr1_all_respond_unix skip_unless_signal_exist? :USR1 usr1_all_respond unix: true end def test_term_exit_code cli_server "-w #{workers} test/rackup/hello.ru" _, status = stop_server assert_equal 15, status end def test_term_suppress cli_server "-w #{workers} -C test/config/suppress_exception.rb test/rackup/hello.ru" _, status = stop_server assert_equal 0, status end def test_term_worker_clean_exit skip "Intermittent failure on Ruby 2.2" if RUBY_VERSION < '2.3' cli_server "-w #{workers} test/rackup/hello.ru" # Get the PIDs of the child workers. worker_pids = get_worker_pids # Signal the workers to terminate, and wait for them to die. Process.kill :TERM, @pid Process.wait @pid zombies = bad_exit_pids worker_pids assert_empty zombies, "Process ids #{zombies} became zombies" end # mimicking stuck workers, test respawn with external TERM def test_stuck_external_term_spawn skip_unless_signal_exist? :TERM worker_respawn(0) do |phase0_worker_pids| last = phase0_worker_pids.last # test is tricky if only one worker is TERM'd, so kill all but # spread out, so all aren't killed at once phase0_worker_pids.each do |pid| Process.kill :TERM, pid sleep 4 unless pid == last end end end # mimicking stuck workers, test restart def test_stuck_phased_restart skip_unless_signal_exist? :USR1 worker_respawn { |phase0_worker_pids| Process.kill :USR1, @pid } end def test_worker_boot_timeout timeout = 1 worker_timeout(timeout, 2, "worker_boot_timeout #{timeout}; on_worker_boot { sleep #{timeout + 1} }") end def test_worker_timeout skip 'Thread#name not available' unless Thread.current.respond_to?(:name) timeout = Puma::Const::WORKER_CHECK_INTERVAL + 1 worker_timeout(timeout, 1, <'/dev/null') sleep 0.01 exitstatus = Process.detach(pid).value.exitstatus [200, {}, [exitstatus.to_s]] end RUBY assert_equal '0', read_body(connect) end def test_nakayoshi cli_server "-w #{workers} test/rackup/hello.ru", config: <=, resets , msg assert_operator 20, :<=, refused , msg # Interleaved asserts # UNIX binders do not generate :reset items if l_reset assert_operator r_success, :<, l_reset , "Interleaved success and reset" assert_operator r_reset , :<, l_refused, "Interleaved reset and refused" else assert_operator r_success, :<, l_refused, "Interleaved success and refused" end ensure if passed? $debugging_info << "#{full_name}\n #{msg}\n" else $debugging_info << "#{full_name}\n #{msg}\n#{replies.inspect}\n" end end # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. # All should be responded to, and at least three workers should be used def usr1_all_respond(unix: false, config: '') cli_server "-w #{workers} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix threads = [] replies = [] mutex = Mutex.new s = connect "sleep1", unix: unix replies << read_body(s) Process.kill :USR1, @pid refused = thread_run_refused unix: unix 24.times do |delay| threads << Thread.new do thread_run_pid replies, delay, 1, mutex, refused, unix: unix end end threads.each(&:join) responses = replies.count { |r| r[/\ASlept 1/] } resets = replies.count { |r| r == :reset } refused = replies.count { |r| r == :refused } # get pids from replies, generate uniq array qty_pids = replies.map { |body| body[/\d+\z/] }.uniq.compact.length msg = "#{responses} responses, #{qty_pids} uniq pids" assert_equal 25, responses, msg assert_operator qty_pids, :>, 2, msg msg = "#{responses} responses, #{resets} resets, #{refused} refused" refute_includes replies, :refused, msg refute_includes replies, :reset , msg ensure unless passed? $debugging_info << "#{full_name}\n #{msg}\n#{replies.inspect}\n" end end def worker_respawn(phase = 1, size = workers) threads = [] cli_server "-w #{workers} -t 1:1 -C test/config/worker_shutdown_timeout_2.rb test/rackup/sleep_pid.ru" # make sure two workers have booted phase0_worker_pids = get_worker_pids [35, 40].each do |sleep_time| threads << Thread.new do begin connect "sleep#{sleep_time}" # stuck connections will raise IOError or Errno::ECONNRESET # when shutdown rescue IOError, Errno::ECONNRESET end end end @start_time = Time.now.to_f # below should 'cancel' the phase 0 workers, either via phased_restart or # externally TERM'ing them yield phase0_worker_pids # wait for new workers to boot phase1_worker_pids = get_worker_pids phase # should be empty if all phase 0 workers cleanly exited phase0_exited = bad_exit_pids phase0_worker_pids # Since 35 is the shorter of the two requests, server should restart # and cancel both requests assert_operator (Time.now.to_f - @start_time).round(2), :<, 35 msg = "phase0_worker_pids #{phase0_worker_pids.inspect} phase1_worker_pids #{phase1_worker_pids.inspect} phase0_exited #{phase0_exited.inspect}" assert_equal workers, phase0_worker_pids.length, msg assert_equal workers, phase1_worker_pids.length, msg assert_empty phase0_worker_pids & phase1_worker_pids, "#{msg}\nBoth workers should be replaced with new" assert_empty phase0_exited, msg threads.each { |th| Thread.kill th } end # Returns an array of pids still in the process table, so it should # be empty for a clean exit. # Process.kill should raise the Errno::ESRCH exception, indicating the # process is dead and has been reaped. def bad_exit_pids(pids) pids.map do |pid| begin pid if Process.kill 0, pid rescue Errno::ESRCH nil end end.compact end # used in loop to create several 'requests' def thread_run_pid(replies, delay, sleep_time, mutex, refused, unix: false) begin sleep delay s = connect "sleep#{sleep_time}", unix: unix body = read_body(s, 20) mutex.synchronize { replies << body } rescue Errno::ECONNRESET # connection was accepted but then closed # client would see an empty response mutex.synchronize { replies << :reset } rescue *refused mutex.synchronize { replies << :refused } end end # used in loop to create several 'requests' def thread_run_step(replies, delay, sleep_time, step, mutex, refused, unix: false) begin sleep delay s = connect "sleep#{sleep_time}-#{step}", unix: unix body = read_body(s) if body[/\ASlept /] mutex.synchronize { replies[step] = :success } else mutex.synchronize { replies[step] = :failure } end rescue Errno::ECONNRESET # connection was accepted but then closed # client would see an empty response mutex.synchronize { replies[step] = :reset } rescue *refused mutex.synchronize { replies[step] = :refused } end end end if ::Process.respond_to?(:fork)