Add a proxy to enable or disable TCP data flow

gate.stop allows to simulate a blocking write transmission to the server.
This should make the nonblocking tests more reliable.

Fixes #424
This commit is contained in:
Lars Kanis 2022-06-08 16:07:36 +02:00
parent d5e223c1c7
commit 443e7cb53e
4 changed files with 291 additions and 105 deletions

View File

@ -7,6 +7,7 @@ require 'pg'
require 'openssl'
require_relative 'helpers/scheduler.rb'
require_relative 'helpers/tcp_gate_scheduler.rb'
require_relative 'helpers/tcp_gate_switcher.rb'
DEFAULT_TEST_DIR_STR = File.join(Dir.pwd, "tmp_test_specs")
TEST_DIR_STR = ENV['RUBY_PG_TEST_DIR'] || DEFAULT_TEST_DIR_STR
@ -528,6 +529,83 @@ EOT
end
end
end
def scheduler_setup
# Run examples with gated scheduler
sched = Helpers::TcpGateScheduler.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i, debug: ENV['PG_DEBUG']=='1')
Fiber.set_scheduler(sched)
@conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{sched.internal_port} sslmode=disable")
# Run examples with default scheduler
#Fiber.set_scheduler(Helpers::Scheduler.new)
#@conninfo_gate = @conninfo
# Run examples without scheduler
#def Fiber.schedule; yield; end
#@conninfo_gate = @conninfo
end
def scheduler_teardown
Fiber.set_scheduler(nil)
end
def scheduler_stop
if Fiber.scheduler && Fiber.scheduler.respond_to?(:finish)
Fiber.scheduler.finish
end
end
def thread_with_timeout(timeout)
th = Thread.new do
yield
end
unless th.join(timeout)
th.kill
$scheduler_timeout = true
raise("scheduler timeout in:\n#{th.backtrace.join("\n")}")
end
end
def run_with_scheduler(timeout=10)
thread_with_timeout(timeout) do
scheduler_setup
Fiber.schedule do
conn = PG.connect(@conninfo_gate)
yield conn
conn.finish
scheduler_stop
end
end
scheduler_teardown
end
def gate_setup
# Run examples with gate
gate = Helpers::TcpGateSwitcher.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i, debug: ENV['PG_DEBUG']=='1')
@conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{gate.internal_port} sslmode=disable")
# Run examples without gate
#@conninfo_gate = @conninfo
gate
end
def gate_stop(gate)
gate&.finish
end
def run_with_gate(timeout=10)
thread_with_timeout(timeout) do
gate = gate_setup
conn = PG.connect(@conninfo_gate)
yield conn, gate
conn.finish
gate_stop(gate)
end
end
end

View File

