Move threadpool init out of server (#2942)

* Simplify `ThreadPool` initializations code in `Puma::Server`

This commit introduces four `ThreadPool` options in `Configuration::DEFAULTS`:
`auto_trim_time`, `reaping_time`, `clean_thread_locals`, and
`out_of_band_hook` they could be configured via file/user options
`Puma::Configuration`.

The auto reap/trim methods stay in `Puma::Server` because the way we test
in `Puma::Server` tests.

Adds `slice(keys)` method to `UserFileDefaultOptions` so it acts
like a Hash and we could read `ThreadPool` options from the user-file-default options.

Adds missing require statement to `test/test_puma_server.rb`, so this test
could run individually.

Co-Authored-By: Shohei Umemoto <cafedomancer@gmail.com>
Co-Authored-By: Nate Berkopec <nate.berkopec@gmail.com>

* Fix out of band

* Fixup lib files

* Fixup tests

Co-authored-by: Juanito Fatas <me@juanitofatas.com>
Co-authored-by: Shohei Umemoto <cafedomancer@gmail.com>
Co-authored-by: MSP-Greg <Greg.mpls@gmail.com>
This commit is contained in:
Nate Berkopec 2022-09-15 10:25:39 +09:00 committed by GitHub
parent 5199ff293a
commit 317e890351
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 39 additions and 38 deletions

View File

@ -126,7 +126,9 @@ module Puma
# configuration files.
class Configuration
DEFAULTS = {
auto_trim_time: 30,
binds: ['tcp://0.0.0.0:9292'.freeze],
clean_thread_locals: false,
debug: false,
early_hints: nil,
environment: 'development'.freeze,
@ -145,11 +147,13 @@ module Puma
min_threads: 0,
mode: :http,
mutate_stdout_and_stderr_to_sync_on_write: true,
out_of_band: [],
# Number of seconds for another request within a persistent session.
persistent_timeout: 20,
queue_requests: true,
rackup: 'config.ru'.freeze,
raise_exception_on_sigterm: true,
reaping_time: 1,
remote_address: :socket,
silence_single_worker_warning: false,
tag: File.basename(Dir.getwd),

View File

@ -31,7 +31,6 @@ module Puma
#
# Each `Puma::Server` will have one reactor and one thread pool.
class Server
include Puma::Const
include Request
extend Forwardable
@ -81,9 +80,6 @@ module Puma
@check, @notify = nil
@status = :stop
@auto_trim_time = 30
@reaping_time = 1
@thread = nil
@thread_pool = nil
@ -235,29 +231,16 @@ module Puma
@status = :run
@thread_pool = ThreadPool.new(
thread_name,
@min_threads,
@max_threads,
::Puma::IOBuffer,
&method(:process_client)
)
@thread_pool.out_of_band_hook = @options[:out_of_band]
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
@thread_pool = ThreadPool.new(thread_name, @options, &method(:process_client))
if @queue_requests
@reactor = Reactor.new(@io_selector_backend, &method(:reactor_wakeup))
@reactor.run
end
if @reaping_time
@thread_pool.auto_reap!(@reaping_time)
end
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
@thread_pool.auto_reap! if @options[:reaping_time]
@thread_pool.auto_trim! if @options[:auto_trim_time]
@check, @notify = Puma::Util.pipe unless @notify

View File

@ -2,6 +2,8 @@
require 'thread'
require 'puma/io_buffer'
module Puma
# Internal Docs for A simple thread pool management object.
#
@ -29,7 +31,7 @@ module Puma
# The block passed is the work that will be performed in each
# thread.
#
def initialize(name, min, max, *extra, &block)
def initialize(name, options = {}, &block)
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
@mutex = Mutex.new
@ -40,10 +42,14 @@ module Puma
@waiting = 0
@name = name
@min = Integer(min)
@max = Integer(max)
@min = Integer(options[:min_threads])
@max = Integer(options[:max_threads])
@block = block
@extra = extra
@extra = [::Puma::IOBuffer]
@out_of_band = options[:out_of_band]
@clean_thread_locals = options[:clean_thread_locals]
@reaping_time = options[:reaping_time]
@auto_trim_time = options[:auto_trim_time]
@shutdown = false
@ -62,14 +68,11 @@ module Puma
end
end
@clean_thread_locals = false
@force_shutdown = false
@shutdown_mutex = Mutex.new
end
attr_reader :spawned, :trim_requested, :waiting
attr_accessor :clean_thread_locals
attr_accessor :out_of_band_hook # @version 5.0.0
def self.clean_thread_locals
Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods
@ -160,12 +163,12 @@ module Puma
# @version 5.0.0
def trigger_out_of_band_hook
return false unless out_of_band_hook && out_of_band_hook.any?
return false unless @out_of_band && @out_of_band.any?
# we execute on idle hook when all threads are free
return false unless @spawned == @waiting
out_of_band_hook.each(&:call)
@out_of_band.each(&:call)
true
rescue Exception => e
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
@ -319,12 +322,12 @@ module Puma
end
end
def auto_trim!(timeout=30)
def auto_trim!(timeout=@auto_trim_time)
@auto_trim = Automaton.new(self, timeout, "#{@name} threadpool trimmer", :trim)
@auto_trim.start!
end
def auto_reap!(timeout=5)
def auto_reap!(timeout=@reaping_time)
@reaper = Automaton.new(self, timeout, "#{@name} threadpool reaper", :reap)
@reaper.start!
end

View File

@ -53,9 +53,10 @@ class TestBusyWorker < Minitest::Test
end
end
options[:min_threads] ||= 0
options[:max_threads] ||= 10
@server = Puma::Server.new request_handler, Puma::LogWriter.strings, Puma::Events.new, **options
@server.min_threads = options[:min_threads] || 0
@server.max_threads = options[:max_threads] || 10
@port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1]
@server.run
end

View File

@ -67,9 +67,10 @@ class TestOutOfBandServer < Minitest::Test
[200, {}, [""]]
end
options[:min_threads] ||= 1
options[:max_threads] ||= 1
@server = Puma::Server.new app, Puma::LogWriter.strings, Puma::Events.new, out_of_band: [oob], **options
@server.min_threads = options[:min_threads] || 1
@server.max_threads = options[:max_threads] || 1
@port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1]
@server.run
sleep 0.15 if Puma.jruby?

View File

@ -1,5 +1,6 @@
require_relative "helper"
require "puma/events"
require "puma/server"
require "net/http"
require "nio"
require "ipaddr"

View File

@ -10,12 +10,20 @@ class TestThreadPool < Minitest::Test
def new_pool(min, max, &block)
block = proc { } unless block
@pool = Puma::ThreadPool.new('tst', min, max, &block)
options = {
min_threads: min,
max_threads: max
}
@pool = Puma::ThreadPool.new('tst', options, &block)
end
def mutex_pool(min, max, &block)
block = proc { } unless block
@pool = MutexPool.new('tst', min, max, &block)
options = {
min_threads: min,
max_threads: max
}
@pool = MutexPool.new('tst', options, &block)
end
# Wraps ThreadPool work in mutex for better concurrency control.
@ -184,7 +192,7 @@ class TestThreadPool < Minitest::Test
Thread.current[:foo] = :hai
}
pool.clean_thread_locals = true
pool.instance_variable_set :@clean_thread_locals, true
pool << [1] * n