2012-12-18 13:04:53 -05:00
require_relative 'connection_pool/version'
require_relative 'timed_queue'
2011-05-14 15:29:51 -04:00
# Generic connection pool class for e.g. sharing a limited number of network connections
# among many threads. Note: Connections are eager created.
#
# Example usage with block (faster):
#
# @pool = ConnectionPool.new { Redis.new }
#
# @pool.with do |redis|
2011-05-14 18:36:17 -04:00
# redis.lpop('my-list') if redis.llen('my-list') > 0
2011-05-14 15:29:51 -04:00
# end
#
2012-03-14 09:10:09 -04:00
# Example usage replacing an existing connection (slower):
2011-05-14 15:29:51 -04:00
#
2012-03-14 09:10:09 -04:00
# $redis = ConnectionPool.wrap { Redis.new }
2011-05-14 15:29:51 -04:00
#
# def do_work
2012-03-14 09:10:09 -04:00
# $redis.lpop('my-list') if $redis.llen('my-list') > 0
2011-05-14 15:29:51 -04:00
# end
#
# Accepts the following options:
# - :size - number of connections to pool, defaults to 5
# - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
#
2012-03-14 09:07:43 -04:00
class ConnectionPool
2012-12-18 13:08:06 -05:00
DEFAULTS = { size : 5 , timeout : 5 }
2011-05-14 15:29:51 -04:00
2012-03-14 09:07:43 -04:00
def self . wrap ( options , & block )
Wrapper . new ( options , & block )
end
2012-12-18 13:08:06 -05:00
def initialize ( options = { } , & block )
2012-03-14 09:07:43 -04:00
raise ArgumentError , 'Connection pool requires a block' unless block
2011-09-18 16:26:26 -04:00
2012-03-14 09:24:47 -04:00
options = DEFAULTS . merge ( options )
2012-12-18 13:08:06 -05:00
@size = options . fetch ( :size )
@timeout = options . fetch ( :timeout )
2012-03-14 09:24:47 -04:00
@available = :: TimedQueue . new ( @size , & block )
2012-03-14 09:27:21 -04:00
@key = :" current- #{ @available . object_id } "
2011-05-14 15:29:51 -04:00
end
2012-03-14 11:36:40 -04:00
def with
2012-06-18 09:10:05 -04:00
conn = checkout
begin
yield conn
ensure
checkin
end
2011-05-14 15:29:51 -04:00
end
2012-12-18 21:33:59 -05:00
def with_connection ( & block )
warn ( " ConnectionPool # with_connection is deprecated and will be removed in version 1.0. Upgrade your code to use ConnectionPool # with instead. (in #{ caller [ 0 ] } ) " )
with ( & block )
end
2011-05-14 15:29:51 -04:00
def checkout
2012-06-18 09:10:05 -04:00
stack = :: Thread . current [ @key ] || = [ ]
if stack . empty?
conn = @available . timed_pop ( @timeout )
else
conn = stack . last
end
stack . push conn
conn
2011-05-14 15:29:51 -04:00
end
def checkin
2012-06-18 09:10:05 -04:00
stack = :: Thread . current [ @key ]
conn = stack . pop
if stack . empty?
@available << conn
end
2011-05-14 15:29:51 -04:00
nil
end
2012-12-18 11:50:12 -05:00
class Wrapper < :: BasicObject
2012-12-18 21:33:59 -05:00
METHODS = [ :with ]
2012-12-18 11:50:12 -05:00
2012-03-14 09:07:43 -04:00
def initialize ( options = { } , & block )
@pool = :: ConnectionPool . new ( options , & block )
end
2012-03-14 11:36:40 -04:00
def with
yield @pool . checkout
ensure
@pool . checkin
end
2012-12-18 21:33:59 -05:00
def with_connection ( & block )
warn ( " ConnectionPool::Wrapper # with_connection is deprecated and will be removed in version 1.0. Upgrade your code to use ConnectionPool::Wrapper # with instead. (in #{ caller [ 0 ] } ) " )
with ( & block )
end
2012-03-14 11:36:40 -04:00
2012-12-18 11:41:59 -05:00
def respond_to? ( id , * args )
2012-12-18 11:50:12 -05:00
METHODS . include? ( id ) || @pool . with { | c | c . respond_to? ( id , * args ) }
2012-11-08 20:14:31 -05:00
end
2012-03-14 09:07:43 -04:00
def method_missing ( name , * args , & block )
@pool . with do | connection |
connection . send ( name , * args , & block )
end
end
end
2011-09-09 17:12:23 -04:00
end