@ -0,0 +1,190 @@
# frozen_string_literal: true
# This is a transparent TCP proxy for testing blocking behaviour in a time insensitive way.
#
# It works as a gate between the client and the server, which is enabled or disabled by the spec.
# Data transfer can be blocked per API.
# The TCP communication in a C extension can be verified in a (mostly) timing insensitive way.
# If a call does IO but doesn't handle non-blocking state, the test will block and can be caught by an external timeout.
#
# PG.connect
# port:5444 TcpGateSwitcher DB
# ------------- ---------------------------------------- --------
# | non- | | TCPServer TCPSocket | | |
# | blocking |----->| port 5444 port 5432|----->|Server|
# | specs | | | | port |
# '------|----' |,--> stop_read : <-send data-- | | 5432 |
# '---------------> stop_write: --send data-> | '------'
# '--------------------------------------'
module Helpers
class TcpGateSwitcher
class Connection
attr_reader :internal_io
attr_reader :external_io
def initialize(internal_io, external_host, external_port, debug: false)
@internal_io = internal_io
@external_host = external_host
@external_port = external_port
@external_io = nil
@mutex = Mutex.new
@debug = debug
@wait = nil
Thread.new do
read
end
Thread.new do
write
end
end
def print_data(desc, data)
return unless @debug
if data.bytesize >= 70
sdata = data[0..70]
puts "#{desc}: #{sdata.inspect} (... #{data.bytesize} bytes)"
else
puts "#{desc}: #{data.inspect} (#{data.bytesize} bytes)"
end
end
def puts(*args)
return unless @debug
super
end
def connect
# Not yet connected?
@mutex.synchronize do
if !@external_io
@external_io = TCPSocket.new(@external_host, @external_port)
puts "connected ext:#{@external_io.inspect} (belongs to int:#{@internal_io.fileno})"
end
end
end
# transfer data in read direction
def read
connect
loop do
@wait&.deq
begin
read_str = @external_io.read_nonblock(65536)
print_data("read-transfer #{read_fds}", read_str)
@internal_io.write(read_str)
rescue IO::WaitReadable, Errno::EINTR
@external_io.wait_readable
rescue EOFError, Errno::ECONNRESET
puts "read_eof from #{read_fds}"
@internal_io.close_write
break
end
end
end
# transfer data in write direction
def write
connect
# transfer data blocks of up to 65536 bytes
loop do
@wait&.deq
begin
read_str = @internal_io.read_nonblock(65536)
print_data("write-transfer #{write_fds}", read_str)
@external_io.write(read_str)
rescue IO::WaitReadable, Errno::EINTR
@internal_io.wait_readable
rescue EOFError, Errno::ECONNRESET
puts "write_eof from #{write_fds}"
@external_io.close_write
break
end
end
end
def read_fds
"ext:#{@external_io&.fileno || '-'}->int:#{@internal_io.fileno}"
end
def write_fds
"int:#{@internal_io.fileno}->ext:#{@external_io&.fileno || '-'}"
end
# Make sure all data is transferred and both connections are closed.
def finish
puts "finish transfers #{write_fds} and #{read_fds}"
write
read
end
def start
@wait&.close
@wait = nil
end
def stop
@wait ||= Queue.new
end
end
UnknownConnection = Struct.new :fileno, :events
def initialize(external_host:, external_port:, internal_host: 'localhost', internal_port: 0, debug: false)
super()
@connections = []
@server_io = TCPServer.new(internal_host, internal_port)
@external_host = external_host
@external_port = external_port
@finish = false
@debug = debug
puts "TcpGate server listening: #{@server_io.inspect}"
run
end
def finish
@finish = true
TCPSocket.new('localhost', internal_port).close
end
def internal_port
@server_io.local_address.ip_port
end
def start
@connections.each(&:start)
end
def stop
@connections.each(&:stop)
end
def run
Thread.new do
# Wait for new connections to the TCP gate
while client=@server_io.accept
if @finish
@connections.each(&:finish)
break
else
conn = Connection.new(client, @external_host, @external_port, debug: @debug)
puts "accept new int:#{conn.internal_io.inspect} from #{conn.internal_io.remote_address.inspect} server fd:#{@server_io.fileno}"
@connections << conn
# Handle the reading and writing in a separate thread
conn.start
end
# Remove old connections
@connections.reject! do |conn|
conn.internal_io.closed? || conn.external_io&.closed?
end
end
end
end
end
end

View File

