diff --git a/History.md b/History.md index 525ae45c..1e7b58d3 100644 --- a/History.md +++ b/History.md @@ -31,6 +31,7 @@ * Set `Connection: closed` header when queue requests is disabled (#2216) * Pass queued requests to thread pool on server shutdown (#2122) * Fixed a few minor concurrency bugs in ThreadPool that may have affected non-GVL Rubies (#2220) + * Fix `out_of_band` hook never executed if the number of worker threads is > 1 (#2177) * Refactor * Remove unused loader argument from Plugin initializer (#2095) diff --git a/lib/puma/server.rb b/lib/puma/server.rb index f590d528..b5269f0d 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -288,10 +288,7 @@ module Puma end pool << client - busy_threads = pool.wait_until_not_full - if busy_threads == 0 - @options[:out_of_band].each(&:call) if @options[:out_of_band] - end + pool.wait_until_not_full end rescue SystemCallError # nothing @@ -439,6 +436,12 @@ module Puma rescue StandardError => e @events.unknown_error self, e, "Client" end + + if @options[:out_of_band] + @thread_pool.with_mutex do + @options[:out_of_band].each(&:call) if @thread_pool.busy_threads == 1 + end + end end end diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index c0d9c543..54b1407a 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -82,6 +82,10 @@ module Puma waiting + (@max - spawned) end + def busy_threads + with_mutex { @spawned - @waiting + @todo.size } + end + # :nodoc: # # Must be called with @mutex held! @@ -188,9 +192,6 @@ module Puma # method would not block and another request would be added into the reactor # by the server. This would continue until a fully bufferend request # makes it through the reactor and can then be processed by the thread pool. - # - # Returns the current number of busy threads, or +nil+ if shutting down. - # def wait_until_not_full with_mutex do while true @@ -200,8 +201,7 @@ module Puma # is work queued that cannot be handled by waiting # threads, then accept more work until we would # spin up the max number of threads. - busy_threads = @spawned - @waiting + @todo.size - return busy_threads if @max > busy_threads + return if busy_threads < @max @not_full.wait @mutex end diff --git a/test/test_puma_server.rb b/test/test_puma_server.rb index b97c6196..f392ae32 100644 --- a/test/test_puma_server.rb +++ b/test/test_puma_server.rb @@ -984,4 +984,85 @@ EOF assert_equal ["HTTP/1.0 200 OK", "Content-Length: 0"], header(sock) sock.close end + + def oob_server(**options) + @request_count = 0 + @oob_count = 0 + @start = Time.now + in_oob = Mutex.new + @mutex = Mutex.new + @oob_finished = ConditionVariable.new + oob_wait = options.delete(:oob_wait) + oob = -> do + in_oob.synchronize do + @mutex.synchronize do + @oob_count += 1 + @oob_finished.signal + @oob_finished.wait(@mutex, 1) if oob_wait + end + end + end + @server = Puma::Server.new @app, @events, out_of_band: [oob], **options + @server.min_threads = 5 + @server.max_threads = 5 + server_run app: ->(_) do + raise 'OOB conflict' if in_oob.locked? + @mutex.synchronize {@request_count += 1} + [200, {}, [""]] + end + end + + # Sequential requests should trigger out_of_band hooks after every request. + def test_out_of_band + n = 100 + oob_server queue_requests: false + n.times do + @mutex.synchronize do + send_http "GET / HTTP/1.0\r\n\r\n" + @oob_finished.wait(@mutex, 1) + end + end + assert_equal n, @request_count + assert_equal n, @oob_count + end + + # Streaming requests on parallel connections without delay should trigger + # out_of_band hooks only once after the final request. + def test_out_of_band_stream + n = 100 + threads = 10 + oob_server + req = "GET / HTTP/1.1\r\n" + @mutex.synchronize do + Array.new(threads) do + Thread.new do + send_http "#{req}\r\n" * (n/threads-1) + "#{req}Connection: close\r\n\r\n" + end + end.each(&:join) + @oob_finished.wait(@mutex, 1) + end + assert_equal n, @request_count + assert_equal 1, @oob_count + end + + def test_out_of_band_overlapping_requests + oob_server oob_wait: true + sock = nil + sock2 = nil + @mutex.synchronize do + sock2 = send_http "GET / HTTP/1.0\r\n" + sleep 0.01 + sock = send_http "GET / HTTP/1.0\r\n\r\n" + # Request 1 processed + @oob_finished.wait(@mutex) # enter oob + sock2 << "\r\n" + sleep 0.01 + @oob_finished.signal # exit oob + # Request 2 processed + @oob_finished.wait(@mutex) # enter oob + @oob_finished.signal # exit oob + end + assert_match(/200/, sock.read) + assert_match(/200/, sock2.read) + end end