From 3deb5d7113e1fd6e4b468e09464d524d390d811e Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 9 May 2021 00:13:47 +1200 Subject: [PATCH] Direct io for accept, send, sendmsg, recvfrom, and related methods. --- ext/socket/basicsocket.c | 2 +- ext/socket/init.c | 97 +++++++++++++++++++++++----------------- ext/socket/rubysocket.h | 2 +- ext/socket/socket.c | 26 +++++------ ext/socket/tcpserver.c | 22 ++++----- ext/socket/unixserver.c | 23 ++++------ include/ruby/io.h | 3 ++ io.c | 36 +++++++++++++++ 8 files changed, 124 insertions(+), 87 deletions(-) diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c index fb5beed81a..6168698df1 100644 --- a/ext/socket/basicsocket.c +++ b/ext/socket/basicsocket.c @@ -566,7 +566,7 @@ rsock_bsock_send(int argc, VALUE *argv, VALUE sock) arg.flags = NUM2INT(flags); while (rsock_maybe_fd_writable(arg.fd), (n = (ssize_t)BLOCKING_REGION_FD(func, &arg)) < 0) { - if (rb_io_wait_writable(arg.fd)) { + if (rb_io_maybe_wait_writable(errno, sock, Qnil)) { continue; } rb_sys_fail(funcname); diff --git a/ext/socket/init.c b/ext/socket/init.c index af46b8edaa..8eb8c8e901 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -166,7 +166,7 @@ recvfrom_locktmp(VALUE v) } VALUE -rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from) +rsock_s_recvfrom(VALUE socket, int argc, VALUE *argv, enum sock_recv_type from) { rb_io_t *fptr; VALUE str; @@ -177,27 +177,35 @@ rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from) rb_scan_args(argc, argv, "12", &len, &flg, &str); - if (flg == Qnil) arg.flags = 0; - else arg.flags = NUM2INT(flg); + if (flg == Qnil) + arg.flags = 0; + else + arg.flags = NUM2INT(flg); + buflen = NUM2INT(len); str = rsock_strbuf(str, buflen); - GetOpenFile(sock, fptr); + RB_IO_POINTER(socket, fptr); + if (rb_io_read_pending(fptr)) { - rb_raise(rb_eIOError, "recv for buffered IO"); + rb_raise(rb_eIOError, "recv for buffered IO"); } + arg.fd = fptr->fd; arg.alen = (socklen_t)sizeof(arg.buf); arg.str = str; arg.length = buflen; - while (rb_io_check_closed(fptr), - rsock_maybe_wait_fd(arg.fd), - (slen = (long)rb_str_locktmp_ensure(str, recvfrom_locktmp, - (VALUE)&arg)) < 0) { - if (!rb_io_wait_readable(fptr->fd)) { + while (true) { + rb_io_check_closed(fptr); + rsock_maybe_wait_fd(arg.fd); + + slen = (long)rb_str_locktmp_ensure(str, recvfrom_locktmp, (VALUE)&arg); + + if (slen >= 0) break; + + if (!rb_io_maybe_wait_readable(errno, socket, Qnil)) rb_sys_fail("recvfrom(2)"); - } } /* Resize the string to the amount of data received */ @@ -221,7 +229,7 @@ rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from) return rb_assoc_new(str, rsock_unixaddr(&arg.buf.un, arg.alen)); #endif case RECV_SOCKET: - return rb_assoc_new(str, rsock_io_socket_addrinfo(sock, &arg.buf.addr, arg.alen)); + return rb_assoc_new(str, rsock_io_socket_addrinfo(socket, &arg.buf.addr, arg.alen)); default: rb_bug("rsock_s_recvfrom called with bad value"); } @@ -682,38 +690,47 @@ accept_blocking(void *data) } VALUE -rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len) +rsock_s_accept(VALUE klass, VALUE io, struct sockaddr *sockaddr, socklen_t *len) { - int fd2; - int retry = 0; - struct accept_arg arg; + rb_io_t *fptr = NULL; + RB_IO_POINTER(io, fptr); + + struct accept_arg accept_arg = { + .fd = fptr->fd, + .sockaddr = sockaddr, + .len = len + }; + + int retry = 0; - arg.fd = fd; - arg.sockaddr = sockaddr; - arg.len = len; retry: - rsock_maybe_wait_fd(fd); - fd2 = (int)BLOCKING_REGION_FD(accept_blocking, &arg); - if (fd2 < 0) { - int e = errno; - switch (e) { - case EMFILE: - case ENFILE: - case ENOMEM: - if (retry) break; - rb_gc(); - retry = 1; - goto retry; - default: - if (!rb_io_wait_readable(fd)) break; - retry = 0; - goto retry; - } - rb_syserr_fail(e, "accept(2)"); + rsock_maybe_wait_fd(accept_arg.fd); + int peer = (int)BLOCKING_REGION_FD(accept_blocking, &accept_arg); + if (peer < 0) { + int error = errno; + + switch (error) { + case EMFILE: + case ENFILE: + case ENOMEM: + if (retry) break; + rb_gc(); + retry = 1; + goto retry; + default: + if (!rb_io_maybe_wait_readable(error, io, Qnil)) break; + retry = 0; + goto retry; + } + + rb_syserr_fail(error, "accept(2)"); } - rb_update_max_fd(fd2); - if (!klass) return INT2NUM(fd2); - return rsock_init_sock(rb_obj_alloc(klass), fd2); + + rb_update_max_fd(peer); + + if (!klass) return INT2NUM(peer); + + return rsock_init_sock(rb_obj_alloc(klass), peer); } int diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h index 2a4c6e136a..a7755660e9 100644 --- a/ext/socket/rubysocket.h +++ b/ext/socket/rubysocket.h @@ -373,7 +373,7 @@ VALUE rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type fr int rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks, struct timeval *timeout); -VALUE rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len); +VALUE rsock_s_accept(VALUE klass, VALUE io, struct sockaddr *sockaddr, socklen_t *len); VALUE rsock_s_accept_nonblock(VALUE klass, VALUE ex, rb_io_t *fptr, struct sockaddr *sockaddr, socklen_t *len); VALUE rsock_sock_listen(VALUE sock, VALUE log); diff --git a/ext/socket/socket.c b/ext/socket/socket.c index 617cca2092..ccf990d11f 100644 --- a/ext/socket/socket.c +++ b/ext/socket/socket.c @@ -750,17 +750,14 @@ sock_recvfrom_nonblock(VALUE sock, VALUE len, VALUE flg, VALUE str, VALUE ex) * */ static VALUE -sock_accept(VALUE sock) +sock_accept(VALUE server) { - rb_io_t *fptr; - VALUE sock2; - union_sockaddr buf; - socklen_t len = (socklen_t)sizeof buf; + union_sockaddr buffer; + socklen_t length = (socklen_t)sizeof(buffer); - GetOpenFile(sock, fptr); - sock2 = rsock_s_accept(rb_cSocket,fptr->fd,&buf.addr,&len); + VALUE peer = rsock_s_accept(rb_cSocket, server, &buffer.addr, &length); - return rb_assoc_new(sock2, rsock_io_socket_addrinfo(sock2, &buf.addr, len)); + return rb_assoc_new(peer, rsock_io_socket_addrinfo(peer, &buffer.addr, length)); } /* :nodoc: */ @@ -820,17 +817,14 @@ sock_accept_nonblock(VALUE sock, VALUE ex) * * Socket#accept */ static VALUE -sock_sysaccept(VALUE sock) +sock_sysaccept(VALUE server) { - rb_io_t *fptr; - VALUE sock2; - union_sockaddr buf; - socklen_t len = (socklen_t)sizeof buf; + union_sockaddr buffer; + socklen_t length = (socklen_t)sizeof(buffer); - GetOpenFile(sock, fptr); - sock2 = rsock_s_accept(0,fptr->fd,&buf.addr,&len); + VALUE peer = rsock_s_accept(0, server, &buffer.addr, &length); - return rb_assoc_new(sock2, rsock_io_socket_addrinfo(sock2, &buf.addr, len)); + return rb_assoc_new(peer, rsock_io_socket_addrinfo(peer, &buffer.addr, length)); } #ifdef HAVE_GETHOSTNAME diff --git a/ext/socket/tcpserver.c b/ext/socket/tcpserver.c index 7634420e38..675733c6f9 100644 --- a/ext/socket/tcpserver.c +++ b/ext/socket/tcpserver.c @@ -53,15 +53,12 @@ tcp_svr_init(int argc, VALUE *argv, VALUE sock) * */ static VALUE -tcp_accept(VALUE sock) +tcp_accept(VALUE server) { - rb_io_t *fptr; - union_sockaddr from; - socklen_t fromlen; + union_sockaddr buffer; + socklen_t length = sizeof(buffer); - GetOpenFile(sock, fptr); - fromlen = (socklen_t)sizeof(from); - return rsock_s_accept(rb_cTCPSocket, fptr->fd, &from.addr, &fromlen); + return rsock_s_accept(rb_cTCPSocket, server, &buffer.addr, &length); } /* :nodoc: */ @@ -91,15 +88,12 @@ tcp_accept_nonblock(VALUE sock, VALUE ex) * */ static VALUE -tcp_sysaccept(VALUE sock) +tcp_sysaccept(VALUE server) { - rb_io_t *fptr; - union_sockaddr from; - socklen_t fromlen; + union_sockaddr buffer; + socklen_t length = sizeof(buffer); - GetOpenFile(sock, fptr); - fromlen = (socklen_t)sizeof(from); - return rsock_s_accept(0, fptr->fd, &from.addr, &fromlen); + return rsock_s_accept(0, server, &buffer.addr, &length); } void diff --git a/ext/socket/unixserver.c b/ext/socket/unixserver.c index b1f2a38547..890f9d3fae 100644 --- a/ext/socket/unixserver.c +++ b/ext/socket/unixserver.c @@ -47,16 +47,12 @@ unix_svr_init(VALUE sock, VALUE path) * */ static VALUE -unix_accept(VALUE sock) +unix_accept(VALUE server) { - rb_io_t *fptr; - struct sockaddr_un from; - socklen_t fromlen; + struct sockaddr_un buffer; + socklen_t length = sizeof(buffer); - GetOpenFile(sock, fptr); - fromlen = (socklen_t)sizeof(struct sockaddr_un); - return rsock_s_accept(rb_cUNIXSocket, fptr->fd, - (struct sockaddr*)&from, &fromlen); + return rsock_s_accept(rb_cUNIXSocket, server, (struct sockaddr*)&buffer, &length); } /* :nodoc: */ @@ -92,15 +88,12 @@ unix_accept_nonblock(VALUE sock, VALUE ex) * */ static VALUE -unix_sysaccept(VALUE sock) +unix_sysaccept(VALUE server) { - rb_io_t *fptr; - struct sockaddr_un from; - socklen_t fromlen; + struct sockaddr_un buffer; + socklen_t length = sizeof(buffer); - GetOpenFile(sock, fptr); - fromlen = (socklen_t)sizeof(struct sockaddr_un); - return rsock_s_accept(0, fptr->fd, (struct sockaddr*)&from, &fromlen); + return rsock_s_accept(0, server, (struct sockaddr*)&buffer, &length); } #endif diff --git a/include/ruby/io.h b/include/ruby/io.h index a3de95f281..bf916a5f8e 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -159,6 +159,9 @@ int rb_io_wait_writable(int fd); int rb_wait_for_single_fd(int fd, int events, struct timeval *tv); VALUE rb_io_wait(VALUE io, VALUE events, VALUE timeout); +VALUE rb_io_maybe_wait(int error, VALUE io, VALUE events, VALUE timeout); +int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout); +int rb_io_maybe_wait_writable(int error, VALUE io, VALUE timeout); /* compatibility for ruby 1.8 and older */ #define rb_io_mode_flags(modestr) [<"rb_io_mode_flags() is obsolete; use rb_io_modestr_fmode()">] diff --git a/io.c b/io.c index d11f4076bb..c5cd348f4d 100644 --- a/io.c +++ b/io.c @@ -1392,6 +1392,42 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) return rb_thread_wait_for_single_fd(fd, events, timeout); } +VALUE rb_io_maybe_wait(int error, VALUE io, VALUE events, VALUE timeout) +{ + switch (error) { + case EINTR: +#if defined(ERESTART) + case ERESTART: +#endif + // We might have pending interrupts since the previous syscall was interrupted: + rb_thread_check_ints(); + + // The operation was interrupted, so retry it immediately: + return events; + + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + // The operation would block, so wait for the specified events: + return rb_io_wait(io, events, timeout); + + default: + // Non-specific error, no event is ready: + return RB_INT2NUM(0); + } +} + +int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout) +{ + return RB_NUM2INT(rb_io_maybe_wait(error, io, RB_INT2NUM(RUBY_IO_READABLE), timeout)); +} + +int rb_io_maybe_wait_writable(int error, VALUE io, VALUE timeout) +{ + return RB_NUM2INT(rb_io_maybe_wait(error, io, RB_INT2NUM(RUBY_IO_WRITABLE), timeout)); +} + static void make_writeconv(rb_io_t *fptr) {