Initial pass at a generic connection pool

This commit is contained in:
Mike Perham 2011-05-14 12:29:51 -07:00
commit 035e7519d2
7 changed files with 148 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
*.gem
.bundle
Gemfile.lock
pkg/*

4
Gemfile Normal file
View File

@ -0,0 +1,4 @@
source "http://rubygems.org"
# Specify your gem's dependencies in connection_pool.gemspec
gemspec

2
Rakefile Normal file
View File

@ -0,0 +1,2 @@
require 'bundler'
Bundler::GemHelper.install_tasks

17
connection_pool.gemspec Normal file
View File

@ -0,0 +1,17 @@
# -*- encoding: utf-8 -*-
require "./lib/connection_pool/version"
Gem::Specification.new do |s|
s.name = "connection_pool"
s.version = ConnectionPool::VERSION
s.platform = Gem::Platform::RUBY
s.authors = ["Mike Perham"]
s.email = ["mperham@gmail.com"]
s.homepage = ""
s.description = s.summary = %q{Generic connection pool for Ruby}
s.files = `git ls-files`.split("\n")
s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
s.require_paths = ["lib"]
end

70
lib/connection_pool.rb Normal file
View File

@ -0,0 +1,70 @@
require 'connection_pool/timed_queue'
# Generic connection pool class for e.g. sharing a limited number of network connections
# among many threads. Note: Connections are eager created.
#
# Example usage with block (faster):
#
# @pool = ConnectionPool.new { Redis.new }
#
# @pool.with do |redis|
# redis.lpop if redis.llen('my-list') > 0
# end
#
# Example usage replacing a global connection (slower):
#
# REDIS = ConnectionPool.new { Redis.new }
#
# def do_work
# REDIS.lpop if REDIS.llen('my-list') > 0
# end
#
# Accepts the following options:
# - :size - number of connections to pool, defaults to 5
# - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
#
class ConnectionPool
DEFAULTS = { :size => 5, :timeout => 5 }
def initialize(options={})
raise ArgumentError, 'Connection pool requires a block' unless block_given?
@available = TimedQueue.new
@options = DEFAULTS.merge(options)
@options[:size].times do
@available << yield
end
@busy = []
end
def with(&block)
yield checkout
ensure
checkin
end
def method_missing(name, *args)
checkout.send(name, *args)
ensure
checkin
end
private
def checkout
Thread.current[:"current-#{self.object_id}"] ||= begin
conn = @available.timed_pop(@options[:timeout])
@busy << conn
conn
end
end
def checkin
conn = Thread.current[:"current-#{self.object_id}"]
Thread.current[:"current-#{self.object_id}"] = nil
@busy.delete(conn)
@available << conn
nil
end
end

View File

@ -0,0 +1,48 @@
require 'thread'
require 'timeout'
class TimedQueue
def initialize
@que = []
@waiting = []
@mutex = Mutex.new
@resource = ConditionVariable.new
end
def push(obj)
@mutex.synchronize do
@que.push obj
@resource.signal
end
end
alias << push
def timed_pop(timeout=0.5)
while true
@mutex.synchronize do
@waiting.delete(Thread.current)
if @que.empty?
@waiting.push Thread.current
@resource.wait(@mutex, timeout)
raise TimeoutError if @que.empty?
else
retval = @que.shift
@resource.signal
return retval
end
end
end
end
def empty?
@que.empty?
end
def clear
@que.clear
end
def length
@que.length
end
end

View File

@ -0,0 +1,3 @@
module ConnectionPool
VERSION = "0.0.1"
end