1
0
Fork 0
mirror of https://github.com/mperham/connection_pool synced 2023-03-27 23:22:21 -04:00

Merge pull request #9 from djanowski/wrap

Separate wrapper
This commit is contained in:
Mike Perham 2012-03-14 06:36:43 -07:00
commit 0c80d7b59c
3 changed files with 35 additions and 27 deletions

View file

@ -12,30 +12,35 @@ require 'timed_queue'
# redis.lpop('my-list') if redis.llen('my-list') > 0
# end
#
# Example usage replacing a global connection (slower):
# Example usage replacing an existing connection (slower):
#
# REDIS = ConnectionPool.new { Redis.new }
# $redis = ConnectionPool.wrap { Redis.new }
#
# def do_work
# REDIS.lpop('my-list') if REDIS.llen('my-list') > 0
# $redis.lpop('my-list') if $redis.llen('my-list') > 0
# end
#
# Accepts the following options:
# - :size - number of connections to pool, defaults to 5
# - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
#
class ConnectionPool < BasicObject
class ConnectionPool
DEFAULTS = { :size => 5, :timeout => 5 }
def initialize(options={}, &block)
::Kernel.raise ::ArgumentError, 'Connection pool requires a block' unless block
def self.wrap(options, &block)
Wrapper.new(options, &block)
end
@available = ::TimedQueue.new
@oid = @available.object_id
@options = DEFAULTS.merge(options)
@options[:size].times do
@available << block.call
end
def initialize(options={}, &block)
raise ArgumentError, 'Connection pool requires a block' unless block
options = DEFAULTS.merge(options)
@size = options[:size] || DEFAULTS[:size]
@timeout = options[:timeout] || DEFAULTS
@available = ::TimedQueue.new(@size, &block)
@key = :"current-#{@available.object_id}"
end
def with(&block)
@ -45,26 +50,29 @@ class ConnectionPool < BasicObject
end
alias_method :with_connection, :with
def method_missing(name, *args, &block)
checkout.send(name, *args, &block)
ensure
checkin
end
private
def checkout
::Thread.current[:"current-#{@oid}"] ||= begin
@available.timed_pop(@options[:timeout])
end
::Thread.current[@key] ||= @available.timed_pop(@timeout)
end
def checkin
conn = ::Thread.current[:"current-#{@oid}"]
::Thread.current[:"current-#{@oid}"] = nil
conn = ::Thread.current[@key]
::Thread.current[@key] = nil
return unless conn
@available << conn
nil
end
class Wrapper < BasicObject
def initialize(options = {}, &block)
@pool = ::ConnectionPool.new(options, &block)
end
def method_missing(name, *args, &block)
@pool.with do |connection|
connection.send(name, *args, &block)
end
end
end
end

View file

@ -2,8 +2,8 @@ require 'thread'
require 'timeout'
class TimedQueue
def initialize
@que = []
def initialize(size = 0)
@que = Array.new(size) { yield }
@mutex = Mutex.new
@resource = ConditionVariable.new
end

View file

@ -50,7 +50,7 @@ class TestConnectionPool < MiniTest::Unit::TestCase
end
sleep 0.05
assert_raises Timeout::Error do
pool.do_something
pool.with { |net| net.do_something }
end
sleep 0.05
@ -60,7 +60,7 @@ class TestConnectionPool < MiniTest::Unit::TestCase
end
def test_passthru
pool = ConnectionPool.new(:timeout => 0.1, :size => 1) { NetworkConnection.new }
pool = ConnectionPool.wrap(:timeout => 0.1, :size => 1) { NetworkConnection.new }
assert_equal 1, pool.do_something
assert_equal 2, pool.do_something
assert_equal 5, pool.do_something_with_block { 3 }