From b9a91334c50e5a0b8ea0a4b571cd3011228cba6c Mon Sep 17 00:00:00 2001 From: normal Date: Mon, 15 Jun 2015 19:38:49 +0000 Subject: [PATCH] socket: allow exception-free nonblocking sendmsg/recvmsg As documented before, exceptions are expensive and IO::Wait*able are too common in socket applications to be the exceptional case. Datagram sockets deserve the same API which stream sockets are allowed with read_nonblock and write_nonblock. Note: this does not offer a performance advantage under optimal conditions when both ends are equally matched in speed, but it it does make debug output cleaner by avoiding exceptions whenever the receiver slows down. * ext/socket/ancdata.c (bsock_sendmsg_internal, bsock_recvmsg_internal): support "exception: false" kwarg * ext/socket/init.c (rsock_s_recvfrom_nonblock): ditto * ext/socket/init.c (rsock_s_recvfrom_nonblock): use rsock_opt_false_p * ext/socket/socket.c (sock_connect_nonblock): ditto * ext/socket/rubysocket.h (rsock_opt_false_p): new function * ext/socket/basicsocket.c (bsock_recv_nonblock): update rdoc * ext/socket/udpsocket.c (udp_recvfrom_nonblock): ditto * test/socket/test_nonblock.rb: new tests [ruby-core:69542] [Feature #11229] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@50910 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 15 ++++++++++++ ext/socket/ancdata.c | 32 ++++++++++++++++++++++---- ext/socket/basicsocket.c | 8 +++++-- ext/socket/init.c | 11 ++++----- ext/socket/rubysocket.h | 8 +++++++ ext/socket/socket.c | 6 ++--- ext/socket/udpsocket.c | 8 +++++-- test/socket/test_nonblock.rb | 44 ++++++++++++++++++++++++++++++++++++ 8 files changed, 113 insertions(+), 19 deletions(-) diff --git a/ChangeLog b/ChangeLog index 512f3252ae..bf5e689235 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +Tue Jun 16 04:38:02 2015 Eric Wong + + * ext/socket/ancdata.c (bsock_sendmsg_internal, + bsock_recvmsg_internal): + support "exception: false" kwarg + * ext/socket/init.c (rsock_s_recvfrom_nonblock): + ditto + * ext/socket/init.c (rsock_s_recvfrom_nonblock): use rsock_opt_false_p + * ext/socket/socket.c (sock_connect_nonblock): ditto + * ext/socket/rubysocket.h (rsock_opt_false_p): new function + * ext/socket/basicsocket.c (bsock_recv_nonblock): update rdoc + * ext/socket/udpsocket.c (udp_recvfrom_nonblock): ditto + * test/socket/test_nonblock.rb: new tests + [ruby-core:69542] [Feature #11229] + Mon Jun 15 14:33:02 2015 Akinori MUSHA * lib/set.rb: Make Set#each and SortedSet#each generate a sized diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c index 6e59694723..614c8f31de 100644 --- a/ext/socket/ancdata.c +++ b/ext/socket/ancdata.c @@ -3,6 +3,7 @@ #include int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ +static VALUE sym_exception, sym_wait_readable, sym_wait_writable; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) static VALUE rb_cAncillaryData; @@ -1133,6 +1134,7 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) VALUE data, vflags, dest_sockaddr; struct msghdr mh; struct iovec iov; + VALUE opts = Qnil; VALUE controls = Qnil; int controls_num; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) @@ -1152,7 +1154,8 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) if (argc == 0) rb_raise(rb_eArgError, "mesg argument required"); - rb_scan_args(argc, argv, "12*", &data, &vflags, &dest_sockaddr, &controls); + rb_scan_args(argc, argv, "12*:", &data, &vflags, &dest_sockaddr, &controls, + &opts); StringValue(data); controls_num = RARRAY_LENINT(controls); @@ -1281,8 +1284,13 @@ bsock_sendmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) rb_io_check_closed(fptr); goto retry; } - if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) - rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "sendmsg(2) would block"); + if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { + if (rsock_opt_false_p(opts, sym_exception)) { + return sym_wait_writable; + } + rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, + "sendmsg(2) would block"); + } rb_sys_fail("sendmsg(2)"); } #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) @@ -1336,7 +1344,7 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock) #if defined(HAVE_SENDMSG) /* * call-seq: - * basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls) => numbytes_sent + * basicsocket.sendmsg_nonblock(mesg, flags=0, dest_sockaddr=nil, *controls, opts={}) => numbytes_sent * * sendmsg_nonblock sends a message using sendmsg(2) system call in non-blocking manner. * @@ -1344,6 +1352,9 @@ rsock_bsock_sendmsg(int argc, VALUE *argv, VALUE sock) * but the non-blocking flag is set before the system call * and it doesn't retry the system call. * + * By specifying `exception: false`, the _opts_ hash allows you to indicate + * that sendmsg_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. */ VALUE rsock_bsock_sendmsg_nonblock(int argc, VALUE *argv, VALUE sock) @@ -1602,8 +1613,12 @@ bsock_recvmsg_internal(int argc, VALUE *argv, VALUE sock, int nonblock) rb_io_check_closed(fptr); goto retry; } - if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) + if (nonblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { + if (rsock_opt_false_p(vopts, sym_exception)) { + return sym_wait_readable; + } rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvmsg(2) would block"); + } #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) if (!gc_done && (errno == EMFILE || errno == EMSGSIZE)) { /* @@ -1788,6 +1803,9 @@ rsock_bsock_recvmsg(int argc, VALUE *argv, VALUE sock) * but non-blocking flag is set before the system call * and it doesn't retry the system call. * + * By specifying `exception: false`, the _opts_ hash allows you to indicate + * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. */ VALUE rsock_bsock_recvmsg_nonblock(int argc, VALUE *argv, VALUE sock) @@ -1833,4 +1851,8 @@ rsock_init_ancdata(void) rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_addr", ancillary_ipv6_pktinfo_addr, 0); rb_define_method(rb_cAncillaryData, "ipv6_pktinfo_ifindex", ancillary_ipv6_pktinfo_ifindex, 0); #endif +#undef rb_intern + sym_exception = ID2SYM(rb_intern("exception")); + sym_wait_readable = ID2SYM(rb_intern("wait_readable")); + sym_wait_writable = ID2SYM(rb_intern("wait_writable")); } diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c index 54559779cf..4c793263ef 100644 --- a/ext/socket/basicsocket.c +++ b/ext/socket/basicsocket.c @@ -640,8 +640,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) /* * call-seq: - * basicsocket.recv_nonblock(maxlen) => mesg - * basicsocket.recv_nonblock(maxlen, flags) => mesg + * basicsocket.recv_nonblock(maxlen [, flags [, options ]) => mesg * * Receives up to _maxlen_ bytes from +socket+ using recvfrom(2) after * O_NONBLOCK is set for the underlying file descriptor. @@ -655,6 +654,7 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) * === Parameters * * +maxlen+ - the number of bytes to receive from the socket * * +flags+ - zero or more of the +MSG_+ options + * * +options+ - keyword hash, supporting `exception: false` * * === Example * serv = TCPServer.new("127.0.0.1", 0) @@ -679,6 +679,10 @@ bsock_recv(int argc, VALUE *argv, VALUE sock) * it is extended by IO::WaitReadable. * So IO::WaitReadable can be used to rescue the exceptions for retrying recv_nonblock. * + * By specifying `exception: false`, the options hash allows you to indicate + * that recv_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. + * * === See * * Socket#recvfrom */ diff --git a/ext/socket/init.c b/ext/socket/init.c index 455652d082..5f0d445c70 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -188,9 +188,10 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type long slen; int fd, flags; VALUE addr = Qnil; + VALUE opts = Qnil; socklen_t len0; - rb_scan_args(argc, argv, "11", &len, &flg); + rb_scan_args(argc, argv, "11:", &len, &flg, &opts); if (flg == Qnil) flags = 0; else flags = NUM2INT(flg); @@ -226,6 +227,8 @@ rsock_s_recvfrom_nonblock(VALUE sock, int argc, VALUE *argv, enum sock_recv_type #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif + if (rsock_opt_false_p(opts, sym_exception)) + return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "recvfrom(2) would block"); } rb_sys_fail("recvfrom(2)"); @@ -528,14 +531,10 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, struct sockaddr *sockaddr, socklen_t *len) { int fd2; - int ex = 1; VALUE opts = Qnil; rb_scan_args(argc, argv, "0:", &opts); - if (!NIL_P(opts) && Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) - ex = 0; - rb_secure(3); rb_io_set_nonblock(fptr); fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1); @@ -549,7 +548,7 @@ rsock_s_accept_nonblock(int argc, VALUE *argv, VALUE klass, rb_io_t *fptr, #if defined EPROTO case EPROTO: #endif - if (!ex) + if (rsock_opt_false_p(opts, sym_exception)) return sym_wait_readable; rb_readwrite_sys_fail(RB_IO_WAIT_READABLE, "accept(2) would block"); } diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h index 359d28ea3d..d03b1c5e0b 100644 --- a/ext/socket/rubysocket.h +++ b/ext/socket/rubysocket.h @@ -423,4 +423,12 @@ static inline void rsock_maybe_wait_fd(int fd) { } # define MSG_DONTWAIT_RELIABLE 0 #endif +static inline int +rsock_opt_false_p(VALUE opt, VALUE sym) +{ + if (!NIL_P(opt) && Qfalse == rb_hash_lookup2(opt, sym, Qundef)) + return 1; + return 0; +} + #endif diff --git a/ext/socket/socket.c b/ext/socket/socket.c index bb23703706..f2d4323a89 100644 --- a/ext/socket/socket.c +++ b/ext/socket/socket.c @@ -503,15 +503,13 @@ sock_connect_nonblock(int argc, VALUE *argv, VALUE sock) n = connect(fptr->fd, (struct sockaddr*)RSTRING_PTR(addr), RSTRING_SOCKLEN(addr)); if (n < 0) { if (errno == EINPROGRESS) { - if (!NIL_P(opts) && - Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) { + if (rsock_opt_false_p(opts, sym_exception)) { return sym_wait_writable; } rb_readwrite_sys_fail(RB_IO_WAIT_WRITABLE, "connect(2) would block"); } if (errno == EISCONN) { - if (!NIL_P(opts) && - Qfalse == rb_hash_lookup2(opts, sym_exception, Qundef)) { + if (rsock_opt_false_p(opts, sym_exception)) { return INT2FIX(0); } } diff --git a/ext/socket/udpsocket.c b/ext/socket/udpsocket.c index eb605cafc1..86d18f4fae 100644 --- a/ext/socket/udpsocket.c +++ b/ext/socket/udpsocket.c @@ -194,8 +194,7 @@ udp_send(int argc, VALUE *argv, VALUE sock) /* * call-seq: - * udpsocket.recvfrom_nonblock(maxlen) => [mesg, sender_inet_addr] - * udpsocket.recvfrom_nonblock(maxlen, flags) => [mesg, sender_inet_addr] + * udpsocket.recvfrom_nonblock(maxlen [, flags [, options]) => [mesg, sender_inet_addr] * * Receives up to _maxlen_ bytes from +udpsocket+ using recvfrom(2) after * O_NONBLOCK is set for the underlying file descriptor. @@ -211,6 +210,7 @@ udp_send(int argc, VALUE *argv, VALUE sock) * === Parameters * * +maxlen+ - the number of bytes to receive from the socket * * +flags+ - zero or more of the +MSG_+ options + * * +options+ - keyword hash, supporting `exception: false` * * === Example * require 'socket' @@ -238,6 +238,10 @@ udp_send(int argc, VALUE *argv, VALUE sock) * it is extended by IO::WaitReadable. * So IO::WaitReadable can be used to rescue the exceptions for retrying recvfrom_nonblock. * + * By specifying `exception: false`, the options hash allows you to indicate + * that recvmsg_nonblock should not raise an IO::WaitWritable exception, but + * return the symbol :wait_writable instead. + * * === See * * Socket#recvfrom */ diff --git a/test/socket/test_nonblock.rb b/test/socket/test_nonblock.rb index f0fba3af74..4e14c9355a 100644 --- a/test/socket/test_nonblock.rb +++ b/test/socket/test_nonblock.rb @@ -1,6 +1,7 @@ begin require "socket" require "io/nonblock" + require "io/wait" rescue LoadError end @@ -275,6 +276,17 @@ class TestSocketNonblock < Test::Unit::TestCase } end + def test_recvfrom_nonblock_no_exception + udp_pair do |s1, s2| + assert_equal :wait_readable, s1.recvfrom_nonblock(100, exception: false) + s2.send("aaa", 0, s1.getsockname) + assert s1.wait_readable + mesg, inet_addr = s1.recvfrom_nonblock(100, exception: false) + assert_equal(4, inet_addr.length) + assert_equal("aaa", mesg) + end + end + if defined?(UNIXSocket) && defined?(Socket::SOCK_SEQPACKET) def test_sendmsg_nonblock_seqpacket buf = '*' * 8192 @@ -286,6 +298,27 @@ class TestSocketNonblock < Test::Unit::TestCase rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}" end + + def test_sendmsg_nonblock_no_exception + buf = '*' * 128 + UNIXSocket.pair(:SEQPACKET) do |s1, s2| + n = 0 + Timeout.timeout(60) do + case rv = s1.sendmsg_nonblock(buf, exception: false) + when Integer + n += rv + when :wait_writable + break + else + flunk "unexpected return value: #{rv.inspect}" + end while true + assert_equal :wait_writable, rv + assert_operator n, :>, 0 + end + end + rescue NotImplementedError, Errno::ENOSYS, Errno::EPROTONOSUPPORT + skip "UNIXSocket.pair(:SEQPACKET) not implemented on this platform: #{$!}" + end end def test_recvmsg_nonblock_error @@ -297,6 +330,7 @@ class TestSocketNonblock < Test::Unit::TestCase rescue Errno::EWOULDBLOCK assert_kind_of(IO::WaitReadable, $!) end + assert_equal :wait_readable, s1.recvmsg_nonblock(11, exception: false) } end @@ -310,6 +344,16 @@ class TestSocketNonblock < Test::Unit::TestCase } end + def test_recv_nonblock_no_exception + tcp_pair {|c, s| + assert_equal :wait_readable, c.recv_nonblock(11, exception: false) + s.write('HI') + assert c.wait_readable + assert_equal 'HI', c.recv_nonblock(11, exception: false) + assert_equal :wait_readable, c.recv_nonblock(11, exception: false) + } + end + def test_connect_nonblock_error serv = TCPServer.new("127.0.0.1", 0) _, port, _, _ = serv.addr