diff --git a/lib/connection_pool.rb b/lib/connection_pool.rb index 547f587..2c50f7d 100644 --- a/lib/connection_pool.rb +++ b/lib/connection_pool.rb @@ -12,30 +12,35 @@ require 'timed_queue' # redis.lpop('my-list') if redis.llen('my-list') > 0 # end # -# Example usage replacing a global connection (slower): +# Example usage replacing an existing connection (slower): # -# REDIS = ConnectionPool.new { Redis.new } +# $redis = ConnectionPool.wrap { Redis.new } # # def do_work -# REDIS.lpop('my-list') if REDIS.llen('my-list') > 0 +# $redis.lpop('my-list') if $redis.llen('my-list') > 0 # end # # Accepts the following options: # - :size - number of connections to pool, defaults to 5 # - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds # -class ConnectionPool < BasicObject +class ConnectionPool DEFAULTS = { :size => 5, :timeout => 5 } - def initialize(options={}, &block) - ::Kernel.raise ::ArgumentError, 'Connection pool requires a block' unless block + def self.wrap(options, &block) + Wrapper.new(options, &block) + end - @available = ::TimedQueue.new - @oid = @available.object_id - @options = DEFAULTS.merge(options) - @options[:size].times do - @available << block.call - end + def initialize(options={}, &block) + raise ArgumentError, 'Connection pool requires a block' unless block + + options = DEFAULTS.merge(options) + + @size = options[:size] || DEFAULTS[:size] + @timeout = options[:timeout] || DEFAULTS + + @available = ::TimedQueue.new(@size, &block) + @key = :"current-#{@available.object_id}" end def with(&block) @@ -45,26 +50,29 @@ class ConnectionPool < BasicObject end alias_method :with_connection, :with - def method_missing(name, *args, &block) - checkout.send(name, *args, &block) - ensure - checkin - end - private def checkout - ::Thread.current[:"current-#{@oid}"] ||= begin - @available.timed_pop(@options[:timeout]) - end + ::Thread.current[@key] ||= @available.timed_pop(@timeout) end def checkin - conn = ::Thread.current[:"current-#{@oid}"] - ::Thread.current[:"current-#{@oid}"] = nil + conn = ::Thread.current[@key] + ::Thread.current[@key] = nil return unless conn @available << conn nil end + class Wrapper < BasicObject + def initialize(options = {}, &block) + @pool = ::ConnectionPool.new(options, &block) + end + + def method_missing(name, *args, &block) + @pool.with do |connection| + connection.send(name, *args, &block) + end + end + end end diff --git a/lib/timed_queue.rb b/lib/timed_queue.rb index 442e79b..2bb3e09 100644 --- a/lib/timed_queue.rb +++ b/lib/timed_queue.rb @@ -2,8 +2,8 @@ require 'thread' require 'timeout' class TimedQueue - def initialize - @que = [] + def initialize(size = 0) + @que = Array.new(size) { yield } @mutex = Mutex.new @resource = ConditionVariable.new end diff --git a/test/test_connection_pool.rb b/test/test_connection_pool.rb index 1f88969..88295fe 100644 --- a/test/test_connection_pool.rb +++ b/test/test_connection_pool.rb @@ -50,7 +50,7 @@ class TestConnectionPool < MiniTest::Unit::TestCase end sleep 0.05 assert_raises Timeout::Error do - pool.do_something + pool.with { |net| net.do_something } end sleep 0.05 @@ -60,7 +60,7 @@ class TestConnectionPool < MiniTest::Unit::TestCase end def test_passthru - pool = ConnectionPool.new(:timeout => 0.1, :size => 1) { NetworkConnection.new } + pool = ConnectionPool.wrap(:timeout => 0.1, :size => 1) { NetworkConnection.new } assert_equal 1, pool.do_something assert_equal 2, pool.do_something assert_equal 5, pool.do_something_with_block { 3 }