mirror of
https://github.com/mperham/connection_pool
synced 2023-03-27 23:22:21 -04:00
Potentially broken connections are now discarded
This commit is contained in:
parent
0099475297
commit
6a2b4b7ba4
3 changed files with 67 additions and 6 deletions
|
@ -53,11 +53,19 @@ class ConnectionPool
|
|||
end
|
||||
|
||||
def with(options = {})
|
||||
success = false # hoisted
|
||||
conn = checkout(options)
|
||||
begin
|
||||
yield conn
|
||||
(yield conn).tap do
|
||||
success = true # means the connection wasn't interrupted
|
||||
end
|
||||
ensure
|
||||
checkin
|
||||
if success
|
||||
# everything is roses, we can safely check the connection back in
|
||||
checkin
|
||||
else
|
||||
@available.discard!(pop_connection)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -117,6 +117,22 @@ class ConnectionPool::TimedStack
|
|||
@max - @created + @que.length
|
||||
end
|
||||
|
||||
##
|
||||
# Indicates that a connection isn't coming back, allowing a new one to be
|
||||
# created to replace it.
|
||||
|
||||
def discard!(obj)
|
||||
@mutex.synchronize do
|
||||
if @shutdown_block
|
||||
@shutdown_block.call(obj)
|
||||
else
|
||||
@created -= 1
|
||||
end
|
||||
|
||||
@resource.broadcast
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
##
|
||||
|
|
|
@ -47,7 +47,11 @@ class TestConnectionPool < Minitest::Test
|
|||
end
|
||||
end.each do |thread|
|
||||
Thread.pass until thread.status == 'sleep'
|
||||
end.map do |thread|
|
||||
end
|
||||
end
|
||||
|
||||
def kill_threads(threads)
|
||||
threads.each do |thread|
|
||||
thread.kill
|
||||
Thread.pass while thread.alive?
|
||||
end
|
||||
|
@ -105,6 +109,33 @@ class TestConnectionPool < Minitest::Test
|
|||
assert Thread.new { pool.checkout }.join
|
||||
end
|
||||
|
||||
def test_with_with_dangerous_timeouts
|
||||
skip('JRuby GC dislikes this test') if RUBY_ENGINE.to_sym == :jruby
|
||||
|
||||
marker_class = Class.new
|
||||
pool = ConnectionPool.new(:timeout => 0, :size => 1) { marker_class.new }
|
||||
|
||||
# no "connections" allocated yet
|
||||
assert_equal [], ObjectSpace.each_object(marker_class).to_a
|
||||
|
||||
checkin_time = 0.05
|
||||
|
||||
assert_raises Timeout::Error do
|
||||
Timeout.timeout(checkin_time) do
|
||||
pool.with do
|
||||
# a "connection" has been allocated
|
||||
refute_equal [], ObjectSpace.each_object(marker_class).to_a
|
||||
sleep 2 * checkin_time
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
GC.start
|
||||
|
||||
# no dangling references to this "connection" remain
|
||||
assert_equal [], ObjectSpace.each_object(marker_class).to_a
|
||||
end
|
||||
|
||||
def test_with_timeout_override
|
||||
pool = ConnectionPool.new(:timeout => 0, :size => 1) { NetworkConnection.new }
|
||||
|
||||
|
@ -311,12 +342,14 @@ class TestConnectionPool < Minitest::Test
|
|||
Recorder.new.tap { |r| recorders << r }
|
||||
end
|
||||
|
||||
use_pool pool, 3
|
||||
threads = use_pool pool, 3
|
||||
|
||||
pool.shutdown do |recorder|
|
||||
recorder.do_work("shutdown")
|
||||
end
|
||||
|
||||
kill_threads(threads)
|
||||
|
||||
assert_equal [["shutdown"]] * 3, recorders.map { |r| r.calls }
|
||||
end
|
||||
|
||||
|
@ -337,7 +370,7 @@ class TestConnectionPool < Minitest::Test
|
|||
Recorder.new.tap { |r| recorders << r }
|
||||
end
|
||||
|
||||
use_pool pool, 3
|
||||
threads = use_pool pool, 2
|
||||
|
||||
pool.checkout
|
||||
|
||||
|
@ -345,6 +378,8 @@ class TestConnectionPool < Minitest::Test
|
|||
recorder.do_work("shutdown")
|
||||
end
|
||||
|
||||
kill_threads(threads)
|
||||
|
||||
assert_equal [["shutdown"], ["shutdown"], []], recorders.map { |r| r.calls }
|
||||
|
||||
pool.checkin
|
||||
|
@ -367,12 +402,14 @@ class TestConnectionPool < Minitest::Test
|
|||
Recorder.new.tap { |r| recorders << r }
|
||||
end
|
||||
|
||||
use_pool wrapper, 3
|
||||
threads = use_pool wrapper, 3
|
||||
|
||||
wrapper.pool_shutdown do |recorder|
|
||||
recorder.do_work("shutdown")
|
||||
end
|
||||
|
||||
kill_threads(threads)
|
||||
|
||||
assert_equal [["shutdown"]] * 3, recorders.map { |r| r.calls }
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in a new issue