1
0
Fork 0
mirror of https://github.com/mperham/connection_pool synced 2023-03-27 23:22:21 -04:00

Merge pull request #55 from drbrain/hooked_timed_stack

Extract resource management in TimedStack
This commit is contained in:
Mike Perham 2014-03-24 18:40:33 -07:00
commit ebc9485ed9
3 changed files with 125 additions and 13 deletions

View file

@ -66,7 +66,7 @@ class ConnectionPool
if stack.empty? if stack.empty?
timeout = options[:timeout] || @timeout timeout = options[:timeout] || @timeout
conn = @available.pop(timeout) conn = @available.pop(timeout: timeout)
else else
conn = stack.last conn = stack.last
end end

View file

@ -1,10 +1,37 @@
require 'thread' require 'thread'
require 'timeout' require 'timeout'
##
# Raised when you attempt to retrieve a connection from a pool that has been
# shut down.
#
# Examples:
#
# ts = TimedStack.new(1) { MyConnection.new }
#
# # fetch a connection
# conn = ts.pop
#
# # return a connection
# ts.push conn
#
# conn = ts.pop
# ts.pop timeout: 5
# #=> raises Timeout::Error after 5 seconds
class ConnectionPool::PoolShuttingDownError < RuntimeError; end class ConnectionPool::PoolShuttingDownError < RuntimeError; end
##
# The TimedStack manages a pool of homogeneous connections (or any resource
# you wish to manage). Connections are created lazily up to a given maximum
# number.
class ConnectionPool::TimedStack class ConnectionPool::TimedStack
##
# Creates a new pool with +size+ connections that are created from the given
# +block+.
def initialize(size = 0, &block) def initialize(size = 0, &block)
@create_block = block @create_block = block
@created = 0 @created = 0
@ -15,12 +42,16 @@ class ConnectionPool::TimedStack
@shutdown_block = nil @shutdown_block = nil
end end
def push(obj) ##
# Returns +obj+ to the stack. +options+ is ignored in TimedStack but may be
# used by subclasses that extend TimedStack.
def push(obj, options = {})
@mutex.synchronize do @mutex.synchronize do
if @shutdown_block if @shutdown_block
@shutdown_block.call(obj) @shutdown_block.call(obj)
else else
@que.push obj store_connection obj, options
end end
@resource.broadcast @resource.broadcast
@ -28,16 +59,28 @@ class ConnectionPool::TimedStack
end end
alias_method :<<, :push alias_method :<<, :push
def pop(timeout=0.5) ##
# Retrieves a connection from the stack. If a connection is available it is
# immediately returned. If no connection is available within the given
# timeout a Timeout::Error is raised.
#
# +:timeout+ is the only checked entry in +options+ and is preferred over
# the +timeout+ argument (which will be removed in a future release). Other
# options may be used by subclasses that extend TimedStack.
def pop(timeout = 0.5, options = {})
options, timeout = timeout, 0.5 if Hash === timeout
timeout = options.fetch :timeout, timeout
deadline = Time.now + timeout deadline = Time.now + timeout
@mutex.synchronize do @mutex.synchronize do
loop do loop do
raise ConnectionPool::PoolShuttingDownError if @shutdown_block raise ConnectionPool::PoolShuttingDownError if @shutdown_block
return @que.pop unless @que.empty? return fetch_connection(options) if connection_stored?(options)
unless @created == @max
@created += 1 connection = try_create(options)
return @create_block.call return connection if connection
end
to_wait = deadline - Time.now to_wait = deadline - Time.now
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
@resource.wait(@mutex, to_wait) @resource.wait(@mutex, to_wait)
@ -45,6 +88,10 @@ class ConnectionPool::TimedStack
end end
end end
##
# Shuts down the TimedStack which prevents connections from being checked
# out. The +block+ is called once for each connection on the stack.
def shutdown(&block) def shutdown(&block)
raise ArgumentError, "shutdown must receive a block" unless block_given? raise ArgumentError, "shutdown must receive a block" unless block_given?
@ -52,18 +99,75 @@ class ConnectionPool::TimedStack
@shutdown_block = block @shutdown_block = block
@resource.broadcast @resource.broadcast
@que.size.times do shutdown_connections
conn = @que.pop
block.call(conn)
end
end end
end end
##
# Returns +true+ if there are no available connections.
def empty? def empty?
(@created - @que.length) >= @max (@created - @que.length) >= @max
end end
##
# The number of connections available on the stack.
def length def length
@max - @created + @que.length @max - @created + @que.length
end end
private
##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must returns true if a connection is available on the stack.
def connection_stored?(options = nil)
!@que.empty?
end
##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must return a connection from the stack.
def fetch_connection(options = nil)
@que.pop
end
##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must shut down all connections on the stack.
def shutdown_connections(options = nil)
while connection_stored?(options)
conn = fetch_connection(options)
@shutdown_block.call(conn)
end
end
##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must return +obj+ to the stack.
def store_connection(obj, options = nil)
@que.push obj
end
##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must create a connection if and only if the total number of
# connections allowed has not been met.
def try_create(options = nil)
unless @created == @max
@created += 1
@create_block.call
end
end
end end

View file

@ -44,6 +44,14 @@ class TestConnectionPoolTimedStack < Minitest::Test
end end
def test_pop_empty def test_pop_empty
e = assert_raises Timeout::Error do
@stack.pop timeout: 0
end
assert_equal 'Waited 0 sec', e.message
end
def test_pop_empty_2_0_compatibility
e = assert_raises Timeout::Error do e = assert_raises Timeout::Error do
@stack.pop 0 @stack.pop 0
end end