1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

merged from 1.8

git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@18228 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
seki 2008-07-27 00:04:38 +00:00
parent 232e659214
commit 0098caf510
3 changed files with 156 additions and 91 deletions

View file

@ -1,3 +1,9 @@
Sun Jul 27 09:02:32 2008 Masatoshi SEKI <m_seki@mva.biglobe.ne.jp>
* lib/rinda/tuplespace.rb: merged from 1.8.
* test/rinda/test_rinda.rb: merged from 1.8.
Sat Jul 26 22:45:18 2008 Yuki Sonoda (Yugui) <yugui@yugui.jp> Sat Jul 26 22:45:18 2008 Yuki Sonoda (Yugui) <yugui@yugui.jp>
* sample/exyacc.rb: fixed NoMethodError(Kernel#sub!). * sample/exyacc.rb: fixed NoMethodError(Kernel#sub!).

View file

@ -2,6 +2,8 @@ require 'monitor'
require 'thread' require 'thread'
require 'drb/drb' require 'drb/drb'
require 'rinda/rinda' require 'rinda/rinda'
require 'enumerator'
require 'forwardable'
module Rinda module Rinda
@ -286,45 +288,70 @@ module Rinda
# of Tuplespace. # of Tuplespace.
class TupleBag class TupleBag
class TupleBin
extend Forwardable
def_delegators '@bin', :find_all, :delete_if, :each, :empty?
def initialize
@bin = []
end
def add(tuple)
@bin.push(tuple)
end
def delete(tuple)
idx = @bin.rindex(tuple)
@bin.delete_at(idx) if idx
end
def find(&blk)
@bin.reverse_each do |x|
return x if yield(x)
end
nil
end
end
def initialize # :nodoc: def initialize # :nodoc:
@hash = {} @hash = {}
@enum = Enumerable::Enumerator.new(self, :each_entry)
end end
## ##
# +true+ if the TupleBag to see if it has any expired entries. # +true+ if the TupleBag to see if it has any expired entries.
def has_expires? def has_expires?
@hash.each do |k, v| @enum.find do |tuple|
v.each do |tuple| tuple.expires
return true if tuple.expires
end end
end end
false
end
## ##
# Add +ary+ to the TupleBag. # Add +tuple+ to the TupleBag.
def push(ary) def push(tuple)
size = ary.size key = bin_key(tuple)
@hash[size] ||= [] @hash[key] ||= TupleBin.new
@hash[size].push(ary) @hash[key].add(tuple)
end end
## ##
# Removes +ary+ from the TupleBag. # Removes +tuple+ from the TupleBag.
def delete(ary) def delete(tuple)
size = ary.size key = bin_key(tuple)
@hash.fetch(size, []).delete(ary) bin = @hash[key]
return nil unless bin
bin.delete(tuple)
@hash.delete(key) if bin.empty?
tuple
end end
## ##
# Finds all live tuples that match +template+. # Finds all live tuples that match +template+.
def find_all(template) def find_all(template)
@hash.fetch(template.size, []).find_all do |tuple| bin_for_find(template).find_all do |tuple|
tuple.alive? && template.match(tuple) tuple.alive? && template.match(tuple)
end end
end end
@ -333,7 +360,7 @@ module Rinda
# Finds a live tuple that matches +template+. # Finds a live tuple that matches +template+.
def find(template) def find(template)
@hash.fetch(template.size, []).find do |tuple| bin_for_find(template).find do |tuple|
tuple.alive? && template.match(tuple) tuple.alive? && template.match(tuple)
end end
end end
@ -343,7 +370,7 @@ module Rinda
# +tuple+ and are alive. # +tuple+ and are alive.
def find_all_template(tuple) def find_all_template(tuple)
@hash.fetch(tuple.size, []).find_all do |template| @enum.find_all do |template|
template.alive? && template.match(tuple) template.alive? && template.match(tuple)
end end
end end
@ -354,20 +381,39 @@ module Rinda
def delete_unless_alive def delete_unless_alive
deleted = [] deleted = []
@hash.keys.each do |size| @hash.each do |key, bin|
ary = [] bin.delete_if do |tuple|
@hash[size].each do |tuple|
if tuple.alive? if tuple.alive?
ary.push(tuple) false
else else
deleted.push(tuple) deleted.push(tuple)
true
end end
end end
@hash[size] = ary
end end
deleted deleted
end end
private
def each_entry(&blk)
@hash.each do |k, v|
v.each(&blk)
end
end
def bin_key(tuple)
head = tuple[0]
if head.class == Symbol
return head
else
false
end
end
def bin_for_find(template)
key = bin_key(template)
key ? @hash.fetch(key, []) : @enum
end
end end
## ##
@ -403,8 +449,7 @@ module Rinda
# Adds +tuple+ # Adds +tuple+
def write(tuple, sec=nil) def write(tuple, sec=nil)
entry = TupleEntry.new(tuple, sec) entry = create_entry(tuple, sec)
start_keeper
synchronize do synchronize do
if entry.expired? if entry.expired?
@read_waiter.find_all_template(entry).each do |template| @read_waiter.find_all_template(entry).each do |template|
@ -414,6 +459,7 @@ module Rinda
notify_event('delete', entry.value) notify_event('delete', entry.value)
else else
@bag.push(entry) @bag.push(entry)
start_keeper if entry.expires
@read_waiter.find_all_template(entry).each do |template| @read_waiter.find_all_template(entry).each do |template|
template.read(tuple) template.read(tuple)
end end
@ -439,7 +485,6 @@ module Rinda
def move(port, tuple, sec=nil) def move(port, tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec) template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given? yield(template) if block_given?
start_keeper
synchronize do synchronize do
entry = @bag.find(template) entry = @bag.find(template)
if entry if entry
@ -452,6 +497,7 @@ module Rinda
begin begin
@take_waiter.push(template) @take_waiter.push(template)
start_keeper if template.expires
while true while true
raise RequestCanceledError if template.canceled? raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired? raise RequestExpiredError if template.expired?
@ -476,7 +522,6 @@ module Rinda
def read(tuple, sec=nil) def read(tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec) template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given? yield(template) if block_given?
start_keeper
synchronize do synchronize do
entry = @bag.find(template) entry = @bag.find(template)
return entry.value if entry return entry.value if entry
@ -484,6 +529,7 @@ module Rinda
begin begin
@read_waiter.push(template) @read_waiter.push(template)
start_keeper if template.expires
template.wait template.wait
raise RequestCanceledError if template.canceled? raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired? raise RequestExpiredError if template.expired?
@ -529,6 +575,10 @@ module Rinda
private private
def create_entry(tuple, sec)
TupleEntry.new(tuple, sec)
end
## ##
# Removes dead tuples. # Removes dead tuples.
@ -566,9 +616,12 @@ module Rinda
def start_keeper def start_keeper
return if @keeper && @keeper.alive? return if @keeper && @keeper.alive?
@keeper = Thread.new do @keeper = Thread.new do
while need_keeper? while true
keep_clean
sleep(@period) sleep(@period)
synchronize do
break unless need_keeper?
keep_clean
end
end end
end end
end end

View file

@ -12,14 +12,14 @@ class MockClock
include Singleton include Singleton
class MyTS < Rinda::TupleSpace class MyTS < Rinda::TupleSpace
def keeper def keeper_thread
nil nil
end end
end end
def initialize def initialize
@now = 2 @now = 2
@reso = 0.1 @reso = 1
@ts = MyTS.new @ts = MyTS.new
@ts.write([2, :now]) @ts.write([2, :now])
@inf = 2**31 - 1 @inf = 2**31 - 1
@ -33,17 +33,18 @@ class MockClock
n n
end end
def _forward(n=@reso) def _forward(n=nil)
now ,= @ts.take([nil, :now]) now ,= @ts.take([nil, :now])
@now = now + n @now = now + n
n = @reso if n.nil? n = @reso if n.nil?
@ts.write([@now, :now]) @ts.write([@now, :now])
end end
def forward(n=@reso) def forward(n)
while n > 0 while n > 0
_forward(@reso) _forward(@reso)
n -= @reso n -= @reso
Thread.pass
end end
end end
@ -55,21 +56,11 @@ class MockClock
@ts.write([2, :now]) @ts.write([2, :now])
end end
def sleep(n=@reso) def sleep(n=nil)
while will_deadlock?
n -= @reso
forward
return 0 if n <= 0
end
now ,= @ts.read([nil, :now]) now ,= @ts.read([nil, :now])
@ts.read([(now + n)..@inf, :now]) @ts.read([(now + n)..@inf, :now])
0 0
end end
def will_deadlock?
sz = Thread.current.group.list.find_all {|x| x.status != 'sleep'}.size
sz <= 1
end
end end
module Time module Time
@ -116,6 +107,14 @@ module TupleSpaceTestModule
end end
end end
def thread_join(th)
while th.alive?
Kernel.sleep(0.1)
sleep(1)
end
th.value
end
def test_00_tuple def test_00_tuple
tuple = Rinda::TupleEntry.new([1,2,3]) tuple = Rinda::TupleEntry.new([1,2,3])
assert(!tuple.canceled?) assert(!tuple.canceled?)
@ -240,6 +239,28 @@ module TupleSpaceTestModule
end end
end end
def test_ruby_talk_264062
th = Thread.new { @ts.take([:empty], 1) }
sleep(10)
assert_raises(Rinda::RequestExpiredError) do
thread_join(th)
end
th = Thread.new { @ts.read([:empty], 1) }
sleep(10)
assert_raises(Rinda::RequestExpiredError) do
thread_join(th)
end
end
def test_symbol_tuple
@ts.write([:symbol, :symbol])
@ts.write(['string', :string])
assert_equal([[:symbol, :symbol]], @ts.read_all([:symbol, nil]))
assert_equal([[:symbol, :symbol]], @ts.read_all([Symbol, nil]))
assert_equal([], @ts.read_all([:nil, nil]))
end
def test_core_01 def test_core_01
5.times do |n| 5.times do |n|
@ts.write([:req, 2]) @ts.write([:req, 2])
@ -252,7 +273,7 @@ module TupleSpaceTestModule
s = 0 s = 0
while true while true
begin begin
tuple = @ts.take([:req, Integer], 0.5) tuple = @ts.take([:req, Integer], 1)
assert_equal(2, tuple[1]) assert_equal(2, tuple[1])
s += tuple[1] s += tuple[1]
rescue Rinda::RequestExpiredError rescue Rinda::RequestExpiredError
@ -263,10 +284,9 @@ module TupleSpaceTestModule
s s
end end
sleep(20) assert_equal(10, thread_join(taker))
tuple = @ts.take([:ans, nil]) tuple = @ts.take([:ans, nil])
assert_equal(10, tuple[1]) assert_equal(10, tuple[1])
assert_equal(10, taker.value)
end end
def test_core_02 def test_core_02
@ -274,7 +294,7 @@ module TupleSpaceTestModule
s = 0 s = 0
while true while true
begin begin
tuple = @ts.take([:req, Integer], 1.0) tuple = @ts.take([:req, Integer], 1)
assert_equal(2, tuple[1]) assert_equal(2, tuple[1])
s += tuple[1] s += tuple[1]
rescue Rinda::RequestExpiredError rescue Rinda::RequestExpiredError
@ -289,10 +309,9 @@ module TupleSpaceTestModule
@ts.write([:req, 2]) @ts.write([:req, 2])
end end
sleep(20) assert_equal(10, thread_join(taker))
tuple = @ts.take([:ans, nil]) tuple = @ts.take([:ans, nil])
assert_equal(10, tuple[1]) assert_equal(10, tuple[1])
assert_equal(10, taker.value)
assert_equal([], @ts.read_all([nil, nil])) assert_equal([], @ts.read_all([nil, nil]))
end end
@ -349,7 +368,7 @@ module TupleSpaceTestModule
s = 0 s = 0
while true while true
begin begin
tuple = @ts.take([:req, Integer], 1.0) tuple = @ts.take([:req, Integer], 1)
s += tuple[1] s += tuple[1]
rescue Rinda::RequestExpiredError rescue Rinda::RequestExpiredError
break break
@ -359,26 +378,23 @@ module TupleSpaceTestModule
s s
end end
writer = Thread.new do
5.times do |n| 5.times do |n|
@ts.write([:req, 2]) @ts.write([:req, 2])
sleep 0.1
end
end end
@ts.take({"message"=>"first", "name"=>"3"}) @ts.take({"message"=>"first", "name"=>"3"})
sleep(4) sleep(4)
assert_equal(10, thread_join(taker))
tuple = @ts.take([:ans, nil]) tuple = @ts.take([:ans, nil])
assert_equal(10, tuple[1]) assert_equal(10, tuple[1])
assert_equal(10, taker.value)
assert_equal([], @ts.read_all([nil, nil])) assert_equal([], @ts.read_all([nil, nil]))
notify1.cancel notify1.cancel
sleep(3) # notify2 expired sleep(3) # notify2 expired
assert_equal([0, 11], listener1.value) assert_equal([0, 11], thread_join(listener1))
assert_equal([0, 3], listener2.value) assert_equal([0, 3], thread_join(listener2))
ary = [] ary = []
ary.push(["write", {"message"=>"first", "name"=>"3"}]) ary.push(["write", {"message"=>"first", "name"=>"3"}])
@ -403,23 +419,24 @@ module TupleSpaceTestModule
template = nil template = nil
taker = Thread.new do taker = Thread.new do
@ts.take([:take, nil], 10) do |template| @ts.take([:take, nil], 10) do |t|
template = t
Thread.new do Thread.new do
sleep 0.2
template.cancel template.cancel
end end
end end
end end
sleep(1) sleep(2)
assert_raises(Rinda::RequestCanceledError) do
assert_nil(thread_join(taker))
end
assert(template.canceled?) assert(template.canceled?)
@ts.write([:take, 1]) @ts.write([:take, 1])
assert_raises(Rinda::RequestCanceledError) do
assert_nil(taker.value)
end
assert_equal([[:take, 1]], @ts.read_all([nil, nil])) assert_equal([[:take, 1]], @ts.read_all([nil, nil]))
end end
@ -431,23 +448,24 @@ module TupleSpaceTestModule
template = nil template = nil
reader = Thread.new do reader = Thread.new do
@ts.read([:take, nil], 10) do |template| @ts.read([:take, nil], 10) do |t|
template = t
Thread.new do Thread.new do
sleep 0.2
template.cancel template.cancel
end end
end end
end end
sleep(1) sleep(2)
assert_raises(Rinda::RequestCanceledError) do
assert_nil(thread_join(reader))
end
assert(template.canceled?) assert(template.canceled?)
@ts.write([:take, 1]) @ts.write([:take, 1])
assert_raises(Rinda::RequestCanceledError) do
assert_nil(reader.value)
end
assert_equal([[:take, 1]], @ts.read_all([nil, nil])) assert_equal([[:take, 1]], @ts.read_all([nil, nil]))
end end
@ -478,29 +496,21 @@ module TupleSpaceTestModule
assert(tuple.expired?) assert(tuple.expired?)
assert(!tuple.alive?) assert(!tuple.alive?)
tuple = Rinda::TupleEntry.new([1,2,3], SimpleRenewer.new(1,2)) @renewer = SimpleRenewer.new(1,2)
tuple = Rinda::TupleEntry.new([1,2,3], @renewer)
assert(!tuple.canceled?) assert(!tuple.canceled?)
assert(!tuple.expired?) assert(!tuple.expired?)
assert(tuple.alive?) assert(tuple.alive?)
sleep(1.5) sleep(1)
assert(!tuple.canceled?) assert(!tuple.canceled?)
assert(!tuple.expired?) assert(!tuple.expired?)
assert(tuple.alive?) assert(tuple.alive?)
sleep(1.5) sleep(2)
assert(tuple.expired?) assert(tuple.expired?)
assert(!tuple.alive?) assert(!tuple.alive?)
end end
end end
class TupleSpaceTest < Test::Unit::TestCase
def test_message
flunk("YARV doesn't support Rinda")
end
end
end
__END__
class TupleSpaceTest < Test::Unit::TestCase class TupleSpaceTest < Test::Unit::TestCase
include TupleSpaceTestModule include TupleSpaceTestModule
@ -520,13 +530,9 @@ class TupleSpaceProxyTest < Test::Unit::TestCase
end end
def test_remote_array_and_hash def test_remote_array_and_hash
ary = [1, 2, 3] @ts.write(DRbObject.new([1, 2, 3]))
@ts.write(DRbObject.new(ary))
GC.start
assert_equal([1, 2, 3], @ts.take([1, 2, 3], 0)) assert_equal([1, 2, 3], @ts.take([1, 2, 3], 0))
hash = {'head' => 1, 'tail' => 2} @ts.write(DRbObject.new({'head' => 1, 'tail' => 2}))
@ts.write(DRbObject.new(hash))
GC.start
assert_equal({'head' => 1, 'tail' => 2}, assert_equal({'head' => 1, 'tail' => 2},
@ts.take({'head' => 1, 'tail' => 2}, 0)) @ts.take({'head' => 1, 'tail' => 2}, 0))
end end