mirror of
https://github.com/mperham/connection_pool
synced 2023-03-27 23:22:21 -04:00
Merge pull request #75 from mperham/rollback_timeout_handling
Non-local return forces disconnect
This commit is contained in:
commit
6007d32b62
6 changed files with 72 additions and 62 deletions
10
Changes.md
10
Changes.md
|
@ -1,3 +1,13 @@
|
|||
connection\_pool changelog
|
||||
---------------------------
|
||||
|
||||
2.2.0
|
||||
------
|
||||
|
||||
- Rollback `Timeout` handling introduced in 2.1.1 and 2.1.2. It seems
|
||||
impossible to safely work around the issue. Please never, ever use
|
||||
`Timeout.timeout` in your code or you will see rare but mysterious bugs. [#75]
|
||||
|
||||
2.1.3
|
||||
------
|
||||
|
||||
|
|
|
@ -8,6 +8,11 @@ MongoDB has its own connection pool. ActiveRecord has its own connection pool.
|
|||
This is a generic connection pool that can be used with anything, e.g. Redis,
|
||||
Dalli and other Ruby network clients.
|
||||
|
||||
**WARNING**: Don't ever use `Timeout.timeout` in your Ruby code or you will see
|
||||
occasional silent corruption and mysterious errors. The Timeout API is unsafe
|
||||
and cannot be used correctly, ever. Use proper socket timeout options as
|
||||
exposed by Net::HTTP, Redis, Dalli, etc.
|
||||
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
require_relative 'connection_pool/version'
|
||||
require_relative 'connection_pool/timed_stack'
|
||||
|
||||
|
||||
# Generic connection pool class for e.g. sharing a limited number of network connections
|
||||
# among many threads. Note: Connections are lazily created.
|
||||
#
|
||||
|
@ -52,26 +53,36 @@ class ConnectionPool
|
|||
@key = :"current-#{@available.object_id}"
|
||||
end
|
||||
|
||||
if Thread.respond_to?(:handle_interrupt)
|
||||
|
||||
# MRI
|
||||
def with(options = {})
|
||||
# Connections can become corrupted via Timeout::Error. Discard
|
||||
# any connection whose usage after checkout does not finish as expected.
|
||||
# See #67
|
||||
success = false
|
||||
conn = checkout(options)
|
||||
begin
|
||||
result = yield conn
|
||||
success = true # means the connection wasn't interrupted
|
||||
result
|
||||
ensure
|
||||
if success
|
||||
# everything is roses, we can safely check the connection back in
|
||||
Thread.handle_interrupt(Exception => :never) do
|
||||
conn = checkout(options)
|
||||
begin
|
||||
Thread.handle_interrupt(Exception => :immediate) do
|
||||
yield conn
|
||||
end
|
||||
ensure
|
||||
checkin
|
||||
else
|
||||
@available.discard!(pop_connection)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
else
|
||||
|
||||
# jruby 1.7.x
|
||||
def with(options = {})
|
||||
conn = checkout(options)
|
||||
begin
|
||||
yield conn
|
||||
ensure
|
||||
checkin
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def checkout(options = {})
|
||||
conn = if stack.empty?
|
||||
timeout = options[:timeout] || @timeout
|
||||
|
|
|
@ -117,28 +117,6 @@ class ConnectionPool::TimedStack
|
|||
@max - @created + @que.length
|
||||
end
|
||||
|
||||
##
|
||||
# Indicates that a connection isn't coming back, allowing a new one to be
|
||||
# created to replace it.
|
||||
|
||||
def discard!(obj)
|
||||
@mutex.synchronize do
|
||||
if @shutdown_block
|
||||
@shutdown_block.call(obj)
|
||||
else
|
||||
# try to shut down the connection before throwing it away
|
||||
if obj.respond_to?(:close) # Dalli::Client
|
||||
obj.close
|
||||
elsif obj.respond_to?(:disconnect!) # Redis
|
||||
obj.disconnect!
|
||||
end
|
||||
@created -= 1
|
||||
end
|
||||
|
||||
@resource.broadcast
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
##
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
class ConnectionPool
|
||||
VERSION = "2.1.3"
|
||||
VERSION = "2.2.0"
|
||||
end
|
||||
|
|
|
@ -109,38 +109,44 @@ class TestConnectionPool < Minitest::Test
|
|||
assert Thread.new { pool.checkout }.join
|
||||
end
|
||||
|
||||
def test_with_with_dangerous_timeouts
|
||||
case RUBY_ENGINE.to_sym
|
||||
when :jruby
|
||||
skip('JRuby GC dislikes this test')
|
||||
when :ruby
|
||||
if RUBY_VERSION == '2.0.0' && RUBY_PATCHLEVEL == 598
|
||||
skip("#{RUBY_VERSION}p#{RUBY_PATCHLEVEL} GC dislikes this test")
|
||||
end
|
||||
end
|
||||
|
||||
marker_class = Class.new
|
||||
pool = ConnectionPool.new(:timeout => 0, :size => 1) { marker_class.new }
|
||||
|
||||
# no "connections" allocated yet
|
||||
assert_equal [], ObjectSpace.each_object(marker_class).to_a
|
||||
|
||||
checkin_time = 0.05
|
||||
def test_with_timeout
|
||||
pool = ConnectionPool.new(:timeout => 0, :size => 1) { Object.new }
|
||||
|
||||
assert_raises Timeout::Error do
|
||||
Timeout.timeout(checkin_time) do
|
||||
pool.with do
|
||||
# a "connection" has been allocated
|
||||
refute_equal [], ObjectSpace.each_object(marker_class).to_a
|
||||
sleep 2 * checkin_time
|
||||
Timeout.timeout(0.01) do
|
||||
pool.with do |obj|
|
||||
assert_equal 0, pool.instance_variable_get(:@available).instance_variable_get(:@que).size
|
||||
sleep 0.015
|
||||
end
|
||||
end
|
||||
end
|
||||
assert_equal 1, pool.instance_variable_get(:@available).instance_variable_get(:@que).size
|
||||
end
|
||||
|
||||
GC.start
|
||||
def test_checkout_ignores_timeout
|
||||
skip("Thread.handle_interrupt not available") unless Thread.respond_to?(:handle_interrupt)
|
||||
|
||||
# no dangling references to this "connection" remain
|
||||
assert_equal [], ObjectSpace.each_object(marker_class).to_a
|
||||
pool = ConnectionPool.new(:timeout => 0, :size => 1) { Object.new }
|
||||
def pool.checkout(options)
|
||||
sleep 0.015
|
||||
super
|
||||
end
|
||||
|
||||
did_something = false
|
||||
assert_raises Timeout::Error do
|
||||
Timeout.timeout(0.01) do
|
||||
pool.with do |obj|
|
||||
did_something = true
|
||||
# Timeout::Error will be triggered by any non-trivial Ruby code
|
||||
# executed here since it couldn't be raised during checkout.
|
||||
# It looks like setting the local variable above does not trigger
|
||||
# the Timeout check in MRI 2.2.1.
|
||||
obj.tap { obj.hash }
|
||||
end
|
||||
end
|
||||
end
|
||||
assert did_something
|
||||
assert_equal 1, pool.instance_variable_get(:@available).instance_variable_get(:@que).size
|
||||
end
|
||||
|
||||
def test_explicit_return
|
||||
|
|
Loading…
Reference in a new issue