From 6fca4ecdd1576d61e12f99f24248f5160a1bff33 Mon Sep 17 00:00:00 2001 From: seki Date: Mon, 28 Feb 2005 15:41:14 +0000 Subject: [PATCH] improved keeper thread git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@8040 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 7 ++++++ lib/rinda/tuplespace.rb | 54 +++++++++++++++++++++++++++------------- test/rinda/test_rinda.rb | 10 ++++++++ 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/ChangeLog b/ChangeLog index 01406c0b6e..6eb3e36004 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +Tue Mar 1 00:40:35 2005 Masatoshi SEKI + + * lib/rinda/tuplespace.rb (Rinda::TupleSpace): improved keeper thread. + + * test/rinda/test_rinda.rb: ditto. + + Mon Feb 28 23:10:13 2005 Hirokazu Yamamoto * ext/socket/socket.c (Init_socket): IPv6 is not supported although diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb index 6986e49b3e..0490e0ee00 100644 --- a/lib/rinda/tuplespace.rb +++ b/lib/rinda/tuplespace.rb @@ -64,7 +64,7 @@ module Rinda sec, @renewer = get_renewer(sec_or_renewer) @expires = make_expires(sec) end - + # Create an expiry time. Called with: # # +true+:: the expiry time is the start of 1970 (i.e. expired). @@ -97,12 +97,12 @@ module Rinda def size @ary.size end - + # Create a new tuple from the supplied object (array-like). def make_tuple(ary) Rinda::Tuple.new(ary) end - + private # Given +true+, +nil+, or +Numeric+, returns that (suitable input to # make_expires) and +nil+ (no actual +renewer+), else it return the @@ -165,12 +165,12 @@ module Rinda def wait @cond.wait end - + def read(tuple) @found = tuple signal end - + def signal @place.synchronize do @cond.signal @@ -188,7 +188,7 @@ module Rinda @queue = Queue.new @done = false end - + def notify(ev) @queue.push(ev) end @@ -199,7 +199,7 @@ module Rinda @done = true if it[0] == 'close' return it end - + def each while !@done it = pop @@ -219,14 +219,23 @@ module Rinda def initialize @hash = {} end - + + def has_expires? + @hash.each do |k, v| + v.each do |tuple| + return true if tuple.expires + end + end + false + end + # Add the object to the TupleBag. def push(ary) size = ary.size @hash[size] ||= [] @hash[size].push(ary) end - + # Remove the object from the TupleBag. def delete(ary) size = ary.size @@ -288,12 +297,13 @@ module Rinda @take_waiter = TupleBag.new @notify_waiter = TupleBag.new @period = period - @keeper = keeper + @keeper = nil end # Put a tuple into the tuplespace. def write(tuple, sec=nil) entry = TupleEntry.new(tuple, sec) + start_keeper synchronize do if entry.expired? @read_waiter.find_all_template(entry).each do |template| @@ -323,6 +333,7 @@ module Rinda def move(port, tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? + start_keeper synchronize do entry = @bag.find(template) if entry @@ -356,6 +367,7 @@ module Rinda def read(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? + start_keeper synchronize do entry = @bag.find(template) return entry.value if entry @@ -390,7 +402,7 @@ module Rinda end template end - + private def keep_clean synchronize do @@ -408,7 +420,7 @@ module Rinda end end end - + def notify_event(event, tuple) ev = [event, tuple] @notify_waiter.find_all_template(ev).each do |template| @@ -416,13 +428,21 @@ module Rinda end end - def keeper - Thread.new do - loop do - sleep(@period) - keep_clean + def start_keeper + return if @keeper && @keeper.alive? + @keeper = Thread.new do + while need_keeper? + keep_clean + sleep(@period) end end end + + def need_keeper? + return true if @bag.has_expires? + return true if @read_waiter.has_expires? + return true if @take_waiter.has_expires? + return true if @notify_waiter.has_expires? + end end end diff --git a/test/rinda/test_rinda.rb b/test/rinda/test_rinda.rb index d2b2c7f17e..7ac6918819 100644 --- a/test/rinda/test_rinda.rb +++ b/test/rinda/test_rinda.rb @@ -6,6 +6,8 @@ require 'rinda/tuplespace' require 'singleton' +require 'weakref' + module Rinda class MockClock @@ -499,6 +501,14 @@ class TupleSpaceTest < Test::Unit::TestCase ThreadGroup.new.add(Thread.current) @ts = Rinda::TupleSpace.new(1) end + + def test_gc + w = WeakRef.new(Rinda::TupleSpace.new) + GC.start + assert_raises(WeakRef::RefError) do + w.__getobj__ + end + end end class TupleSpaceProxyTest < Test::Unit::TestCase