@ -517,87 +517,55 @@ describe PG::Connection do
end
it "rejects to send lots of COPY data" do
skip("takes around an hour to succeed on Windows") if RUBY_PLATFORM=~/mingw|mswin/
unless RUBY_PLATFORM =~ /i386-mingw|x86_64-darwin|x86_64-linux/
skip "this spec depends on out-of-memory condition in put_copy_data, which is not reliable on all platforms"
end
conn = described_class.new(@conninfo)
conn.setnonblocking(true)
run_with_gate(60) do |conn, gate|
conn.setnonblocking(true)
res = nil
begin
Timeout.timeout(60) do
conn.exec <<-EOSQL
CREATE TEMP TABLE copytable (col1 TEXT);
res = nil
conn.exec <<-EOSQL
CREATE TEMP TABLE copytable (col1 TEXT);
EOSQL
CREATE OR REPLACE FUNCTION delay_input() RETURNS trigger AS $x$
BEGIN
PERFORM pg_sleep(1);
RETURN NEW;
END;
$x$ LANGUAGE plpgsql;
conn.exec( "COPY copytable FROM STDOUT CSV" )
gate.stop
CREATE TRIGGER delay_input BEFORE INSERT ON copytable
FOR EACH ROW EXECUTE PROCEDURE delay_input();
EOSQL
conn.exec( "COPY copytable FROM STDOUT CSV" )
data = "x" * 1000 * 1000
data << "\n"
20000.times do
res = conn.put_copy_data(data)
break if res == false
end
data = "x" * 1000 * 1000
data << "\n"
20000.times do |idx|
res = conn.put_copy_data(data)
break if res == false
end
expect( res ).to be_falsey
rescue Timeout::Error
skip <<-EOT
Unfortunately this test is not reliable.
It is timing dependent, since it assumes that the ruby process
sends data faster than the PostgreSQL server can process it.
This assumption is wrong in some environments.
EOT
ensure
gate.start
conn.cancel
conn.discard_results
conn.finish
end
end
it "needs to flush data after send_query" do
retries = 3
begin
conn = described_class.new(@conninfo)
run_with_gate(60) do |conn, gate|
conn.setnonblocking(true)
data = "x" * 1000 * 1000 * 100
gate.stop
data = "x" * 1000 * 1000 * 30
res = conn.send_query_params("SELECT LENGTH($1)", [data])
expect( res ).to be_nil
res = conn.flush
expect( res ).to be_falsey
rescue RSpec::Expectations::ExpectationNotMetError
if (retries-=1) >= 0
until conn.flush()
IO.select(nil, [conn.socket_io], nil, 10)
end
conn.get_last_result
conn.finish
retry
end
raise
ensure
until conn.flush()
IO.select(nil, [conn.socket_io], nil, 10)
gate.start
until conn.flush
IO.select(nil, [conn.socket_io], [conn.socket_io], 10)
end
expect( conn.flush ).to be_truthy
res = conn.get_last_result
expect( res.values ).to eq( [[data.length.to_s]] )
conn.finish
end
end

View File

@ -7,56 +7,6 @@ $scheduler_timeout = false
context "with a Fiber scheduler", :scheduler do
def setup
# Run examples with gated scheduler
sched = Helpers::TcpGateScheduler.new(external_host: 'localhost', external_port: ENV['PGPORT'].to_i, debug: ENV['PG_DEBUG']=='1')
Fiber.set_scheduler(sched)
@conninfo_gate = @conninfo.gsub(/(^| )port=\d+/, " port=#{sched.internal_port} sslmode=disable")
# Run examples with default scheduler
#Fiber.set_scheduler(Helpers::Scheduler.new)
#@conninfo_gate = @conninfo
# Run examples without scheduler
#def Fiber.schedule; yield; end
#@conninfo_gate = @conninfo
end
def teardown
Fiber.set_scheduler(nil)
end
def stop_scheduler
if Fiber.scheduler && Fiber.scheduler.respond_to?(:finish)
Fiber.scheduler.finish
end
end
def thread_with_timeout(timeout)
th = Thread.new do
yield
end
unless th.join(timeout)
th.kill
$scheduler_timeout = true
raise("scheduler timeout in:\n#{th.backtrace.join("\n")}")
end
end
def run_with_scheduler(timeout=10)
thread_with_timeout(timeout) do
setup
Fiber.schedule do
conn = PG.connect(@conninfo_gate)
yield conn
conn.finish
stop_scheduler
end
end
end
it "connects to a server" do
run_with_scheduler do |conn|
res = conn.exec_params("SELECT 7", [])