1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00

Inject small delay to improve requests distribution

Ruby MRI when used can at most process a single thread concurrently due to GVL. This results in a over-utilisation if unfavourable distribution of connections is happening.

This tries to prefer less-busy workers (ie. faster to accept
the connection) to improve workers utilisation.
This commit is contained in:
Kamil Trzciński 2019-11-20 15:39:37 +01:00 committed by Kamil Trzcinski
parent 774c460e60
commit 7af9807778
8 changed files with 261 additions and 0 deletions

View file

@ -10,6 +10,7 @@
* Increases maximum URI path length from 2048 to 8196 bytes (#2167)
* Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203)
* Faster phased restart and worker timeout (#2121)
* Inject small delay for busy workers to improve requests distribution (#2079)
* Deprecations, Removals and Breaking API Changes
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)

102
benchmarks/wrk/cpu_spin.sh Executable file
View file

@ -0,0 +1,102 @@
#!/bin/bash
set -eo pipefail
ITERATIONS=400000
HOST=127.0.0.1:9292
URL="http://$HOST/cpu/$ITERATIONS"
MIN_WORKERS=1
MAX_WORKERS=4
MIN_THREADS=4
MAX_THREADS=4
DURATION=2
MIN_CONCURRENT=1
MAX_CONCURRENT=8
retry() {
local tries="$1"
local sleep="$2"
shift 2
for i in $(seq 1 $tries); do
if eval "$@"; then
return 0
fi
sleep "$sleep"
done
return 1
}
ms() {
VALUE=$(cat)
FRAC=${VALUE%%[ums]*}
case "$VALUE" in
*us)
echo "scale=1; ${FRAC}/1000" | bc
;;
*ms)
echo "scale=1; ${FRAC}/1" | bc
;;
*s)
echo "scale=1; ${FRAC}*1000/1" | bc
;;
esac
}
run_wrk() {
result=$(wrk -H "Connection: Close" -c "$wrk_c" -t "$wrk_t" -d "$DURATION" --latency "$@" | tee -a wrk.txt)
req_sec=$(echo "$result" | grep "^Requests/sec:" | awk '{print $2}')
latency_avg=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $2}' | ms)
latency_stddev=$(echo "$result" | grep "^\s*Latency.*%" | awk '{print $3}' | ms)
latency_50=$(echo "$result" | grep "^\s*50%" | awk '{print $2}' | ms)
latency_75=$(echo "$result" | grep "^\s*75%" | awk '{print $2}' | ms)
latency_90=$(echo "$result" | grep "^\s*90%" | awk '{print $2}' | ms)
latency_99=$(echo "$result" | grep "^\s*99%" | awk '{print $2}' | ms)
echo -e "$workers\t$threads\t$wrk_c\t$wrk_t\t$req_sec\t$latency_avg\t$latency_stddev\t$latency_50\t$latency_75\t$latency_90\t$latency_99"
}
run_concurrency_tests() {
echo
echo -e "PUMA_W\tPUMA_T\tWRK_C\tWRK_T\tREQ_SEC\tL_AVG\tL_DEV\tL_50%\tL_75%\tL_90%\tL_99%"
for wrk_c in $(seq $MIN_CONCURRENT $MAX_CONCURRENT); do
wrk_t="$wrk_c"
eval "$@"
sleep 1
done
echo
}
with_puma() {
# start puma and wait for 10s for it to start
bundle exec bin/puma -w "$workers" -t "$threads" -b "tcp://$HOST" -C test/config/cpu_spin.rb &
local puma_pid=$!
trap "kill $puma_pid" EXIT
# wait for Puma to be up
if ! retry 10 1s curl --fail "$URL" &>/dev/null; then
echo "Failed to connect to $URL."
return 1
fi
# execute testing command
eval "$@"
kill "$puma_pid" || true
trap - EXIT
wait
}
for workers in $(seq $MIN_WORKERS $MAX_WORKERS); do
for threads in $(seq $MIN_THREADS $MAX_THREADS); do
with_puma \
run_concurrency_tests \
run_wrk "$URL"
done
done

View file

@ -664,6 +664,18 @@ module Puma
@options[:shutdown_debug] = val
end
# Controls an injected delay (in seconds) before
# accepting new socket
# if we are already processing requests,
# it gives a time for less busy workers
# to pick workbefore us
#
# This only affects Ruby MRI implementation
# The best value is between 0.001 (1ms) to 0.010 (10ms)
def wait_for_less_busy_worker(val)
@options[:wait_for_less_busy_worker] = val.to_f
end
# Control how the remote address of the connection is set. This
# is configurable because to calculate the true socket peer address
# a kernel syscall is required which for very fast rack handlers

