From e4f91bbdbaa6ab3125f24967414ac5300bb244f5 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 7 Oct 2022 21:48:38 +1300 Subject: [PATCH] Add IO#timeout attribute and use it for blocking IO operations. (#5653) --- NEWS.md | 11 + ext/openssl/extconf.rb | 1 + ext/openssl/ossl_ssl.c | 14 +- ext/socket/ancdata.c | 4 +- ext/socket/basicsocket.c | 2 +- ext/socket/init.c | 4 +- ext/socket/socket.c | 4 + ext/socket/udpsocket.c | 2 +- gc.c | 1 + include/ruby/io.h | 50 ++- io.c | 340 ++++++++++++++---- scheduler.c | 4 +- .../library/socket/tcpsocket/shared/new.rb | 10 +- test/ruby/test_io.rb | 3 + test/ruby/test_io_timeout.rb | 64 ++++ test/socket/test_tcp.rb | 2 +- thread.c | 11 + 17 files changed, 428 insertions(+), 99 deletions(-) create mode 100644 test/ruby/test_io_timeout.rb diff --git a/NEWS.md b/NEWS.md index d1b25f9444..eb1ffbe438 100644 --- a/NEWS.md +++ b/NEWS.md @@ -102,6 +102,16 @@ Note that each entry is kept to a minimum, see links for details. Note: We're only listing outstanding class updates. +* IO + * Introduce `IO#timeout=` and `IO#timeout` which can cause + `IO::TimeoutError` to be raised if a blocking operation exceeds the + specified timeout. [[Feature #18630]] + + ```ruby + STDIN.timeout = 1 + STDIN.read # => Blocking operation timed out! (IO::TimeoutError) + ``` + * Data * New core class to represent simple immutable value object. The class is similar to `Struct` and partially shares an implementation, but has more @@ -332,3 +342,4 @@ The following deprecated APIs are removed. [Feature #19008]: https://bugs.ruby-lang.org/issues/19008 [Feature #19026]: https://bugs.ruby-lang.org/issues/19026 [Feature #16122]: https://bugs.ruby-lang.org/issues/16122 +[Feature #18630]: https://bugs.ruby-lang.org/issues/18630 diff --git a/ext/openssl/extconf.rb b/ext/openssl/extconf.rb index cc2b1f8ba2..a856646fe5 100644 --- a/ext/openssl/extconf.rb +++ b/ext/openssl/extconf.rb @@ -27,6 +27,7 @@ if with_config("debug") or enable_config("debug") end have_func("rb_io_maybe_wait") # Ruby 3.1 +have_func("rb_io_timeout") # Ruby 3.2 Logging::message "=== Checking for system dependent stuff... ===\n" have_library("nsl", "t_open") diff --git a/ext/openssl/ossl_ssl.c b/ext/openssl/ossl_ssl.c index 6e1a50fd6d..605591efe5 100644 --- a/ext/openssl/ossl_ssl.c +++ b/ext/openssl/ossl_ssl.c @@ -1641,11 +1641,21 @@ no_exception_p(VALUE opts) return 0; } +inline static +VALUE io_timeout() +{ +#ifdef HAVE_RB_IO_TIMEOUT + return Qundef; +#else + return Qnil; +#endif +} + static void io_wait_writable(rb_io_t *fptr) { #ifdef HAVE_RB_IO_MAYBE_WAIT - rb_io_maybe_wait_writable(errno, fptr->self, Qnil); + rb_io_maybe_wait_writable(errno, fptr->self, io_timeout()); #else rb_io_wait_writable(fptr->fd); #endif @@ -1655,7 +1665,7 @@ static void io_wait_readable(rb_io_t *fptr) { #ifdef HAVE_RB_IO_MAYBE_WAIT - rb_io_maybe_wait_readable(errno, fptr->self, Qnil); + rb_io_maybe_wait_readable(errno, fptr->self, io_timeout()); #else rb_io_wait_readable(fptr->fd); #endif diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c index 071e3323bb..0ab3a8da47 100644 --- a/ext/socket/ancdata.c +++ b/ext/socket/ancdata.c @@ -1285,7 +1285,7 @@ bsock_sendmsg_internal(VALUE sock, VALUE data, VALUE vflags, if (ss == -1) { int e; - if (!nonblock && rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) { + if (!nonblock && rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) { rb_io_check_closed(fptr); goto retry; } @@ -1557,7 +1557,7 @@ bsock_recvmsg_internal(VALUE sock, if (ss == -1) { int e; - if (!nonblock && rb_io_maybe_wait_readable(errno, fptr->self, Qnil)) { + if (!nonblock && rb_io_maybe_wait_readable(errno, fptr->self, fptr->timeout)) { rb_io_check_closed(fptr); goto retry; } diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c index 93196c924d..66c2537cbb 100644 --- a/ext/socket/basicsocket.c +++ b/ext/socket/basicsocket.c @@ -601,7 +601,7 @@ rsock_bsock_send(int argc, VALUE *argv, VALUE socket) if (n >= 0) return SSIZET2NUM(n); - if (rb_io_maybe_wait_writable(errno, socket, Qnil)) { + if (rb_io_maybe_wait_writable(errno, socket, fptr->timeout)) { continue; } diff --git a/ext/socket/init.c b/ext/socket/init.c index 0cff3d6794..e60dd32264 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -189,7 +189,7 @@ rsock_s_recvfrom(VALUE socket, int argc, VALUE *argv, enum sock_recv_type from) if (slen >= 0) break; - if (!rb_io_maybe_wait_readable(errno, socket, Qnil)) + if (!rb_io_maybe_wait_readable(errno, socket, Qundef)) rb_sys_fail("recvfrom(2)"); } @@ -705,7 +705,7 @@ rsock_s_accept(VALUE klass, VALUE io, struct sockaddr *sockaddr, socklen_t *len) retry = 1; goto retry; default: - if (!rb_io_maybe_wait_readable(error, io, Qnil)) break; + if (!rb_io_maybe_wait_readable(error, io, Qundef)) break; retry = 0; goto retry; } diff --git a/ext/socket/socket.c b/ext/socket/socket.c index b1965deb9e..5cf0835062 100644 --- a/ext/socket/socket.c +++ b/ext/socket/socket.c @@ -28,6 +28,10 @@ rsock_syserr_fail_host_port(int err, const char *mesg, VALUE host, VALUE port) message = rb_sprintf("%s for %+"PRIsVALUE" port % "PRIsVALUE"", mesg, host, port); + if (err == ETIMEDOUT) { + rb_exc_raise(rb_exc_new3(rb_eIOTimeoutError, message)); + } + rb_syserr_fail_str(err, message); } diff --git a/ext/socket/udpsocket.c b/ext/socket/udpsocket.c index 3500107972..5b878b4a95 100644 --- a/ext/socket/udpsocket.c +++ b/ext/socket/udpsocket.c @@ -170,7 +170,7 @@ udp_send_internal(VALUE v) if (n >= 0) return RB_SSIZE2NUM(n); - if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) { + if (rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) { goto retry; } } diff --git a/gc.c b/gc.c index c1b06a76e7..916584ed35 100644 --- a/gc.c +++ b/gc.c @@ -7303,6 +7303,7 @@ gc_mark_children(rb_objspace_t *objspace, VALUE obj) gc_mark(objspace, any->as.file.fptr->writeconv_pre_ecopts); gc_mark(objspace, any->as.file.fptr->encs.ecopts); gc_mark(objspace, any->as.file.fptr->write_lock); + gc_mark(objspace, any->as.file.fptr->timeout); } break; diff --git a/include/ruby/io.h b/include/ruby/io.h index dc4c8becf6..b91ecd00cb 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -63,6 +63,11 @@ RBIMPL_SYMBOL_EXPORT_BEGIN() struct stat; struct timeval; +/** + * Indicates that a timeout has occurred while performing an IO operation. + */ +RUBY_EXTERN VALUE rb_eIOTimeoutError; + /** * Type of events that an IO can wait. * @@ -214,6 +219,11 @@ typedef struct rb_io_t { * This of course doesn't help inter-process IO interleaves, though. */ VALUE write_lock; + + /** + * The timeout associated with this IO when performing blocking operations. + */ + VALUE timeout; } rb_io_t; /** @alias{rb_io_enc_t} */ @@ -844,11 +854,33 @@ int rb_io_wait_writable(int fd); */ int rb_wait_for_single_fd(int fd, int events, struct timeval *tv); +/** + * Get the timeout associated with the specified io object. + * + * @param[in] io An IO object. + * @retval RUBY_Qnil There is no associated timeout. + * @retval Otherwise The timeout value. + */ +VALUE rb_io_timeout(VALUE io); + +/** + * Set the timeout associated with the specified io object. This timeout is + * used as a best effort timeout to prevent operations from blocking forever. + * + * @param[in] io An IO object. + * @param[in] timeout A timeout value. Must respond to #to_f. + * @ + */ +VALUE rb_io_set_timeout(VALUE io, VALUE timeout); + /** * Blocks until the passed IO is ready for the passed events. The "events" * here is a Ruby level integer, which is an OR-ed value of `IO::READABLE`, * `IO::WRITable`, and `IO::PRIORITY`. * + * If timeout is `Qundef`, it will use the default timeout as given by + * `rb_io_timeout(io)`. + * * @param[in] io An IO object to wait. * @param[in] events See above. * @param[in] timeout Time, or numeric seconds since UNIX epoch. @@ -903,13 +935,8 @@ VALUE rb_io_maybe_wait(int error, VALUE io, VALUE events, VALUE timeout); * @exception rb_eIOError `io` is not open. * @exception rb_eRangeError `timeout` is out of range. * @exception rb_eSystemCallError `select(2)` failed for some reason. - * @exception rb_eTypeError Operation timed out. - * @return Always returns ::RUBY_IO_READABLE. - * - * @internal - * - * Because rb_io_maybe_wait() returns ::RUBY_Qfalse on timeout, this function - * fails to convert that value to `int`, and raises ::rb_eTypeError. + * @retval 0 Operation timed out. + * @retval Otherwise Always returns ::RUBY_IO_READABLE. */ int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout); @@ -924,13 +951,8 @@ int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout); * @exception rb_eIOError `io` is not open. * @exception rb_eRangeError `timeout` is out of range. * @exception rb_eSystemCallError `select(2)` failed for some reason. - * @exception rb_eTypeError Operation timed out. - * @return Always returns ::RUBY_IO_WRITABLE. - * - * @internal - * - * Because rb_io_maybe_wait() returns ::RUBY_Qfalse on timeout, this function - * fails to convert that value to `int`, and raises ::rb_eTypeError. + * @retval 0 Operation timed out. + * @retval Otherwise Always returns ::RUBY_IO_WRITABLE. */ int rb_io_maybe_wait_writable(int error, VALUE io, VALUE timeout); diff --git a/io.c b/io.c index 883175e0df..c41a8d4ed5 100644 --- a/io.c +++ b/io.c @@ -178,6 +178,7 @@ off_t __syscall(quad_t number, ...); VALUE rb_cIO; VALUE rb_eEOFError; VALUE rb_eIOError; +VALUE rb_eIOTimeoutError; VALUE rb_mWaitReadable; VALUE rb_mWaitWritable; @@ -493,7 +494,7 @@ rb_cloexec_fcntl_dupfd(int fd, int minfd) #if defined(_WIN32) #define WAIT_FD_IN_WIN32(fptr) \ - (rb_w32_io_cancelable_p((fptr)->fd) ? Qnil : rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), Qnil)) + (rb_w32_io_cancelable_p((fptr)->fd) ? Qnil : rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), fptr->timeout)) #else #define WAIT_FD_IN_WIN32(fptr) #endif @@ -829,6 +830,49 @@ rb_io_set_write_io(VALUE io, VALUE w) return write_io ? write_io : Qnil; } +/* + * call-seq: + * timeout -> duration or nil + * + * Get the internal timeout duration or nil if it was not set. + * + */ +VALUE +rb_io_timeout(VALUE self) +{ + rb_io_t *fptr = rb_io_get_fptr(self); + + return fptr->timeout; +} + +/* + * call-seq: + * timeout = duration -> duration + * timeout = nil -> nil + * + * Set the internal timeout to the specified duration or nil. The timeout + * applies to all blocking operations where possible. + * + * This affects the following methods (but is not limited to): #gets, #puts, + * #read, #write, #wait_readable and #wait_writable. This also affects + * blocking socket operations like Socket#accept and Socket#connect. + * + * Some operations like File#open and IO#close are not affected by the + * timeout. A timeout during a write operation may leave the IO in an + * inconsistent state, e.g. data was partially written. Generally speaking, a + * timeout is a last ditch effort to prevent an application from hanging on + * slow I/O operations, such as those that occur during a slowloris attack. + */ +VALUE +rb_io_set_timeout(VALUE self, VALUE timeout) +{ + rb_io_t *fptr = rb_io_get_fptr(self); + + fptr->timeout = timeout; + + return self; +} + /* * call-seq: * IO.try_convert(object) -> new_io or nil @@ -1000,7 +1044,7 @@ void rb_io_read_check(rb_io_t *fptr) { if (!READ_DATA_PENDING(fptr)) { - rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), Qnil); + rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), fptr->timeout); } return; } @@ -1052,56 +1096,121 @@ struct io_internal_read_struct { VALUE th; rb_io_t *fptr; int nonblock; + int fd; + void *buf; size_t capa; + struct timeval *timeout; }; struct io_internal_write_struct { + VALUE th; + rb_io_t *fptr; + int nonblock; int fd; + const void *buf; size_t capa; + struct timeval *timeout; }; #ifdef HAVE_WRITEV struct io_internal_writev_struct { + VALUE th; + rb_io_t *fptr; + int nonblock; int fd; + int iovcnt; const struct iovec *iov; + struct timeval *timeout; }; #endif -static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events); +static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval *timeout); + +/** + * Wait for the given events on the given file descriptor. + * Returns -1 if an error or timeout occurred. +errno+ will be set. + * Returns the event mask if an event occurred. + */ +static inline int +io_internal_wait(VALUE thread, rb_io_t *fptr, int error, int events, struct timeval *timeout) +{ + int ready = nogvl_wait_for(thread, fptr, events, timeout); + + if (ready > 0) { + return ready; + } else if (ready == 0) { + errno = ETIMEDOUT; + return -1; + } + + errno = error; + return -1; +} + static VALUE internal_read_func(void *ptr) { struct io_internal_read_struct *iis = ptr; - ssize_t r; -retry: - r = read(iis->fptr->fd, iis->buf, iis->capa); - if (r < 0 && !iis->nonblock) { - int e = errno; - if (io_again_p(e)) { - if (nogvl_wait_for(iis->th, iis->fptr, RB_WAITFD_IN) != -1) { - goto retry; - } - errno = e; + ssize_t result; + + if (iis->timeout && !iis->nonblock) { + if (io_internal_wait(iis->th, iis->fptr, 0, RB_WAITFD_IN, iis->timeout) == -1) { + return -1; } } - return r; + + retry: + result = read(iis->fd, iis->buf, iis->capa); + + if (result < 0 && !iis->nonblock) { + if (io_again_p(errno)) { + if (io_internal_wait(iis->th, iis->fptr, errno, RB_WAITFD_IN, iis->timeout) == -1) { + return -1; + } else { + goto retry; + } + } + } + + return result; } #if defined __APPLE__ -# define do_write_retry(code) do {ret = code;} while (ret == -1 && errno == EPROTOTYPE) +# define do_write_retry(code) do {result = code;} while (result == -1 && errno == EPROTOTYPE) #else -# define do_write_retry(code) ret = code +# define do_write_retry(code) result = code #endif + static VALUE internal_write_func(void *ptr) { struct io_internal_write_struct *iis = ptr; - ssize_t ret; + ssize_t result; + + if (iis->timeout && !iis->nonblock) { + if (io_internal_wait(iis->th, iis->fptr, 0, RB_WAITFD_OUT, iis->timeout) == -1) { + return -1; + } + } + + retry: do_write_retry(write(iis->fd, iis->buf, iis->capa)); - return (VALUE)ret; + + if (result < 0 && !iis->nonblock) { + int e = errno; + if (io_again_p(e)) { + if (io_internal_wait(iis->th, iis->fptr, errno, RB_WAITFD_OUT, iis->timeout) == -1) { + return -1; + } else { + goto retry; + } + } + } + + return result; } #ifdef HAVE_WRITEV @@ -1109,14 +1218,33 @@ static VALUE internal_writev_func(void *ptr) { struct io_internal_writev_struct *iis = ptr; - ssize_t ret; + ssize_t result; + + if (iis->timeout && !iis->nonblock) { + if (io_internal_wait(iis->th, iis->fptr, 0, RB_WAITFD_OUT, iis->timeout) == -1) { + return -1; + } + } + + retry: do_write_retry(writev(iis->fd, iis->iov, iis->iovcnt)); - return (VALUE)ret; + + if (result < 0 && !iis->nonblock) { + if (io_again_p(errno)) { + if (io_internal_wait(iis->th, iis->fptr, errno, RB_WAITFD_OUT, iis->timeout) == -1) { + return -1; + } else { + goto retry; + } + } + } + + return result; } #endif static ssize_t -rb_read_internal(rb_io_t *fptr, void *buf, size_t count) +rb_io_read_memory(rb_io_t *fptr, void *buf, size_t count) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { @@ -1131,15 +1259,25 @@ rb_read_internal(rb_io_t *fptr, void *buf, size_t count) .th = rb_thread_current(), .fptr = fptr, .nonblock = 0, + .fd = fptr->fd, + .buf = buf, - .capa = count + .capa = count, + .timeout = NULL, }; + struct timeval timeout_storage; + + if (fptr->timeout != Qnil) { + timeout_storage = rb_time_interval(fptr->timeout); + iis.timeout = &timeout_storage; + } + return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fptr->fd); } static ssize_t -rb_write_internal(rb_io_t *fptr, const void *buf, size_t count) +rb_io_write_memory(rb_io_t *fptr, const void *buf, size_t count) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { @@ -1151,11 +1289,23 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count) } struct io_internal_write_struct iis = { + .th = rb_thread_current(), + .fptr = fptr, + .nonblock = 0, .fd = fptr->fd, + .buf = buf, - .capa = count + .capa = count, + .timeout = NULL }; + struct timeval timeout_storage; + + if (fptr->timeout != Qnil) { + timeout_storage = rb_time_interval(fptr->timeout); + iis.timeout = &timeout_storage; + } + return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd); } @@ -1175,11 +1325,23 @@ rb_writev_internal(rb_io_t *fptr, const struct iovec *iov, int iovcnt) } struct io_internal_writev_struct iis = { + .th = rb_thread_current(), + .fptr = fptr, + .nonblock = 0, .fd = fptr->fd, + .iov = iov, .iovcnt = iovcnt, + .timeout = NULL }; + struct timeval timeout_storage; + + if (fptr->timeout != Qnil) { + timeout_storage = rb_time_interval(fptr->timeout); + iis.timeout = &timeout_storage; + } + return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fptr->fd); } #endif @@ -1196,11 +1358,13 @@ io_flush_buffer_sync(void *arg) fptr->wbuf.len = 0; return 0; } + if (0 <= r) { fptr->wbuf.off += (int)r; fptr->wbuf.len -= (int)r; errno = EAGAIN; } + return (VALUE)-1; } @@ -1231,7 +1395,7 @@ io_fflush(rb_io_t *fptr) return 0; while (fptr->wbuf.len > 0 && io_flush_buffer(fptr) != 0) { - if (!rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) + if (!rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) return -1; rb_io_check_closed(fptr); @@ -1255,6 +1419,10 @@ rb_io_wait(VALUE io, VALUE events, VALUE timeout) struct timeval tv_storage; struct timeval *tv = NULL; + if (timeout == Qundef) { + timeout = fptr->timeout; + } + if (timeout != Qnil) { tv_storage = rb_time_interval(timeout); tv = &tv_storage; @@ -1562,7 +1730,7 @@ io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length) return result; } else { - return rb_write_internal(fptr, ptr, length); + return rb_io_write_memory(fptr, ptr, length); } } #else @@ -1597,7 +1765,7 @@ io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length) } // Otherwise, we should write the data directly: - return rb_write_internal(fptr, ptr, length); + return rb_io_write_memory(fptr, ptr, length); } #endif @@ -1625,7 +1793,7 @@ io_binwrite_string(VALUE arg) remaining -= result; } // Wait for it to become writable: - else if (rb_io_maybe_wait_writable(errno, p->fptr->self, Qnil)) { + else if (rb_io_maybe_wait_writable(errno, p->fptr->self, p->fptr->timeout)) { rb_io_check_closed(p->fptr); } else { @@ -1892,7 +2060,7 @@ io_binwritev_internal(VALUE arg) iov->iov_base = (char *)iov->iov_base + result; iov->iov_len -= result; } - else if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) { + else if (rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) { rb_io_check_closed(fptr); } else { @@ -2399,12 +2567,12 @@ rb_io_rewind(VALUE io) static int fptr_wait_readable(rb_io_t *fptr) { - int ret = rb_io_maybe_wait_readable(errno, fptr->self, Qnil); + int result = rb_io_maybe_wait_readable(errno, fptr->self, fptr->timeout); - if (ret) + if (result) rb_io_check_closed(fptr); - return ret; + return result; } static int @@ -2423,7 +2591,7 @@ io_fillbuf(rb_io_t *fptr) } if (fptr->rbuf.len == 0) { retry: - r = rb_read_internal(fptr, fptr->rbuf.ptr, fptr->rbuf.capa); + r = rb_io_read_memory(fptr, fptr->rbuf.ptr, fptr->rbuf.capa); if (r < 0) { if (fptr_wait_readable(fptr)) @@ -2815,7 +2983,7 @@ io_bufread(char *ptr, long len, rb_io_t *fptr) while (n > 0) { again: rb_io_check_closed(fptr); - c = rb_read_internal(fptr, ptr+offset, n); + c = rb_io_read_memory(fptr, ptr+offset, n); if (c == 0) break; if (c < 0) { if (fptr_wait_readable(fptr)) @@ -3171,7 +3339,7 @@ rb_io_set_nonblock(rb_io_t *fptr) } static VALUE -read_internal_call(VALUE arg) +io_read_memory_call(VALUE arg) { struct io_internal_read_struct *iis = (struct io_internal_read_struct *)arg; @@ -3189,9 +3357,9 @@ read_internal_call(VALUE arg) } static long -read_internal_locktmp(VALUE str, struct io_internal_read_struct *iis) +io_read_memory_locktmp(VALUE str, struct io_internal_read_struct *iis) { - return (long)rb_str_locktmp_ensure(str, read_internal_call, (VALUE)iis); + return (long)rb_str_locktmp_ensure(str, io_read_memory_call, (VALUE)iis); } #define no_exception_p(opts) !rb_opts_exception_p((opts), TRUE) @@ -3233,9 +3401,11 @@ io_getpartial(int argc, VALUE *argv, VALUE io, int no_exception, int nonblock) iis.th = rb_thread_current(); iis.fptr = fptr; iis.nonblock = nonblock; + iis.fd = fptr->fd; iis.buf = RSTRING_PTR(str); iis.capa = len; - n = read_internal_locktmp(str, &iis); + iis.timeout = NULL; + n = io_read_memory_locktmp(str, &iis); if (n < 0) { int e = errno; if (!nonblock && fptr_wait_readable(fptr)) @@ -3400,9 +3570,11 @@ io_read_nonblock(rb_execution_context_t *ec, VALUE io, VALUE length, VALUE str, shrinkable |= io_setstrbuf(&str, len); iis.fptr = fptr; iis.nonblock = 1; + iis.fd = fptr->fd; iis.buf = RSTRING_PTR(str); iis.capa = len; - n = read_internal_locktmp(str, &iis); + iis.timeout = NULL; + n = io_read_memory_locktmp(str, &iis); if (n < 0) { int e = errno; if (io_again_p(e)) { @@ -5113,13 +5285,13 @@ finish_writeconv(rb_io_t *fptr, int noalloc) res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0); while (dp-ds) { size_t remaining = dp-ds; - long result = rb_write_internal(fptr, ds, remaining); + long result = rb_io_write_memory(fptr, ds, remaining); if (result > 0) { ds += result; if ((size_t)result == remaining) break; } - else if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) { + else if (rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) { if (fptr->fd < 0) return noalloc ? Qtrue : rb_exc_new3(rb_eIOError, rb_str_new_cstr(closed_stream)); } @@ -5732,7 +5904,7 @@ rb_io_syswrite(VALUE io, VALUE str) tmp = rb_str_tmp_frozen_acquire(str); RSTRING_GETMEM(tmp, ptr, len); - n = rb_write_internal(fptr, ptr, len); + n = rb_io_write_memory(fptr, ptr, len); if (n < 0) rb_sys_fail_path(fptr->pathv); rb_str_tmp_frozen_release(str, tmp); @@ -5778,9 +5950,11 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io) iis.th = rb_thread_current(); iis.fptr = fptr; iis.nonblock = 0; + iis.fd = fptr->fd; iis.buf = RSTRING_PTR(str); iis.capa = ilen; - n = read_internal_locktmp(str, &iis); + iis.timeout = NULL; + n = io_read_memory_locktmp(str, &iis); if (n < 0) { rb_sys_fail_path(fptr->pathv); @@ -8900,6 +9074,7 @@ prep_io(int fd, int fmode, VALUE klass, const char *path) fp->self = io; fp->fd = fd; fp->mode = fmode; + fp->timeout = Qnil; if (!io_check_tty(fp)) { #ifdef __CYGWIN__ fp->mode |= FMODE_BINMODE; @@ -9004,6 +9179,7 @@ rb_io_fptr_new(void) fp->encs.ecflags = 0; fp->encs.ecopts = Qnil; fp->write_lock = Qnil; + fp->timeout = Qnil; return fp; } @@ -9112,6 +9288,7 @@ rb_io_initialize(int argc, VALUE *argv, VALUE io) fp->fd = fd; fp->mode = fmode; fp->encs = convconfig; + fp->timeout = Qnil; clear_codeconv(fp); io_check_tty(fp); if (fileno(stdin) == fd) @@ -12093,7 +12270,7 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp) return FALSE; } -struct wait_for_single_fd { +struct fiber_scheduler_wait_for_arguments { VALUE scheduler; rb_io_t *fptr; @@ -12103,11 +12280,11 @@ struct wait_for_single_fd { }; static void * -rb_thread_fiber_scheduler_wait_for(void * _args) +fiber_scheduler_wait_for(void * _arguments) { - struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args; + struct fiber_scheduler_wait_for_arguments *arguments = (struct fiber_scheduler_wait_for_arguments *)_arguments; - args->result = rb_fiber_scheduler_io_wait(args->scheduler, args->fptr->self, INT2NUM(args->events), Qnil); + arguments->result = rb_fiber_scheduler_io_wait(arguments->scheduler, arguments->fptr->self, INT2NUM(arguments->events), arguments->fptr->timeout); return NULL; } @@ -12117,12 +12294,12 @@ rb_thread_fiber_scheduler_wait_for(void * _args) STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN); STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT); static int -nogvl_wait_for(VALUE th, rb_io_t *fptr, short events) +nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval *timeout) { VALUE scheduler = rb_fiber_scheduler_current_for_thread(th); if (scheduler != Qnil) { - struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events}; - rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args); + struct fiber_scheduler_wait_for_arguments args = {.scheduler = scheduler, .fptr = fptr, .events = events}; + rb_thread_call_with_gvl(fiber_scheduler_wait_for, &args); return RTEST(args.result); } @@ -12134,22 +12311,32 @@ nogvl_wait_for(VALUE th, rb_io_t *fptr, short events) fds.fd = fd; fds.events = events; - return poll(&fds, 1, -1); + int timeout_milliseconds = -1; + + if (timeout) { + timeout_milliseconds = (int)(timeout->tv_sec * 1000) + (int)(timeout->tv_usec / 1000); + } + + return poll(&fds, 1, timeout_milliseconds); } #else /* !USE_POLL */ # define IOWAIT_SYSCALL "select" static int -nogvl_wait_for(VALUE th, rb_io_t *fptr, short events) +nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval *timeout) { VALUE scheduler = rb_fiber_scheduler_current_for_thread(th); if (scheduler != Qnil) { - struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events}; - rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args); + struct fiber_scheduler_wait_for_arguments args = {.scheduler = scheduler, .fptr = fptr, .events = events}; + rb_thread_call_with_gvl(fiber_scheduler_wait_for, &args); return RTEST(args.result); } int fd = fptr->fd; - if (fd == -1) return 0; + + if (fd == -1) { + errno = EBADF; + return -1; + } rb_fdset_t fds; int ret; @@ -12159,16 +12346,18 @@ nogvl_wait_for(VALUE th, rb_io_t *fptr, short events) switch (events) { case RB_WAITFD_IN: - ret = rb_fd_select(fd + 1, &fds, 0, 0, 0); + ret = rb_fd_select(fd + 1, &fds, 0, 0, timeout); break; case RB_WAITFD_OUT: - ret = rb_fd_select(fd + 1, 0, &fds, 0, 0); + ret = rb_fd_select(fd + 1, 0, &fds, 0, timeout); break; default: VM_UNREACHABLE(nogvl_wait_for); } rb_fd_term(&fds); + + // On timeout, this returns 0. return ret; } #endif /* !USE_POLL */ @@ -12183,7 +12372,7 @@ maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp) ret = RB_NUM2INT(rb_io_wait(stp->src, RB_INT2NUM(RUBY_IO_READABLE), Qnil)); } else { - ret = nogvl_wait_for(stp->th, stp->src_fptr, RB_WAITFD_IN); + ret = nogvl_wait_for(stp->th, stp->src_fptr, RB_WAITFD_IN, NULL); } } while (ret < 0 && maygvl_copy_stream_continue_p(has_gvl, stp)); @@ -12201,7 +12390,7 @@ nogvl_copy_stream_wait_write(struct copy_stream_struct *stp) int ret; do { - ret = nogvl_wait_for(stp->th, stp->dst_fptr, RB_WAITFD_OUT); + ret = nogvl_wait_for(stp->th, stp->dst_fptr, RB_WAITFD_OUT, NULL); } while (ret < 0 && maygvl_copy_stream_continue_p(0, stp)); if (ret < 0) { @@ -12552,7 +12741,7 @@ static ssize_t maygvl_read(int has_gvl, rb_io_t *fptr, void *buf, size_t count) { if (has_gvl) - return rb_read_internal(fptr, buf, count); + return rb_io_read_memory(fptr, buf, count); else return read(fptr->fd, buf, count); } @@ -14666,6 +14855,8 @@ Init_IO(void) rb_cIO = rb_define_class("IO", rb_cObject); rb_include_module(rb_cIO, rb_mEnumerable); + rb_eIOTimeoutError = rb_define_class_under(rb_cIO, "TimeoutError", rb_eIOError); + rb_define_const(rb_cIO, "READABLE", INT2NUM(RUBY_IO_READABLE)); rb_define_const(rb_cIO, "WRITABLE", INT2NUM(RUBY_IO_WRITABLE)); rb_define_const(rb_cIO, "PRIORITY", INT2NUM(RUBY_IO_PRIORITY)); @@ -14764,23 +14955,26 @@ Init_IO(void) rb_define_alias(rb_cIO, "to_i", "fileno"); rb_define_method(rb_cIO, "to_io", rb_io_to_io, 0); - rb_define_method(rb_cIO, "fsync", rb_io_fsync, 0); - rb_define_method(rb_cIO, "fdatasync", rb_io_fdatasync, 0); - rb_define_method(rb_cIO, "sync", rb_io_sync, 0); - rb_define_method(rb_cIO, "sync=", rb_io_set_sync, 1); + rb_define_method(rb_cIO, "timeout", rb_io_timeout, 0); + rb_define_method(rb_cIO, "timeout=", rb_io_set_timeout, 1); - rb_define_method(rb_cIO, "lineno", rb_io_lineno, 0); - rb_define_method(rb_cIO, "lineno=", rb_io_set_lineno, 1); + rb_define_method(rb_cIO, "fsync", rb_io_fsync, 0); + rb_define_method(rb_cIO, "fdatasync", rb_io_fdatasync, 0); + rb_define_method(rb_cIO, "sync", rb_io_sync, 0); + rb_define_method(rb_cIO, "sync=", rb_io_set_sync, 1); - rb_define_method(rb_cIO, "readlines", rb_io_readlines, -1); + rb_define_method(rb_cIO, "lineno", rb_io_lineno, 0); + rb_define_method(rb_cIO, "lineno=", rb_io_set_lineno, 1); - rb_define_method(rb_cIO, "readpartial", io_readpartial, -1); - rb_define_method(rb_cIO, "read", io_read, -1); + rb_define_method(rb_cIO, "readlines", rb_io_readlines, -1); + + rb_define_method(rb_cIO, "readpartial", io_readpartial, -1); + rb_define_method(rb_cIO, "read", io_read, -1); rb_define_method(rb_cIO, "write", io_write_m, -1); - rb_define_method(rb_cIO, "gets", rb_io_gets_m, -1); - rb_define_method(rb_cIO, "readline", rb_io_readline, -1); - rb_define_method(rb_cIO, "getc", rb_io_getc, 0); - rb_define_method(rb_cIO, "getbyte", rb_io_getbyte, 0); + rb_define_method(rb_cIO, "gets", rb_io_gets_m, -1); + rb_define_method(rb_cIO, "readline", rb_io_readline, -1); + rb_define_method(rb_cIO, "getc", rb_io_getc, 0); + rb_define_method(rb_cIO, "getbyte", rb_io_getbyte, 0); rb_define_method(rb_cIO, "readchar", rb_io_readchar, 0); rb_define_method(rb_cIO, "readbyte", rb_io_readbyte, 0); rb_define_method(rb_cIO, "ungetbyte",rb_io_ungetbyte, 1); diff --git a/scheduler.c b/scheduler.c index e13733722c..675a0a6768 100644 --- a/scheduler.c +++ b/scheduler.c @@ -222,13 +222,13 @@ rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeou VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io) { - return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), Qnil); + return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), rb_io_timeout(io)); } VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io) { - return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), Qnil); + return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io)); } VALUE diff --git a/spec/ruby/library/socket/tcpsocket/shared/new.rb b/spec/ruby/library/socket/tcpsocket/shared/new.rb index 4189acc2f8..e7eb2f3c13 100644 --- a/spec/ruby/library/socket/tcpsocket/shared/new.rb +++ b/spec/ruby/library/socket/tcpsocket/shared/new.rb @@ -14,7 +14,7 @@ describe :tcpsocket_new, shared: true do } end - ruby_version_is "3.0" do + ruby_version_is "3.0"..."3.1" do it 'raises Errno::ETIMEDOUT with :connect_timeout when no server is listening on the given address' do -> { TCPSocket.send(@method, "192.0.2.1", 80, connect_timeout: 0) @@ -22,6 +22,14 @@ describe :tcpsocket_new, shared: true do end end + ruby_version_is "3.2" do + it 'raises IO::TimeoutError with :connect_timeout when no server is listening on the given address' do + -> { + TCPSocket.send(@method, "192.0.2.1", 80, connect_timeout: 0) + }.should raise_error(IO::TimeoutError) + end + end + describe "with a running server" do before :each do @server = SocketSpecs::SpecTCPServer.new diff --git a/test/ruby/test_io.rb b/test/ruby/test_io.rb index f791b4415d..f4ebccf9df 100644 --- a/test/ruby/test_io.rb +++ b/test/ruby/test_io.rb @@ -3960,6 +3960,9 @@ __END__ noex = Thread.new do # everything right and never see exceptions :) until sig_rd.wait_readable(0) IO.pipe do |r, w| + assert_nil r.timeout + assert_nil w.timeout + th = Thread.new { r.read(1) } w.write(dot) diff --git a/test/ruby/test_io_timeout.rb b/test/ruby/test_io_timeout.rb new file mode 100644 index 0000000000..ca4c0b833b --- /dev/null +++ b/test/ruby/test_io_timeout.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: false + +require 'io/nonblock' + +class TestIOTimeout < Test::Unit::TestCase + def with_pipe + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + begin + i, o = UNIXSocket.pair + + unless i.nonblock? && o.nonblock? + i.close + o.close + omit "I/O is not non-blocking!" + end + + yield i, o + ensure + i.close + o.close + end + end + + def test_timeout_attribute + with_pipe do |i, o| + assert_nil i.timeout + + i.timeout = 10 + assert_equal 10, i.timeout + assert_nil o.timeout + + o.timeout = 20 + assert_equal 20, o.timeout + assert_equal 10, i.timeout + end + end + + def test_timeout_read_exception + with_pipe do |i, o| + i.timeout = 0.0001 + + assert_raise(IO::TimeoutError) {i.read} + end + end + + def test_timeout_gets_exception + with_pipe do |i, o| + i.timeout = 0.0001 + + assert_raise(IO::TimeoutError) {i.gets} + end + end + + def test_timeout_puts + with_pipe do |i, o| + i.timeout = 0.0001 + o.puts("Hello World") + o.close + + assert_equal "Hello World", i.gets.chomp + end + end +end diff --git a/test/socket/test_tcp.rb b/test/socket/test_tcp.rb index 9aa716f7ec..83ebea1b7a 100644 --- a/test/socket/test_tcp.rb +++ b/test/socket/test_tcp.rb @@ -70,7 +70,7 @@ class TestSocket_TCPSocket < Test::Unit::TestCase end def test_initialize_connect_timeout - assert_raise(Errno::ETIMEDOUT) do + assert_raise(IO::TimeoutError) do TCPSocket.new("192.0.2.1", 80, connect_timeout: 0) end end diff --git a/thread.c b/thread.c index 1364d73be2..e1b194861a 100644 --- a/thread.c +++ b/thread.c @@ -1675,6 +1675,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) .th = rb_ec_thread_ptr(ec) }; + // `errno` is only valid when there is an actual error - but we can't + // extract that from the return value of `func` alone, so we clear any + // prior `errno` value here so that we can later check if it was set by + // `func` or not (as opposed to some previously set value). + errno = 0; + RB_VM_LOCK_ENTER(); { ccan_list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node); @@ -1706,6 +1712,11 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) /* TODO: check func() */ RUBY_VM_CHECK_INTS_BLOCKING(ec); + // If the error was a timeout, we raise a specific exception for that: + if (saved_errno == ETIMEDOUT) { + rb_raise(rb_eIOTimeoutError, "Blocking operation timed out!"); + } + errno = saved_errno; return val;