mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Add IO#timeout attribute and use it for blocking IO operations. (#5653)
This commit is contained in:
parent
e76217a7f3
commit
e4f91bbdba
Notes:
git
2022-10-07 17:49:02 +09:00
Merged-By: ioquatix <samuel@codeotaku.com>
17 changed files with 428 additions and 99 deletions
11
NEWS.md
11
NEWS.md
|
@ -102,6 +102,16 @@ Note that each entry is kept to a minimum, see links for details.
|
||||||
|
|
||||||
Note: We're only listing outstanding class updates.
|
Note: We're only listing outstanding class updates.
|
||||||
|
|
||||||
|
* IO
|
||||||
|
* Introduce `IO#timeout=` and `IO#timeout` which can cause
|
||||||
|
`IO::TimeoutError` to be raised if a blocking operation exceeds the
|
||||||
|
specified timeout. [[Feature #18630]]
|
||||||
|
|
||||||
|
```ruby
|
||||||
|
STDIN.timeout = 1
|
||||||
|
STDIN.read # => Blocking operation timed out! (IO::TimeoutError)
|
||||||
|
```
|
||||||
|
|
||||||
* Data
|
* Data
|
||||||
* New core class to represent simple immutable value object. The class is
|
* New core class to represent simple immutable value object. The class is
|
||||||
similar to `Struct` and partially shares an implementation, but has more
|
similar to `Struct` and partially shares an implementation, but has more
|
||||||
|
@ -332,3 +342,4 @@ The following deprecated APIs are removed.
|
||||||
[Feature #19008]: https://bugs.ruby-lang.org/issues/19008
|
[Feature #19008]: https://bugs.ruby-lang.org/issues/19008
|
||||||
[Feature #19026]: https://bugs.ruby-lang.org/issues/19026
|
[Feature #19026]: https://bugs.ruby-lang.org/issues/19026
|
||||||
[Feature #16122]: https://bugs.ruby-lang.org/issues/16122
|
[Feature #16122]: https://bugs.ruby-lang.org/issues/16122
|
||||||
|
[Feature #18630]: https://bugs.ruby-lang.org/issues/18630
|
||||||
|
|
|
@ -27,6 +27,7 @@ if with_config("debug") or enable_config("debug")
|
||||||
end
|
end
|
||||||
|
|
||||||
have_func("rb_io_maybe_wait") # Ruby 3.1
|
have_func("rb_io_maybe_wait") # Ruby 3.1
|
||||||
|
have_func("rb_io_timeout") # Ruby 3.2
|
||||||
|
|
||||||
Logging::message "=== Checking for system dependent stuff... ===\n"
|
Logging::message "=== Checking for system dependent stuff... ===\n"
|
||||||
have_library("nsl", "t_open")
|
have_library("nsl", "t_open")
|
||||||
|
|
|
@ -1641,11 +1641,21 @@ no_exception_p(VALUE opts)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline static
|
||||||
|
VALUE io_timeout()
|
||||||
|
{
|
||||||
|
#ifdef HAVE_RB_IO_TIMEOUT
|
||||||
|
return Qundef;
|
||||||
|
#else
|
||||||
|
return Qnil;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
io_wait_writable(rb_io_t *fptr)
|
io_wait_writable(rb_io_t *fptr)
|
||||||
{
|
{
|
||||||
#ifdef HAVE_RB_IO_MAYBE_WAIT
|
#ifdef HAVE_RB_IO_MAYBE_WAIT
|
||||||
rb_io_maybe_wait_writable(errno, fptr->self, Qnil);
|
rb_io_maybe_wait_writable(errno, fptr->self, io_timeout());
|
||||||
#else
|
#else
|
||||||
rb_io_wait_writable(fptr->fd);
|
rb_io_wait_writable(fptr->fd);
|
||||||
#endif
|
#endif
|
||||||
|
@ -1655,7 +1665,7 @@ static void
|
||||||
io_wait_readable(rb_io_t *fptr)
|
io_wait_readable(rb_io_t *fptr)
|
||||||
{
|
{
|
||||||
#ifdef HAVE_RB_IO_MAYBE_WAIT
|
#ifdef HAVE_RB_IO_MAYBE_WAIT
|
||||||
rb_io_maybe_wait_readable(errno, fptr->self, Qnil);
|
rb_io_maybe_wait_readable(errno, fptr->self, io_timeout());
|
||||||
#else
|
#else
|
||||||
rb_io_wait_readable(fptr->fd);
|
rb_io_wait_readable(fptr->fd);
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1285,7 +1285,7 @@ bsock_sendmsg_internal(VALUE sock, VALUE data, VALUE vflags,
|
||||||
|
|
||||||
if (ss == -1) {
|
if (ss == -1) {
|
||||||
int e;
|
int e;
|
||||||
if (!nonblock && rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
|
if (!nonblock && rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) {
|
||||||
rb_io_check_closed(fptr);
|
rb_io_check_closed(fptr);
|
||||||
goto retry;
|
goto retry;
|
||||||
}
|
}
|
||||||
|
@ -1557,7 +1557,7 @@ bsock_recvmsg_internal(VALUE sock,
|
||||||
|
|
||||||
if (ss == -1) {
|
if (ss == -1) {
|
||||||
int e;
|
int e;
|
||||||
if (!nonblock && rb_io_maybe_wait_readable(errno, fptr->self, Qnil)) {
|
if (!nonblock && rb_io_maybe_wait_readable(errno, fptr->self, fptr->timeout)) {
|
||||||
rb_io_check_closed(fptr);
|
rb_io_check_closed(fptr);
|
||||||
goto retry;
|
goto retry;
|
||||||
}
|
}
|
||||||
|
|
|
@ -601,7 +601,7 @@ rsock_bsock_send(int argc, VALUE *argv, VALUE socket)
|
||||||
|
|
||||||
if (n >= 0) return SSIZET2NUM(n);
|
if (n >= 0) return SSIZET2NUM(n);
|
||||||
|
|
||||||
if (rb_io_maybe_wait_writable(errno, socket, Qnil)) {
|
if (rb_io_maybe_wait_writable(errno, socket, fptr->timeout)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -189,7 +189,7 @@ rsock_s_recvfrom(VALUE socket, int argc, VALUE *argv, enum sock_recv_type from)
|
||||||
|
|
||||||
if (slen >= 0) break;
|
if (slen >= 0) break;
|
||||||
|
|
||||||
if (!rb_io_maybe_wait_readable(errno, socket, Qnil))
|
if (!rb_io_maybe_wait_readable(errno, socket, Qundef))
|
||||||
rb_sys_fail("recvfrom(2)");
|
rb_sys_fail("recvfrom(2)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,7 +705,7 @@ rsock_s_accept(VALUE klass, VALUE io, struct sockaddr *sockaddr, socklen_t *len)
|
||||||
retry = 1;
|
retry = 1;
|
||||||
goto retry;
|
goto retry;
|
||||||
default:
|
default:
|
||||||
if (!rb_io_maybe_wait_readable(error, io, Qnil)) break;
|
if (!rb_io_maybe_wait_readable(error, io, Qundef)) break;
|
||||||
retry = 0;
|
retry = 0;
|
||||||
goto retry;
|
goto retry;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,10 @@ rsock_syserr_fail_host_port(int err, const char *mesg, VALUE host, VALUE port)
|
||||||
message = rb_sprintf("%s for %+"PRIsVALUE" port % "PRIsVALUE"",
|
message = rb_sprintf("%s for %+"PRIsVALUE" port % "PRIsVALUE"",
|
||||||
mesg, host, port);
|
mesg, host, port);
|
||||||
|
|
||||||
|
if (err == ETIMEDOUT) {
|
||||||
|
rb_exc_raise(rb_exc_new3(rb_eIOTimeoutError, message));
|
||||||
|
}
|
||||||
|
|
||||||
rb_syserr_fail_str(err, message);
|
rb_syserr_fail_str(err, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -170,7 +170,7 @@ udp_send_internal(VALUE v)
|
||||||
|
|
||||||
if (n >= 0) return RB_SSIZE2NUM(n);
|
if (n >= 0) return RB_SSIZE2NUM(n);
|
||||||
|
|
||||||
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
|
if (rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) {
|
||||||
goto retry;
|
goto retry;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
1
gc.c
1
gc.c
|
@ -7303,6 +7303,7 @@ gc_mark_children(rb_objspace_t *objspace, VALUE obj)
|
||||||
gc_mark(objspace, any->as.file.fptr->writeconv_pre_ecopts);
|
gc_mark(objspace, any->as.file.fptr->writeconv_pre_ecopts);
|
||||||
gc_mark(objspace, any->as.file.fptr->encs.ecopts);
|
gc_mark(objspace, any->as.file.fptr->encs.ecopts);
|
||||||
gc_mark(objspace, any->as.file.fptr->write_lock);
|
gc_mark(objspace, any->as.file.fptr->write_lock);
|
||||||
|
gc_mark(objspace, any->as.file.fptr->timeout);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
|
|
@ -63,6 +63,11 @@ RBIMPL_SYMBOL_EXPORT_BEGIN()
|
||||||
struct stat;
|
struct stat;
|
||||||
struct timeval;
|
struct timeval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates that a timeout has occurred while performing an IO operation.
|
||||||
|
*/
|
||||||
|
RUBY_EXTERN VALUE rb_eIOTimeoutError;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Type of events that an IO can wait.
|
* Type of events that an IO can wait.
|
||||||
*
|
*
|
||||||
|
@ -214,6 +219,11 @@ typedef struct rb_io_t {
|
||||||
* This of course doesn't help inter-process IO interleaves, though.
|
* This of course doesn't help inter-process IO interleaves, though.
|
||||||
*/
|
*/
|
||||||
VALUE write_lock;
|
VALUE write_lock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The timeout associated with this IO when performing blocking operations.
|
||||||
|
*/
|
||||||
|
VALUE timeout;
|
||||||
} rb_io_t;
|
} rb_io_t;
|
||||||
|
|
||||||
/** @alias{rb_io_enc_t} */
|
/** @alias{rb_io_enc_t} */
|
||||||
|
@ -844,11 +854,33 @@ int rb_io_wait_writable(int fd);
|
||||||
*/
|
*/
|
||||||
int rb_wait_for_single_fd(int fd, int events, struct timeval *tv);
|
int rb_wait_for_single_fd(int fd, int events, struct timeval *tv);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the timeout associated with the specified io object.
|
||||||
|
*
|
||||||
|
* @param[in] io An IO object.
|
||||||
|
* @retval RUBY_Qnil There is no associated timeout.
|
||||||
|
* @retval Otherwise The timeout value.
|
||||||
|
*/
|
||||||
|
VALUE rb_io_timeout(VALUE io);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the timeout associated with the specified io object. This timeout is
|
||||||
|
* used as a best effort timeout to prevent operations from blocking forever.
|
||||||
|
*
|
||||||
|
* @param[in] io An IO object.
|
||||||
|
* @param[in] timeout A timeout value. Must respond to #to_f.
|
||||||
|
* @
|
||||||
|
*/
|
||||||
|
VALUE rb_io_set_timeout(VALUE io, VALUE timeout);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Blocks until the passed IO is ready for the passed events. The "events"
|
* Blocks until the passed IO is ready for the passed events. The "events"
|
||||||
* here is a Ruby level integer, which is an OR-ed value of `IO::READABLE`,
|
* here is a Ruby level integer, which is an OR-ed value of `IO::READABLE`,
|
||||||
* `IO::WRITable`, and `IO::PRIORITY`.
|
* `IO::WRITable`, and `IO::PRIORITY`.
|
||||||
*
|
*
|
||||||
|
* If timeout is `Qundef`, it will use the default timeout as given by
|
||||||
|
* `rb_io_timeout(io)`.
|
||||||
|
*
|
||||||
* @param[in] io An IO object to wait.
|
* @param[in] io An IO object to wait.
|
||||||
* @param[in] events See above.
|
* @param[in] events See above.
|
||||||
* @param[in] timeout Time, or numeric seconds since UNIX epoch.
|
* @param[in] timeout Time, or numeric seconds since UNIX epoch.
|
||||||
|
@ -903,13 +935,8 @@ VALUE rb_io_maybe_wait(int error, VALUE io, VALUE events, VALUE timeout);
|
||||||
* @exception rb_eIOError `io` is not open.
|
* @exception rb_eIOError `io` is not open.
|
||||||
* @exception rb_eRangeError `timeout` is out of range.
|
* @exception rb_eRangeError `timeout` is out of range.
|
||||||
* @exception rb_eSystemCallError `select(2)` failed for some reason.
|
* @exception rb_eSystemCallError `select(2)` failed for some reason.
|
||||||
* @exception rb_eTypeError Operation timed out.
|
* @retval 0 Operation timed out.
|
||||||
* @return Always returns ::RUBY_IO_READABLE.
|
* @retval Otherwise Always returns ::RUBY_IO_READABLE.
|
||||||
*
|
|
||||||
* @internal
|
|
||||||
*
|
|
||||||
* Because rb_io_maybe_wait() returns ::RUBY_Qfalse on timeout, this function
|
|
||||||
* fails to convert that value to `int`, and raises ::rb_eTypeError.
|
|
||||||
*/
|
*/
|
||||||
int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout);
|
int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout);
|
||||||
|
|
||||||
|
@ -924,13 +951,8 @@ int rb_io_maybe_wait_readable(int error, VALUE io, VALUE timeout);
|
||||||
* @exception rb_eIOError `io` is not open.
|
* @exception rb_eIOError `io` is not open.
|
||||||
* @exception rb_eRangeError `timeout` is out of range.
|
* @exception rb_eRangeError `timeout` is out of range.
|
||||||
* @exception rb_eSystemCallError `select(2)` failed for some reason.
|
* @exception rb_eSystemCallError `select(2)` failed for some reason.
|
||||||
* @exception rb_eTypeError Operation timed out.
|
* @retval 0 Operation timed out.
|
||||||
* @return Always returns ::RUBY_IO_WRITABLE.
|
* @retval Otherwise Always returns ::RUBY_IO_WRITABLE.
|
||||||
*
|
|
||||||
* @internal
|
|
||||||
*
|
|
||||||
* Because rb_io_maybe_wait() returns ::RUBY_Qfalse on timeout, this function
|
|
||||||
* fails to convert that value to `int`, and raises ::rb_eTypeError.
|
|
||||||
*/
|
*/
|
||||||
int rb_io_maybe_wait_writable(int error, VALUE io, VALUE timeout);
|
int rb_io_maybe_wait_writable(int error, VALUE io, VALUE timeout);
|
||||||
|
|
||||||
|
|
308
io.c
308
io.c
|
@ -178,6 +178,7 @@ off_t __syscall(quad_t number, ...);
|
||||||
VALUE rb_cIO;
|
VALUE rb_cIO;
|
||||||
VALUE rb_eEOFError;
|
VALUE rb_eEOFError;
|
||||||
VALUE rb_eIOError;
|
VALUE rb_eIOError;
|
||||||
|
VALUE rb_eIOTimeoutError;
|
||||||
VALUE rb_mWaitReadable;
|
VALUE rb_mWaitReadable;
|
||||||
VALUE rb_mWaitWritable;
|
VALUE rb_mWaitWritable;
|
||||||
|
|
||||||
|
@ -493,7 +494,7 @@ rb_cloexec_fcntl_dupfd(int fd, int minfd)
|
||||||
|
|
||||||
#if defined(_WIN32)
|
#if defined(_WIN32)
|
||||||
#define WAIT_FD_IN_WIN32(fptr) \
|
#define WAIT_FD_IN_WIN32(fptr) \
|
||||||
(rb_w32_io_cancelable_p((fptr)->fd) ? Qnil : rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), Qnil))
|
(rb_w32_io_cancelable_p((fptr)->fd) ? Qnil : rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), fptr->timeout))
|
||||||
#else
|
#else
|
||||||
#define WAIT_FD_IN_WIN32(fptr)
|
#define WAIT_FD_IN_WIN32(fptr)
|
||||||
#endif
|
#endif
|
||||||
|
@ -829,6 +830,49 @@ rb_io_set_write_io(VALUE io, VALUE w)
|
||||||
return write_io ? write_io : Qnil;
|
return write_io ? write_io : Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* timeout -> duration or nil
|
||||||
|
*
|
||||||
|
* Get the internal timeout duration or nil if it was not set.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
VALUE
|
||||||
|
rb_io_timeout(VALUE self)
|
||||||
|
{
|
||||||
|
rb_io_t *fptr = rb_io_get_fptr(self);
|
||||||
|
|
||||||
|
return fptr->timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* timeout = duration -> duration
|
||||||
|
* timeout = nil -> nil
|
||||||
|
*
|
||||||
|
* Set the internal timeout to the specified duration or nil. The timeout
|
||||||
|
* applies to all blocking operations where possible.
|
||||||
|
*
|
||||||
|
* This affects the following methods (but is not limited to): #gets, #puts,
|
||||||
|
* #read, #write, #wait_readable and #wait_writable. This also affects
|
||||||
|
* blocking socket operations like Socket#accept and Socket#connect.
|
||||||
|
*
|
||||||
|
* Some operations like File#open and IO#close are not affected by the
|
||||||
|
* timeout. A timeout during a write operation may leave the IO in an
|
||||||
|
* inconsistent state, e.g. data was partially written. Generally speaking, a
|
||||||
|
* timeout is a last ditch effort to prevent an application from hanging on
|
||||||
|
* slow I/O operations, such as those that occur during a slowloris attack.
|
||||||
|
*/
|
||||||
|
VALUE
|
||||||
|
rb_io_set_timeout(VALUE self, VALUE timeout)
|
||||||
|
{
|
||||||
|
rb_io_t *fptr = rb_io_get_fptr(self);
|
||||||
|
|
||||||
|
fptr->timeout = timeout;
|
||||||
|
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* call-seq:
|
* call-seq:
|
||||||
* IO.try_convert(object) -> new_io or nil
|
* IO.try_convert(object) -> new_io or nil
|
||||||
|
@ -1000,7 +1044,7 @@ void
|
||||||
rb_io_read_check(rb_io_t *fptr)
|
rb_io_read_check(rb_io_t *fptr)
|
||||||
{
|
{
|
||||||
if (!READ_DATA_PENDING(fptr)) {
|
if (!READ_DATA_PENDING(fptr)) {
|
||||||
rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), Qnil);
|
rb_io_wait(fptr->self, RB_INT2NUM(RUBY_IO_READABLE), fptr->timeout);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1052,56 +1096,121 @@ struct io_internal_read_struct {
|
||||||
VALUE th;
|
VALUE th;
|
||||||
rb_io_t *fptr;
|
rb_io_t *fptr;
|
||||||
int nonblock;
|
int nonblock;
|
||||||
|
int fd;
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
size_t capa;
|
size_t capa;
|
||||||
|
struct timeval *timeout;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct io_internal_write_struct {
|
struct io_internal_write_struct {
|
||||||
|
VALUE th;
|
||||||
|
rb_io_t *fptr;
|
||||||
|
int nonblock;
|
||||||
int fd;
|
int fd;
|
||||||
|
|
||||||
const void *buf;
|
const void *buf;
|
||||||
size_t capa;
|
size_t capa;
|
||||||
|
struct timeval *timeout;
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef HAVE_WRITEV
|
#ifdef HAVE_WRITEV
|
||||||
struct io_internal_writev_struct {
|
struct io_internal_writev_struct {
|
||||||
|
VALUE th;
|
||||||
|
rb_io_t *fptr;
|
||||||
|
int nonblock;
|
||||||
int fd;
|
int fd;
|
||||||
|
|
||||||
int iovcnt;
|
int iovcnt;
|
||||||
const struct iovec *iov;
|
const struct iovec *iov;
|
||||||
|
struct timeval *timeout;
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events);
|
static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval *timeout);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the given events on the given file descriptor.
|
||||||
|
* Returns -1 if an error or timeout occurred. +errno+ will be set.
|
||||||
|
* Returns the event mask if an event occurred.
|
||||||
|
*/
|
||||||
|
static inline int
|
||||||
|
io_internal_wait(VALUE thread, rb_io_t *fptr, int error, int events, struct timeval *timeout)
|
||||||
|
{
|
||||||
|
int ready = nogvl_wait_for(thread, fptr, events, timeout);
|
||||||
|
|
||||||
|
if (ready > 0) {
|
||||||
|
return ready;
|
||||||
|
} else if (ready == 0) {
|
||||||
|
errno = ETIMEDOUT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = error;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
internal_read_func(void *ptr)
|
internal_read_func(void *ptr)
|
||||||
{
|
{
|
||||||
struct io_internal_read_struct *iis = ptr;
|
struct io_internal_read_struct *iis = ptr;
|
||||||
ssize_t r;
|
ssize_t result;
|
||||||
|
|
||||||
|
if (iis->timeout && !iis->nonblock) {
|
||||||
|
if (io_internal_wait(iis->th, iis->fptr, 0, RB_WAITFD_IN, iis->timeout) == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
retry:
|
retry:
|
||||||
r = read(iis->fptr->fd, iis->buf, iis->capa);
|
result = read(iis->fd, iis->buf, iis->capa);
|
||||||
if (r < 0 && !iis->nonblock) {
|
|
||||||
int e = errno;
|
if (result < 0 && !iis->nonblock) {
|
||||||
if (io_again_p(e)) {
|
if (io_again_p(errno)) {
|
||||||
if (nogvl_wait_for(iis->th, iis->fptr, RB_WAITFD_IN) != -1) {
|
if (io_internal_wait(iis->th, iis->fptr, errno, RB_WAITFD_IN, iis->timeout) == -1) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
goto retry;
|
goto retry;
|
||||||
}
|
}
|
||||||
errno = e;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return r;
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined __APPLE__
|
#if defined __APPLE__
|
||||||
# define do_write_retry(code) do {ret = code;} while (ret == -1 && errno == EPROTOTYPE)
|
# define do_write_retry(code) do {result = code;} while (result == -1 && errno == EPROTOTYPE)
|
||||||
#else
|
#else
|
||||||
# define do_write_retry(code) ret = code
|
# define do_write_retry(code) result = code
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
internal_write_func(void *ptr)
|
internal_write_func(void *ptr)
|
||||||
{
|
{
|
||||||
struct io_internal_write_struct *iis = ptr;
|
struct io_internal_write_struct *iis = ptr;
|
||||||
ssize_t ret;
|
ssize_t result;
|
||||||
|
|
||||||
|
if (iis->timeout && !iis->nonblock) {
|
||||||
|
if (io_internal_wait(iis->th, iis->fptr, 0, RB_WAITFD_OUT, iis->timeout) == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
retry:
|
||||||
do_write_retry(write(iis->fd, iis->buf, iis->capa));
|
do_write_retry(write(iis->fd, iis->buf, iis->capa));
|
||||||
return (VALUE)ret;
|
|
||||||
|
if (result < 0 && !iis->nonblock) {
|
||||||
|
int e = errno;
|
||||||
|
if (io_again_p(e)) {
|
||||||
|
if (io_internal_wait(iis->th, iis->fptr, errno, RB_WAITFD_OUT, iis->timeout) == -1) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAVE_WRITEV
|
#ifdef HAVE_WRITEV
|
||||||
|
@ -1109,14 +1218,33 @@ static VALUE
|
||||||
internal_writev_func(void *ptr)
|
internal_writev_func(void *ptr)
|
||||||
{
|
{
|
||||||
struct io_internal_writev_struct *iis = ptr;
|
struct io_internal_writev_struct *iis = ptr;
|
||||||
ssize_t ret;
|
ssize_t result;
|
||||||
|
|
||||||
|
if (iis->timeout && !iis->nonblock) {
|
||||||
|
if (io_internal_wait(iis->th, iis->fptr, 0, RB_WAITFD_OUT, iis->timeout) == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
retry:
|
||||||
do_write_retry(writev(iis->fd, iis->iov, iis->iovcnt));
|
do_write_retry(writev(iis->fd, iis->iov, iis->iovcnt));
|
||||||
return (VALUE)ret;
|
|
||||||
|
if (result < 0 && !iis->nonblock) {
|
||||||
|
if (io_again_p(errno)) {
|
||||||
|
if (io_internal_wait(iis->th, iis->fptr, errno, RB_WAITFD_OUT, iis->timeout) == -1) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
rb_read_internal(rb_io_t *fptr, void *buf, size_t count)
|
rb_io_read_memory(rb_io_t *fptr, void *buf, size_t count)
|
||||||
{
|
{
|
||||||
VALUE scheduler = rb_fiber_scheduler_current();
|
VALUE scheduler = rb_fiber_scheduler_current();
|
||||||
if (scheduler != Qnil) {
|
if (scheduler != Qnil) {
|
||||||
|
@ -1131,15 +1259,25 @@ rb_read_internal(rb_io_t *fptr, void *buf, size_t count)
|
||||||
.th = rb_thread_current(),
|
.th = rb_thread_current(),
|
||||||
.fptr = fptr,
|
.fptr = fptr,
|
||||||
.nonblock = 0,
|
.nonblock = 0,
|
||||||
|
.fd = fptr->fd,
|
||||||
|
|
||||||
.buf = buf,
|
.buf = buf,
|
||||||
.capa = count
|
.capa = count,
|
||||||
|
.timeout = NULL,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct timeval timeout_storage;
|
||||||
|
|
||||||
|
if (fptr->timeout != Qnil) {
|
||||||
|
timeout_storage = rb_time_interval(fptr->timeout);
|
||||||
|
iis.timeout = &timeout_storage;
|
||||||
|
}
|
||||||
|
|
||||||
return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fptr->fd);
|
return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fptr->fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
|
rb_io_write_memory(rb_io_t *fptr, const void *buf, size_t count)
|
||||||
{
|
{
|
||||||
VALUE scheduler = rb_fiber_scheduler_current();
|
VALUE scheduler = rb_fiber_scheduler_current();
|
||||||
if (scheduler != Qnil) {
|
if (scheduler != Qnil) {
|
||||||
|
@ -1151,11 +1289,23 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct io_internal_write_struct iis = {
|
struct io_internal_write_struct iis = {
|
||||||
|
.th = rb_thread_current(),
|
||||||
|
.fptr = fptr,
|
||||||
|
.nonblock = 0,
|
||||||
.fd = fptr->fd,
|
.fd = fptr->fd,
|
||||||
|
|
||||||
.buf = buf,
|
.buf = buf,
|
||||||
.capa = count
|
.capa = count,
|
||||||
|
.timeout = NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct timeval timeout_storage;
|
||||||
|
|
||||||
|
if (fptr->timeout != Qnil) {
|
||||||
|
timeout_storage = rb_time_interval(fptr->timeout);
|
||||||
|
iis.timeout = &timeout_storage;
|
||||||
|
}
|
||||||
|
|
||||||
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
|
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1175,11 +1325,23 @@ rb_writev_internal(rb_io_t *fptr, const struct iovec *iov, int iovcnt)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct io_internal_writev_struct iis = {
|
struct io_internal_writev_struct iis = {
|
||||||
|
.th = rb_thread_current(),
|
||||||
|
.fptr = fptr,
|
||||||
|
.nonblock = 0,
|
||||||
.fd = fptr->fd,
|
.fd = fptr->fd,
|
||||||
|
|
||||||
.iov = iov,
|
.iov = iov,
|
||||||
.iovcnt = iovcnt,
|
.iovcnt = iovcnt,
|
||||||
|
.timeout = NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct timeval timeout_storage;
|
||||||
|
|
||||||
|
if (fptr->timeout != Qnil) {
|
||||||
|
timeout_storage = rb_time_interval(fptr->timeout);
|
||||||
|
iis.timeout = &timeout_storage;
|
||||||
|
}
|
||||||
|
|
||||||
return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fptr->fd);
|
return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fptr->fd);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -1196,11 +1358,13 @@ io_flush_buffer_sync(void *arg)
|
||||||
fptr->wbuf.len = 0;
|
fptr->wbuf.len = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 <= r) {
|
if (0 <= r) {
|
||||||
fptr->wbuf.off += (int)r;
|
fptr->wbuf.off += (int)r;
|
||||||
fptr->wbuf.len -= (int)r;
|
fptr->wbuf.len -= (int)r;
|
||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (VALUE)-1;
|
return (VALUE)-1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1231,7 +1395,7 @@ io_fflush(rb_io_t *fptr)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
while (fptr->wbuf.len > 0 && io_flush_buffer(fptr) != 0) {
|
while (fptr->wbuf.len > 0 && io_flush_buffer(fptr) != 0) {
|
||||||
if (!rb_io_maybe_wait_writable(errno, fptr->self, Qnil))
|
if (!rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
rb_io_check_closed(fptr);
|
rb_io_check_closed(fptr);
|
||||||
|
@ -1255,6 +1419,10 @@ rb_io_wait(VALUE io, VALUE events, VALUE timeout)
|
||||||
struct timeval tv_storage;
|
struct timeval tv_storage;
|
||||||
struct timeval *tv = NULL;
|
struct timeval *tv = NULL;
|
||||||
|
|
||||||
|
if (timeout == Qundef) {
|
||||||
|
timeout = fptr->timeout;
|
||||||
|
}
|
||||||
|
|
||||||
if (timeout != Qnil) {
|
if (timeout != Qnil) {
|
||||||
tv_storage = rb_time_interval(timeout);
|
tv_storage = rb_time_interval(timeout);
|
||||||
tv = &tv_storage;
|
tv = &tv_storage;
|
||||||
|
@ -1562,7 +1730,7 @@ io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return rb_write_internal(fptr, ptr, length);
|
return rb_io_write_memory(fptr, ptr, length);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
@ -1597,7 +1765,7 @@ io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, we should write the data directly:
|
// Otherwise, we should write the data directly:
|
||||||
return rb_write_internal(fptr, ptr, length);
|
return rb_io_write_memory(fptr, ptr, length);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -1625,7 +1793,7 @@ io_binwrite_string(VALUE arg)
|
||||||
remaining -= result;
|
remaining -= result;
|
||||||
}
|
}
|
||||||
// Wait for it to become writable:
|
// Wait for it to become writable:
|
||||||
else if (rb_io_maybe_wait_writable(errno, p->fptr->self, Qnil)) {
|
else if (rb_io_maybe_wait_writable(errno, p->fptr->self, p->fptr->timeout)) {
|
||||||
rb_io_check_closed(p->fptr);
|
rb_io_check_closed(p->fptr);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -1892,7 +2060,7 @@ io_binwritev_internal(VALUE arg)
|
||||||
iov->iov_base = (char *)iov->iov_base + result;
|
iov->iov_base = (char *)iov->iov_base + result;
|
||||||
iov->iov_len -= result;
|
iov->iov_len -= result;
|
||||||
}
|
}
|
||||||
else if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
|
else if (rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) {
|
||||||
rb_io_check_closed(fptr);
|
rb_io_check_closed(fptr);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -2399,12 +2567,12 @@ rb_io_rewind(VALUE io)
|
||||||
static int
|
static int
|
||||||
fptr_wait_readable(rb_io_t *fptr)
|
fptr_wait_readable(rb_io_t *fptr)
|
||||||
{
|
{
|
||||||
int ret = rb_io_maybe_wait_readable(errno, fptr->self, Qnil);
|
int result = rb_io_maybe_wait_readable(errno, fptr->self, fptr->timeout);
|
||||||
|
|
||||||
if (ret)
|
if (result)
|
||||||
rb_io_check_closed(fptr);
|
rb_io_check_closed(fptr);
|
||||||
|
|
||||||
return ret;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
@ -2423,7 +2591,7 @@ io_fillbuf(rb_io_t *fptr)
|
||||||
}
|
}
|
||||||
if (fptr->rbuf.len == 0) {
|
if (fptr->rbuf.len == 0) {
|
||||||
retry:
|
retry:
|
||||||
r = rb_read_internal(fptr, fptr->rbuf.ptr, fptr->rbuf.capa);
|
r = rb_io_read_memory(fptr, fptr->rbuf.ptr, fptr->rbuf.capa);
|
||||||
|
|
||||||
if (r < 0) {
|
if (r < 0) {
|
||||||
if (fptr_wait_readable(fptr))
|
if (fptr_wait_readable(fptr))
|
||||||
|
@ -2815,7 +2983,7 @@ io_bufread(char *ptr, long len, rb_io_t *fptr)
|
||||||
while (n > 0) {
|
while (n > 0) {
|
||||||
again:
|
again:
|
||||||
rb_io_check_closed(fptr);
|
rb_io_check_closed(fptr);
|
||||||
c = rb_read_internal(fptr, ptr+offset, n);
|
c = rb_io_read_memory(fptr, ptr+offset, n);
|
||||||
if (c == 0) break;
|
if (c == 0) break;
|
||||||
if (c < 0) {
|
if (c < 0) {
|
||||||
if (fptr_wait_readable(fptr))
|
if (fptr_wait_readable(fptr))
|
||||||
|
@ -3171,7 +3339,7 @@ rb_io_set_nonblock(rb_io_t *fptr)
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
read_internal_call(VALUE arg)
|
io_read_memory_call(VALUE arg)
|
||||||
{
|
{
|
||||||
struct io_internal_read_struct *iis = (struct io_internal_read_struct *)arg;
|
struct io_internal_read_struct *iis = (struct io_internal_read_struct *)arg;
|
||||||
|
|
||||||
|
@ -3189,9 +3357,9 @@ read_internal_call(VALUE arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
static long
|
static long
|
||||||
read_internal_locktmp(VALUE str, struct io_internal_read_struct *iis)
|
io_read_memory_locktmp(VALUE str, struct io_internal_read_struct *iis)
|
||||||
{
|
{
|
||||||
return (long)rb_str_locktmp_ensure(str, read_internal_call, (VALUE)iis);
|
return (long)rb_str_locktmp_ensure(str, io_read_memory_call, (VALUE)iis);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define no_exception_p(opts) !rb_opts_exception_p((opts), TRUE)
|
#define no_exception_p(opts) !rb_opts_exception_p((opts), TRUE)
|
||||||
|
@ -3233,9 +3401,11 @@ io_getpartial(int argc, VALUE *argv, VALUE io, int no_exception, int nonblock)
|
||||||
iis.th = rb_thread_current();
|
iis.th = rb_thread_current();
|
||||||
iis.fptr = fptr;
|
iis.fptr = fptr;
|
||||||
iis.nonblock = nonblock;
|
iis.nonblock = nonblock;
|
||||||
|
iis.fd = fptr->fd;
|
||||||
iis.buf = RSTRING_PTR(str);
|
iis.buf = RSTRING_PTR(str);
|
||||||
iis.capa = len;
|
iis.capa = len;
|
||||||
n = read_internal_locktmp(str, &iis);
|
iis.timeout = NULL;
|
||||||
|
n = io_read_memory_locktmp(str, &iis);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
int e = errno;
|
int e = errno;
|
||||||
if (!nonblock && fptr_wait_readable(fptr))
|
if (!nonblock && fptr_wait_readable(fptr))
|
||||||
|
@ -3400,9 +3570,11 @@ io_read_nonblock(rb_execution_context_t *ec, VALUE io, VALUE length, VALUE str,
|
||||||
shrinkable |= io_setstrbuf(&str, len);
|
shrinkable |= io_setstrbuf(&str, len);
|
||||||
iis.fptr = fptr;
|
iis.fptr = fptr;
|
||||||
iis.nonblock = 1;
|
iis.nonblock = 1;
|
||||||
|
iis.fd = fptr->fd;
|
||||||
iis.buf = RSTRING_PTR(str);
|
iis.buf = RSTRING_PTR(str);
|
||||||
iis.capa = len;
|
iis.capa = len;
|
||||||
n = read_internal_locktmp(str, &iis);
|
iis.timeout = NULL;
|
||||||
|
n = io_read_memory_locktmp(str, &iis);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
int e = errno;
|
int e = errno;
|
||||||
if (io_again_p(e)) {
|
if (io_again_p(e)) {
|
||||||
|
@ -5113,13 +5285,13 @@ finish_writeconv(rb_io_t *fptr, int noalloc)
|
||||||
res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0);
|
res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0);
|
||||||
while (dp-ds) {
|
while (dp-ds) {
|
||||||
size_t remaining = dp-ds;
|
size_t remaining = dp-ds;
|
||||||
long result = rb_write_internal(fptr, ds, remaining);
|
long result = rb_io_write_memory(fptr, ds, remaining);
|
||||||
|
|
||||||
if (result > 0) {
|
if (result > 0) {
|
||||||
ds += result;
|
ds += result;
|
||||||
if ((size_t)result == remaining) break;
|
if ((size_t)result == remaining) break;
|
||||||
}
|
}
|
||||||
else if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
|
else if (rb_io_maybe_wait_writable(errno, fptr->self, fptr->timeout)) {
|
||||||
if (fptr->fd < 0)
|
if (fptr->fd < 0)
|
||||||
return noalloc ? Qtrue : rb_exc_new3(rb_eIOError, rb_str_new_cstr(closed_stream));
|
return noalloc ? Qtrue : rb_exc_new3(rb_eIOError, rb_str_new_cstr(closed_stream));
|
||||||
}
|
}
|
||||||
|
@ -5732,7 +5904,7 @@ rb_io_syswrite(VALUE io, VALUE str)
|
||||||
|
|
||||||
tmp = rb_str_tmp_frozen_acquire(str);
|
tmp = rb_str_tmp_frozen_acquire(str);
|
||||||
RSTRING_GETMEM(tmp, ptr, len);
|
RSTRING_GETMEM(tmp, ptr, len);
|
||||||
n = rb_write_internal(fptr, ptr, len);
|
n = rb_io_write_memory(fptr, ptr, len);
|
||||||
if (n < 0) rb_sys_fail_path(fptr->pathv);
|
if (n < 0) rb_sys_fail_path(fptr->pathv);
|
||||||
rb_str_tmp_frozen_release(str, tmp);
|
rb_str_tmp_frozen_release(str, tmp);
|
||||||
|
|
||||||
|
@ -5778,9 +5950,11 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io)
|
||||||
iis.th = rb_thread_current();
|
iis.th = rb_thread_current();
|
||||||
iis.fptr = fptr;
|
iis.fptr = fptr;
|
||||||
iis.nonblock = 0;
|
iis.nonblock = 0;
|
||||||
|
iis.fd = fptr->fd;
|
||||||
iis.buf = RSTRING_PTR(str);
|
iis.buf = RSTRING_PTR(str);
|
||||||
iis.capa = ilen;
|
iis.capa = ilen;
|
||||||
n = read_internal_locktmp(str, &iis);
|
iis.timeout = NULL;
|
||||||
|
n = io_read_memory_locktmp(str, &iis);
|
||||||
|
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
rb_sys_fail_path(fptr->pathv);
|
rb_sys_fail_path(fptr->pathv);
|
||||||
|
@ -8900,6 +9074,7 @@ prep_io(int fd, int fmode, VALUE klass, const char *path)
|
||||||
fp->self = io;
|
fp->self = io;
|
||||||
fp->fd = fd;
|
fp->fd = fd;
|
||||||
fp->mode = fmode;
|
fp->mode = fmode;
|
||||||
|
fp->timeout = Qnil;
|
||||||
if (!io_check_tty(fp)) {
|
if (!io_check_tty(fp)) {
|
||||||
#ifdef __CYGWIN__
|
#ifdef __CYGWIN__
|
||||||
fp->mode |= FMODE_BINMODE;
|
fp->mode |= FMODE_BINMODE;
|
||||||
|
@ -9004,6 +9179,7 @@ rb_io_fptr_new(void)
|
||||||
fp->encs.ecflags = 0;
|
fp->encs.ecflags = 0;
|
||||||
fp->encs.ecopts = Qnil;
|
fp->encs.ecopts = Qnil;
|
||||||
fp->write_lock = Qnil;
|
fp->write_lock = Qnil;
|
||||||
|
fp->timeout = Qnil;
|
||||||
return fp;
|
return fp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9112,6 +9288,7 @@ rb_io_initialize(int argc, VALUE *argv, VALUE io)
|
||||||
fp->fd = fd;
|
fp->fd = fd;
|
||||||
fp->mode = fmode;
|
fp->mode = fmode;
|
||||||
fp->encs = convconfig;
|
fp->encs = convconfig;
|
||||||
|
fp->timeout = Qnil;
|
||||||
clear_codeconv(fp);
|
clear_codeconv(fp);
|
||||||
io_check_tty(fp);
|
io_check_tty(fp);
|
||||||
if (fileno(stdin) == fd)
|
if (fileno(stdin) == fd)
|
||||||
|
@ -12093,7 +12270,7 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct wait_for_single_fd {
|
struct fiber_scheduler_wait_for_arguments {
|
||||||
VALUE scheduler;
|
VALUE scheduler;
|
||||||
|
|
||||||
rb_io_t *fptr;
|
rb_io_t *fptr;
|
||||||
|
@ -12103,11 +12280,11 @@ struct wait_for_single_fd {
|
||||||
};
|
};
|
||||||
|
|
||||||
static void *
|
static void *
|
||||||
rb_thread_fiber_scheduler_wait_for(void * _args)
|
fiber_scheduler_wait_for(void * _arguments)
|
||||||
{
|
{
|
||||||
struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args;
|
struct fiber_scheduler_wait_for_arguments *arguments = (struct fiber_scheduler_wait_for_arguments *)_arguments;
|
||||||
|
|
||||||
args->result = rb_fiber_scheduler_io_wait(args->scheduler, args->fptr->self, INT2NUM(args->events), Qnil);
|
arguments->result = rb_fiber_scheduler_io_wait(arguments->scheduler, arguments->fptr->self, INT2NUM(arguments->events), arguments->fptr->timeout);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -12117,12 +12294,12 @@ rb_thread_fiber_scheduler_wait_for(void * _args)
|
||||||
STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN);
|
STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN);
|
||||||
STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
|
STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
|
||||||
static int
|
static int
|
||||||
nogvl_wait_for(VALUE th, rb_io_t *fptr, short events)
|
nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval *timeout)
|
||||||
{
|
{
|
||||||
VALUE scheduler = rb_fiber_scheduler_current_for_thread(th);
|
VALUE scheduler = rb_fiber_scheduler_current_for_thread(th);
|
||||||
if (scheduler != Qnil) {
|
if (scheduler != Qnil) {
|
||||||
struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events};
|
struct fiber_scheduler_wait_for_arguments args = {.scheduler = scheduler, .fptr = fptr, .events = events};
|
||||||
rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args);
|
rb_thread_call_with_gvl(fiber_scheduler_wait_for, &args);
|
||||||
return RTEST(args.result);
|
return RTEST(args.result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12134,22 +12311,32 @@ nogvl_wait_for(VALUE th, rb_io_t *fptr, short events)
|
||||||
fds.fd = fd;
|
fds.fd = fd;
|
||||||
fds.events = events;
|
fds.events = events;
|
||||||
|
|
||||||
return poll(&fds, 1, -1);
|
int timeout_milliseconds = -1;
|
||||||
|
|
||||||
|
if (timeout) {
|
||||||
|
timeout_milliseconds = (int)(timeout->tv_sec * 1000) + (int)(timeout->tv_usec / 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
return poll(&fds, 1, timeout_milliseconds);
|
||||||
}
|
}
|
||||||
#else /* !USE_POLL */
|
#else /* !USE_POLL */
|
||||||
# define IOWAIT_SYSCALL "select"
|
# define IOWAIT_SYSCALL "select"
|
||||||
static int
|
static int
|
||||||
nogvl_wait_for(VALUE th, rb_io_t *fptr, short events)
|
nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval *timeout)
|
||||||
{
|
{
|
||||||
VALUE scheduler = rb_fiber_scheduler_current_for_thread(th);
|
VALUE scheduler = rb_fiber_scheduler_current_for_thread(th);
|
||||||
if (scheduler != Qnil) {
|
if (scheduler != Qnil) {
|
||||||
struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events};
|
struct fiber_scheduler_wait_for_arguments args = {.scheduler = scheduler, .fptr = fptr, .events = events};
|
||||||
rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args);
|
rb_thread_call_with_gvl(fiber_scheduler_wait_for, &args);
|
||||||
return RTEST(args.result);
|
return RTEST(args.result);
|
||||||
}
|
}
|
||||||
|
|
||||||
int fd = fptr->fd;
|
int fd = fptr->fd;
|
||||||
if (fd == -1) return 0;
|
|
||||||
|
if (fd == -1) {
|
||||||
|
errno = EBADF;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
rb_fdset_t fds;
|
rb_fdset_t fds;
|
||||||
int ret;
|
int ret;
|
||||||
|
@ -12159,16 +12346,18 @@ nogvl_wait_for(VALUE th, rb_io_t *fptr, short events)
|
||||||
|
|
||||||
switch (events) {
|
switch (events) {
|
||||||
case RB_WAITFD_IN:
|
case RB_WAITFD_IN:
|
||||||
ret = rb_fd_select(fd + 1, &fds, 0, 0, 0);
|
ret = rb_fd_select(fd + 1, &fds, 0, 0, timeout);
|
||||||
break;
|
break;
|
||||||
case RB_WAITFD_OUT:
|
case RB_WAITFD_OUT:
|
||||||
ret = rb_fd_select(fd + 1, 0, &fds, 0, 0);
|
ret = rb_fd_select(fd + 1, 0, &fds, 0, timeout);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
VM_UNREACHABLE(nogvl_wait_for);
|
VM_UNREACHABLE(nogvl_wait_for);
|
||||||
}
|
}
|
||||||
|
|
||||||
rb_fd_term(&fds);
|
rb_fd_term(&fds);
|
||||||
|
|
||||||
|
// On timeout, this returns 0.
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
#endif /* !USE_POLL */
|
#endif /* !USE_POLL */
|
||||||
|
@ -12183,7 +12372,7 @@ maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
|
||||||
ret = RB_NUM2INT(rb_io_wait(stp->src, RB_INT2NUM(RUBY_IO_READABLE), Qnil));
|
ret = RB_NUM2INT(rb_io_wait(stp->src, RB_INT2NUM(RUBY_IO_READABLE), Qnil));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
ret = nogvl_wait_for(stp->th, stp->src_fptr, RB_WAITFD_IN);
|
ret = nogvl_wait_for(stp->th, stp->src_fptr, RB_WAITFD_IN, NULL);
|
||||||
}
|
}
|
||||||
} while (ret < 0 && maygvl_copy_stream_continue_p(has_gvl, stp));
|
} while (ret < 0 && maygvl_copy_stream_continue_p(has_gvl, stp));
|
||||||
|
|
||||||
|
@ -12201,7 +12390,7 @@ nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
ret = nogvl_wait_for(stp->th, stp->dst_fptr, RB_WAITFD_OUT);
|
ret = nogvl_wait_for(stp->th, stp->dst_fptr, RB_WAITFD_OUT, NULL);
|
||||||
} while (ret < 0 && maygvl_copy_stream_continue_p(0, stp));
|
} while (ret < 0 && maygvl_copy_stream_continue_p(0, stp));
|
||||||
|
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
@ -12552,7 +12741,7 @@ static ssize_t
|
||||||
maygvl_read(int has_gvl, rb_io_t *fptr, void *buf, size_t count)
|
maygvl_read(int has_gvl, rb_io_t *fptr, void *buf, size_t count)
|
||||||
{
|
{
|
||||||
if (has_gvl)
|
if (has_gvl)
|
||||||
return rb_read_internal(fptr, buf, count);
|
return rb_io_read_memory(fptr, buf, count);
|
||||||
else
|
else
|
||||||
return read(fptr->fd, buf, count);
|
return read(fptr->fd, buf, count);
|
||||||
}
|
}
|
||||||
|
@ -14666,6 +14855,8 @@ Init_IO(void)
|
||||||
rb_cIO = rb_define_class("IO", rb_cObject);
|
rb_cIO = rb_define_class("IO", rb_cObject);
|
||||||
rb_include_module(rb_cIO, rb_mEnumerable);
|
rb_include_module(rb_cIO, rb_mEnumerable);
|
||||||
|
|
||||||
|
rb_eIOTimeoutError = rb_define_class_under(rb_cIO, "TimeoutError", rb_eIOError);
|
||||||
|
|
||||||
rb_define_const(rb_cIO, "READABLE", INT2NUM(RUBY_IO_READABLE));
|
rb_define_const(rb_cIO, "READABLE", INT2NUM(RUBY_IO_READABLE));
|
||||||
rb_define_const(rb_cIO, "WRITABLE", INT2NUM(RUBY_IO_WRITABLE));
|
rb_define_const(rb_cIO, "WRITABLE", INT2NUM(RUBY_IO_WRITABLE));
|
||||||
rb_define_const(rb_cIO, "PRIORITY", INT2NUM(RUBY_IO_PRIORITY));
|
rb_define_const(rb_cIO, "PRIORITY", INT2NUM(RUBY_IO_PRIORITY));
|
||||||
|
@ -14764,6 +14955,9 @@ Init_IO(void)
|
||||||
rb_define_alias(rb_cIO, "to_i", "fileno");
|
rb_define_alias(rb_cIO, "to_i", "fileno");
|
||||||
rb_define_method(rb_cIO, "to_io", rb_io_to_io, 0);
|
rb_define_method(rb_cIO, "to_io", rb_io_to_io, 0);
|
||||||
|
|
||||||
|
rb_define_method(rb_cIO, "timeout", rb_io_timeout, 0);
|
||||||
|
rb_define_method(rb_cIO, "timeout=", rb_io_set_timeout, 1);
|
||||||
|
|
||||||
rb_define_method(rb_cIO, "fsync", rb_io_fsync, 0);
|
rb_define_method(rb_cIO, "fsync", rb_io_fsync, 0);
|
||||||
rb_define_method(rb_cIO, "fdatasync", rb_io_fdatasync, 0);
|
rb_define_method(rb_cIO, "fdatasync", rb_io_fdatasync, 0);
|
||||||
rb_define_method(rb_cIO, "sync", rb_io_sync, 0);
|
rb_define_method(rb_cIO, "sync", rb_io_sync, 0);
|
||||||
|
|
|
@ -222,13 +222,13 @@ rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeou
|
||||||
VALUE
|
VALUE
|
||||||
rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
|
rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
|
||||||
{
|
{
|
||||||
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), Qnil);
|
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), rb_io_timeout(io));
|
||||||
}
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
|
rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
|
||||||
{
|
{
|
||||||
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), Qnil);
|
return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io));
|
||||||
}
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
|
|
|
@ -14,7 +14,7 @@ describe :tcpsocket_new, shared: true do
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
ruby_version_is "3.0" do
|
ruby_version_is "3.0"..."3.1" do
|
||||||
it 'raises Errno::ETIMEDOUT with :connect_timeout when no server is listening on the given address' do
|
it 'raises Errno::ETIMEDOUT with :connect_timeout when no server is listening on the given address' do
|
||||||
-> {
|
-> {
|
||||||
TCPSocket.send(@method, "192.0.2.1", 80, connect_timeout: 0)
|
TCPSocket.send(@method, "192.0.2.1", 80, connect_timeout: 0)
|
||||||
|
@ -22,6 +22,14 @@ describe :tcpsocket_new, shared: true do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
ruby_version_is "3.2" do
|
||||||
|
it 'raises IO::TimeoutError with :connect_timeout when no server is listening on the given address' do
|
||||||
|
-> {
|
||||||
|
TCPSocket.send(@method, "192.0.2.1", 80, connect_timeout: 0)
|
||||||
|
}.should raise_error(IO::TimeoutError)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "with a running server" do
|
describe "with a running server" do
|
||||||
before :each do
|
before :each do
|
||||||
@server = SocketSpecs::SpecTCPServer.new
|
@server = SocketSpecs::SpecTCPServer.new
|
||||||
|
|
|
@ -3960,6 +3960,9 @@ __END__
|
||||||
noex = Thread.new do # everything right and never see exceptions :)
|
noex = Thread.new do # everything right and never see exceptions :)
|
||||||
until sig_rd.wait_readable(0)
|
until sig_rd.wait_readable(0)
|
||||||
IO.pipe do |r, w|
|
IO.pipe do |r, w|
|
||||||
|
assert_nil r.timeout
|
||||||
|
assert_nil w.timeout
|
||||||
|
|
||||||
th = Thread.new { r.read(1) }
|
th = Thread.new { r.read(1) }
|
||||||
w.write(dot)
|
w.write(dot)
|
||||||
|
|
||||||
|
|
64
test/ruby/test_io_timeout.rb
Normal file
64
test/ruby/test_io_timeout.rb
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
# frozen_string_literal: false
|
||||||
|
|
||||||
|
require 'io/nonblock'
|
||||||
|
|
||||||
|
class TestIOTimeout < Test::Unit::TestCase
|
||||||
|
def with_pipe
|
||||||
|
omit "UNIXSocket is not defined!" unless defined?(UNIXSocket)
|
||||||
|
|
||||||
|
begin
|
||||||
|
i, o = UNIXSocket.pair
|
||||||
|
|
||||||
|
unless i.nonblock? && o.nonblock?
|
||||||
|
i.close
|
||||||
|
o.close
|
||||||
|
omit "I/O is not non-blocking!"
|
||||||
|
end
|
||||||
|
|
||||||
|
yield i, o
|
||||||
|
ensure
|
||||||
|
i.close
|
||||||
|
o.close
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_timeout_attribute
|
||||||
|
with_pipe do |i, o|
|
||||||
|
assert_nil i.timeout
|
||||||
|
|
||||||
|
i.timeout = 10
|
||||||
|
assert_equal 10, i.timeout
|
||||||
|
assert_nil o.timeout
|
||||||
|
|
||||||
|
o.timeout = 20
|
||||||
|
assert_equal 20, o.timeout
|
||||||
|
assert_equal 10, i.timeout
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_timeout_read_exception
|
||||||
|
with_pipe do |i, o|
|
||||||
|
i.timeout = 0.0001
|
||||||
|
|
||||||
|
assert_raise(IO::TimeoutError) {i.read}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_timeout_gets_exception
|
||||||
|
with_pipe do |i, o|
|
||||||
|
i.timeout = 0.0001
|
||||||
|
|
||||||
|
assert_raise(IO::TimeoutError) {i.gets}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_timeout_puts
|
||||||
|
with_pipe do |i, o|
|
||||||
|
i.timeout = 0.0001
|
||||||
|
o.puts("Hello World")
|
||||||
|
o.close
|
||||||
|
|
||||||
|
assert_equal "Hello World", i.gets.chomp
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -70,7 +70,7 @@ class TestSocket_TCPSocket < Test::Unit::TestCase
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_initialize_connect_timeout
|
def test_initialize_connect_timeout
|
||||||
assert_raise(Errno::ETIMEDOUT) do
|
assert_raise(IO::TimeoutError) do
|
||||||
TCPSocket.new("192.0.2.1", 80, connect_timeout: 0)
|
TCPSocket.new("192.0.2.1", 80, connect_timeout: 0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
11
thread.c
11
thread.c
|
@ -1675,6 +1675,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
||||||
.th = rb_ec_thread_ptr(ec)
|
.th = rb_ec_thread_ptr(ec)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// `errno` is only valid when there is an actual error - but we can't
|
||||||
|
// extract that from the return value of `func` alone, so we clear any
|
||||||
|
// prior `errno` value here so that we can later check if it was set by
|
||||||
|
// `func` or not (as opposed to some previously set value).
|
||||||
|
errno = 0;
|
||||||
|
|
||||||
RB_VM_LOCK_ENTER();
|
RB_VM_LOCK_ENTER();
|
||||||
{
|
{
|
||||||
ccan_list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node);
|
ccan_list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node);
|
||||||
|
@ -1706,6 +1712,11 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
||||||
/* TODO: check func() */
|
/* TODO: check func() */
|
||||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||||
|
|
||||||
|
// If the error was a timeout, we raise a specific exception for that:
|
||||||
|
if (saved_errno == ETIMEDOUT) {
|
||||||
|
rb_raise(rb_eIOTimeoutError, "Blocking operation timed out!");
|
||||||
|
}
|
||||||
|
|
||||||
errno = saved_errno;
|
errno = saved_errno;
|
||||||
|
|
||||||
return val;
|
return val;
|
||||||
|
|
Loading…
Reference in a new issue