mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Faster phased restart and worker timeout
This commit is contained in:
parent
b16d8cb963
commit
67f9b1f3f2
5 changed files with 71 additions and 44 deletions
|
@ -9,6 +9,7 @@
|
|||
* Add `requests_count` to workers stats. (#2106)
|
||||
* Increases maximum URI path length from 2048 to 8196 bytes (#2167)
|
||||
* Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203)
|
||||
* Faster phased restart and worker timeout (#2121)
|
||||
|
||||
* Deprecations, Removals and Breaking API Changes
|
||||
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)
|
||||
|
|
|
@ -25,9 +25,8 @@ module Puma
|
|||
|
||||
@phase = 0
|
||||
@workers = []
|
||||
@next_check = nil
|
||||
@next_check = Time.now
|
||||
|
||||
@phased_state = :idle
|
||||
@phased_restart = false
|
||||
end
|
||||
|
||||
|
@ -98,8 +97,12 @@ module Puma
|
|||
@last_status = JSON.parse(status, symbolize_names: true)
|
||||
end
|
||||
|
||||
def ping_timeout?(which)
|
||||
Time.now - @last_checkin > which
|
||||
def ping_timeout
|
||||
@last_checkin +
|
||||
(booted? ?
|
||||
@options[:worker_timeout] :
|
||||
@options[:worker_boot_timeout]
|
||||
)
|
||||
end
|
||||
|
||||
def term
|
||||
|
@ -116,8 +119,8 @@ module Puma
|
|||
end
|
||||
|
||||
def kill
|
||||
Process.kill "KILL", @pid
|
||||
rescue Errno::ESRCH
|
||||
@signal = 'KILL'
|
||||
term
|
||||
end
|
||||
|
||||
def hup
|
||||
|
@ -148,10 +151,6 @@ module Puma
|
|||
|
||||
@launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
|
||||
end
|
||||
|
||||
if diff > 0
|
||||
@phased_state = :idle
|
||||
end
|
||||
end
|
||||
|
||||
def cull_workers
|
||||
|
@ -180,26 +179,12 @@ module Puma
|
|||
@workers.count { |w| !w.booted? } == 0
|
||||
end
|
||||
|
||||
def check_workers(force=false)
|
||||
return if !force && @next_check && @next_check >= Time.now
|
||||
def check_workers
|
||||
return if @next_check >= Time.now
|
||||
|
||||
@next_check = Time.now + Const::WORKER_CHECK_INTERVAL
|
||||
|
||||
any = false
|
||||
|
||||
@workers.each do |w|
|
||||
next if !w.booted? && !w.ping_timeout?(@options[:worker_boot_timeout])
|
||||
if w.ping_timeout?(@options[:worker_timeout])
|
||||
log "! Terminating timed out worker: #{w.pid}"
|
||||
w.kill
|
||||
any = true
|
||||
end
|
||||
end
|
||||
|
||||
# If we killed any timed out workers, try to catch them
|
||||
# during this loop by giving the kernel time to kill them.
|
||||
sleep 1 if any
|
||||
|
||||
timeout_workers
|
||||
wait_workers
|
||||
cull_workers
|
||||
spawn_workers
|
||||
|
@ -212,21 +197,23 @@ module Puma
|
|||
w = @workers.find { |x| x.phase != @phase }
|
||||
|
||||
if w
|
||||
if @phased_state == :idle
|
||||
@phased_state = :waiting
|
||||
log "- Stopping #{w.pid} for phased upgrade..."
|
||||
end
|
||||
|
||||
log "- Stopping #{w.pid} for phased upgrade..."
|
||||
unless w.term?
|
||||
w.term
|
||||
log "- #{w.signal} sent to #{w.pid}..."
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@next_check = [
|
||||
@workers.reject(&:term?).map(&:ping_timeout).min,
|
||||
@next_check
|
||||
].compact.min
|
||||
end
|
||||
|
||||
def wakeup!
|
||||
return unless @wakeup
|
||||
@next_check = Time.now
|
||||
|
||||
begin
|
||||
@wakeup.write "!" unless @wakeup.closed?
|
||||
|
@ -483,8 +470,6 @@ module Puma
|
|||
@launcher.events.fire_on_booted!
|
||||
|
||||
begin
|
||||
force_check = false
|
||||
|
||||
while @status == :run
|
||||
begin
|
||||
if @phased_restart
|
||||
|
@ -492,11 +477,9 @@ module Puma
|
|||
@phased_restart = false
|
||||
end
|
||||
|
||||
check_workers force_check
|
||||
check_workers
|
||||
|
||||
force_check = false
|
||||
|
||||
res = IO.select([read], nil, nil, Const::WORKER_CHECK_INTERVAL)
|
||||
res = IO.select([read], nil, nil, [0, @next_check - Time.now].max)
|
||||
|
||||
if res
|
||||
req = read.read_nonblock(1)
|
||||
|
@ -511,13 +494,12 @@ module Puma
|
|||
when "b"
|
||||
w.boot!
|
||||
log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}"
|
||||
force_check = true
|
||||
@next_check = Time.now
|
||||
when "e"
|
||||
# external term, see worker method, Signal.trap "SIGTERM"
|
||||
w.instance_variable_set :@term, true
|
||||
when "t"
|
||||
w.term unless w.term?
|
||||
force_check = true
|
||||
when "p"
|
||||
w.ping!(result.sub(/^\d+/,'').chomp)
|
||||
end
|
||||
|
@ -558,5 +540,14 @@ module Puma
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
def timeout_workers
|
||||
@workers.each do |w|
|
||||
if !w.term? && w.ping_timeout <= Time.now
|
||||
log "! Terminating timed out worker: #{w.pid}"
|
||||
w.kill
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -126,8 +126,6 @@ class TestIntegration < Minitest::Test
|
|||
while pids.size < size
|
||||
if pid = @server.gets[re, 1]
|
||||
pids << pid
|
||||
else
|
||||
sleep 2
|
||||
end
|
||||
end
|
||||
pids.map(&:to_i)
|
||||
|
|
|
@ -133,8 +133,44 @@ class TestIntegrationCluster < TestIntegration
|
|||
worker_respawn { |phase0_worker_pids| Process.kill :USR1, @pid }
|
||||
end
|
||||
|
||||
def test_worker_boot_timeout
|
||||
timeout = 1
|
||||
worker_timeout(timeout, 2, "worker_boot_timeout #{timeout}; on_worker_boot { sleep #{timeout + 1} }")
|
||||
end
|
||||
|
||||
def test_worker_timeout
|
||||
skip 'Thread#name not available' unless Thread.current.respond_to?(:name)
|
||||
timeout = Puma::Const::WORKER_CHECK_INTERVAL + 1
|
||||
worker_timeout(timeout, 1, <<RUBY)
|
||||
worker_timeout #{timeout}
|
||||
on_worker_boot do
|
||||
Thread.new do
|
||||
sleep 1
|
||||
Thread.list.find {|t| t.name == 'puma stat payload'}.kill
|
||||
end
|
||||
end
|
||||
RUBY
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def worker_timeout(timeout, iterations, config)
|
||||
config_file = Tempfile.new(%w(worker_timeout .rb))
|
||||
config_file.write config
|
||||
config_file.close
|
||||
cli_server "-w #{WORKERS} -t 1:1 -C #{config_file.path} test/rackup/hello.ru"
|
||||
|
||||
pids = []
|
||||
Timeout.timeout(iterations * timeout + 1) do
|
||||
while pids.size < WORKERS * iterations
|
||||
(pids << @server.gets[/Terminating timed out worker: (\d+)/, 1]).compact!
|
||||
end
|
||||
pids.map!(&:to_i)
|
||||
end
|
||||
|
||||
assert_equal pids, pids.uniq
|
||||
end
|
||||
|
||||
# Send requests 10 per second. Send 10, then :TERM server, then send another 30.
|
||||
# No more than 10 should throw Errno::ECONNRESET.
|
||||
def term_closes_listeners(unix: false)
|
||||
|
|
|
@ -54,12 +54,13 @@ class TestIntegrationPumactl < TestIntegration
|
|||
|
||||
def test_phased_restart_cluster
|
||||
skip NO_FORK_MSG unless HAS_FORK
|
||||
start = Time.now
|
||||
|
||||
cli_server "-q -w #{WORKERS} test/rackup/sleep.ru --control-url unix://#{@control_path} --control-token #{TOKEN} -S #{@state_path}", unix: true
|
||||
|
||||
s = UNIXSocket.new @bind_path
|
||||
@ios_to_close << s
|
||||
s << "GET /sleep5 HTTP/1.0\r\n\r\n"
|
||||
s << "GET /sleep1 HTTP/1.0\r\n\r\n"
|
||||
|
||||
# Get the PIDs of the phase 0 workers.
|
||||
phase0_worker_pids = get_worker_pids 0
|
||||
|
@ -82,7 +83,7 @@ class TestIntegrationPumactl < TestIntegration
|
|||
|
||||
_, status = Process.wait2(@pid)
|
||||
assert_equal 0, status
|
||||
|
||||
assert_operator Time.now - start, :<, 5
|
||||
@server = nil
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue