mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Improvements to out_of_band hook (#2234)
* Improve OOB hook This moves the hook to be executed outside of busy loop considering the thread to be truly idle at a time of execution of the hook * Further improvements to out-of-band hook * Don't trigger OOB on partially-queued requests * Don't accept new connections during OOB * Move tests to `TestOutOfBandServer` Co-authored-by: Kamil Trzcinski <ayufan@ayufan.eu> Co-authored-by: Nate Berkopec <nate.berkopec@gmail.com>
This commit is contained in:
parent
e5f1655e47
commit
774c460e60
5 changed files with 185 additions and 89 deletions
|
@ -39,6 +39,7 @@
|
|||
* Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177)
|
||||
* Fix ThreadPool#shutdown timeout accuracy (#2221)
|
||||
* Fix `UserFileDefaultOptions#fetch` to properly use `default` (#2233)
|
||||
* Improvements to `out_of_band` hook (#2234)
|
||||
* Prefer the rackup file specified by the CLI (#2225)
|
||||
|
||||
* Refactor
|
||||
|
|
|
@ -224,8 +224,11 @@ module Puma
|
|||
@reactor.add client
|
||||
end
|
||||
end
|
||||
|
||||
process_now
|
||||
end
|
||||
|
||||
@thread_pool.out_of_band_hook = @options[:out_of_band]
|
||||
@thread_pool.clean_thread_locals = @options[:clean_thread_locals]
|
||||
|
||||
if @queue_requests
|
||||
|
@ -279,6 +282,7 @@ module Puma
|
|||
break if handle_check
|
||||
else
|
||||
begin
|
||||
pool.wait_until_not_full
|
||||
if io = sock.accept_nonblock
|
||||
client = Client.new io, @binder.env(sock)
|
||||
if remote_addr_value
|
||||
|
@ -288,7 +292,6 @@ module Puma
|
|||
end
|
||||
|
||||
pool << client
|
||||
pool.wait_until_not_full
|
||||
end
|
||||
rescue SystemCallError
|
||||
# nothing
|
||||
|
@ -436,12 +439,6 @@ module Puma
|
|||
rescue StandardError => e
|
||||
@events.unknown_error self, e, "Client"
|
||||
end
|
||||
|
||||
if @options[:out_of_band]
|
||||
@thread_pool.with_mutex do
|
||||
@options[:out_of_band].each(&:call) if @thread_pool.busy_threads == 1
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ module Puma
|
|||
@shutdown = false
|
||||
|
||||
@trim_requested = 0
|
||||
@out_of_band_pending = false
|
||||
|
||||
@workers = []
|
||||
|
||||
|
@ -65,6 +66,7 @@ module Puma
|
|||
|
||||
attr_reader :spawned, :trim_requested, :waiting
|
||||
attr_accessor :clean_thread_locals
|
||||
attr_accessor :out_of_band_hook
|
||||
|
||||
def self.clean_thread_locals
|
||||
Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods
|
||||
|
@ -116,6 +118,9 @@ module Puma
|
|||
end
|
||||
|
||||
@waiting += 1
|
||||
if @out_of_band_pending && trigger_out_of_band_hook
|
||||
@out_of_band_pending = false
|
||||
end
|
||||
not_full.signal
|
||||
not_empty.wait mutex
|
||||
@waiting -= 1
|
||||
|
@ -129,7 +134,7 @@ module Puma
|
|||
end
|
||||
|
||||
begin
|
||||
block.call(work, *extra)
|
||||
@out_of_band_pending = true if block.call(work, *extra)
|
||||
rescue Exception => e
|
||||
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
|
||||
end
|
||||
|
@ -143,6 +148,21 @@ module Puma
|
|||
|
||||
private :spawn_thread
|
||||
|
||||
def trigger_out_of_band_hook
|
||||
return false unless out_of_band_hook && out_of_band_hook.any?
|
||||
|
||||
# we execute on idle hook when all threads are free
|
||||
return false unless @spawned == @waiting
|
||||
|
||||
out_of_band_hook.each(&:call)
|
||||
true
|
||||
rescue Exception => e
|
||||
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
|
||||
true
|
||||
end
|
||||
|
||||
private :trigger_out_of_band_hook
|
||||
|
||||
def with_mutex(&block)
|
||||
@mutex.owned? ?
|
||||
yield :
|
||||
|
|
159
test/test_out_of_band_server.rb
Normal file
159
test/test_out_of_band_server.rb
Normal file
|
@ -0,0 +1,159 @@
|
|||
require_relative "helper"
|
||||
require "puma/events"
|
||||
|
||||
class TestOutOfBandServer < Minitest::Test
|
||||
parallelize_me!
|
||||
|
||||
def setup
|
||||
@ios = []
|
||||
@server = nil
|
||||
@oob_finished = ConditionVariable.new
|
||||
@app_finished = ConditionVariable.new
|
||||
end
|
||||
|
||||
def teardown
|
||||
@oob_finished.broadcast
|
||||
@app_finished.broadcast
|
||||
@server.stop(true) if @server
|
||||
@ios.each {|i| i.close unless i.closed?}
|
||||
end
|
||||
|
||||
def new_connection
|
||||
TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s}
|
||||
rescue IOError
|
||||
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
|
||||
retry
|
||||
end
|
||||
|
||||
def send_http(req)
|
||||
new_connection << req
|
||||
end
|
||||
|
||||
def send_http_and_read(req)
|
||||
send_http(req).read
|
||||
end
|
||||
|
||||
def oob_server(**options)
|
||||
@request_count = 0
|
||||
@oob_count = 0
|
||||
in_oob = Mutex.new
|
||||
@mutex = Mutex.new
|
||||
oob_wait = options.delete(:oob_wait)
|
||||
oob = -> do
|
||||
in_oob.synchronize do
|
||||
@mutex.synchronize do
|
||||
@oob_count += 1
|
||||
@oob_finished.signal
|
||||
@oob_finished.wait(@mutex, 1) if oob_wait
|
||||
end
|
||||
end
|
||||
end
|
||||
app_wait = options.delete(:app_wait)
|
||||
app = ->(_) do
|
||||
raise 'OOB conflict' if in_oob.locked?
|
||||
@mutex.synchronize do
|
||||
@request_count += 1
|
||||
@app_finished.signal
|
||||
@app_finished.wait(@mutex, 1) if app_wait
|
||||
end
|
||||
[200, {}, [""]]
|
||||
end
|
||||
|
||||
@server = Puma::Server.new app, Puma::Events.strings, out_of_band: [oob], **options
|
||||
@server.min_threads = options[:min_threads] || 1
|
||||
@server.max_threads = options[:max_threads] || 1
|
||||
@server.add_tcp_listener '127.0.0.1', 0
|
||||
@server.run
|
||||
end
|
||||
|
||||
# Sequential requests should trigger out_of_band after every request.
|
||||
def test_sequential
|
||||
n = 100
|
||||
oob_server
|
||||
n.times do
|
||||
@mutex.synchronize do
|
||||
send_http "GET / HTTP/1.0\r\n\r\n"
|
||||
@oob_finished.wait(@mutex, 1)
|
||||
end
|
||||
end
|
||||
assert_equal n, @request_count
|
||||
assert_equal n, @oob_count
|
||||
end
|
||||
|
||||
# Stream of requests on concurrent connections should trigger
|
||||
# out_of_band hooks only once after the final request.
|
||||
def test_stream
|
||||
oob_server app_wait: true, max_threads: 2
|
||||
n = 100
|
||||
Array.new(n) {send_http("GET / HTTP/1.0\r\n\r\n")}
|
||||
Thread.pass until @request_count == n
|
||||
@mutex.synchronize do
|
||||
@app_finished.signal
|
||||
@oob_finished.wait(@mutex, 1)
|
||||
end
|
||||
assert_equal n, @request_count
|
||||
assert_equal 1, @oob_count
|
||||
end
|
||||
|
||||
# New requests should not get processed while OOB is running.
|
||||
def test_request_overlapping_hook
|
||||
oob_server oob_wait: true, max_threads: 2
|
||||
|
||||
# Establish connection for Req2 before OOB
|
||||
req2 = new_connection
|
||||
sleep 0.01
|
||||
|
||||
@mutex.synchronize do
|
||||
send_http "GET / HTTP/1.0\r\n\r\n"
|
||||
@oob_finished.wait(@mutex) # enter OOB
|
||||
|
||||
# Send Req2
|
||||
req2 << "GET / HTTP/1.0\r\n\r\n"
|
||||
# If Req2 is processed now it raises 'OOB Conflict' in the response.
|
||||
sleep 0.01
|
||||
|
||||
@oob_finished.signal # exit OOB
|
||||
# Req2 should be processed now.
|
||||
@oob_finished.wait(@mutex, 1) # enter OOB
|
||||
@oob_finished.signal # exit OOB
|
||||
end
|
||||
|
||||
refute_match(/OOB conflict/, req2.read)
|
||||
end
|
||||
|
||||
# Partial requests should not trigger OOB.
|
||||
def test_partial_request
|
||||
oob_server
|
||||
new_connection.close
|
||||
sleep 0.01
|
||||
assert_equal 0, @oob_count
|
||||
end
|
||||
|
||||
# OOB should be triggered following a completed request
|
||||
# concurrent with other partial requests.
|
||||
def test_partial_concurrent
|
||||
oob_server max_threads: 2
|
||||
@mutex.synchronize do
|
||||
send_http("GET / HTTP/1.0\r\n\r\n")
|
||||
100.times {new_connection.close}
|
||||
@oob_finished.wait(@mutex, 1)
|
||||
end
|
||||
assert_equal 1, @oob_count
|
||||
end
|
||||
|
||||
# OOB should block new connections from being accepted.
|
||||
def test_blocks_new_connection
|
||||
oob_server oob_wait: true, max_threads: 2
|
||||
@mutex.synchronize do
|
||||
send_http("GET / HTTP/1.0\r\n\r\n")
|
||||
@oob_finished.wait(@mutex)
|
||||
end
|
||||
accepted = false
|
||||
io = @server.binder.ios.last
|
||||
io.stub(:accept_nonblock, -> {accepted = true; new_connection}) do
|
||||
new_connection.close
|
||||
sleep 0.01
|
||||
end
|
||||
refute accepted, 'New connection accepted during out of band'
|
||||
end
|
||||
end
|
|
@ -984,85 +984,4 @@ EOF
|
|||
assert_equal ["HTTP/1.0 200 OK", "Content-Length: 0"], header(sock)
|
||||
sock.close
|
||||
end
|
||||
|
||||
def oob_server(**options)
|
||||
@request_count = 0
|
||||
@oob_count = 0
|
||||
@start = Time.now
|
||||
in_oob = Mutex.new
|
||||
@mutex = Mutex.new
|
||||
@oob_finished = ConditionVariable.new
|
||||
oob_wait = options.delete(:oob_wait)
|
||||
oob = -> do
|
||||
in_oob.synchronize do
|
||||
@mutex.synchronize do
|
||||
@oob_count += 1
|
||||
@oob_finished.signal
|
||||
@oob_finished.wait(@mutex, 1) if oob_wait
|
||||
end
|
||||
end
|
||||
end
|
||||
@server = Puma::Server.new @app, @events, out_of_band: [oob], **options
|
||||
@server.min_threads = 5
|
||||
@server.max_threads = 5
|
||||
server_run app: ->(_) do
|
||||
raise 'OOB conflict' if in_oob.locked?
|
||||
@mutex.synchronize {@request_count += 1}
|
||||
[200, {}, [""]]
|
||||
end
|
||||
end
|
||||
|
||||
# Sequential requests should trigger out_of_band hooks after every request.
|
||||
def test_out_of_band
|
||||
n = 100
|
||||
oob_server queue_requests: false
|
||||
n.times do
|
||||
@mutex.synchronize do
|
||||
send_http "GET / HTTP/1.0\r\n\r\n"
|
||||
@oob_finished.wait(@mutex, 1)
|
||||
end
|
||||
end
|
||||
assert_equal n, @request_count
|
||||
assert_equal n, @oob_count
|
||||
end
|
||||
|
||||
# Streaming requests on parallel connections without delay should trigger
|
||||
# out_of_band hooks only once after the final request.
|
||||
def test_out_of_band_stream
|
||||
n = 100
|
||||
threads = 10
|
||||
oob_server
|
||||
req = "GET / HTTP/1.1\r\n"
|
||||
@mutex.synchronize do
|
||||
Array.new(threads) do
|
||||
Thread.new do
|
||||
send_http "#{req}\r\n" * (n/threads-1) + "#{req}Connection: close\r\n\r\n"
|
||||
end
|
||||
end.each(&:join)
|
||||
@oob_finished.wait(@mutex, 1)
|
||||
end
|
||||
assert_equal n, @request_count
|
||||
assert_equal 1, @oob_count
|
||||
end
|
||||
|
||||
def test_out_of_band_overlapping_requests
|
||||
oob_server oob_wait: true
|
||||
sock = nil
|
||||
sock2 = nil
|
||||
@mutex.synchronize do
|
||||
sock2 = send_http "GET / HTTP/1.0\r\n"
|
||||
sleep 0.01
|
||||
sock = send_http "GET / HTTP/1.0\r\n\r\n"
|
||||
# Request 1 processed
|
||||
@oob_finished.wait(@mutex) # enter oob
|
||||
sock2 << "\r\n"
|
||||
sleep 0.01
|
||||
@oob_finished.signal # exit oob
|
||||
# Request 2 processed
|
||||
@oob_finished.wait(@mutex) # enter oob
|
||||
@oob_finished.signal # exit oob
|
||||
end
|
||||
assert_match(/200/, sock.read)
|
||||
assert_match(/200/, sock2.read)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue