mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Add fork_worker option and refork command
Trigger refork for improved copy-on-write performance.
This commit is contained in:
parent
774c460e60
commit
613ac8e803
10 changed files with 187 additions and 23 deletions
|
@ -10,6 +10,7 @@
|
|||
* Increases maximum URI path length from 2048 to 8196 bytes (#2167)
|
||||
* Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203)
|
||||
* Faster phased restart and worker timeout (#2121)
|
||||
* Add `fork_worker` option and `refork` command for improved copy-on-write performance (#2099)
|
||||
|
||||
* Deprecations, Removals and Breaking API Changes
|
||||
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)
|
||||
|
|
31
docs/fork_worker.md
Normal file
31
docs/fork_worker.md
Normal file
|
@ -0,0 +1,31 @@
|
|||
# Fork-Worker Cluster Mode [Experimental]
|
||||
|
||||
Puma 5 introduces an experimental new cluster-mode configuration option, `fork_worker` (`--fork-worker` from the CLI). This mode causes Puma to fork additional workers from worker 0, instead of directly from the master process:
|
||||
|
||||
```
|
||||
10000 \_ puma 4.3.3 (tcp://0.0.0.0:9292) [puma]
|
||||
10001 \_ puma: cluster worker 0: 10000 [puma]
|
||||
10002 \_ puma: cluster worker 1: 10000 [puma]
|
||||
10003 \_ puma: cluster worker 2: 10000 [puma]
|
||||
10004 \_ puma: cluster worker 3: 10000 [puma]
|
||||
```
|
||||
|
||||
Similar to the `preload_app!` option, the `fork_worker` option allows your application to be initialized only once for copy-on-write memory savings, and it has two additional advantages:
|
||||
|
||||
1. **Compatible with phased restart.** Because the master process itself doesn't preload the application, this mode works with phased restart (`SIGUSR1` or `pumactl phased-restart`). When worker 0 reloads as part of a phased restart, it initializes a new copy of your application first, then the other workers reload by forking from this new worker already containing the new preloaded application.
|
||||
|
||||
This allows a phased restart to complete as quickly as a hot restart (`SIGUSR2` or `pumactl restart`), while still minimizing downtime by staggering the restart across cluster workers.
|
||||
|
||||
2. **'Refork' for additional copy-on-write improvements in running applications.** Fork-worker mode introduces a new `refork` command that re-loads all nonzero workers by re-forking them from worker 0.
|
||||
|
||||
This command can potentially improve memory utilization in large or complex applications that don't fully pre-initialize on startup, because the re-forked workers can share copy-on-write memory with a worker that has been running for a while and serving requests.
|
||||
|
||||
You can trigger a refork by sending the cluster the `SIGURG` signal or running the `pumactl refork` command at any time. A refork will also automatically trigger once, after a certain number of requests have been processed by worker 0 (default 1000). To configure the number of requests before the auto-refork, pass a positive integer argument to `fork_workers` (e.g., `fork_workers 1000`), or `0` to disable.
|
||||
|
||||
### Limitations
|
||||
|
||||
- This mode is still very experimental so there may be bugs or edge-cases, particularly around expected behavior of existing hooks. Please open a [bug report](https://github.com/puma/puma/issues/new?template=bug_report.md) if you encounter any issues.
|
||||
|
||||
- In order to fork new workers cleanly, worker 0 shuts down its server and stops serving requests so there are no open file descriptors or other kinds of shared global state between processes, and to maximize copy-on-write efficiency across the newly-forked workers. This may temporarily reduce total capacity of the cluster during a phased restart / refork.
|
||||
|
||||
In a cluster with `n` workers, a normal phased restart stops and restarts workers one by one while the application is loaded in each process, so `n-1` workers are available serving requests during the restart. In a phased restart in fork-worker mode, the application is first loaded in worker 0 while `n-1` workers are available, then worker 0 remains stopped while the rest of the workers are reloaded one by one, leaving only `n-2` workers to be available for a brief period of time. Reloading the rest of the workers should be quick because the application is preloaded at that point, but there may be situations where it can take longer (slow clients, long-running application code, slow worker-fork hooks, etc).
|
|
@ -41,6 +41,7 @@ Puma cluster responds to these signals:
|
|||
- `HUP` reopen log files defined in stdout_redirect configuration parameter. If there is no stdout_redirect option provided it will behave like `INT`
|
||||
- `INT` equivalent of sending Ctrl-C to cluster. Will attempt to finish then exit.
|
||||
- `CHLD`
|
||||
- `URG` refork workers in phases from worker 0, if `fork_workers` option is enabled.
|
||||
|
||||
## Callbacks order in case of different signals
|
||||
|
||||
|
|
|
@ -63,6 +63,11 @@ module Puma
|
|||
end
|
||||
|
||||
rack_response(200, backtraces.to_json)
|
||||
|
||||
when /\/refork$/
|
||||
Process.kill "SIGURG", $$
|
||||
rack_response(200, OK_STATUS)
|
||||
|
||||
else
|
||||
rack_response 404, "Unsupported action", 'text/plain'
|
||||
end
|
||||
|
|
|
@ -130,6 +130,12 @@ module Puma
|
|||
user_config.environment arg
|
||||
end
|
||||
|
||||
o.on "-f", "--fork-worker=[REQUESTS]", OptionParser::DecimalInteger,
|
||||
"Fork new workers from existing worker. Cluster mode only",
|
||||
"Auto-refork after REQUESTS (default 1000)" do |*args|
|
||||
user_config.fork_worker *args.compact
|
||||
end
|
||||
|
||||
o.on "-I", "--include PATH", "Specify $LOAD_PATH directories" do |arg|
|
||||
$LOAD_PATH.unshift(*arg.split(':'))
|
||||
end
|
||||
|
|
|
@ -37,7 +37,7 @@ module Puma
|
|||
begin
|
||||
loop do
|
||||
wait_workers
|
||||
break if @workers.empty?
|
||||
break if @workers.reject {|w| w.pid.nil?}.empty?
|
||||
sleep 0.2
|
||||
end
|
||||
rescue Interrupt
|
||||
|
@ -78,6 +78,7 @@ module Puma
|
|||
end
|
||||
|
||||
attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
|
||||
attr_writer :pid, :phase
|
||||
|
||||
def booted?
|
||||
@stage == :booted
|
||||
|
@ -113,7 +114,7 @@ module Puma
|
|||
@term ||= true
|
||||
@first_term_sent ||= Time.now
|
||||
end
|
||||
Process.kill @signal, @pid
|
||||
Process.kill @signal, @pid if @pid
|
||||
rescue Errno::ESRCH
|
||||
end
|
||||
end
|
||||
|
@ -134,23 +135,43 @@ module Puma
|
|||
return if diff < 1
|
||||
|
||||
master = Process.pid
|
||||
if @options[:fork_worker]
|
||||
@fork_writer << "-1\n"
|
||||
end
|
||||
|
||||
diff.times do
|
||||
idx = next_worker_index
|
||||
@launcher.config.run_hooks :before_worker_fork, idx, @launcher.events
|
||||
|
||||
pid = fork { worker(idx, master) }
|
||||
if !pid
|
||||
log "! Complete inability to spawn new workers detected"
|
||||
log "! Seppuku is the only choice."
|
||||
exit! 1
|
||||
if @options[:fork_worker] && idx != 0
|
||||
@fork_writer << "#{idx}\n"
|
||||
pid = nil
|
||||
else
|
||||
pid = spawn_worker(idx, master)
|
||||
end
|
||||
|
||||
debug "Spawned worker: #{pid}"
|
||||
@workers << Worker.new(idx, pid, @phase, @options)
|
||||
|
||||
@launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
|
||||
end
|
||||
|
||||
if @options[:fork_worker] &&
|
||||
@workers.all? {|x| x.phase == @phase}
|
||||
|
||||
@fork_writer << "0\n"
|
||||
end
|
||||
end
|
||||
|
||||
def spawn_worker(idx, master)
|
||||
@launcher.config.run_hooks :before_worker_fork, idx, @launcher.events
|
||||
|
||||
pid = fork { worker(idx, master) }
|
||||
if !pid
|
||||
log "! Complete inability to spawn new workers detected"
|
||||
log "! Seppuku is the only choice."
|
||||
exit! 1
|
||||
end
|
||||
|
||||
@launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
|
||||
pid
|
||||
end
|
||||
|
||||
def cull_workers
|
||||
|
@ -213,7 +234,6 @@ module Puma
|
|||
|
||||
def wakeup!
|
||||
return unless @wakeup
|
||||
@next_check = Time.now
|
||||
|
||||
begin
|
||||
@wakeup.write "!" unless @wakeup.closed?
|
||||
|
@ -229,9 +249,14 @@ module Puma
|
|||
|
||||
Signal.trap "SIGINT", "IGNORE"
|
||||
|
||||
fork_worker = @options[:fork_worker] && index == 0
|
||||
|
||||
@workers = []
|
||||
@master_read.close
|
||||
@suicide_pipe.close
|
||||
if !@options[:fork_worker] || fork_worker
|
||||
@master_read.close
|
||||
@suicide_pipe.close
|
||||
@fork_writer.close
|
||||
end
|
||||
|
||||
Thread.new do
|
||||
Puma.set_thread_name "worker check pipe"
|
||||
|
@ -254,15 +279,45 @@ module Puma
|
|||
# things in shape before booting the app.
|
||||
@launcher.config.run_hooks :before_worker_boot, index, @launcher.events
|
||||
|
||||
server = start_server
|
||||
server = @server ||= start_server
|
||||
restart_server = Queue.new << true << false
|
||||
|
||||
if fork_worker
|
||||
restart_server.clear
|
||||
Signal.trap "SIGCHLD" do
|
||||
Process.wait(-1, Process::WNOHANG) rescue nil
|
||||
wakeup!
|
||||
end
|
||||
|
||||
Thread.new do
|
||||
Puma.set_thread_name "worker fork pipe"
|
||||
while (idx = @fork_pipe.gets)
|
||||
idx = idx.to_i
|
||||
if idx == -1 # stop server
|
||||
if restart_server.length > 0
|
||||
restart_server.clear
|
||||
server.begin_restart(true)
|
||||
@launcher.config.run_hooks :before_refork, nil, @launcher.events
|
||||
GC.compact if GC.respond_to?(:compact)
|
||||
end
|
||||
elsif idx == 0 # restart server
|
||||
restart_server << true << false
|
||||
else # fork worker
|
||||
pid = spawn_worker(idx, master)
|
||||
@worker_write << "f#{pid}:#{idx}\n" rescue nil
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Signal.trap "SIGTERM" do
|
||||
@worker_write << "e#{Process.pid}\n" rescue nil
|
||||
server.stop
|
||||
restart_server << false
|
||||
end
|
||||
|
||||
begin
|
||||
@worker_write << "b#{Process.pid}\n"
|
||||
@worker_write << "b#{Process.pid}:#{index}\n"
|
||||
rescue SystemCallError, IOError
|
||||
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
|
||||
STDERR.puts "Master seems to have exited, exiting."
|
||||
|
@ -283,7 +338,7 @@ module Puma
|
|||
end
|
||||
end
|
||||
|
||||
server.run.join
|
||||
server.run.join while restart_server.pop
|
||||
|
||||
# Invoke any worker shutdown hooks so they can prevent the worker
|
||||
# exiting until any background operations are completed
|
||||
|
@ -360,9 +415,31 @@ module Puma
|
|||
@options[:preload_app]
|
||||
end
|
||||
|
||||
def fork_worker!
|
||||
if (worker = @workers.find { |w| w.index == 0 })
|
||||
worker.phase += 1
|
||||
end
|
||||
phased_restart
|
||||
end
|
||||
|
||||
# We do this in a separate method to keep the lambda scope
|
||||
# of the signals handlers as small as possible.
|
||||
def setup_signals
|
||||
if @options[:fork_worker]
|
||||
Signal.trap "SIGURG" do
|
||||
fork_worker!
|
||||
end
|
||||
|
||||
# Auto-fork after the specified number of requests.
|
||||
if (fork_requests = @options[:fork_worker].to_i) > 0
|
||||
@launcher.events.register(:ping!) do |w|
|
||||
fork_worker! if w.index == 0 &&
|
||||
w.phase == 0 &&
|
||||
w.last_status[:requests_count] >= fork_requests
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Signal.trap "SIGCHLD" do
|
||||
wakeup!
|
||||
end
|
||||
|
@ -446,6 +523,10 @@ module Puma
|
|||
#
|
||||
@check_pipe, @suicide_pipe = Puma::Util.pipe
|
||||
|
||||
# Separate pipe used by worker 0 to receive commands to
|
||||
# fork new worker processes.
|
||||
@fork_pipe, @fork_writer = Puma::Util.pipe
|
||||
|
||||
log "Use Ctrl-C to stop"
|
||||
|
||||
redirect_io
|
||||
|
@ -484,11 +565,18 @@ module Puma
|
|||
if res
|
||||
req = read.read_nonblock(1)
|
||||
|
||||
@next_check = Time.now if req == "!"
|
||||
next if !req || req == "!"
|
||||
|
||||
result = read.gets
|
||||
pid = result.to_i
|
||||
|
||||
if req == "b" || req == "f"
|
||||
pid, idx = result.split(':').map(&:to_i)
|
||||
w = @workers.find {|x| x.index == idx}
|
||||
w.pid = pid if w.pid.nil?
|
||||
end
|
||||
|
||||
if w = @workers.find { |x| x.pid == pid }
|
||||
case req
|
||||
when "b"
|
||||
|
@ -502,6 +590,7 @@ module Puma
|
|||
w.term unless w.term?
|
||||
when "p"
|
||||
w.ping!(result.sub(/^\d+/,'').chomp)
|
||||
@launcher.events.fire(:ping!, w)
|
||||
end
|
||||
else
|
||||
log "! Out-of-sync worker list, no #{pid} worker"
|
||||
|
@ -528,6 +617,7 @@ module Puma
|
|||
# `#term` if needed
|
||||
def wait_workers
|
||||
@workers.reject! do |w|
|
||||
next false if w.pid.nil?
|
||||
begin
|
||||
if Process.wait(w.pid, Process::WNOHANG)
|
||||
true
|
||||
|
@ -536,7 +626,12 @@ module Puma
|
|||
nil
|
||||
end
|
||||
rescue Errno::ECHILD
|
||||
true # child is already terminated
|
||||
begin
|
||||
Process.kill(0, w.pid)
|
||||
false # child still alive, but has another parent
|
||||
rescue Errno::ESRCH, Errno::EPERM
|
||||
true # child is already terminated
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -11,7 +11,7 @@ require 'socket'
|
|||
module Puma
|
||||
class ControlCLI
|
||||
|
||||
COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces}
|
||||
COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces refork}
|
||||
PRINTABLE_COMMANDS = %w{gc-stats stats thread-backtraces}
|
||||
|
||||
def initialize(argv, stdout=STDOUT, stderr=STDERR)
|
||||
|
@ -239,6 +239,9 @@ module Puma
|
|||
|
||||
return
|
||||
|
||||
when "refork"
|
||||
Process.kill "SIGURG", @pid
|
||||
|
||||
else
|
||||
return
|
||||
end
|
||||
|
|
|
@ -705,5 +705,19 @@ module Puma
|
|||
end
|
||||
end
|
||||
|
||||
# When enabled, workers will be forked from worker 0 instead of from the master process.
|
||||
# This option is similar to `preload_app` because the app is preloaded before forking,
|
||||
# but it is compatible with phased restart.
|
||||
#
|
||||
# This option also enables the `refork` command (SIGURG), which optimizes copy-on-write performance
|
||||
# in a running app.
|
||||
#
|
||||
# A refork will automatically trigger once after the specified number of requests
|
||||
# (default 1000), or pass 0 to disable auto refork.
|
||||
#
|
||||
# @note Cluster mode only.
|
||||
def fork_worker(after_requests=1000)
|
||||
@options[:fork_worker] = Integer(after_requests)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -59,8 +59,6 @@ module Puma
|
|||
@app = app
|
||||
@events = events
|
||||
|
||||
@check, @notify = Puma::Util.pipe
|
||||
|
||||
@status = :stop
|
||||
|
||||
@min_threads = 0
|
||||
|
@ -258,6 +256,7 @@ module Puma
|
|||
end
|
||||
|
||||
def handle_servers
|
||||
@check, @notify = Puma::Util.pipe unless @notify
|
||||
begin
|
||||
check = @check
|
||||
sockets = [check] + @binder.ios
|
||||
|
@ -324,6 +323,8 @@ module Puma
|
|||
ensure
|
||||
@check.close unless @check.closed? # Ruby 2.2 issue
|
||||
@notify.close
|
||||
@notify = nil
|
||||
@check = nil
|
||||
end
|
||||
|
||||
@events.fire :state, :done
|
||||
|
@ -881,6 +882,7 @@ module Puma
|
|||
end
|
||||
|
||||
def notify_safely(message)
|
||||
@check, @notify = Puma::Util.pipe unless @notify
|
||||
begin
|
||||
@notify << message
|
||||
rescue IOError
|
||||
|
@ -910,8 +912,9 @@ module Puma
|
|||
@thread.join if @thread && sync
|
||||
end
|
||||
|
||||
def begin_restart
|
||||
def begin_restart(sync=false)
|
||||
notify_safely(RESTART_COMMAND)
|
||||
@thread.join if @thread && sync
|
||||
end
|
||||
|
||||
def fast_write(io, str)
|
||||
|
|
|
@ -75,6 +75,11 @@ class TestIntegrationCluster < TestIntegration
|
|||
usr1_all_respond unix: false
|
||||
end
|
||||
|
||||
def test_usr1_fork_worker
|
||||
skip_unless_signal_exist? :USR1
|
||||
usr1_all_respond config: '--fork-worker'
|
||||
end
|
||||
|
||||
def test_usr1_all_respond_unix
|
||||
skip_unless_signal_exist? :USR1
|
||||
usr1_all_respond unix: true
|
||||
|
@ -239,8 +244,8 @@ RUBY
|
|||
|
||||
# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
|
||||
# All should be responded to, and at least three workers should be used
|
||||
def usr1_all_respond(unix: false)
|
||||
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix
|
||||
def usr1_all_respond(unix: false, config: '')
|
||||
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
|
||||
threads = []
|
||||
replies = []
|
||||
mutex = Mutex.new
|
||||
|
|
Loading…
Reference in a new issue