1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

* ext/socket/lib/socket.rb (Socket.udp_server_sockets): new method.

(Socket.udp_server_loop_on): new method.
  (Socket.udp_server_loop): new method
  (Socket.ip_sockets_port0): extracted from tcp_server_sockets_port0.
  (Socket::UDPSource): new class.


git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@22212 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
akr 2009-02-10 12:38:16 +00:00
parent a038fab649
commit 1463f1dfe7
3 changed files with 219 additions and 10 deletions

View file

@ -1,3 +1,11 @@
Tue Feb 10 21:26:33 2009 Tanaka Akira <akr@fsij.org>
* ext/socket/lib/socket.rb (Socket.udp_server_sockets): new method.
(Socket.udp_server_loop_on): new method.
(Socket.udp_server_loop): new method
(Socket.ip_sockets_port0): extracted from tcp_server_sockets_port0.
(Socket::UDPSource): new class.
Tue Feb 10 21:14:43 2009 Tanaka Akira <akr@fsij.org>
* ext/socket/socket.c (sockaddr_obj): fill pfamily.

View file

@ -226,8 +226,7 @@ class Socket
end
end
def self.tcp_server_sockets_port0(host)
ai_list = Addrinfo.getaddrinfo(host, 0, nil, :STREAM, nil, Socket::AI_PASSIVE)
def self.ip_sockets_port0(ai_list, reuseaddr)
begin
sockets = []
port = nil
@ -239,14 +238,15 @@ class Socket
end
sockets << s
s.ipv6only! if ai.ipv6?
s.setsockopt(:SOCKET, :REUSEADDR, 1)
if reuseaddr
s.setsockopt(:SOCKET, :REUSEADDR, 1)
end
if !port
s.bind(ai)
port = s.local_address.ip_port
else
s.bind(Addrinfo.tcp(ai.ip_address, port))
s.bind(ai.family_addrinfo(ai.ip_address, port))
end
s.listen(5)
}
rescue Errno::EADDRINUSE
sockets.each {|s|
@ -256,11 +256,21 @@ class Socket
end
sockets
ensure
if $!
sockets.each {|s|
s.close if !s.closed?
}
end
sockets.each {|s| s.close if !s.closed? } if $!
end
class << self
private :ip_sockets_port0
end
def self.tcp_server_sockets_port0(host)
ai_list = Addrinfo.getaddrinfo(host, 0, nil, :STREAM, nil, Socket::AI_PASSIVE)
sockets = ip_sockets_port0(ai_list, true)
sockets.each {|s|
s.listen(5)
}
sockets
ensure
sockets.each {|s| s.close if !s.closed? } if $!
end
class << self
private :tcp_server_sockets_port0
@ -395,6 +405,163 @@ class Socket
end
end
# :call-seq:
# Socket.udp_server_sockets([host, ] port)
#
# Creates UDP sockets for a UDP server.
# It returns an array of sockets.
#
# If _port_ is zero, some port is choosen.
# But the choosen port is used for the all sockets.
#
# # UDP echo server
# sockets = Socket.udp_server_sockets(0)
# p sockets.first.local_address.ip_port #=> 32963
# Socket.udp_server_loop_on(sockets) {|msg, msg_src|
# msg_src.reply msg
# }
#
def self.udp_server_sockets(host=nil, port)
last_error = nil
sockets = []
addr_hash = {}
ipv6_recvpktinfo = nil
if defined? Socket::AncillaryData
if defined? Socket::IPV6_RECVPKTINFO # RFC 3542
ipv6_recvpktinfo = Socket::IPV6_RECVPKTINFO
elsif defined? Socket::IPV6_PKTINFO # RFC 2292
ipv6_recvpktinfo = Socket::IPV6_PKTINFO
end
end
local_addrs = Socket.ip_address_list
ip_list = []
Addrinfo.foreach(host, port, nil, :DGRAM, nil, Socket::AI_PASSIVE) {|ai|
if ai.ipv4? && ai.ip_address == "0.0.0.0"
local_addrs.each {|a|
next if !a.ipv4?
ip_list << Addrinfo.new(a.to_sockaddr, :INET, :DGRAM, 0);
}
elsif ai.ipv6? && ai.ip_address == "::" && !ipv6_recvpktinfo
local_addrs.each {|a|
next if !a.ipv6?
ip_list << Addrinfo.new(a.to_sockaddr, :INET6, :DGRAM, 0);
}
else
ip_list << ai
end
}
if port == 0
sockets = ip_sockets_port0(ip_list, false)
else
ip_list.each {|ip|
ai = Addrinfo.udp(ip.ip_address, port)
begin
s = ai.bind
rescue SystemCallError
last_error = $!
next
end
sockets << s
}
if sockets.empty?
raise last_error
end
end
pktinfo_sockets = {}
sockets.each {|s|
ai = s.local_address
if ipv6_recvpktinfo && ai.ipv6? && ai.ip_address == "::"
s.setsockopt(:IPV6, ipv6_recvpktinfo, 1)
pktinfo_sockets[s] = true
end
}
sockets
end
# :call-seq:
# Socket.udp_server_loop_on(sockets) {|msg, msg_src| ... }
#
# Run UDP server loop on the given sockets.
#
# The return value of Socket.udp_server_sockets is appropriate for the argument.
#
# It calls the block for each message received.
#
def self.udp_server_loop_on(sockets) # :yield: msg, msg_src
loop {
readable, _, _ = IO.select(sockets)
readable.each {|r|
begin
msg, sender_addrinfo, rflags, *controls = r.recvmsg_nonblock
rescue Errno::EWOULDBLOCK
next
end
ai = r.local_address
if ai.ipv6? and pktinfo = controls.find {|c| c.cmsg_is?(:IPV6, :PKTINFO) }
ai = Addrinfo.udp(pktinfo.ipv6_pktinfo_addr.ip_address, ai.ip_port)
yield msg, UDPSource.new(sender_addrinfo, ai) {|reply_msg|
r.sendmsg reply_msg, 0, sender_addrinfo, pktinfo
}
else
yield msg, UDPSource.new(sender_addrinfo, ai) {|reply_msg|
r.send reply_msg, 0, sender_addrinfo
}
end
}
}
end
# :call-seq:
# Socket.udp_server_loop(port) {|msg, msg_src| ... }
# Socket.udp_server_loop(host, port) {|msg, msg_src| ... }
#
# creates a UDP server on _port_ and calls the block for each message arrived.
# The block is called with the message and its source information.
#
# This method allocates sockets internally using _port_.
# If _host_ is specified, it is used conjunction with _port_ to determine the server addresses.
#
# The _msg_ is a string.
#
# The _msg_src_ is a Socket::UDPSource object.
# It is used for reply.
#
# # UDP echo server.
# Socket.udp_server_loop(9261) {|msg, msg_src|
# msg_src.reply msg
# }
#
def self.udp_server_loop(host=nil, port, &b) # :yield: message, message_source
sockets = udp_server_sockets(host, port)
udp_server_loop_on(sockets, &b)
ensure
sockets.each {|s| s.close if !s.closed? } if sockets
end
# UDP address information used by Socket.udp_server_loop.
class UDPSource
def initialize(remote_address, local_address, &reply_proc)
@remote_address = remote_address
@local_address = local_address
@reply_proc = reply_proc
end
attr_reader :remote_address, :local_address
def inspect
"\#<#{self.class}: #{@sender.inspect_sockaddr} to #{@receiver.inspect_sockaddr}>"
end
def reply(msg)
@reply_proc.call msg
end
end
# creates a new socket connected to path using UNIX socket socket.
#
# If a block is given, the block is called with the socket.

View file

@ -225,4 +225,38 @@ class TestSocket < Test::Unit::TestCase
end
def test_udp_server
begin
ip_addrs = Socket.ip_address_list
rescue NotImplementedError
end
sockets = Socket.udp_server_sockets(0)
port = sockets.first.local_address.ip_port
th = Thread.new {
Socket.udp_server_loop_on(sockets) {|msg, msg_src|
break if msg == "exit"
rmsg = Marshal.dump([msg, msg_src.remote_address, msg_src.local_address])
msg_src.reply rmsg
}
}
ip_addrs.each {|ai|
Addrinfo.udp(ai.ip_address, port).connect {|s|
msg1 = "<<<#{ai.inspect}>>>"
s.sendmsg msg1
msg2, addr = s.recvmsg
msg2, remote_address, local_address = Marshal.load(msg2)
assert_equal(msg1, msg2)
assert_equal(ai.ip_address, addr.ip_address)
}
}
ensure
if th
Addrinfo.udp("127.0.0.1", port).connect {|s| s.sendmsg "exit" }
th.join
end
end
end if defined?(Socket)