From de632261ac45d7dd85230c83f6af6dd720f1cbd9 Mon Sep 17 00:00:00 2001 From: Chris LaRose Date: Tue, 29 Sep 2020 06:21:52 -0700 Subject: [PATCH] Refactor: extract worker process into separate class [changelog skip] (#2374) * Rename Puma::Cluster::{Worker => WorkerHandle} This class represents a worker from the perspective of the puma master process. It provides methods for controlling the process, but doesn't contain the logic actually executed by the worker. In preparation for creating a new class that encapsulates the worker logic, we're renaming this one to WorkerHandle. * Extract Puma::Cluster::WorkerHandle to a separate file * Move worker functionality to new class Before, all functionality of the worker processes was defined in the Cluster class. In preparation for making it possible to start worker processes outside of the context of a Cluster instance, we move the worker functionality into a new class. This has the additional benefit of delineating exactly the dependencies of the worker processes, namely the Launcher, options hash, and the pipes used for inter-process communication. * Extract nakayoshi_gc to Puma::Util * Add comment to describe Puma::Cluster::WorkerHandle * Remove options from Worker constructor The instance varaible @options can be derived from the @launcher --- lib/puma/cluster.rb | 213 +++--------------------------- lib/puma/cluster/worker.rb | 167 +++++++++++++++++++++++ lib/puma/cluster/worker_handle.rb | 83 ++++++++++++ lib/puma/util.rb | 11 ++ 4 files changed, 279 insertions(+), 195 deletions(-) create mode 100644 lib/puma/cluster/worker.rb create mode 100644 lib/puma/cluster/worker_handle.rb diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 95f97deb..ff41cf0b 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -3,6 +3,8 @@ require 'puma/runner' require 'puma/util' require 'puma/plugin' +require 'puma/cluster/worker_handle' +require 'puma/cluster/worker' require 'time' @@ -11,10 +13,6 @@ module Puma # to boot and serve a Ruby application when puma "workers" are needed # i.e. when using multi-processes. For example `$ puma -w 5` # - # At the core of this class is running an instance of `Puma::Server` which - # gets created via the `start_server` method from the `Puma::Runner` class - # that this inherits from. - # # An instance of this class will spawn the number of processes passed in # via the `spawn_workers` method call. Each worker will have it's own # instance of a `Puma::Server`. @@ -61,79 +59,6 @@ module Puma @workers.each { |x| x.hup } end - class Worker - def initialize(idx, pid, phase, options) - @index = idx - @pid = pid - @phase = phase - @stage = :started - @signal = "TERM" - @options = options - @first_term_sent = nil - @started_at = Time.now - @last_checkin = Time.now - @last_status = {} - @term = false - end - - attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at - - # @version 5.0.0 - attr_writer :pid, :phase - - def booted? - @stage == :booted - end - - def boot! - @last_checkin = Time.now - @stage = :booted - end - - def term? - @term - end - - def ping!(status) - @last_checkin = Time.now - require 'json' - @last_status = JSON.parse(status, symbolize_names: true) - end - - # @see Puma::Cluster#check_workers - # @version 5.0.0 - def ping_timeout - @last_checkin + - (booted? ? - @options[:worker_timeout] : - @options[:worker_boot_timeout] - ) - end - - def term - begin - if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout] - @signal = "KILL" - else - @term ||= true - @first_term_sent ||= Time.now - end - Process.kill @signal, @pid if @pid - rescue Errno::ESRCH - end - end - - def kill - @signal = 'KILL' - term - end - - def hup - Process.kill "HUP", @pid - rescue Errno::ESRCH - end - end - def spawn_workers diff = @options[:workers] - @workers.size return if diff < 1 @@ -154,7 +79,7 @@ module Puma end debug "Spawned worker: #{pid}" - @workers << Worker.new(idx, pid, @phase, @options) + @workers << WorkerHandle.new(idx, pid, @phase, @options) end if @options[:fork_worker] && @@ -249,113 +174,23 @@ module Puma end def worker(index, master) - title = "puma: cluster worker #{index}: #{master}" - title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty? - $0 = title - - Signal.trap "SIGINT", "IGNORE" - Signal.trap "SIGCHLD", "DEFAULT" - - fork_worker = @options[:fork_worker] && index == 0 - @workers = [] - if !@options[:fork_worker] || fork_worker - @master_read.close - @suicide_pipe.close - @fork_writer.close + + @master_read.close + @suicide_pipe.close + @fork_writer.close + + pipes = { check_pipe: @check_pipe, worker_write: @worker_write } + if @options[:fork_worker] + pipes[:fork_pipe] = @fork_pipe + pipes[:wakeup] = @wakeup end - Thread.new do - Puma.set_thread_name "worker check pipe" - IO.select [@check_pipe] - log "! Detected parent died, dying" - exit! 1 - end - - # If we're not running under a Bundler context, then - # report the info about the context we will be using - if !ENV['BUNDLE_GEMFILE'] - if File.exist?("Gemfile") - log "+ Gemfile in context: #{File.expand_path("Gemfile")}" - elsif File.exist?("gems.rb") - log "+ Gemfile in context: #{File.expand_path("gems.rb")}" - end - end - - # Invoke any worker boot hooks so they can get - # things in shape before booting the app. - @launcher.config.run_hooks :before_worker_boot, index, @launcher.events - - server = @server ||= start_server - restart_server = Queue.new << true << false - - if fork_worker - restart_server.clear - worker_pids = [] - Signal.trap "SIGCHLD" do - wakeup! if worker_pids.reject! do |p| - Process.wait(p, Process::WNOHANG) rescue true - end - end - - Thread.new do - Puma.set_thread_name "worker fork pipe" - while (idx = @fork_pipe.gets) - idx = idx.to_i - if idx == -1 # stop server - if restart_server.length > 0 - restart_server.clear - server.begin_restart(true) - @launcher.config.run_hooks :before_refork, nil, @launcher.events - nakayoshi_gc - end - elsif idx == 0 # restart server - restart_server << true << false - else # fork worker - worker_pids << pid = spawn_worker(idx, master) - @worker_write << "f#{pid}:#{idx}\n" rescue nil - end - end - end - end - - Signal.trap "SIGTERM" do - @worker_write << "e#{Process.pid}\n" rescue nil - server.stop - restart_server << false - end - - begin - @worker_write << "b#{Process.pid}:#{index}\n" - rescue SystemCallError, IOError - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - STDERR.puts "Master seems to have exited, exiting." - return - end - - Thread.new(@worker_write) do |io| - Puma.set_thread_name "stat payload" - - while true - sleep Const::WORKER_CHECK_INTERVAL - begin - require 'json' - io << "p#{Process.pid}#{server.stats.to_json}\n" - rescue IOError - Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue - break - end - end - end - - server.run.join while restart_server.pop - - # Invoke any worker shutdown hooks so they can prevent the worker - # exiting until any background operations are completed - @launcher.config.run_hooks :before_worker_shutdown, index, @launcher.events - ensure - @worker_write << "t#{Process.pid}\n" rescue nil - @worker_write.close + new_worker = Worker.new index: index, + master: master, + launcher: @launcher, + pipes: pipes + new_worker.run end def restart @@ -552,7 +387,7 @@ module Puma @master_read, @worker_write = read, @wakeup @launcher.config.run_hooks :before_fork, nil, @launcher.events - nakayoshi_gc + Puma::Util.nakayoshi_gc @events if @options[:nakayoshi_fork] spawn_workers @@ -657,17 +492,5 @@ module Puma end end end - - # @version 5.0.0 - def nakayoshi_gc - return unless @options[:nakayoshi_fork] - log "! Promoting existing objects to old generation..." - 4.times { GC.start(full_mark: false) } - if GC.respond_to?(:compact) - log "! Compacting..." - GC.compact - end - log "! Friendly fork preparation complete." - end end end diff --git a/lib/puma/cluster/worker.rb b/lib/puma/cluster/worker.rb new file mode 100644 index 00000000..c6774ab0 --- /dev/null +++ b/lib/puma/cluster/worker.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +module Puma + class Cluster < Puma::Runner + # This class is instantiated by the `Puma::Cluster` and represents a single + # worker process. + # + # At the core of this class is running an instance of `Puma::Server` which + # gets created via the `start_server` method from the `Puma::Runner` class + # that this inherits from. + class Worker < Puma::Runner + attr_reader :index, :master + + def initialize(index:, master:, launcher:, pipes:, server: nil) + super launcher, launcher.events + + @index = index + @master = master + @launcher = launcher + @options = launcher.options + @check_pipe = pipes[:check_pipe] + @worker_write = pipes[:worker_write] + @fork_pipe = pipes[:fork_pipe] + @wakeup = pipes[:wakeup] + @server = server + end + + def run + title = "puma: cluster worker #{index}: #{master}" + title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty? + $0 = title + + Signal.trap "SIGINT", "IGNORE" + Signal.trap "SIGCHLD", "DEFAULT" + + Thread.new do + Puma.set_thread_name "worker check pipe" + IO.select [@check_pipe] + log "! Detected parent died, dying" + exit! 1 + end + + # If we're not running under a Bundler context, then + # report the info about the context we will be using + if !ENV['BUNDLE_GEMFILE'] + if File.exist?("Gemfile") + log "+ Gemfile in context: #{File.expand_path("Gemfile")}" + elsif File.exist?("gems.rb") + log "+ Gemfile in context: #{File.expand_path("gems.rb")}" + end + end + + # Invoke any worker boot hooks so they can get + # things in shape before booting the app. + @launcher.config.run_hooks :before_worker_boot, index, @launcher.events + + server = @server ||= start_server + restart_server = Queue.new << true << false + + fork_worker = @options[:fork_worker] && index == 0 + + if fork_worker + restart_server.clear + worker_pids = [] + Signal.trap "SIGCHLD" do + wakeup! if worker_pids.reject! do |p| + Process.wait(p, Process::WNOHANG) rescue true + end + end + + Thread.new do + Puma.set_thread_name "worker fork pipe" + while (idx = @fork_pipe.gets) + idx = idx.to_i + if idx == -1 # stop server + if restart_server.length > 0 + restart_server.clear + server.begin_restart(true) + @launcher.config.run_hooks :before_refork, nil, @launcher.events + Puma::Util.nakayoshi_gc @events if @options[:nakayoshi_fork] + end + elsif idx == 0 # restart server + restart_server << true << false + else # fork worker + worker_pids << pid = spawn_worker(idx) + @worker_write << "f#{pid}:#{idx}\n" rescue nil + end + end + end + end + + Signal.trap "SIGTERM" do + @worker_write << "e#{Process.pid}\n" rescue nil + server.stop + restart_server << false + end + + begin + @worker_write << "b#{Process.pid}:#{index}\n" + rescue SystemCallError, IOError + Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + STDERR.puts "Master seems to have exited, exiting." + return + end + + Thread.new(@worker_write) do |io| + Puma.set_thread_name "stat payload" + + while true + sleep Const::WORKER_CHECK_INTERVAL + begin + require 'json' + io << "p#{Process.pid}#{server.stats.to_json}\n" + rescue IOError + Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + break + end + end + end + + server.run.join while restart_server.pop + + # Invoke any worker shutdown hooks so they can prevent the worker + # exiting until any background operations are completed + @launcher.config.run_hooks :before_worker_shutdown, index, @launcher.events + ensure + @worker_write << "t#{Process.pid}\n" rescue nil + @worker_write.close + end + + private + + def spawn_worker(idx) + @launcher.config.run_hooks :before_worker_fork, idx, @launcher.events + + pid = fork do + new_worker = Worker.new index: idx, + master: master, + launcher: @launcher, + pipes: { check_pipe: @check_pipe, + worker_write: @worker_write }, + server: @server + new_worker.run + end + + if !pid + log "! Complete inability to spawn new workers detected" + log "! Seppuku is the only choice." + exit! 1 + end + + @launcher.config.run_hooks :after_worker_fork, idx, @launcher.events + pid + end + + def wakeup! + return unless @wakeup + + begin + @wakeup.write "!" unless @wakeup.closed? + rescue SystemCallError, IOError + Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue + end + end + end + end +end diff --git a/lib/puma/cluster/worker_handle.rb b/lib/puma/cluster/worker_handle.rb new file mode 100644 index 00000000..eb7f3e06 --- /dev/null +++ b/lib/puma/cluster/worker_handle.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +module Puma + class Cluster < Runner + # This class represents a worker process from the perspective of the puma + # master process. It contains information about the process and its health + # and it exposes methods to control the process via IPC. It does not + # include the actual logic executed by the worker process itself. For that, + # see Puma::Cluster::Worker. + class WorkerHandle + def initialize(idx, pid, phase, options) + @index = idx + @pid = pid + @phase = phase + @stage = :started + @signal = "TERM" + @options = options + @first_term_sent = nil + @started_at = Time.now + @last_checkin = Time.now + @last_status = {} + @term = false + end + + attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at + + # @version 5.0.0 + attr_writer :pid, :phase + + def booted? + @stage == :booted + end + + def boot! + @last_checkin = Time.now + @stage = :booted + end + + def term? + @term + end + + def ping!(status) + @last_checkin = Time.now + require 'json' + @last_status = JSON.parse(status, symbolize_names: true) + end + + # @see Puma::Cluster#check_workers + # @version 5.0.0 + def ping_timeout + @last_checkin + + (booted? ? + @options[:worker_timeout] : + @options[:worker_boot_timeout] + ) + end + + def term + begin + if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout] + @signal = "KILL" + else + @term ||= true + @first_term_sent ||= Time.now + end + Process.kill @signal, @pid if @pid + rescue Errno::ESRCH + end + end + + def kill + @signal = 'KILL' + term + end + + def hup + Process.kill "HUP", @pid + rescue Errno::ESRCH + end + end + end +end diff --git a/lib/puma/util.rb b/lib/puma/util.rb index ff36cc97..c2b3b412 100644 --- a/lib/puma/util.rb +++ b/lib/puma/util.rb @@ -23,6 +23,17 @@ module Puma end module_function :unescape + # @version 5.0.0 + def nakayoshi_gc(events) + events.log "! Promoting existing objects to old generation..." + 4.times { GC.start(full_mark: false) } + if GC.respond_to?(:compact) + events.log "! Compacting..." + GC.compact + end + events.log "! Friendly fork preparation complete." + end + DEFAULT_SEP = /[&;] */n # Stolen from Mongrel, with some small modifications: