1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00
ruby--ruby/lib/rinda/ring.rb

165 lines
3 KiB
Ruby
Raw Normal View History

#
# Note: Rinda::Ring API is unstable.
#
require 'drb/drb'
require 'rinda/rinda'
require 'thread'
module Rinda
Ring_PORT = 7647
class RingServer
include DRbUndumped
def initialize(ts, port=Ring_PORT)
@ts = ts
@soc = UDPSocket.open
@soc.bind('', port)
@w_service = write_service
@r_service = reply_service
end
def write_service
Thread.new do
loop do
msg = @soc.recv(1024)
do_write(msg)
end
end
end
def do_write(msg)
Thread.new do
begin
tuple, sec = Marshal.load(msg)
@ts.write(tuple, sec)
rescue
end
end
end
def reply_service
Thread.new do
loop do
do_reply
end
end
end
def do_reply
tuple = @ts.take([:lookup_ring, DRbObject])
Thread.new { tuple[1].call(@ts) rescue nil}
rescue
end
end
class RingFinger
@@finger = nil
def self.finger
unless @@finger
@@finger = self.new
@@finger.lookup_ring_any
end
@@finger
end
def self.primary
finger.primary
end
def self.to_a
finger.to_a
end
@@broadcast_list = ['<broadcast>', 'localhost']
def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
@broadcast_list = broadcast_list || ['localhost']
@port = port
@primary = nil
@rings = []
end
attr_accessor :broadcast_list, :port, :primary
def to_a
@rings
end
def each
lookup_ring_any unless @primary
return unless @primary
yield(@primary)
@rings.each { |x| yield(x) }
end
def lookup_ring(timeout=5, &block)
return lookup_ring_any(timeout) unless block_given?
msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
@broadcast_list.each do |it|
soc = UDPSocket.open
begin
soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
soc.send(msg, 0, it, @port)
rescue
nil
ensure
soc.close
end
end
sleep(timeout)
end
def lookup_ring_any(timeout=5)
queue = Queue.new
th = Thread.new do
self.lookup_ring(timeout) do |ts|
queue.push(ts)
end
queue.push(nil)
while it = queue.pop
@rings.push(it)
end
end
@primary = queue.pop
raise('RingNotFound') if @primary.nil?
@primary
end
end
class RingProvider
def initialize(klass, front, desc, renewer = nil)
@tuple = [:name, klass, front, desc]
@renewer = renewer || Rinda::SimpleRenewer.new
end
def provide
ts = Rinda::RingFinger.primary
ts.write(@tuple, @renewer)
end
end
end
if __FILE__ == $0
DRb.start_service
case ARGV.shift
when 's'
require 'rinda/tuplespace'
ts = Rinda::TupleSpace.new
place = Rinda::RingServer.new(ts)
$stdin.gets
when 'w'
finger = Rinda::RingFinger.new(nil)
finger.lookup_ring do |ts|
p ts
ts.write([:hello, :world])
end
when 'r'
finger = Rinda::RingFinger.new(nil)
finger.lookup_ring do |ts|
p ts
p ts.take([nil, nil])
end
end
end