1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/cluster.rb

551 lines
14 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
require 'puma/runner'
require 'puma/util'
require 'puma/plugin'
require 'time'
module Puma
2018-05-01 16:42:12 -04:00
# This class is instantiated by the `Puma::Launcher` and used
# to boot and serve a Ruby application when puma "workers" are needed
# i.e. when using multi-processes. For example `$ puma -w 5`
#
# At the core of this class is running an instance of `Puma::Server` which
# gets created via the `start_server` method from the `Puma::Runner` class
# that this inherits from.
#
# An instance of this class will spawn the number of processes passed in
# via the `spawn_workers` method call. Each worker will have it's own
# instance of a `Puma::Server`.
class Cluster < Runner
def initialize(cli, events)
super cli, events
@phase = 0
@workers = []
@next_check = nil
@phased_state = :idle
2013-07-06 00:13:29 -04:00
@phased_restart = false
end
def stop_workers
log "- Gracefully shutting down workers..."
@workers.each { |x| x.term }
begin
loop do
wait_workers
break if @workers.empty?
sleep 0.2
end
rescue Interrupt
log "! Cancelled waiting for workers"
end
end
def start_phased_restart
@phase += 1
log "- Starting phased worker restart, phase: #{@phase}"
# Be sure to change the directory again before loading
# the app. This way we can pick up new code.
dir = @launcher.restart_dir
log "+ Changing to #{dir}"
Dir.chdir dir
end
2014-07-27 10:01:05 -04:00
def redirect_io
super
@workers.each { |x| x.hup }
end
class Worker
2015-01-15 07:58:16 -05:00
def initialize(idx, pid, phase, options)
@index = idx
@pid = pid
@phase = phase
@stage = :started
@signal = "TERM"
2015-01-15 07:58:16 -05:00
@options = options
@first_term_sent = nil
@started_at = Time.now
@last_checkin = Time.now
@last_status = '{}'
@term = false
end
attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
def booted?
@stage == :booted
end
def boot!
@last_checkin = Time.now
@stage = :booted
end
def term?
@term
end
def term?
@term
end
def ping!(status)
@last_checkin = Time.now
@last_status = status
end
def ping_timeout?(which)
Time.now - @last_checkin > which
end
def term
begin
2016-02-19 20:05:45 -05:00
if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout]
@signal = "KILL"
else
@term ||= true
2016-02-19 20:05:45 -05:00
@first_term_sent ||= Time.now
end
Process.kill @signal, @pid
rescue Errno::ESRCH
end
end
def kill
Process.kill "KILL", @pid
rescue Errno::ESRCH
end
2014-07-27 10:01:05 -04:00
def hup
Process.kill "HUP", @pid
rescue Errno::ESRCH
end
end
def spawn_workers
diff = @options[:workers] - @workers.size
return if diff < 1
2013-07-06 00:13:29 -04:00
master = Process.pid
diff.times do
idx = next_worker_index
2016-02-07 17:51:54 -05:00
@launcher.config.run_hooks :before_worker_fork, idx
pid = fork { worker(idx, master) }
2016-04-07 14:41:53 -04:00
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
end
debug "Spawned worker: #{pid}"
2015-01-15 07:58:16 -05:00
@workers << Worker.new(idx, pid, @phase, @options)
2016-02-07 17:51:54 -05:00
@launcher.config.run_hooks :after_worker_fork, idx
end
if diff > 0
@phased_state = :idle
end
end
def cull_workers
diff = @workers.size - @options[:workers]
return if diff < 1
debug "Culling #{diff.inspect} workers"
workers_to_cull = @workers[-diff,diff]
debug "Workers to cull: #{workers_to_cull.inspect}"
workers_to_cull.each do |worker|
log "- Worker #{worker.index} (pid: #{worker.pid}) terminating"
worker.term
end
end
def next_worker_index
2014-07-27 10:01:05 -04:00
all_positions = 0...@options[:workers]
occupied_positions = @workers.map { |w| w.index }
2014-07-27 10:01:05 -04:00
available_positions = all_positions.to_a - occupied_positions
available_positions.first
end
def all_workers_booted?
@workers.count { |w| !w.booted? } == 0
end
def check_workers(force=false)
return if !force && @next_check && @next_check >= Time.now
@next_check = Time.now + Const::WORKER_CHECK_INTERVAL
any = false
@workers.each do |w|
next if !w.booted? && !w.ping_timeout?(@options[:worker_boot_timeout])
if w.ping_timeout?(@options[:worker_timeout])
log "! Terminating timed out worker: #{w.pid}"
w.kill
any = true
end
end
# If we killed any timed out workers, try to catch them
# during this loop by giving the kernel time to kill them.
sleep 1 if any
wait_workers
cull_workers
spawn_workers
if all_workers_booted?
# If we're running at proper capacity, check to see if
# we need to phase any workers out (which will restart
# in the right phase).
#
w = @workers.find { |x| x.phase != @phase }
if w
if @phased_state == :idle
@phased_state = :waiting
log "- Stopping #{w.pid} for phased upgrade..."
end
w.term
log "- #{w.signal} sent to #{w.pid}..."
end
end
end
def wakeup!
return unless @wakeup
begin
@wakeup.write "!" unless @wakeup.closed?
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
def worker(index, master)
title = "puma: cluster worker #{index}: #{master}"
title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
$0 = title
Signal.trap "SIGINT", "IGNORE"
2014-07-27 10:01:05 -04:00
@workers = []
@master_read.close
@suicide_pipe.close
Thread.new do
IO.select [@check_pipe]
log "! Detected parent died, dying"
exit! 1
end
# If we're not running under a Bundler context, then
# report the info about the context we will be using
if !ENV['BUNDLE_GEMFILE']
if File.exist?("Gemfile")
log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
elsif File.exist?("gems.rb")
log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
end
end
# Invoke any worker boot hooks so they can get
# things in shape before booting the app.
2016-02-07 17:51:54 -05:00
@launcher.config.run_hooks :before_worker_boot, index
2013-07-06 00:13:29 -04:00
server = start_server
Signal.trap "SIGTERM" do
server.stop
end
begin
@worker_write << "b#{Process.pid}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
2015-10-17 13:31:55 -04:00
STDERR.puts "Master seems to have exited, exiting."
return
end
Thread.new(@worker_write) do |io|
base_payload = "p#{Process.pid}"
while true
sleep Const::WORKER_CHECK_INTERVAL
begin
b = server.backlog || 0
r = server.running || 0
t = server.pool_capacity || 0
m = server.max_threads || 0
payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m} }\n!
io << payload
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
break
end
end
end
server.run.join
# Invoke any worker shutdown hooks so they can prevent the worker
2014-08-01 06:28:33 -04:00
# exiting until any background operations are completed
2016-02-07 17:51:54 -05:00
@launcher.config.run_hooks :before_worker_shutdown, index
ensure
@worker_write << "t#{Process.pid}\n" rescue nil
@worker_write.close
end
def restart
@restart = true
2013-07-06 00:13:29 -04:00
stop
end
def phased_restart
return false if @options[:preload_app]
@phased_restart = true
wakeup!
true
end
def stop
@status = :stop
wakeup!
end
def stop_blocked
@status = :stop if @status == :run
wakeup!
@control.stop(true) if @control
Process.waitall
end
def halt
@status = :halt
wakeup!
end
2014-02-25 08:52:20 -05:00
def reload_worker_directory
dir = @launcher.restart_dir
log "+ Changing to #{dir}"
Dir.chdir dir
2014-02-25 08:52:20 -05:00
end
# Inside of a child process, this will return all zeroes, as @workers is only populated in
# the master process.
def stats
old_worker_count = @workers.count { |w| w.phase != @phase }
booted_worker_count = @workers.count { |w| w.booted? }
worker_status = '[' + @workers.map { |w| %Q!{ "started_at": "#{w.started_at.utc.iso8601}", "pid": #{w.pid}, "index": #{w.index}, "phase": #{w.phase}, "booted": #{w.booted?}, "last_checkin": "#{w.last_checkin.utc.iso8601}", "last_status": #{w.last_status} }!}.join(",") + ']'
%Q!{ "started_at": "#{@started_at.utc.iso8601}", "workers": #{@workers.size}, "phase": #{@phase}, "booted_workers": #{booted_worker_count}, "old_workers": #{old_worker_count}, "worker_status": #{worker_status} }!
end
2013-07-06 00:13:29 -04:00
def preload?
@options[:preload_app]
end
2016-07-14 15:12:50 -04:00
# We do this in a separate method to keep the lambda scope
# of the signals handlers as small as possible.
def setup_signals
Signal.trap "SIGCHLD" do
wakeup!
end
Signal.trap "TTIN" do
@options[:workers] += 1
wakeup!
end
Signal.trap "TTOU" do
@options[:workers] -= 1 if @options[:workers] >= 2
wakeup!
end
master_pid = Process.pid
Signal.trap "SIGTERM" do
# The worker installs their own SIGTERM when booted.
# Until then, this is run by the worker and the worker
# should just exit if they get it.
if Process.pid != master_pid
log "Early termination of worker"
exit! 0
else
@launcher.close_binder_listeners
[close #1802] Close listeners on SIGTERM Currently when a SIGTERM is sent to a puma cluster, the signal is trapped, then sent to all children, it then waits for children to exit and then the parent process exits. The socket that accepts connections is only closed when the parent process calls `exit 0`. The problem with this flow is there is a period of time where there are no child processes to work on an incoming connection, however the socket is still open so clients can connect to it. When this happens, the client will connect, but the connection will be closed with no response. Instead, the desired behavior is for the connection from the client to be rejected. This allows the client to re-connect, or if there is a load balance between the client and the puma server, it allows the request to be routed to another node. This PR fixes the existing behavior by manually closing the socket when SIGTERM is received before shutting down the workers/children processes. When the socket is closed, any incoming requests will fail to connect and they will be rejected, this is our desired behavior. Existing requests that are in-flight can still respond. ## Test This behavior is quite difficult to test, you'll notice that the test is far longer than the code change. In this test we send an initial request to an endpoint that sleeps for 1 second. We then signal to other threads that they can continue. We send the parent process a SIGTERM, while simultaneously sending other requests. Some of these will happen after the SIGTERM is received by the server. When that happens we want none of the requests to get a `ECONNRESET` error, this would indicate the request was accepted but then closed. Instead we want `ECONNREFUSED`. I ran this test in a loop for a few hours and it passes with my patch, it fails immediately if you remove the call to close the listeners. ``` $ while m test/test_integration.rb:235; do :; done ``` ## Considerations This PR only fixes the problem for "cluster" (i.e. multi-worker) mode. When trying to reproduce the test with single mode, on (removing the `-w 2` config) it already passes. This leads us to believe that either the bug does not exist in single threaded mode, or at the very least reproducing the bug via a test in the single threaded mode requires a different approach. Co-authored-by: Danny Fallon <Danny.fallon.ie+github@gmail.com> Co-authored-by: Richard Schneeman <richard.schneeman+foo@gmail.com>
2019-05-30 14:09:17 -04:00
stop_workers
stop
raise(SignalException, "SIGTERM") if @options[:raise_exception_on_sigterm]
exit 0 # Clean exit, workers were stopped
end
end
end
def run
@status = :run
2013-07-05 20:09:18 -04:00
output_header "cluster"
log "* Process workers: #{@options[:workers]}"
before = Thread.list
2013-07-06 00:13:29 -04:00
if preload?
log "* Preloading application"
load_and_bind
after = Thread.list
if after.size > before.size
threads = (after - before)
if threads.first.respond_to? :backtrace
log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot:"
threads.each do |t|
log "! #{t.inspect} - #{t.backtrace ? t.backtrace.first : ''}"
end
else
log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot"
end
end
else
log "* Phased restart available"
unless @launcher.config.app_configured?
error "No application configured, nothing to run"
exit 1
end
@launcher.binder.parse @options[:binds], self
end
read, @wakeup = Puma::Util.pipe
setup_signals
# Used by the workers to detect if the master process dies.
# If select says that @check_pipe is ready, it's because the
# master has exited and @suicide_pipe has been automatically
# closed.
#
@check_pipe, @suicide_pipe = Puma::Util.pipe
2013-07-05 20:09:18 -04:00
if daemon?
log "* Daemonizing..."
Process.daemon(true)
else
log "Use Ctrl-C to stop"
end
redirect_io
Plugins.fire_background
@launcher.write_state
start_control
@master_read, @worker_write = read, @wakeup
2016-02-07 17:51:54 -05:00
@launcher.config.run_hooks :before_fork, nil
spawn_workers
Signal.trap "SIGINT" do
2013-07-06 00:13:29 -04:00
stop
end
@launcher.events.fire_on_booted!
begin
2016-03-20 17:14:35 -04:00
force_check = false
while @status == :run
begin
2016-03-20 17:14:35 -04:00
if @phased_restart
start_phased_restart
@phased_restart = false
end
check_workers force_check
force_check = false
res = IO.select([read], nil, nil, Const::WORKER_CHECK_INTERVAL)
2016-03-20 17:14:35 -04:00
if res
req = read.read_nonblock(1)
next if !req || req == "!"
result = read.gets
pid = result.to_i
if w = @workers.find { |x| x.pid == pid }
case req
when "b"
w.boot!
log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}"
force_check = true
when "t"
w.term
force_check = true
when "p"
w.ping!(result.sub(/^\d+/,'').chomp)
end
else
log "! Out-of-sync worker list, no #{pid} worker"
end
end
rescue Interrupt
@status = :stop
end
end
stop_workers unless @status == :halt
ensure
@check_pipe.close
@suicide_pipe.close
read.close
@wakeup.close
end
end
private
# loops thru @workers, removing workers that exited, and calling
# `#term` if needed
def wait_workers
@workers.reject! do |w|
begin
if Process.wait(w.pid, Process::WNOHANG)
true
else
w.term if w.term?
nil
end
rescue Errno::ECHILD
true # child is already terminated
end
end
end
end
end