View file

@ -283,6 +283,9 @@ module Puma
else
begin
pool.wait_until_not_full
pool.wait_for_less_busy_worker(
@options[:wait_for_less_busy_worker].to_f)
if io = sock.accept_nonblock
client = Client.new io, @binder.env(sock)
if remote_addr_value

View file

@ -228,6 +228,25 @@ module Puma
end
end
def wait_for_less_busy_worker(delay_s)
# Ruby MRI does GVL, this can result
# in processing contention when multiple threads
# (requests) are running concurrently
return unless Puma.mri?
return unless delay_s > 0
with_mutex do
return if @shutdown
# do not delay, if we are not busy
return unless busy_threads > 0
# this will be signaled once a request finishes,
# which can happen earlier than delay
@not_full.wait @mutex, delay_s
end
end
# If there are any free threads in the pool, tell one to go ahead
# and exit. If +force+ is true, then a trim request is requested
# even if all threads are being utilized.

17
test/config/cpu_spin.rb Normal file
View file

@ -0,0 +1,17 @@
# call with "GET /cpu/<d> HTTP/1.1\r\n\r\n",
# where <d> is the number of iterations
require 'benchmark'
# configure `wait_for_less_busy_workers` based on ENV, default `true`
wait_for_less_busy_worker ENV.fetch('WAIT_FOR_LESS_BUSY_WORKERS', '0.005').to_f
app do |env|
iterations = (env['REQUEST_PATH'][/\/cpu\/(\d.*)/,1] || '1000').to_i
duration = Benchmark.measure do
iterations.times { rand }
end
[200, {"Content-Type" => "text/plain"}, ["Run for #{duration.total} #{Process.pid}"]]
end

View file

@ -116,6 +116,7 @@ module TestSkips
when :darwin then "Skip unless darwin" unless RUBY_PLATFORM[/darwin/]
when :jruby then "Skip unless JRuby" unless Puma.jruby?
when :windows then "Skip unless Windows" unless Puma.windows?
when :mri then "Skip unless MRI" unless Puma.mri?
else false
end
skip skip_msg, bt if skip_msg

106
test/test_busy_worker.rb Normal file
View file

@ -0,0 +1,106 @@
require_relative "helper"
require "puma/events"
class TestBusyWorker < Minitest::Test
parallelize_me!
def setup
@ios = []
@server = nil
end
def teardown
@server.stop(true) if @server
@ios.each {|i| i.close unless i.closed?}
end
def new_connection
TCPSocket.new('127.0.0.1', @server.connected_ports[0]).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
retry
end
def send_http(req)
new_connection << req
end
def send_http_and_read(req)
send_http(req).read
end
def with_server(**options, &app)
@requests_count = 0 # number of requests processed
@requests_running = 0 # current number of requests running
@requests_max_running = 0 # max number of requests running in parallel
@mutex = Mutex.new
request_handler = ->(env) do
@mutex.synchronize do
@requests_count += 1
@requests_running += 1
if @requests_running > @requests_max_running
@requests_max_running = @requests_running
end
end
begin
yield(env)
ensure
@mutex.synchronize do
@requests_running -= 1
end
end
end
@server = Puma::Server.new request_handler, Puma::Events.strings, **options
@server.min_threads = options[:min_threads] || 0
@server.max_threads = options[:max_threads] || 10
@server.add_tcp_listener '127.0.0.1', 0
@server.run
end
# Multiple concurrent requests are not processed
# sequentially as a small delay is introduced
def test_multiple_requests_waiting_on_less_busy_worker
skip_unless :mri
with_server(wait_for_less_busy_worker: 1.0) do |_|
sleep(0.1)
[200, {}, [""]]
end
n = 2
Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)
assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal 1, @requests_max_running, "maximum number of concurrent requests needs to be 1"
end
# Multiple concurrent requests are processed
# in parallel as a delay is disabled
def test_multiple_requests_processing_in_parallel
skip_unless :mri
with_server(wait_for_less_busy_worker: 0.0) do |_|
sleep(0.1)
[200, {}, [""]]
end
n = 4
Array.new(n) do
Thread.new { send_http_and_read "GET / HTTP/1.0\r\n\r\n" }
end.each(&:join)
assert_equal n, @requests_count, "number of requests needs to match"
assert_equal 0, @requests_running, "none of requests needs to be running"
assert_equal n, @requests_max_running, "maximum number of concurrent requests needs to match"
end
end