mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Commit miss
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@39891 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
ebca3dd3c3
commit
90d9413391
2 changed files with 87 additions and 3 deletions
|
@ -206,6 +206,50 @@ module Rinda
|
||||||
# TupleSpaceProxy allows a remote Tuplespace to appear as local.
|
# TupleSpaceProxy allows a remote Tuplespace to appear as local.
|
||||||
|
|
||||||
class TupleSpaceProxy
|
class TupleSpaceProxy
|
||||||
|
##
|
||||||
|
# A Port ensures that a moved tuple arrives properly at its destination
|
||||||
|
# and does not get lost.
|
||||||
|
#
|
||||||
|
# See https://bugs.ruby-lang.org/issues/8125
|
||||||
|
|
||||||
|
class Port # :nodoc:
|
||||||
|
attr_reader :value
|
||||||
|
|
||||||
|
def self.deliver
|
||||||
|
port = new
|
||||||
|
|
||||||
|
begin
|
||||||
|
yield(port)
|
||||||
|
ensure
|
||||||
|
port.close
|
||||||
|
end
|
||||||
|
|
||||||
|
port.value
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
@open = true
|
||||||
|
@value = nil
|
||||||
|
end
|
||||||
|
|
||||||
|
##
|
||||||
|
# Don't let the DRb thread push to it when remote sends tuple
|
||||||
|
|
||||||
|
def close
|
||||||
|
@open = false
|
||||||
|
end
|
||||||
|
|
||||||
|
##
|
||||||
|
# Stores +value+ and ensure it does not get marshaled multiple times.
|
||||||
|
|
||||||
|
def push value
|
||||||
|
raise 'port closed' unless @open
|
||||||
|
|
||||||
|
@value = value
|
||||||
|
|
||||||
|
nil # avoid Marshal
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
##
|
##
|
||||||
# Creates a new TupleSpaceProxy to wrap +ts+.
|
# Creates a new TupleSpaceProxy to wrap +ts+.
|
||||||
|
@ -225,9 +269,9 @@ module Rinda
|
||||||
# Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take.
|
# Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take.
|
||||||
|
|
||||||
def take(tuple, sec=nil, &block)
|
def take(tuple, sec=nil, &block)
|
||||||
port = []
|
Port.deliver do |port|
|
||||||
@ts.move(DRbObject.new(port), tuple, sec, &block)
|
@ts.move(DRbObject.new(port), tuple, sec, &block)
|
||||||
port[0]
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
##
|
##
|
||||||
|
|
|
@ -477,6 +477,46 @@ class TupleSpaceProxyTest < Test::Unit::TestCase
|
||||||
@ts.take({'head' => 1, 'tail' => 2}, 0))
|
@ts.take({'head' => 1, 'tail' => 2}, 0))
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_take_bug_8215
|
||||||
|
DRb.stop_service
|
||||||
|
service = DRb.start_service(nil, @ts_base)
|
||||||
|
|
||||||
|
uri = service.uri
|
||||||
|
|
||||||
|
take = fork do
|
||||||
|
DRb.stop_service
|
||||||
|
DRb.start_service
|
||||||
|
ro = DRbObject.new_with_uri(uri)
|
||||||
|
ts = Rinda::TupleSpaceProxy.new(ro)
|
||||||
|
th = Thread.new do
|
||||||
|
ts.take([:test_take, nil])
|
||||||
|
end
|
||||||
|
Kernel.sleep(0.1)
|
||||||
|
th.raise(Interrupt) # causes loss of the taken tuple
|
||||||
|
ts.write([:barrier, :continue])
|
||||||
|
Kernel.sleep
|
||||||
|
end
|
||||||
|
|
||||||
|
@ts_base.take([:barrier, :continue])
|
||||||
|
|
||||||
|
write = fork do
|
||||||
|
DRb.stop_service
|
||||||
|
DRb.start_service
|
||||||
|
ro = DRbObject.new_with_uri(uri)
|
||||||
|
ts = Rinda::TupleSpaceProxy.new(ro)
|
||||||
|
ts.write([:test_take, 42])
|
||||||
|
end
|
||||||
|
|
||||||
|
status = Process.wait(write)
|
||||||
|
|
||||||
|
assert_equal([[:test_take, 42]], @ts_base.read_all([:test_take, nil]),
|
||||||
|
'[bug:8215] tuple lost')
|
||||||
|
ensure
|
||||||
|
Process.kill("TERM", write) if write && status.nil?
|
||||||
|
Process.kill("TERM", take) if take
|
||||||
|
service.stop_service
|
||||||
|
end
|
||||||
|
|
||||||
@server = DRb.primary_server || DRb.start_service
|
@server = DRb.primary_server || DRb.start_service
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue