1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Introduce io_result wrapper for passing [-errno, size] in VALUE.

This commit is contained in:
Samuel Williams 2021-12-18 18:19:30 +13:00 committed by GitHub
parent 922a81a994
commit 42d3231154
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
Notes: git 2021-12-18 14:19:51 +09:00
Merged: https://github.com/ruby/ruby/pull/5287

Merged-By: ioquatix <samuel@codeotaku.com>
5 changed files with 337 additions and 68 deletions

View file

@ -11,17 +11,67 @@
*/
#include "ruby/internal/config.h"
#include <errno.h>
#ifdef STDC_HEADERS
#include <stddef.h> /* size_t */
#endif
#include "ruby/ruby.h"
#include "ruby/internal/dllexport.h"
#include "ruby/internal/arithmetic.h"
RBIMPL_SYMBOL_EXPORT_BEGIN()
struct timeval;
/**
* Wrap a `ssize_t` and `int errno` into a single `VALUE`. This interface should
* be used to safely capture results from system calls like `read` and `write`.
*
* You should use `rb_fiber_scheduler_io_result_apply` to unpack the result of
* this value and update `int errno`.
*
* You should not directly try to interpret the result value as it is considered
* an opaque representation. However, the general representation is an integer
* in the range of `[-int errno, size_t size]`. Linux generally restricts the
* result of system calls like `read` and `write` to `<= 2^31` which means this
* will typically fit within a single FIXNUM.
*
* @param[in] result The result of the system call.
* @param[in] error The value of `errno`.
* @return A `VALUE` which contains the result and/or errno.
*/
static inline VALUE
rb_fiber_scheduler_io_result(ssize_t result, int error) {
if (result == -1) {
return RB_INT2NUM(-error);
} else {
return RB_SIZE2NUM(result);
}
}
/**
* Apply an io result to the local thread, returning the value of the orginal
* system call that created it and updating `int errno`.
*
* You should not directly try to interpret the result value as it is considered
* an opaque representation.
*
* @param[in] result The `VALUE` which contains an errno and/or result size.
* @post Updates `int errno` with the value if negative.
* @return The original result of the system call.
*/
static inline ssize_t
rb_fiber_scheduler_io_result_apply(VALUE result) {
if (RB_FIXNUM_P(result) && RB_NUM2INT(result) < 0) {
errno = -RB_NUM2INT(result);
return -1;
} else {
return RB_NUM2SIZE(result);
}
}
/**
* Queries the current scheduler of the current thread that is calling this
* function.
@ -195,7 +245,7 @@ VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
* @param[out] buffer Return buffer.
* @param[in] length Requested number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
* @return otherwise What `scheduler.io_read` returns `[-errno, size]`.
*/
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
@ -207,7 +257,7 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
* @param[in] buffer What to write.
* @param[in] length Number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
* @return otherwise What `scheduler.io_write` returns `[-errno, size]`.
*/
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length);

123
io.c
View file

@ -1138,14 +1138,14 @@ rb_read_internal(rb_io_t *fptr, void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, 1);
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, count);
if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
ssize_t length = rb_fiber_scheduler_io_result_apply(result);
if (length < 0) rb_sys_fail_path(fptr->pathv);
if (length < 0) rb_sys_fail_path(fptr->pathv);
return length;
return length;
}
}
@ -1165,14 +1165,10 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, count);
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, 0);
if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
if (length < 0) rb_sys_fail_path(fptr->pathv);
return length;
return rb_fiber_scheduler_io_result_apply(result);
}
}
@ -1182,33 +1178,34 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
.capa = count
};
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
}
static ssize_t
rb_write_internal2(rb_io_t *fptr, const void *buf, size_t count)
{
struct io_internal_write_struct iis = {
.fd = fptr->fd,
.buf = buf,
.capa = count
};
return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis,
RUBY_UBF_IO, NULL);
if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis, RUBY_UBF_IO, NULL);
else
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
}
#ifdef HAVE_WRITEV
static ssize_t
rb_writev_internal(int fd, const struct iovec *iov, int iovcnt)
rb_writev_internal(rb_io_t *fptr, const struct iovec *iov, int iovcnt)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
for (int i = 0; i < iovcnt; i += 1) {
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, iov[i].iov_base, iov[i].iov_len, 0);
if (result != Qundef) {
return rb_fiber_scheduler_io_result_apply(result);
}
}
}
struct io_internal_writev_struct iis = {
.fd = fd,
.fd = fptr->fd,
.iov = iov,
.iovcnt = iovcnt,
};
return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fd);
return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fptr->fd);
}
#endif
@ -1592,7 +1589,7 @@ io_binwrite_string(VALUE arg)
iov[1].iov_base = (char *)p->ptr;
iov[1].iov_len = p->length;
r = rb_writev_internal(fptr->fd, iov, 2);
r = rb_writev_internal(fptr, iov, 2);
if (r < 0)
return r;
@ -1654,56 +1651,49 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
if ((n = len) <= 0) return n;
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, ptr, len, len);
if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
if (length < 0) rb_sys_fail_path(fptr->pathv);
return length;
}
}
if (fptr->wbuf.ptr == NULL && !(!nosync && (fptr->mode & FMODE_SYNC))) {
fptr->wbuf.off = 0;
fptr->wbuf.len = 0;
fptr->wbuf.capa = IO_WBUF_CAPA_MIN;
fptr->wbuf.ptr = ALLOC_N(char, fptr->wbuf.capa);
fptr->write_lock = rb_mutex_new();
rb_mutex_allow_trap(fptr->write_lock, 1);
rb_mutex_allow_trap(fptr->write_lock, 1);
}
if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) ||
(fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) {
struct binwrite_arg arg;
struct binwrite_arg arg;
arg.fptr = fptr;
arg.str = str;
arg.fptr = fptr;
arg.str = str;
retry:
arg.ptr = ptr + offset;
arg.length = n;
if (fptr->write_lock) {
arg.ptr = ptr + offset;
arg.length = n;
if (fptr->write_lock) {
r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
}
else {
r = io_binwrite_string((VALUE)&arg);
}
/* xxx: other threads may modify given string. */
}
else {
r = io_binwrite_string((VALUE)&arg);
}
/* xxx: other threads may modify given string. */
if (r == n) return len;
if (0 <= r) {
offset += r;
n -= r;
errno = EAGAIN;
}
if (r == -2L)
return -1L;
}
if (r == -2L)
return -1L;
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
rb_io_check_closed(fptr);
if (offset < len)
goto retry;
if (offset < len)
goto retry;
}
return -1L;
}
@ -1712,8 +1702,10 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}
MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len);
fptr->wbuf.len += (int)len;
return len;
}
@ -1853,7 +1845,7 @@ static VALUE
call_writev_internal(VALUE arg)
{
struct binwritev_arg *p = (struct binwritev_arg *)arg;
return rb_writev_internal(p->fptr->fd, p->iov, p->iovcnt);
return rb_writev_internal(p->fptr, p->iov, p->iovcnt);
}
static long
@ -1906,7 +1898,7 @@ io_binwritev(struct iovec *iov, int iovcnt, rb_io_t *fptr)
r = rb_mutex_synchronize(fptr->write_lock, call_writev_internal, (VALUE)&arg);
}
else {
r = rb_writev_internal(fptr->fd, iov, iovcnt);
r = rb_writev_internal(fptr, iov, iovcnt);
}
if (r >= 0) {
@ -2330,6 +2322,7 @@ fptr_wait_readable(rb_io_t *fptr)
if (ret)
rb_io_check_closed(fptr);
return ret;
}
@ -3063,10 +3056,11 @@ read_internal_call(VALUE arg)
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 1);
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 0);
if (result != Qundef) {
return (VALUE)RB_NUM2SSIZE(result);
// This is actually returned as a pseudo-VALUE and later cast to a long:
return (VALUE)rb_fiber_scheduler_io_result_apply(result);
}
}
@ -4761,10 +4755,7 @@ finish_writeconv(rb_io_t *fptr, int noalloc)
res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0);
while (dp-ds) {
retry:
if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
r = rb_write_internal2(fptr, ds, dp-ds);
else
r = rb_write_internal(fptr, ds, dp-ds);
r = rb_write_internal(fptr, ds, dp-ds);
if (r == dp-ds)
break;
if (0 <= r) {

View file

@ -192,6 +192,9 @@ class Scheduler
end
Fiber.yield
ensure
@readable.delete(io)
@writable.delete(io)
end
# Used for Kernel#sleep and Thread::Mutex#sleep
@ -257,6 +260,85 @@ class Scheduler
end
end
class IOBufferScheduler < Scheduler
EAGAIN = Errno::EAGAIN::Errno
def io_read(io, buffer, length)
offset = 0
while true
maximum_size = buffer.size - offset
result = blocking{io.read_nonblock(maximum_size, exception: false)}
# blocking{pp read: maximum_size, result: result, length: length}
case result
when :wait_readable
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return -EAGAIN
end
when :wait_writable
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return -EAGAIN
end
else
break unless result
buffer.copy(result, offset)
size = result.bytesize
offset += size
break if size >= length
length -= size
end
end
return offset
end
def io_write(io, buffer, length)
offset = 0
while true
maximum_size = buffer.size - offset
chunk = buffer.to_str(offset, maximum_size)
result = blocking{io.write_nonblock(chunk, exception: false)}
# blocking{pp write: maximum_size, result: result, length: length}
case result
when :wait_readable
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return -EAGAIN
end
when :wait_writable
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return -EAGAIN
end
else
offset += result
break if result >= length
length -= result
end
end
return offset
end
def blocking(&block)
Fiber.new(blocking: true, &block).resume
end
end
class BrokenUnblockScheduler < Scheduler
def unblock(blocker, fiber)
super

View file

@ -140,4 +140,36 @@ class TestFiberIO < Test::Unit::TestCase
server.close
th.join
end
def test_read_write_blocking
skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
i.nonblock = false
o.nonblock = false
message = nil
thread = Thread.new do
# This scheduler provides non-blocking `io_read`/`io_write`:
scheduler = IOBufferScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
message = i.read(20)
i.close
end
Fiber.schedule do
o.write("Hello World")
o.close
end
end
thread.join
assert_equal MESSAGE, message
assert_predicate(i, :closed?)
assert_predicate(o, :closed?)
end
end

View file

@ -0,0 +1,114 @@
# frozen_string_literal: true
require 'test/unit'
require_relative 'scheduler'
require 'timeout'
class TestFiberIOBuffer < Test::Unit::TestCase
MESSAGE = "Hello World"
def test_read_write_blocking
skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
i.nonblock = false
o.nonblock = false
message = nil
thread = Thread.new do
scheduler = IOBufferScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
message = i.read(20)
i.close
end
Fiber.schedule do
o.write(MESSAGE)
o.close
end
end
thread.join
assert_equal MESSAGE, message
assert_predicate(i, :closed?)
assert_predicate(o, :closed?)
end
def test_timeout_after
skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
i.nonblock = false
o.nonblock = false
message = nil
error = nil
thread = Thread.new do
scheduler = IOBufferScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
Timeout.timeout(0.1) do
message = i.read(20)
end
rescue Timeout::Error => error
# Assertions below.
ensure
i.close
end
end
thread.join
assert_nil message
assert_kind_of Timeout::Error, error
end
def test_read_nonblock
skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
message = nil
thread = Thread.new do
scheduler = IOBufferScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
message = i.read_nonblock(20, exception: false)
i.close
end
end
thread.join
assert_equal :wait_readable, message
o.close
end
def test_write_nonblock
skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
thread = Thread.new do
scheduler = IOBufferScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
o.write_nonblock(MESSAGE, exception: false)
o.close
end
end
thread.join
assert_equal MESSAGE, i.read
end
end