mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Fixes Cluster worker shutdown/restart (#1908)
* cluster.rb - fixup worker wait in Cluster, add Worker#term? 1. Cluster - fix wait in #check_workers, #stop_workers 2. Cluster - add private wait_workers method for use in above 2. Worker - add #term? method for use in above * Adds two tests for worker SIGTERM/respawn and phased-restart test_worker_spawn_external_term - sends SIGTERM to workers, checks respawn, etc test_worker_phased_restart - checking worker handling during phased-restart
This commit is contained in:
parent
d5c394ec7f
commit
f759017e35
2 changed files with 137 additions and 0 deletions
|
@ -74,6 +74,10 @@ module Puma
|
|||
@started_at = Time.now
|
||||
@last_checkin = Time.now
|
||||
@last_status = '{}'
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
@dead = false
|
||||
>>>>>>> Fixes Cluster worker shutdown/restart (#1908)
|
||||
@term = false
|
||||
end
|
||||
|
||||
|
@ -92,6 +96,10 @@ module Puma
|
|||
@term
|
||||
end
|
||||
|
||||
def term?
|
||||
@term
|
||||
end
|
||||
|
||||
def ping!(status)
|
||||
@last_checkin = Time.now
|
||||
@last_status = status
|
||||
|
|
|
@ -9,8 +9,12 @@ require "open3"
|
|||
# TODO: remove stdout logging, get everything out of my rainbow dots
|
||||
|
||||
class TestIntegration < Minitest::Test
|
||||
<<<<<<< HEAD
|
||||
HOST = "127.0.0.1"
|
||||
TOKEN = "xxyyzz"
|
||||
=======
|
||||
HOST = '127.0.0.1'
|
||||
>>>>>>> Fixes Cluster worker shutdown/restart (#1908)
|
||||
|
||||
def setup
|
||||
@state_path = "test/test_#{name}_puma.state"
|
||||
|
@ -533,4 +537,129 @@ class TestIntegration < Minitest::Test
|
|||
|
||||
[thr, launcher, @events]
|
||||
end
|
||||
|
||||
def test_worker_spawn_external_term
|
||||
worker_respawn { |l, old_pids|
|
||||
old_pids.each { |p| Process.kill :TERM, p }
|
||||
}
|
||||
end
|
||||
|
||||
def test_worker_phased_restart
|
||||
worker_respawn { |l, old_pids| l.phased_restart }
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def worker_respawn
|
||||
skip NO_FORK_MSG unless HAS_FORK
|
||||
port = UniquePort.call
|
||||
workers_booted = 0
|
||||
|
||||
conf = Puma::Configuration.new do |c|
|
||||
c.bind "tcp://#{HOST}:#{port}"
|
||||
c.threads 1, 1
|
||||
c.workers 2
|
||||
c.worker_shutdown_timeout 2
|
||||
c.app TestApps::SLEEP
|
||||
c.after_worker_fork { |idx| workers_booted += 1 }
|
||||
end
|
||||
|
||||
# start Puma via launcher
|
||||
thr, launcher, _e = run_launcher conf
|
||||
|
||||
# make sure two workers have booted
|
||||
time = 0
|
||||
until workers_booted >= 2 || time >= 10
|
||||
sleep 2
|
||||
time += 2
|
||||
end
|
||||
|
||||
cluster = launcher.instance_variable_get :@runner
|
||||
|
||||
http0 = Net::HTTP.new HOST, port
|
||||
http1 = Net::HTTP.new HOST, port
|
||||
body0 = nil
|
||||
body1 = nil
|
||||
|
||||
worker0 = Thread.new do
|
||||
begin
|
||||
req0 = Net::HTTP::Get.new "/sleep35", {}
|
||||
http0.start.request(req0) { |rep0| body0 = rep0.body }
|
||||
rescue
|
||||
end
|
||||
end
|
||||
|
||||
worker1 = Thread.new do
|
||||
begin
|
||||
req1 = Net::HTTP::Get.new "/sleep40", {}
|
||||
http1.start.request(req1) { |rep1| body1 = rep1.body }
|
||||
rescue
|
||||
end
|
||||
end
|
||||
|
||||
old_pids = cluster.instance_variable_get(:@workers).map(&:pid)
|
||||
|
||||
start_time = Time.now.to_f
|
||||
|
||||
# below should 'cancel' the phase 0 workers, either via phased_restart or
|
||||
# externally SIGTERM'ing them
|
||||
yield launcher, old_pids
|
||||
|
||||
# make sure four workers have booted
|
||||
time = 0
|
||||
until workers_booted >= 4 || time >= 45
|
||||
sleep 2
|
||||
time += 2
|
||||
end
|
||||
|
||||
new_pids = cluster.instance_variable_get(:@workers).map(&:pid)
|
||||
|
||||
# should be empty if all old workers removed
|
||||
old_waited = old_pids.map { |pid|
|
||||
begin
|
||||
Process.wait(pid, Process::WNOHANG)
|
||||
pid
|
||||
rescue Errno::ECHILD
|
||||
nil # child is already terminated
|
||||
end
|
||||
}.compact
|
||||
|
||||
Thread.kill worker0
|
||||
Thread.kill worker1
|
||||
|
||||
launcher.stop
|
||||
assert_kind_of Thread, thr.join, "server didn't stop"
|
||||
|
||||
refute_equal 'Slept 35', body0
|
||||
refute_equal 'Slept 40', body1
|
||||
|
||||
# Since 35 is the shorter of the two requests, server should restart
|
||||
# and cancel both requests
|
||||
assert_operator (Time.now.to_f - start_time).round(2), :<, 35
|
||||
|
||||
msg = "old_pids #{old_pids.inspect} new_pids #{new_pids.inspect} old_waited #{old_waited.inspect}"
|
||||
assert_equal 2, new_pids.length, msg
|
||||
assert_equal 2, old_pids.length, msg
|
||||
assert_empty new_pids & old_pids, "#{msg}\nBoth workers should be replaced with new"
|
||||
assert_empty old_waited, msg
|
||||
end
|
||||
|
||||
def run_launcher(conf)
|
||||
# below for future PR
|
||||
#@wait, @ready = IO.pipe
|
||||
# @ios_to_close << @wait << @ready
|
||||
#@events = Puma::Events.strings
|
||||
#@events.on_booted { @ready << "!" }
|
||||
|
||||
launcher = Puma::Launcher.new conf, :events => @events
|
||||
|
||||
thr = Thread.new do
|
||||
launcher.run
|
||||
end
|
||||
|
||||
# wait for boot from #@events.on_booted
|
||||
@wait.sysread 1
|
||||
|
||||
[thr, launcher, @events]
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue