mirror of
https://github.com/mperham/connection_pool
synced 2023-03-27 23:22:21 -04:00
New shutdown
method
This commit is contained in:
parent
971b016329
commit
6caf9d5227
5 changed files with 104 additions and 14 deletions
10
Changes.md
10
Changes.md
|
@ -1,3 +1,13 @@
|
|||
1.0.1
|
||||
-----
|
||||
|
||||
- New `#shutdown` method
|
||||
|
||||
This method accepts a block and calls the block for each
|
||||
connection in the pool. After calling this method, trying to get a
|
||||
connection from the pool raises a `PullShuttingDownError`
|
||||
exception.
|
||||
|
||||
1.0.0
|
||||
-----
|
||||
|
||||
|
|
|
@ -73,6 +73,10 @@ class ConnectionPool
|
|||
nil
|
||||
end
|
||||
|
||||
def shutdown(&block)
|
||||
@available.shutdown(&block)
|
||||
end
|
||||
|
||||
class Wrapper < ::BasicObject
|
||||
METHODS = [:with]
|
||||
|
||||
|
|
|
@ -1,16 +1,25 @@
|
|||
require 'thread'
|
||||
require 'timeout'
|
||||
|
||||
class ConnectionPool::PoolShuttingDownError < RuntimeError
|
||||
end
|
||||
|
||||
class ConnectionPool::TimedStack
|
||||
def initialize(size = 0)
|
||||
@que = Array.new(size) { yield }
|
||||
@mutex = Mutex.new
|
||||
@resource = ConditionVariable.new
|
||||
@shutdown_block = nil
|
||||
end
|
||||
|
||||
def push(obj)
|
||||
@mutex.synchronize do
|
||||
@que.push obj
|
||||
if @shutdown_block
|
||||
@shutdown_block.call(obj)
|
||||
else
|
||||
@que.push obj
|
||||
end
|
||||
|
||||
@resource.broadcast
|
||||
end
|
||||
end
|
||||
|
@ -20,6 +29,7 @@ class ConnectionPool::TimedStack
|
|||
deadline = Time.now + timeout
|
||||
@mutex.synchronize do
|
||||
loop do
|
||||
raise ConnectionPool::PoolShuttingDownError if @shutdown_block
|
||||
return @que.pop unless @que.empty?
|
||||
to_wait = deadline - Time.now
|
||||
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
|
||||
|
@ -28,6 +38,20 @@ class ConnectionPool::TimedStack
|
|||
end
|
||||
end
|
||||
|
||||
def shutdown(&block)
|
||||
raise ArgumentError, "shutdown must receive a block" unless block_given?
|
||||
|
||||
@mutex.synchronize do
|
||||
@shutdown_block = block
|
||||
@resource.broadcast
|
||||
|
||||
@que.size.times do
|
||||
conn = @que.pop
|
||||
block.call(conn)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def empty?
|
||||
@que.empty?
|
||||
end
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
class ConnectionPool
|
||||
VERSION = "1.0.0"
|
||||
VERSION = "1.0.1"
|
||||
end
|
||||
|
|
|
@ -29,6 +29,18 @@ class TestConnectionPool < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
class Recorder
|
||||
def initialize
|
||||
@calls = []
|
||||
end
|
||||
|
||||
attr_reader :calls
|
||||
|
||||
def do_work(label)
|
||||
@calls << label
|
||||
end
|
||||
end
|
||||
|
||||
def test_basic_multithreaded_usage
|
||||
pool = ConnectionPool.new(:size => 5) { NetworkConnection.new }
|
||||
threads = []
|
||||
|
@ -112,18 +124,6 @@ class TestConnectionPool < MiniTest::Unit::TestCase
|
|||
assert_equal 1, ids.uniq.size
|
||||
end
|
||||
|
||||
class Recorder
|
||||
def initialize
|
||||
@calls = []
|
||||
end
|
||||
|
||||
attr_reader :calls
|
||||
|
||||
def do_work(label)
|
||||
@calls << label
|
||||
end
|
||||
end
|
||||
|
||||
def test_nested_checkout
|
||||
recorder = Recorder.new
|
||||
pool = ConnectionPool.new(:size => 1) { recorder }
|
||||
|
@ -147,4 +147,56 @@ class TestConnectionPool < MiniTest::Unit::TestCase
|
|||
|
||||
assert_equal ['inner', 'outer', 'other'], recorder.calls
|
||||
end
|
||||
|
||||
def test_shutdown_is_executed_for_all_connections
|
||||
recorders = []
|
||||
|
||||
pool = ConnectionPool.new(:size => 3) do
|
||||
Recorder.new.tap { |r| recorders << r }
|
||||
end
|
||||
|
||||
pool.shutdown do |recorder|
|
||||
recorder.do_work("shutdown")
|
||||
end
|
||||
|
||||
assert_equal [["shutdown"]] * 3, recorders.map { |r| r.calls }
|
||||
end
|
||||
|
||||
def test_raises_error_after_shutting_down
|
||||
pool = ConnectionPool.new(:size => 1) { true }
|
||||
|
||||
pool.shutdown { }
|
||||
|
||||
assert_raises ConnectionPool::PoolShuttingDownError do
|
||||
pool.checkout
|
||||
end
|
||||
end
|
||||
|
||||
def test_runs_shutdown_block_asynchronously_if_connection_was_in_use
|
||||
recorders = []
|
||||
|
||||
pool = ConnectionPool.new(:size => 3) do
|
||||
Recorder.new.tap { |r| recorders << r }
|
||||
end
|
||||
|
||||
pool.checkout
|
||||
|
||||
pool.shutdown do |recorder|
|
||||
recorder.do_work("shutdown")
|
||||
end
|
||||
|
||||
assert_equal [["shutdown"], ["shutdown"], []], recorders.map { |r| r.calls }
|
||||
|
||||
pool.checkin
|
||||
|
||||
assert_equal [["shutdown"], ["shutdown"], ["shutdown"]], recorders.map { |r| r.calls }
|
||||
end
|
||||
|
||||
def test_raises_an_error_if_shutdown_is_called_without_a_block
|
||||
pool = ConnectionPool.new(:size => 1) { }
|
||||
|
||||
assert_raises ArgumentError do
|
||||
pool.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue