mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Add fiber scheduler hooks for pread
/pwrite
, and add support to IO::Buffer
.
This commit is contained in:
parent
91c5c1c132
commit
bed920f073
Notes:
git
2021-12-23 08:20:31 +09:00
Merged: https://github.com/ruby/ruby/pull/5249 Merged-By: ioquatix <samuel@codeotaku.com>
6 changed files with 288 additions and 2 deletions
|
@ -7585,6 +7585,7 @@ io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h
|
|||
io_buffer.$(OBJEXT): {$(VPATH)}config.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}defines.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}encoding.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}fiber/scheduler.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}intern.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}internal.h
|
||||
io_buffer.$(OBJEXT): {$(VPATH)}internal/anyargs.h
|
||||
|
|
|
@ -261,6 +261,32 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
|
|||
*/
|
||||
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
|
||||
|
||||
/**
|
||||
* Nonblocking read from the passed IO at the specified offset.
|
||||
*
|
||||
* @param[in] scheduler Target scheduler.
|
||||
* @param[out] io An io object to read from.
|
||||
* @param[out] buffer Return buffer.
|
||||
* @param[in] length Requested number of bytes to read.
|
||||
* @param[in] offset The offset in the given IO to read the data from.
|
||||
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
|
||||
* @return otherwise What `scheduler.io_read` returns.
|
||||
*/
|
||||
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset);
|
||||
|
||||
/**
|
||||
* Nonblocking write to the passed IO at the specified offset.
|
||||
*
|
||||
* @param[in] scheduler Target scheduler.
|
||||
* @param[out] io An io object to write to.
|
||||
* @param[in] buffer What to write.
|
||||
* @param[in] length Number of bytes to write.
|
||||
* @param[in] offset The offset in the given IO to write the data to.
|
||||
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
|
||||
* @return otherwise What `scheduler.io_write` returns.
|
||||
*/
|
||||
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset);
|
||||
|
||||
/**
|
||||
* Nonblocking read from the passed IO using a native buffer.
|
||||
*
|
||||
|
|
|
@ -80,6 +80,12 @@ VALUE rb_io_buffer_transfer(VALUE self);
|
|||
void rb_io_buffer_resize(VALUE self, size_t size);
|
||||
void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length);
|
||||
|
||||
// The length is the minimum required length.
|
||||
VALUE rb_io_buffer_read(VALUE self, VALUE io, size_t length);
|
||||
VALUE rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset);
|
||||
VALUE rb_io_buffer_write(VALUE self, VALUE io, size_t length);
|
||||
VALUE rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset);
|
||||
|
||||
RBIMPL_SYMBOL_EXPORT_END()
|
||||
|
||||
#endif /* RUBY_IO_BUFFER_T */
|
||||
|
|
173
io_buffer.c
173
io_buffer.c
|
@ -8,6 +8,7 @@
|
|||
|
||||
#include "ruby/io.h"
|
||||
#include "ruby/io/buffer.h"
|
||||
#include "ruby/fiber/scheduler.h"
|
||||
|
||||
#include "internal.h"
|
||||
#include "internal/string.h"
|
||||
|
@ -1864,6 +1865,172 @@ size_t io_buffer_default_size(size_t page_size) {
|
|||
return platform_agnostic_default_size;
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_io_buffer_read(VALUE self, VALUE io, size_t length)
|
||||
{
|
||||
VALUE scheduler = rb_fiber_scheduler_current();
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length);
|
||||
|
||||
if (result != Qundef) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
struct rb_io_buffer *data = NULL;
|
||||
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
|
||||
|
||||
io_buffer_validate_range(data, 0, length);
|
||||
|
||||
int descriptor = rb_io_descriptor(io);
|
||||
|
||||
void * base;
|
||||
size_t size;
|
||||
io_buffer_get_bytes_for_writing(data, &base, &size);
|
||||
|
||||
ssize_t result = read(descriptor, base, size);
|
||||
|
||||
return rb_fiber_scheduler_io_result(result, errno);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
io_buffer_read(VALUE self, VALUE io, VALUE length)
|
||||
{
|
||||
return rb_io_buffer_read(self, io, RB_NUM2SIZE(length));
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset)
|
||||
{
|
||||
VALUE scheduler = rb_fiber_scheduler_current();
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, self, length, offset);
|
||||
|
||||
if (result != Qundef) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
struct rb_io_buffer *data = NULL;
|
||||
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
|
||||
|
||||
io_buffer_validate_range(data, 0, length);
|
||||
|
||||
int descriptor = rb_io_descriptor(io);
|
||||
|
||||
void * base;
|
||||
size_t size;
|
||||
io_buffer_get_bytes_for_writing(data, &base, &size);
|
||||
|
||||
#if defined(HAVE_PREAD)
|
||||
ssize_t result = pread(descriptor, base, size, offset);
|
||||
#else
|
||||
// This emulation is not thread safe, but the GVL means it's unlikely to be a problem.
|
||||
off_t current_offset = lseek(descriptor, 0, SEEK_CUR);
|
||||
if (current_offset == (off_t)-1)
|
||||
return rb_fiber_scheduler_io_result(-1, errno);
|
||||
|
||||
if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1)
|
||||
return rb_fiber_scheduler_io_result(-1, errno);
|
||||
|
||||
ssize_t result = read(descriptor, base, size);
|
||||
|
||||
if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1)
|
||||
return rb_fiber_scheduler_io_result(-1, errno);
|
||||
#endif
|
||||
|
||||
return rb_fiber_scheduler_io_result(result, errno);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
io_buffer_pread(VALUE self, VALUE io, VALUE length, VALUE offset)
|
||||
{
|
||||
return rb_io_buffer_pread(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset));
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_io_buffer_write(VALUE self, VALUE io, size_t length)
|
||||
{
|
||||
VALUE scheduler = rb_fiber_scheduler_current();
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length);
|
||||
|
||||
if (result != Qundef) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
struct rb_io_buffer *data = NULL;
|
||||
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
|
||||
|
||||
io_buffer_validate_range(data, 0, length);
|
||||
|
||||
int descriptor = rb_io_descriptor(io);
|
||||
|
||||
const void * base;
|
||||
size_t size;
|
||||
io_buffer_get_bytes_for_reading(data, &base, &size);
|
||||
|
||||
ssize_t result = write(descriptor, base, length);
|
||||
|
||||
return rb_fiber_scheduler_io_result(result, errno);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
io_buffer_write(VALUE self, VALUE io, VALUE length)
|
||||
{
|
||||
return rb_io_buffer_write(self, io, RB_NUM2SIZE(length));
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset)
|
||||
{
|
||||
VALUE scheduler = rb_fiber_scheduler_current();
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, self, length, OFFT2NUM(offset));
|
||||
|
||||
if (result != Qundef) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
struct rb_io_buffer *data = NULL;
|
||||
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
|
||||
|
||||
io_buffer_validate_range(data, 0, length);
|
||||
|
||||
int descriptor = rb_io_descriptor(io);
|
||||
|
||||
const void * base;
|
||||
size_t size;
|
||||
io_buffer_get_bytes_for_reading(data, &base, &size);
|
||||
|
||||
#if defined(HAVE_PWRITE)
|
||||
ssize_t result = pwrite(descriptor, base, length, offset);
|
||||
#else
|
||||
// This emulation is not thread safe, but the GVL means it's unlikely to be a problem.
|
||||
off_t current_offset = lseek(descriptor, 0, SEEK_CUR);
|
||||
if (current_offset == (off_t)-1)
|
||||
return rb_fiber_scheduler_io_result(-1, errno);
|
||||
|
||||
if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1)
|
||||
return rb_fiber_scheduler_io_result(-1, errno);
|
||||
|
||||
ssize_t result = write(descriptor, base, length);
|
||||
|
||||
if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1)
|
||||
return rb_fiber_scheduler_io_result(-1, errno);
|
||||
#endif
|
||||
|
||||
return rb_fiber_scheduler_io_result(result, errno);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
io_buffer_pwrite(VALUE self, VALUE io, VALUE length, VALUE offset)
|
||||
{
|
||||
return rb_io_buffer_pwrite(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset));
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-class: IO::Buffer
|
||||
*
|
||||
|
@ -2038,4 +2205,10 @@ Init_IO_Buffer(void)
|
|||
|
||||
rb_define_method(rb_cIOBuffer, "get_string", io_buffer_get_string, -1);
|
||||
rb_define_method(rb_cIOBuffer, "set_string", io_buffer_set_string, -1);
|
||||
|
||||
// IO operations:
|
||||
rb_define_method(rb_cIOBuffer, "read", io_buffer_read, 2);
|
||||
rb_define_method(rb_cIOBuffer, "pread", io_buffer_pread, 3);
|
||||
rb_define_method(rb_cIOBuffer, "write", io_buffer_write, 2);
|
||||
rb_define_method(rb_cIOBuffer, "pwrite", io_buffer_pwrite, 3);
|
||||
}
|
||||
|
|
27
scheduler.c
27
scheduler.c
|
@ -25,8 +25,8 @@ static ID id_timeout_after;
|
|||
static ID id_kernel_sleep;
|
||||
static ID id_process_wait;
|
||||
|
||||
static ID id_io_read;
|
||||
static ID id_io_write;
|
||||
static ID id_io_read, id_io_pread;
|
||||
static ID id_io_write, id_io_pwrite;
|
||||
static ID id_io_wait;
|
||||
static ID id_io_close;
|
||||
|
||||
|
@ -46,7 +46,10 @@ Init_Fiber_Scheduler(void)
|
|||
id_process_wait = rb_intern_const("process_wait");
|
||||
|
||||
id_io_read = rb_intern_const("io_read");
|
||||
id_io_pread = rb_intern_const("io_pread");
|
||||
id_io_write = rb_intern_const("io_write");
|
||||
id_io_pwrite = rb_intern_const("io_pwrite");
|
||||
|
||||
id_io_wait = rb_intern_const("io_wait");
|
||||
id_io_close = rb_intern_const("io_close");
|
||||
|
||||
|
@ -238,6 +241,16 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
|
|||
return rb_check_funcall(scheduler, id_io_read, 3, arguments);
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
|
||||
{
|
||||
VALUE arguments[] = {
|
||||
io, buffer, SIZET2NUM(length), OFFT2NUM(offset)
|
||||
};
|
||||
|
||||
return rb_check_funcall(scheduler, id_io_pread, 4, arguments);
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
|
||||
{
|
||||
|
@ -248,6 +261,16 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
|
|||
return rb_check_funcall(scheduler, id_io_write, 3, arguments);
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
|
||||
{
|
||||
VALUE arguments[] = {
|
||||
io, buffer, SIZET2NUM(length), OFFT2NUM(offset)
|
||||
};
|
||||
|
||||
return rb_check_funcall(scheduler, id_io_pwrite, 4, arguments);
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
|
||||
{
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
# frozen_string_literal: false
|
||||
|
||||
require 'tempfile'
|
||||
|
||||
class TestIOBuffer < Test::Unit::TestCase
|
||||
experimental = Warning[:experimental]
|
||||
begin
|
||||
|
@ -270,4 +272,59 @@ class TestIOBuffer < Test::Unit::TestCase
|
|||
|
||||
input.close
|
||||
end
|
||||
|
||||
def test_read
|
||||
io = Tempfile.new
|
||||
io.write("Hello World")
|
||||
io.seek(0)
|
||||
|
||||
buffer = IO::Buffer.new(128)
|
||||
buffer.read(io, 5)
|
||||
|
||||
assert_equal "Hello", buffer.get_string(0, 5)
|
||||
ensure
|
||||
io.close!
|
||||
end
|
||||
|
||||
def test_write
|
||||
io = Tempfile.new
|
||||
|
||||
buffer = IO::Buffer.new(128)
|
||||
buffer.set_string("Hello")
|
||||
buffer.write(io, 5)
|
||||
|
||||
io.seek(0)
|
||||
assert_equal "Hello", io.read(5)
|
||||
ensure
|
||||
io.close!
|
||||
end
|
||||
|
||||
def test_pread
|
||||
io = Tempfile.new
|
||||
io.write("Hello World")
|
||||
io.seek(0)
|
||||
|
||||
buffer = IO::Buffer.new(128)
|
||||
buffer.pread(io, 5, 6)
|
||||
|
||||
assert_equal "World", buffer.get_string(0, 5)
|
||||
assert_equal 0, io.tell
|
||||
ensure
|
||||
io.close!
|
||||
end
|
||||
|
||||
def test_pwrite
|
||||
io = Tempfile.new
|
||||
|
||||
buffer = IO::Buffer.new(128)
|
||||
buffer.set_string("World")
|
||||
buffer.pwrite(io, 5, 6)
|
||||
|
||||
assert_equal 0, io.tell
|
||||
|
||||
io.seek(6)
|
||||
assert_equal "World", io.read(5)
|
||||
ensure
|
||||
io.close!
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue