1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/cluster.rb

672 lines
17 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
require 'puma/runner'
require 'puma/util'
require 'puma/plugin'
require 'time'
module Puma
2018-05-01 16:42:12 -04:00
# This class is instantiated by the `Puma::Launcher` and used
# to boot and serve a Ruby application when puma "workers" are needed
# i.e. when using multi-processes. For example `$ puma -w 5`
#
# At the core of this class is running an instance of `Puma::Server` which
# gets created via the `start_server` method from the `Puma::Runner` class
# that this inherits from.
#
# An instance of this class will spawn the number of processes passed in
# via the `spawn_workers` method call. Each worker will have it's own
# instance of a `Puma::Server`.
class Cluster < Runner
def initialize(cli, events)
super cli, events
@phase = 0
@workers = []
@next_check = Time.now
2013-07-06 00:13:29 -04:00
@phased_restart = false
end
def stop_workers
log "- Gracefully shutting down workers..."
@workers.each { |x| x.term }
begin
loop do
wait_workers
break if @workers.reject {|w| w.pid.nil?}.empty?
sleep 0.2
end
rescue Interrupt
log "! Cancelled waiting for workers"
end
end
def start_phased_restart
@phase += 1
log "- Starting phased worker restart, phase: #{@phase}"
# Be sure to change the directory again before loading
# the app. This way we can pick up new code.
dir = @launcher.restart_dir
log "+ Changing to #{dir}"
Dir.chdir dir
end
2014-07-27 10:01:05 -04:00
def redirect_io
super
@workers.each { |x| x.hup }
end
class Worker
2015-01-15 07:58:16 -05:00
def initialize(idx, pid, phase, options)
@index = idx
@pid = pid
@phase = phase
@stage = :started
@signal = "TERM"
2015-01-15 07:58:16 -05:00
@options = options
@first_term_sent = nil
@started_at = Time.now
@last_checkin = Time.now
@last_status = {}
@term = false
end
attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
# @version 5.0.0
attr_writer :pid, :phase
def booted?
@stage == :booted
end
def boot!
@last_checkin = Time.now
@stage = :booted
end
def term?
@term
end
def ping!(status)
@last_checkin = Time.now
2020-05-17 12:46:14 -04:00
require 'json'
@last_status = JSON.parse(status, symbolize_names: true)
end
# @see Puma::Cluster#check_workers
# @version 5.0.0
def ping_timeout
@last_checkin +
(booted? ?
@options[:worker_timeout] :
@options[:worker_boot_timeout]
)
end
def term
begin
2016-02-19 20:05:45 -05:00
if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout]
@signal = "KILL"
else
@term ||= true
2016-02-19 20:05:45 -05:00
@first_term_sent ||= Time.now
end
Process.kill @signal, @pid if @pid
rescue Errno::ESRCH
end
end
def kill
@signal = 'KILL'
term
end
2014-07-27 10:01:05 -04:00
def hup
Process.kill "HUP", @pid
rescue Errno::ESRCH
end
end
def spawn_workers
diff = @options[:workers] - @workers.size
return if diff < 1
2013-07-06 00:13:29 -04:00
master = Process.pid
if @options[:fork_worker]
@fork_writer << "-1\n"
end
2013-07-06 00:13:29 -04:00
diff.times do
idx = next_worker_index
if @options[:fork_worker] && idx != 0
@fork_writer << "#{idx}\n"
pid = nil
else
pid = spawn_worker(idx, master)
2016-04-07 14:41:53 -04:00
end
debug "Spawned worker: #{pid}"
2015-01-15 07:58:16 -05:00
@workers << Worker.new(idx, pid, @phase, @options)
end
2016-02-07 17:51:54 -05:00
if @options[:fork_worker] &&
@workers.all? {|x| x.phase == @phase}
@fork_writer << "0\n"
end
end
# @version 5.0.0
def spawn_worker(idx, master)
@launcher.config.run_hooks :before_worker_fork, idx, @launcher.events
pid = fork { worker(idx, master) }
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
end
@launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
pid
end
def cull_workers
diff = @workers.size - @options[:workers]
return if diff < 1
debug "Culling #{diff.inspect} workers"
workers_to_cull = @workers[-diff,diff]
debug "Workers to cull: #{workers_to_cull.inspect}"
workers_to_cull.each do |worker|
log "- Worker #{worker.index} (pid: #{worker.pid}) terminating"
worker.term
end
end
def next_worker_index
2014-07-27 10:01:05 -04:00
all_positions = 0...@options[:workers]
occupied_positions = @workers.map { |w| w.index }
2014-07-27 10:01:05 -04:00
available_positions = all_positions.to_a - occupied_positions
available_positions.first
end
def all_workers_booted?
@workers.count { |w| !w.booted? } == 0
end
def check_workers
return if @next_check >= Time.now
@next_check = Time.now + Const::WORKER_CHECK_INTERVAL
timeout_workers
wait_workers
cull_workers
spawn_workers
if all_workers_booted?
# If we're running at proper capacity, check to see if
# we need to phase any workers out (which will restart
# in the right phase).
#
w = @workers.find { |x| x.phase != @phase }
if w
log "- Stopping #{w.pid} for phased upgrade..."
unless w.term?
w.term
log "- #{w.signal} sent to #{w.pid}..."
end
end
end
@next_check = [
@workers.reject(&:term?).map(&:ping_timeout).min,
@next_check
].compact.min
end
def wakeup!
return unless @wakeup
begin
@wakeup.write "!" unless @wakeup.closed?
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end
end
def worker(index, master)
title = "puma: cluster worker #{index}: #{master}"
title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
$0 = title
Signal.trap "SIGINT", "IGNORE"
Signal.trap "SIGCHLD", "DEFAULT"
fork_worker = @options[:fork_worker] && index == 0
2014-07-27 10:01:05 -04:00
@workers = []
if !@options[:fork_worker] || fork_worker
@master_read.close
@suicide_pipe.close
@fork_writer.close
end
Thread.new do
2019-09-15 04:52:34 -04:00
Puma.set_thread_name "worker check pipe"
IO.select [@check_pipe]
log "! Detected parent died, dying"
exit! 1
end
# If we're not running under a Bundler context, then
# report the info about the context we will be using
if !ENV['BUNDLE_GEMFILE']
if File.exist?("Gemfile")
log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
elsif File.exist?("gems.rb")
log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
end
end
# Invoke any worker boot hooks so they can get
# things in shape before booting the app.
@launcher.config.run_hooks :before_worker_boot, index, @launcher.events
server = @server ||= start_server
restart_server = Queue.new << true << false
if fork_worker
restart_server.clear
worker_pids = []
Signal.trap "SIGCHLD" do
wakeup! if worker_pids.reject! do |p|
Process.wait(p, Process::WNOHANG) rescue true
end
end
Thread.new do
Puma.set_thread_name "worker fork pipe"
while (idx = @fork_pipe.gets)
idx = idx.to_i
if idx == -1 # stop server
if restart_server.length > 0
restart_server.clear
server.begin_restart(true)
@launcher.config.run_hooks :before_refork, nil, @launcher.events
nakayoshi_gc
end
elsif idx == 0 # restart server
restart_server << true << false
else # fork worker
worker_pids << pid = spawn_worker(idx, master)
@worker_write << "f#{pid}:#{idx}\n" rescue nil
end
end
end
end
Signal.trap "SIGTERM" do
@worker_write << "e#{Process.pid}\n" rescue nil
server.stop
restart_server << false
end
begin
@worker_write << "b#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
2015-10-17 13:31:55 -04:00
STDERR.puts "Master seems to have exited, exiting."
return
end
Thread.new(@worker_write) do |io|
2019-09-15 04:52:34 -04:00
Puma.set_thread_name "stat payload"
while true
sleep Const::WORKER_CHECK_INTERVAL
begin
2020-05-17 12:46:14 -04:00
require 'json'
io << "p#{Process.pid}#{server.stats.to_json}\n"
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
break
end
end
end
server.run.join while restart_server.pop
# Invoke any worker shutdown hooks so they can prevent the worker
2014-08-01 06:28:33 -04:00
# exiting until any background operations are completed
@launcher.config.run_hooks :before_worker_shutdown, index, @launcher.events
ensure
@worker_write << "t#{Process.pid}\n" rescue nil
@worker_write.close
end
def restart
@restart = true
2013-07-06 00:13:29 -04:00
stop
end
def phased_restart
return false if @options[:preload_app]
@phased_restart = true
wakeup!
true
end
def stop
@status = :stop
wakeup!
end
def stop_blocked
@status = :stop if @status == :run
wakeup!
@control.stop(true) if @control
Process.waitall
end
def halt
@status = :halt
wakeup!
end
2014-02-25 08:52:20 -05:00
def reload_worker_directory
dir = @launcher.restart_dir
log "+ Changing to #{dir}"
Dir.chdir dir
2014-02-25 08:52:20 -05:00
end
# Inside of a child process, this will return all zeroes, as @workers is only populated in
# the master process.
def stats
old_worker_count = @workers.count { |w| w.phase != @phase }
worker_status = @workers.map do |w|
{
started_at: w.started_at.utc.iso8601,
pid: w.pid,
index: w.index,
phase: w.phase,
booted: w.booted?,
last_checkin: w.last_checkin.utc.iso8601,
last_status: w.last_status,
}
end
{
started_at: @started_at.utc.iso8601,
workers: @workers.size,
phase: @phase,
booted_workers: worker_status.count { |w| w[:booted] },
old_workers: old_worker_count,
worker_status: worker_status,
}
end
2013-07-06 00:13:29 -04:00
def preload?
@options[:preload_app]
end
# @version 5.0.0
def fork_worker!
if (worker = @workers.find { |w| w.index == 0 })
worker.phase += 1
end
phased_restart
end
2016-07-14 15:12:50 -04:00
# We do this in a separate method to keep the lambda scope
# of the signals handlers as small as possible.
def setup_signals
if @options[:fork_worker]
Signal.trap "SIGURG" do
fork_worker!
end
# Auto-fork after the specified number of requests.
if (fork_requests = @options[:fork_worker].to_i) > 0
@launcher.events.register(:ping!) do |w|
fork_worker! if w.index == 0 &&
w.phase == 0 &&
w.last_status[:requests_count] >= fork_requests
end
end
end
Signal.trap "SIGCHLD" do
wakeup!
end
Signal.trap "TTIN" do
@options[:workers] += 1
wakeup!
end
Signal.trap "TTOU" do
@options[:workers] -= 1 if @options[:workers] >= 2
wakeup!
end
master_pid = Process.pid
Signal.trap "SIGTERM" do
# The worker installs their own SIGTERM when booted.
# Until then, this is run by the worker and the worker
# should just exit if they get it.
if Process.pid != master_pid
log "Early termination of worker"
exit! 0
else
@launcher.close_binder_listeners
[close #1802] Close listeners on SIGTERM Currently when a SIGTERM is sent to a puma cluster, the signal is trapped, then sent to all children, it then waits for children to exit and then the parent process exits. The socket that accepts connections is only closed when the parent process calls `exit 0`. The problem with this flow is there is a period of time where there are no child processes to work on an incoming connection, however the socket is still open so clients can connect to it. When this happens, the client will connect, but the connection will be closed with no response. Instead, the desired behavior is for the connection from the client to be rejected. This allows the client to re-connect, or if there is a load balance between the client and the puma server, it allows the request to be routed to another node. This PR fixes the existing behavior by manually closing the socket when SIGTERM is received before shutting down the workers/children processes. When the socket is closed, any incoming requests will fail to connect and they will be rejected, this is our desired behavior. Existing requests that are in-flight can still respond. ## Test This behavior is quite difficult to test, you'll notice that the test is far longer than the code change. In this test we send an initial request to an endpoint that sleeps for 1 second. We then signal to other threads that they can continue. We send the parent process a SIGTERM, while simultaneously sending other requests. Some of these will happen after the SIGTERM is received by the server. When that happens we want none of the requests to get a `ECONNRESET` error, this would indicate the request was accepted but then closed. Instead we want `ECONNREFUSED`. I ran this test in a loop for a few hours and it passes with my patch, it fails immediately if you remove the call to close the listeners. ``` $ while m test/test_integration.rb:235; do :; done ``` ## Considerations This PR only fixes the problem for "cluster" (i.e. multi-worker) mode. When trying to reproduce the test with single mode, on (removing the `-w 2` config) it already passes. This leads us to believe that either the bug does not exist in single threaded mode, or at the very least reproducing the bug via a test in the single threaded mode requires a different approach. Co-authored-by: Danny Fallon <Danny.fallon.ie+github@gmail.com> Co-authored-by: Richard Schneeman <richard.schneeman+foo@gmail.com>
2019-05-30 14:09:17 -04:00
stop_workers
stop
raise(SignalException, "SIGTERM") if @options[:raise_exception_on_sigterm]
exit 0 # Clean exit, workers were stopped
end
end
end
def run
@status = :run
2013-07-05 20:09:18 -04:00
output_header "cluster"
log "* Process workers: #{@options[:workers]}"
before = Thread.list
2013-07-06 00:13:29 -04:00
if preload?
log "* Preloading application"
load_and_bind
after = Thread.list
if after.size > before.size
threads = (after - before)
if threads.first.respond_to? :backtrace
log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot:"
threads.each do |t|
log "! #{t.inspect} - #{t.backtrace ? t.backtrace.first : ''}"
end
else
log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot"
end
end
else
log "* Phased restart available"
unless @launcher.config.app_configured?
error "No application configured, nothing to run"
exit 1
end
@launcher.binder.parse @options[:binds], self
end
read, @wakeup = Puma::Util.pipe
setup_signals
# Used by the workers to detect if the master process dies.
# If select says that @check_pipe is ready, it's because the
# master has exited and @suicide_pipe has been automatically
# closed.
#
@check_pipe, @suicide_pipe = Puma::Util.pipe
# Separate pipe used by worker 0 to receive commands to
# fork new worker processes.
@fork_pipe, @fork_writer = Puma::Util.pipe
log "Use Ctrl-C to stop"
redirect_io
Plugins.fire_background
@launcher.write_state
start_control
@master_read, @worker_write = read, @wakeup
@launcher.config.run_hooks :before_fork, nil, @launcher.events
nakayoshi_gc
spawn_workers
Signal.trap "SIGINT" do
2013-07-06 00:13:29 -04:00
stop
end
@launcher.events.fire_on_booted!
begin
while @status == :run
begin
2016-03-20 17:14:35 -04:00
if @phased_restart
start_phased_restart
@phased_restart = false
end
check_workers
res = IO.select([read], nil, nil, [0, @next_check - Time.now].max)
2016-03-20 17:14:35 -04:00
if res
req = read.read_nonblock(1)
@next_check = Time.now if req == "!"
next if !req || req == "!"
result = read.gets
pid = result.to_i
if req == "b" || req == "f"
pid, idx = result.split(':').map(&:to_i)
w = @workers.find {|x| x.index == idx}
w.pid = pid if w.pid.nil?
end
if w = @workers.find { |x| x.pid == pid }
case req
when "b"
w.boot!
log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}"
@next_check = Time.now
when "e"
# external term, see worker method, Signal.trap "SIGTERM"
w.instance_variable_set :@term, true
when "t"
w.term unless w.term?
when "p"
w.ping!(result.sub(/^\d+/,'').chomp)
@launcher.events.fire(:ping!, w)
end
else
log "! Out-of-sync worker list, no #{pid} worker"
end
end
rescue Interrupt
@status = :stop
end
end
stop_workers unless @status == :halt
ensure
@check_pipe.close
@suicide_pipe.close
read.close
@wakeup.close
end
end
private
# loops thru @workers, removing workers that exited, and calling
# `#term` if needed
def wait_workers
@workers.reject! do |w|
next false if w.pid.nil?
begin
if Process.wait(w.pid, Process::WNOHANG)
true
else
w.term if w.term?
nil
end
rescue Errno::ECHILD
begin
Process.kill(0, w.pid)
false # child still alive, but has another parent
rescue Errno::ESRCH, Errno::EPERM
true # child is already terminated
end
end
end
end
# @version 5.0.0
def timeout_workers
@workers.each do |w|
if !w.term? && w.ping_timeout <= Time.now
log "! Terminating timed out worker: #{w.pid}"
w.kill
end
end
end
# @version 5.0.0
def nakayoshi_gc
return unless @options[:nakayoshi_fork]
log "! Promoting existing objects to old generation..."
4.times { GC.start(full_mark: false) }
if GC.respond_to?(:compact)
log "! Compacting..."
GC.compact
end
log "! Friendly fork preparation complete."
end
end
end