1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Automatically close fds on fork (and GC). The connection pools are maintained at thread scope.

This commit is contained in:
Masatoshi SEKI 2019-10-14 20:30:22 +09:00
parent a4cf11c10f
commit 8488d5b5b6

View file

@ -1211,6 +1211,49 @@ module DRb
end
end
class ThreadObject
include MonitorMixin
def initialize(&blk)
super()
@wait_ev = new_cond
@req_ev = new_cond
@res_ev = new_cond
@status = :wait
@req = nil
@res = nil
@thread = Thread.new(self, &blk)
end
def alive?
@thread.alive?
end
def method_missing(msg, *arg, &blk)
synchronize do
@wait_ev.wait_until { @status == :wait }
@req = [msg] + arg
@status = :req
@req_ev.broadcast
@res_ev.wait_until { @status == :res }
value = @res
@req = @res = nil
@status = :wait
@wait_ev.broadcast
return value
end
end
def _execute()
synchronize do
@req_ev.wait_until { @status == :req }
@res = yield(@req)
@status = :res
@res_ev.signal
end
end
end
# Class handling the connection between a DRbObject and the
# server the real object lives on.
#
@ -1222,31 +1265,45 @@ module DRb
# not normally need to deal with it directly.
class DRbConn
POOL_SIZE = 16 # :nodoc:
@mutex = Thread::Mutex.new
@pool = []
def self.make_pool
ThreadObject.new do |queue|
pool = []
while true
queue._execute do |message|
case(message[0])
when :take then
remote_uri = message[1]
conn = nil
new_pool = []
pool.each do |c|
if conn.nil? and c.uri == remote_uri
conn = c if c.alive?
else
new_pool.push c
end
end
pool = new_pool
conn
when :store then
conn = message[1]
pool.unshift(conn)
pool.pop.close while pool.size > POOL_SIZE
conn
else
nil
end
end
end
end
end
@pool_proxy = make_pool
def self.open(remote_uri) # :nodoc:
begin
conn = nil
pid = $$
@mutex.synchronize do
#FIXME
new_pool = []
@pool.each do |c|
if c.pid == pid
if conn.nil? and c.uri == remote_uri
conn = c if c.alive?
else
new_pool.push c
end
else
c.close
end
end
@pool = new_pool
end
@pool_proxy = make_pool unless @pool_proxy.alive?
conn = @pool_proxy.take(remote_uri)
conn = self.new(remote_uri) unless conn
succ, result = yield(conn)
return succ, result
@ -1254,10 +1311,7 @@ module DRb
ensure
if conn
if succ
@mutex.synchronize do
@pool.unshift(conn)
@pool.pop.close while @pool.size > POOL_SIZE
end
@pool_proxy.store(conn)
else
conn.close
end
@ -1267,11 +1321,9 @@ module DRb
def initialize(remote_uri) # :nodoc:
@uri = remote_uri
@pid = $$
@protocol = DRbProtocol.open(remote_uri, DRb.config)
end
attr_reader :uri # :nodoc:
attr_reader :pid # :nodoc:
def send_message(ref, msg_id, arg, block) # :nodoc:
@protocol.send_request(ref, msg_id, arg, block)