mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
* lib/rinda/ring.rb: Add multicast support to Rinda::RingFinger and
Rinda::RingServer. [ruby-trunk - Bug #8073] * test/rinda/test_rinda.rb: Test for the above. * NEWS: Update with Rinda multicast support git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@39895 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
362208c041
commit
646bcaa717
4 changed files with 758 additions and 519 deletions
|
@ -1,3 +1,11 @@
|
|||
Sun Mar 24 05:03:36 2013 Eric Hodel <drbrain@segment7.net>
|
||||
|
||||
* lib/rinda/ring.rb: Add multicast support to Rinda::RingFinger and
|
||||
Rinda::RingServer. [ruby-trunk - Bug #8073]
|
||||
* test/rinda/test_rinda.rb: Test for the above.
|
||||
|
||||
* NEWS: Update with Rinda multicast support
|
||||
|
||||
Sun Mar 24 04:13:27 2013 Eric Hodel <drbrain@segment7.net>
|
||||
|
||||
* test/rinda/test_rinda.rb: Fixed test failures in r39890 and r39890
|
||||
|
|
2
NEWS
2
NEWS
|
@ -32,6 +32,8 @@ with all sufficient information, see the ChangeLog file.
|
|||
|
||||
* Net::SMTP
|
||||
* Added Net::SMTP#rset to implement the RSET comamnd
|
||||
* Rinda::RingServer, Rinda::RingFinger
|
||||
* Rinda now supports multicast sockets
|
||||
|
||||
=== Stdlib compatibility issues (excluding feature bug fixes)
|
||||
=== C API updates
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
require 'drb/drb'
|
||||
require 'rinda/rinda'
|
||||
require 'thread'
|
||||
require 'ipaddr'
|
||||
|
||||
module Rinda
|
||||
|
||||
|
@ -27,25 +28,90 @@ module Rinda
|
|||
include DRbUndumped
|
||||
|
||||
##
|
||||
# Advertises +ts+ on the UDP broadcast address at +port+.
|
||||
# Special renewer for the RingServer to allow shutdown
|
||||
|
||||
def initialize(ts, port=Ring_PORT)
|
||||
@ts = ts
|
||||
@soc = UDPSocket.open
|
||||
@soc.bind('', port)
|
||||
@w_service = write_service
|
||||
@r_service = reply_service
|
||||
class Renewer # :nodoc:
|
||||
include DRbUndumped
|
||||
|
||||
##
|
||||
# Set to false to shutdown future requests using this Renewer
|
||||
|
||||
attr_accessor :renew
|
||||
|
||||
def initialize # :nodoc:
|
||||
@renew = true
|
||||
end
|
||||
|
||||
def renew # :nodoc:
|
||||
@renew ? 1 : true
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Creates a thread that picks up UDP packets and passes them to do_write
|
||||
# for decoding.
|
||||
# Advertises +ts+ on the UDP broadcast address at +port+.
|
||||
|
||||
def write_service
|
||||
Thread.new do
|
||||
loop do
|
||||
msg = @soc.recv(1024)
|
||||
do_write(msg)
|
||||
def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
|
||||
@port = port
|
||||
|
||||
if Integer === addresses then
|
||||
addresses, @port = [Socket::INADDR_ANY], addresses
|
||||
end
|
||||
|
||||
@renewer = Renewer.new
|
||||
|
||||
@ts = ts
|
||||
@sockets = addresses.map do |address|
|
||||
make_socket(address)
|
||||
end
|
||||
|
||||
@w_services = write_services
|
||||
@r_service = reply_service
|
||||
end
|
||||
|
||||
##
|
||||
# Creates a socket at +address+
|
||||
|
||||
def make_socket(address)
|
||||
addrinfo = Addrinfo.udp(address, @port)
|
||||
|
||||
socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
|
||||
addrinfo.protocol)
|
||||
|
||||
if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
|
||||
if Socket.const_defined?(:SO_REUSEPORT) then
|
||||
socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
|
||||
else
|
||||
socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
|
||||
end
|
||||
|
||||
if addrinfo.ipv4_multicast? then
|
||||
mreq = IPAddr.new(addrinfo.ip_address).hton +
|
||||
IPAddr.new('0.0.0.0').hton
|
||||
|
||||
socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
|
||||
else
|
||||
mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I')
|
||||
|
||||
socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
|
||||
end
|
||||
end
|
||||
|
||||
socket.bind(addrinfo)
|
||||
|
||||
socket
|
||||
end
|
||||
|
||||
##
|
||||
# Creates threads that pick up UDP packets and passes them to do_write for
|
||||
# decoding.
|
||||
|
||||
def write_services
|
||||
@sockets.map do |s|
|
||||
Thread.new(s) do |socket|
|
||||
loop do
|
||||
msg = socket.recv(1024)
|
||||
do_write(msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -80,11 +146,28 @@ module Rinda
|
|||
# address of the local TupleSpace.
|
||||
|
||||
def do_reply
|
||||
tuple = @ts.take([:lookup_ring, DRbObject])
|
||||
tuple = @ts.take([:lookup_ring, DRbObject], @renewer)
|
||||
Thread.new { tuple[1].call(@ts) rescue nil}
|
||||
rescue
|
||||
end
|
||||
|
||||
##
|
||||
# Shuts down the RingServer
|
||||
|
||||
def shutdown
|
||||
@renewer.renew = false
|
||||
|
||||
@w_services.each do |thread|
|
||||
thread.kill
|
||||
end
|
||||
|
||||
@sockets.each do |socket|
|
||||
socket.close
|
||||
end
|
||||
|
||||
@r_service.kill
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
##
|
||||
|
@ -130,6 +213,18 @@ module Rinda
|
|||
|
||||
attr_accessor :broadcast_list
|
||||
|
||||
##
|
||||
# Maximum number of hops for sent multicast packets (if using a multicast
|
||||
# address in the broadcast list). The default is 1 (same as UDP
|
||||
# broadcast).
|
||||
|
||||
attr_accessor :multicast_hops
|
||||
|
||||
##
|
||||
# The interface index to send IPv6 multicast packets from.
|
||||
|
||||
attr_accessor :multicast_interface
|
||||
|
||||
##
|
||||
# The port that RingFinger will send query packets to.
|
||||
|
||||
|
@ -149,6 +244,9 @@ module Rinda
|
|||
@port = port
|
||||
@primary = nil
|
||||
@rings = []
|
||||
|
||||
@multicast_hops = 1
|
||||
@multicast_interface = 0
|
||||
end
|
||||
|
||||
##
|
||||
|
@ -178,15 +276,7 @@ module Rinda
|
|||
|
||||
msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
|
||||
@broadcast_list.each do |it|
|
||||
soc = UDPSocket.open
|
||||
begin
|
||||
soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
|
||||
soc.send(msg, 0, it, @port)
|
||||
rescue
|
||||
nil
|
||||
ensure
|
||||
soc.close
|
||||
end
|
||||
send_message(it, msg)
|
||||
end
|
||||
sleep(timeout)
|
||||
end
|
||||
|
@ -217,6 +307,44 @@ module Rinda
|
|||
@primary
|
||||
end
|
||||
|
||||
##
|
||||
# Creates a socket for +address+ with the appropriate multicast options
|
||||
# for multicast addresses.
|
||||
|
||||
def make_socket(address) # :nodoc:
|
||||
addrinfo = Addrinfo.udp(address, @port)
|
||||
|
||||
soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
|
||||
|
||||
if addrinfo.ipv4_multicast? then
|
||||
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true)
|
||||
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL,
|
||||
[@multicast_hops].pack('c'))
|
||||
elsif addrinfo.ipv6_multicast? then
|
||||
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
|
||||
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
|
||||
[@multicast_hops].pack('I'))
|
||||
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
|
||||
[@multicast_interface].pack('I'))
|
||||
else
|
||||
soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
|
||||
end
|
||||
|
||||
soc.connect(addrinfo)
|
||||
|
||||
soc
|
||||
end
|
||||
|
||||
def send_message(address, message) # :nodoc:
|
||||
soc = make_socket(address)
|
||||
|
||||
soc.send(message, 0)
|
||||
rescue
|
||||
nil
|
||||
ensure
|
||||
soc.close if soc
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
##
|
||||
|
|
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue