mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Better error handling during force shutdown (#2271)
Only allow `ForceShutdown` to be raised in a thread during specific areas of the connection-processing cycle (marked by `with_force_shutdown` blocks), to ensure that the raised error is always rescued and handled cleanly. Fixes an issue where the `force_shutdown_after: 0` option throws uncaught exceptions from the threadpool on shutdown.
This commit is contained in:
parent
7d4df2931a
commit
f4c59a0d22
5 changed files with 76 additions and 46 deletions
|
@ -5,6 +5,7 @@
|
|||
|
||||
* Bugfixes
|
||||
* Prevent connections from entering Reactor after shutdown begins (#2377)
|
||||
* Better error handling during force shutdown (#2271)
|
||||
|
||||
* Refactor
|
||||
* Change Events#ssl_error signature from (error, peeraddr, peercert) to (error, ssl_socket) (#2375)
|
||||
|
|
|
@ -232,7 +232,9 @@ module Puma
|
|||
if @queue_requests
|
||||
process_now = client.eagerly_finish
|
||||
else
|
||||
client.finish(@first_data_timeout)
|
||||
@thread_pool.with_force_shutdown do
|
||||
client.finish(@first_data_timeout)
|
||||
end
|
||||
process_now = true
|
||||
end
|
||||
rescue MiniSSL::SSLError => e
|
||||
|
@ -244,7 +246,7 @@ module Puma
|
|||
client.close
|
||||
|
||||
@events.parse_error e, client
|
||||
rescue ConnectionError, EOFError => e
|
||||
rescue ConnectionError, EOFError, ThreadPool::ForceShutdown => e
|
||||
client.close
|
||||
|
||||
@events.connection_error e, client
|
||||
|
@ -422,7 +424,11 @@ module Puma
|
|||
check_for_more_data = false
|
||||
end
|
||||
|
||||
unless client.reset(check_for_more_data)
|
||||
next_request_ready = @thread_pool.with_force_shutdown do
|
||||
client.reset(check_for_more_data)
|
||||
end
|
||||
|
||||
unless next_request_ready
|
||||
@shutdown_mutex.synchronize do
|
||||
return unless @queue_requests
|
||||
close_socket = false
|
||||
|
@ -435,7 +441,7 @@ module Puma
|
|||
end
|
||||
|
||||
# The client disconnected while we were reading data
|
||||
rescue ConnectionError
|
||||
rescue ConnectionError, ThreadPool::ForceShutdown
|
||||
# Swallow them. The ensure tries to close +client+ down
|
||||
|
||||
# SSL handshake error
|
||||
|
@ -638,7 +644,9 @@ module Puma
|
|||
|
||||
begin
|
||||
begin
|
||||
status, headers, res_body = @app.call(env)
|
||||
status, headers, res_body = @thread_pool.with_force_shutdown do
|
||||
@app.call(env)
|
||||
end
|
||||
|
||||
return :async if req.hijacked
|
||||
|
||||
|
@ -936,7 +944,7 @@ module Puma
|
|||
|
||||
if @thread_pool
|
||||
if timeout = @options[:force_shutdown_after]
|
||||
@thread_pool.shutdown timeout.to_i
|
||||
@thread_pool.shutdown timeout.to_f
|
||||
else
|
||||
@thread_pool.shutdown
|
||||
end
|
||||
|
|
|
@ -62,6 +62,8 @@ module Puma
|
|||
end
|
||||
|
||||
@clean_thread_locals = false
|
||||
@force_shutdown = false
|
||||
@shutdown_mutex = Mutex.new
|
||||
end
|
||||
|
||||
attr_reader :spawned, :trim_requested, :waiting
|
||||
|
@ -322,6 +324,19 @@ module Puma
|
|||
@reaper.start!
|
||||
end
|
||||
|
||||
# Allows ThreadPool::ForceShutdown to be raised within the
|
||||
# provided block if the thread is forced to shutdown during execution.
|
||||
def with_force_shutdown
|
||||
t = Thread.current
|
||||
@shutdown_mutex.synchronize do
|
||||
raise ForceShutdown if @force_shutdown
|
||||
t[:with_force_shutdown] = true
|
||||
end
|
||||
yield
|
||||
ensure
|
||||
t[:with_force_shutdown] = false
|
||||
end
|
||||
|
||||
# Tell all threads in the pool to exit and wait for them to finish.
|
||||
# Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads.
|
||||
# Next, wait an extra +grace+ seconds then force-kill remaining threads.
|
||||
|
@ -356,8 +371,11 @@ module Puma
|
|||
join.call(timeout)
|
||||
|
||||
# If threads are still running, raise ForceShutdown and wait to finish.
|
||||
threads.each do |t|
|
||||
t.raise ForceShutdown
|
||||
@shutdown_mutex.synchronize do
|
||||
@force_shutdown = true
|
||||
threads.each do |t|
|
||||
t.raise ForceShutdown if t[:with_force_shutdown]
|
||||
end
|
||||
end
|
||||
join.call(SHUTDOWN_GRACE_TIME)
|
||||
|
||||
|
|
|
@ -949,27 +949,37 @@ EOF
|
|||
end
|
||||
|
||||
# Perform a server shutdown while requests are pending (one in app-server response, one still sending client request).
|
||||
def shutdown_requests(app_delay: 2, request_delay: 1, post: false, response:, **options)
|
||||
def shutdown_requests(s1_complete: true, s1_response: nil, post: false, s2_response: nil, **options)
|
||||
@server = Puma::Server.new @app, @events, options
|
||||
server_run app: ->(_) {
|
||||
sleep app_delay
|
||||
mutex = Mutex.new
|
||||
app_finished = ConditionVariable.new
|
||||
server_run app: ->(env) {
|
||||
path = env['REQUEST_PATH']
|
||||
mutex.synchronize do
|
||||
app_finished.signal
|
||||
app_finished.wait(mutex) if path == '/s1'
|
||||
end
|
||||
[204, {}, []]
|
||||
}
|
||||
|
||||
s1 = send_http "GET / HTTP/1.1\r\n\r\n"
|
||||
s1 = nil
|
||||
s2 = send_http post ?
|
||||
"POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
|
||||
"GET / HTTP/1.1\r\n"
|
||||
sleep 0.1
|
||||
|
||||
"POST /s2 HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" :
|
||||
"GET /s2 HTTP/1.1\r\n"
|
||||
mutex.synchronize do
|
||||
s1 = send_http "GET /s1 HTTP/1.1\r\n\r\n"
|
||||
app_finished.wait(mutex)
|
||||
app_finished.signal if s1_complete
|
||||
end
|
||||
@server.stop
|
||||
sleep request_delay
|
||||
Thread.pass until @server.instance_variable_get(:@thread_pool).instance_variable_get(:@shutdown)
|
||||
|
||||
s2 << "\r\n"
|
||||
assert_match(s1_response, s1.gets) if s1_response
|
||||
|
||||
assert_match(/204/, s1.gets)
|
||||
# Send s2 after shutdown begins
|
||||
s2 << "\r\n" unless IO.select([s2], nil, nil, 0.1)
|
||||
|
||||
assert IO.select([s2], nil, nil, app_delay), 'timeout waiting for response'
|
||||
assert IO.select([s2], nil, nil, 10), 'timeout waiting for response'
|
||||
s2_result = begin
|
||||
s2.gets
|
||||
rescue Errno::ECONNABORTED, Errno::ECONNRESET
|
||||
|
@ -977,38 +987,27 @@ EOF
|
|||
post ? '408' : nil
|
||||
end
|
||||
|
||||
if response
|
||||
assert_match response, s2_result
|
||||
if s2_response
|
||||
assert_match s2_response, s2_result
|
||||
else
|
||||
assert_nil s2_result
|
||||
end
|
||||
end
|
||||
|
||||
# Shutdown should allow pending requests to complete.
|
||||
# Shutdown should allow pending requests and app-responses to complete.
|
||||
def test_shutdown_requests
|
||||
shutdown_requests response: /204/
|
||||
shutdown_requests response: /204/, queue_requests: false
|
||||
end
|
||||
|
||||
# Requests stuck longer than `first_data_timeout` should have connection closed (408 w/pending POST body).
|
||||
def test_shutdown_data_timeout
|
||||
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil
|
||||
shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil, queue_requests: false
|
||||
shutdown_requests request_delay: 3, first_data_timeout: 2, response: /408/, post: true
|
||||
opts = {s1_response: /204/, s2_response: /204/}
|
||||
shutdown_requests(**opts)
|
||||
shutdown_requests(**opts, queue_requests: false)
|
||||
end
|
||||
|
||||
# Requests still pending after `force_shutdown_after` should have connection closed (408 w/pending POST body).
|
||||
# App-responses still pending should return 503 (uncaught Puma::ThreadPool::ForceShutdown exception).
|
||||
def test_force_shutdown
|
||||
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3
|
||||
shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3, queue_requests: false
|
||||
shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 3, post: true
|
||||
end
|
||||
|
||||
# App-responses still pending during `force_shutdown_after` should return 503
|
||||
# (uncaught Puma::ThreadPool::ForceShutdown exception).
|
||||
def test_force_shutdown_app
|
||||
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3
|
||||
shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3, queue_requests: false
|
||||
opts = {s1_complete: false, s1_response: /503/, s2_response: nil, force_shutdown_after: 0}
|
||||
shutdown_requests(**opts)
|
||||
shutdown_requests(**opts, queue_requests: false)
|
||||
shutdown_requests(**opts, post: true, s2_response: /408/)
|
||||
end
|
||||
|
||||
def test_http11_connection_header_queue
|
||||
|
|
|
@ -222,8 +222,10 @@ class TestThreadPool < Minitest::Test
|
|||
|
||||
pool = mutex_pool(0, 1) do
|
||||
begin
|
||||
pool.signal
|
||||
sleep
|
||||
pool.with_force_shutdown do
|
||||
pool.signal
|
||||
sleep
|
||||
end
|
||||
rescue Puma::ThreadPool::ForceShutdown
|
||||
rescued = true
|
||||
end
|
||||
|
@ -248,8 +250,10 @@ class TestThreadPool < Minitest::Test
|
|||
rescued = []
|
||||
pool = mutex_pool(2, 2) do
|
||||
begin
|
||||
pool.signal
|
||||
sleep
|
||||
pool.with_force_shutdown do
|
||||
pool.signal
|
||||
sleep
|
||||
end
|
||||
rescue Puma::ThreadPool::ForceShutdown
|
||||
rescued << Thread.current
|
||||
sleep
|
||||
|
|
Loading…
Add table
Reference in a new issue