From ced1d172804b6dfe39aa31a323ffab80a25223b9 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 12 Oct 2022 12:59:05 +1300 Subject: [PATCH] Improvements to IO::Buffer implementation and documentation. (#6525) --- common.mk | 5 + cont.c | 37 ++- include/ruby/fiber/scheduler.h | 22 +- include/ruby/io/buffer.h | 10 +- io_buffer.c | 441 ++++++++++++++++++++++----------- scheduler.c | 28 +-- test/fiber/scheduler.rb | 111 ++++----- test/ruby/test_io_buffer.rb | 22 +- vm_core.h | 2 +- 9 files changed, 436 insertions(+), 242 deletions(-) diff --git a/common.mk b/common.mk index 0fe5217566..464f64cb76 100644 --- a/common.mk +++ b/common.mk @@ -7717,12 +7717,17 @@ io.$(OBJEXT): {$(VPATH)}vm_core.h io.$(OBJEXT): {$(VPATH)}vm_opts.h io_buffer.$(OBJEXT): $(hdrdir)/ruby/ruby.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/array.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/bignum.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/bits.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/compilers.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/error.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/fixnum.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/numeric.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/serial.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/static_assert.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/string.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/thread.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/vm.h io_buffer.$(OBJEXT): {$(VPATH)}assert.h io_buffer.$(OBJEXT): {$(VPATH)}backward/2/assume.h io_buffer.$(OBJEXT): {$(VPATH)}backward/2/attributes.h diff --git a/cont.c b/cont.c index 499e1e7910..39f0fc8171 100644 --- a/cont.c +++ b/cont.c @@ -2410,20 +2410,34 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) VALUE rb_fiber_blocking_p(VALUE fiber) { - return RBOOL(fiber_ptr(fiber)->blocking != 0); + return RBOOL(fiber_ptr(fiber)->blocking); } static VALUE -fiber_blocking_yield(VALUE fiber) +fiber_blocking_yield(VALUE fiber_value) { - fiber_ptr(fiber)->blocking += 1; - return rb_yield(fiber); + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + // fiber->blocking is `unsigned int : 1`, so we use it as a boolean: + fiber->blocking = 1; + + // Once the fiber is blocking, and current, we increment the thread blocking state: + th->blocking += 1; + + return rb_yield(fiber_value); } static VALUE -fiber_blocking_ensure(VALUE fiber) +fiber_blocking_ensure(VALUE fiber_value) { - fiber_ptr(fiber)->blocking -= 1; + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + // We are no longer blocking: + fiber->blocking = 0; + th->blocking -= 1; + return Qnil; } @@ -2440,8 +2454,15 @@ fiber_blocking_ensure(VALUE fiber) VALUE rb_fiber_blocking(VALUE class) { - VALUE fiber = rb_fiber_current(); - return rb_ensure(fiber_blocking_yield, fiber, fiber_blocking_ensure, fiber); + VALUE fiber_value = rb_fiber_current(); + rb_fiber_t *fiber = fiber_ptr(fiber_value); + + // If we are already blocking, this is essentially a no-op: + if (fiber->blocking) { + return rb_yield(fiber_value); + } else { + return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value); + } } /* diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h index d38651da5c..37985e1285 100644 --- a/include/ruby/fiber/scheduler.h +++ b/include/ruby/fiber/scheduler.h @@ -23,6 +23,8 @@ RBIMPL_SYMBOL_EXPORT_BEGIN() +#define RUBY_FIBER_SCHEDULER_VERSION 2 + struct timeval; /** @@ -248,10 +250,11 @@ VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io); * @param[out] io An io object to read from. * @param[out] buffer Return buffer. * @param[in] length Requested number of bytes to read. + * @param[in] offset The offset in the buffer to read to. * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. * @return otherwise What `scheduler.io_read` returns `[-errno, size]`. */ -VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length); +VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset); /** * Nonblocking write to the passed IO. @@ -260,36 +263,39 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t * @param[out] io An io object to write to. * @param[in] buffer What to write. * @param[in] length Number of bytes to write. + * @param[in] offset The offset in the buffer to write from. * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. * @return otherwise What `scheduler.io_write` returns `[-errno, size]`. */ -VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length); +VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset); /** * Nonblocking read from the passed IO at the specified offset. * * @param[in] scheduler Target scheduler. * @param[out] io An io object to read from. - * @param[out] buffer Return buffer. + * @param[in] from The offset in the given IO to read the data from. + * @param[out] buffer The buffer to read the data to. * @param[in] length Requested number of bytes to read. - * @param[in] offset The offset in the given IO to read the data from. + * @param[in] offset The offset in the buffer to read to. * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. * @return otherwise What `scheduler.io_read` returns. */ -VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, rb_off_t offset); +VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset); /** * Nonblocking write to the passed IO at the specified offset. * * @param[in] scheduler Target scheduler. * @param[out] io An io object to write to. - * @param[in] buffer What to write. + * @param[in] from The offset in the given IO to write the data to. + * @param[in] buffer The buffer to write the data from. * @param[in] length Number of bytes to write. - * @param[in] offset The offset in the given IO to write the data to. + * @param[in] offset The offset in the buffer to write from. * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. * @return otherwise What `scheduler.io_write` returns. */ -VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, rb_off_t offset); +VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset); /** * Nonblocking read from the passed IO using a native buffer. diff --git a/include/ruby/io/buffer.h b/include/ruby/io/buffer.h index 16b23ec629..dd92db5bbe 100644 --- a/include/ruby/io/buffer.h +++ b/include/ruby/io/buffer.h @@ -21,6 +21,8 @@ RBIMPL_SYMBOL_EXPORT_BEGIN() // WARNING: This entire interface is experimental and may change in the future! #define RB_IO_BUFFER_EXPERIMENTAL 1 +#define RUBY_IO_BUFFER_VERSION 2 + RUBY_EXTERN VALUE rb_cIOBuffer; RUBY_EXTERN size_t RUBY_IO_BUFFER_PAGE_SIZE; RUBY_EXTERN size_t RUBY_IO_BUFFER_DEFAULT_SIZE; @@ -81,10 +83,10 @@ void rb_io_buffer_resize(VALUE self, size_t size); void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length); // The length is the minimum required length. -VALUE rb_io_buffer_read(VALUE self, VALUE io, size_t length); -VALUE rb_io_buffer_pread(VALUE self, VALUE io, size_t length, rb_off_t offset); -VALUE rb_io_buffer_write(VALUE self, VALUE io, size_t length); -VALUE rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, rb_off_t offset); +VALUE rb_io_buffer_read(VALUE self, VALUE io, size_t length, size_t offset); +VALUE rb_io_buffer_pread(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset); +VALUE rb_io_buffer_write(VALUE self, VALUE io, size_t length, size_t offset); +VALUE rb_io_buffer_pwrite(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset); RBIMPL_SYMBOL_EXPORT_END() diff --git a/io_buffer.c b/io_buffer.c index 4326d21def..bc5fac8118 100644 --- a/io_buffer.c +++ b/io_buffer.c @@ -14,6 +14,7 @@ #include "internal/array.h" #include "internal/bits.h" #include "internal/error.h" +#include "internal/numeric.h" #include "internal/string.h" #include "internal/thread.h" @@ -439,27 +440,29 @@ rb_io_buffer_map(VALUE io, size_t size, rb_off_t offset, enum rb_io_buffer_flags * mapping, you need to open a file in read-write mode, and explicitly pass * +flags+ argument without IO::Buffer::IMMUTABLE. * - * File.write('test.txt', 'test') + * Example: * - * buffer = IO::Buffer.map(File.open('test.txt'), nil, 0, IO::Buffer::READONLY) - * # => # + * File.write('test.txt', 'test') * - * buffer.readonly? # => true + * buffer = IO::Buffer.map(File.open('test.txt'), nil, 0, IO::Buffer::READONLY) + * # => # * - * buffer.get_string - * # => "test" + * buffer.readonly? # => true * - * buffer.set_string('b', 0) - * # `set_string': Buffer is not writable! (IO::Buffer::AccessError) + * buffer.get_string + * # => "test" * - * # create read/write mapping: length 4 bytes, offset 0, flags 0 - * buffer = IO::Buffer.map(File.open('test.txt', 'r+'), 4, 0) - * buffer.set_string('b', 0) - * # => 1 + * buffer.set_string('b', 0) + * # `set_string': Buffer is not writable! (IO::Buffer::AccessError) * - * # Check it - * File.read('test.txt') - * # => "best" + * # create read/write mapping: length 4 bytes, offset 0, flags 0 + * buffer = IO::Buffer.map(File.open('test.txt', 'r+'), 4, 0) + * buffer.set_string('b', 0) + * # => 1 + * + * # Check it + * File.read('test.txt') + * # => "best" * * Note that some operating systems may not have cache coherency between mapped * buffers and file reads. @@ -467,9 +470,7 @@ rb_io_buffer_map(VALUE io, size_t size, rb_off_t offset, enum rb_io_buffer_flags static VALUE io_buffer_map(int argc, VALUE *argv, VALUE klass) { - if (argc < 1 || argc > 4) { - rb_error_arity(argc, 2, 4); - } + rb_check_arity(argc, 1, 4); // We might like to handle a string path? VALUE io = argv[0]; @@ -534,14 +535,14 @@ io_flags_for_size(size_t size) * * buffer = IO::Buffer.new(4) * # => - * # # - * # 0x00000000 00 00 00 00 .... + * # # + * # 0x00000000 00 00 00 00 .... * * buffer.get_string(0, 1) # => "\x00" * * buffer.set_string("test") * buffer - * # => + * # => * # # * # 0x00000000 74 65 73 74 test */ @@ -550,9 +551,7 @@ rb_io_buffer_initialize(int argc, VALUE *argv, VALUE self) { io_buffer_experimental(); - if (argc < 0 || argc > 2) { - rb_error_arity(argc, 0, 2); - } + rb_check_arity(argc, 0, 2); struct rb_io_buffer *data = NULL; TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); @@ -890,6 +889,8 @@ rb_io_buffer_mapped_p(VALUE self) * Locking is not thread safe, but is a semantic used to ensure buffers don't * move while being used by a system call. * + * Example: + * * buffer.locked do * buffer.write(io) # theoretical system call interface * end @@ -978,6 +979,14 @@ rb_io_buffer_try_unlock(VALUE self) * can enter the lock. Also, locked buffer can't be changed with #resize or * #free. * + * The following operations acquire a lock: #resize, #free. + * + * Locking is not thread safe. It is designed as a safety net around + * non-blocking system calls. You can only share a buffer between threads with + * appropriate synchronisation techniques. + * + * Example: + * * buffer = IO::Buffer.new(4) * buffer.locked? #=> false * @@ -993,12 +1002,6 @@ rb_io_buffer_try_unlock(VALUE self) * buffer.set_string("test", 0) * end * end - * - * The following operations acquire a lock: #resize, #free. - * - * Locking is not thread safe. It is designed as a safety net around - * non-blocking system calls. You can only share a buffer between threads with - * appropriate synchronisation techniques. */ VALUE rb_io_buffer_locked(VALUE self) @@ -1029,20 +1032,22 @@ rb_io_buffer_locked(VALUE self) * * After the buffer is freed, no further operations can't be performed on it. * - * buffer = IO::Buffer.for('test') - * buffer.free - * # => # - * - * buffer.get_value(:U8, 0) - * # in `get_value': The buffer is not allocated! (IO::Buffer::AllocationError) - * - * buffer.get_string - * # in `get_string': The buffer is not allocated! (IO::Buffer::AllocationError) - * - * buffer.null? - * # => true - * * You can resize a freed buffer to re-allocate it. + * + * Example: + * + * buffer = IO::Buffer.for('test') + * buffer.free + * # => # + * + * buffer.get_value(:U8, 0) + * # in `get_value': The buffer is not allocated! (IO::Buffer::AllocationError) + * + * buffer.get_string + * # in `get_string': The buffer is not allocated! (IO::Buffer::AllocationError) + * + * buffer.null? + * # => true */ VALUE rb_io_buffer_free(VALUE self) @@ -1068,7 +1073,7 @@ io_buffer_validate_range(struct rb_io_buffer *data, size_t offset, size_t length } /* - * call-seq: slice(offset, length) -> io_buffer + * call-seq: slice([offset = 0, [length]]) -> io_buffer * * Produce another IO::Buffer which is a slice (or view into) the current one * starting at +offset+ bytes and going for +length+ bytes. @@ -1076,45 +1081,56 @@ io_buffer_validate_range(struct rb_io_buffer *data, size_t offset, size_t length * The slicing happens without copying of memory, and the slice keeps being * associated with the original buffer's source (string, or file), if any. * - * Raises RuntimeError if the offset+length is out of the current + * If the offset is not given, it will be zero. If the offset is negative, it + * will raise an ArgumentError. + * + * If the length is not given, the slice will be as long as the original + * buffer minus the specified offset. If the length is negative, it will raise + * an ArgumentError. + * + * Raises RuntimeError if the offset+length is out of the current * buffer's bounds. * - * string = 'test' - * buffer = IO::Buffer.for(string) + * Example: * - * slice = buffer.slice(1, 2) - * # => - * # # - * # 0x00000000 65 73 es + * string = 'test' + * buffer = IO::Buffer.for(string) * - * # Put "o" into 0s position of the slice - * slice.set_string('o', 0) - * slice - * # => - * # # - * # 0x00000000 6f 73 os + * slice = buffer.slice + * # => + * # # + * # 0x00000000 74 65 73 74 test * + * buffer.slice(2) + * # => + * # # + * # 0x00000000 73 74 st * - * # it is also visible at position 1 of the original buffer - * buffer - * # => - * # # - * # 0x00000000 74 6f 73 74 tost + * slice = buffer.slice(1, 2) + * # => + * # # + * # 0x00000000 65 73 es * - * # ...and original string - * string - * # => tost + * # Put "o" into 0s position of the slice + * slice.set_string('o', 0) + * slice + * # => + * # # + * # 0x00000000 6f 73 os + * + * # it is also visible at position 1 of the original buffer + * buffer + * # => + * # # + * # 0x00000000 74 6f 73 74 tost + * + * # ...and original string + * string + * # => tost */ -VALUE -rb_io_buffer_slice(VALUE self, VALUE _offset, VALUE _length) +static VALUE +rb_io_buffer_slice(struct rb_io_buffer *data, VALUE self, size_t offset, size_t length) { - // TODO fail on negative offets/lengths. - size_t offset = NUM2SIZET(_offset); - size_t length = NUM2SIZET(_length); - - struct rb_io_buffer *data = NULL; - TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); - io_buffer_validate_range(data, offset, length); VALUE instance = rb_io_buffer_type_allocate(rb_class_of(self)); @@ -1133,6 +1149,37 @@ rb_io_buffer_slice(VALUE self, VALUE _offset, VALUE _length) return instance; } +static VALUE +io_buffer_slice(int argc, VALUE *argv, VALUE self) +{ + rb_check_arity(argc, 0, 2); + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + size_t offset = 0, length = 0; + + if (argc > 0) { + if (rb_int_negative_p(argv[0])) { + rb_raise(rb_eArgError, "Offset can't be negative!"); + } + + offset = NUM2SIZET(argv[0]); + } + + if (argc > 1) { + if (rb_int_negative_p(argv[1])) { + rb_raise(rb_eArgError, "Length can't be negative!"); + } + + length = NUM2SIZET(argv[1]); + } else { + length = data->size - offset; + } + + return rb_io_buffer_slice(data, self, offset, length); +} + int rb_io_buffer_get_bytes(VALUE self, void **base, size_t *size) { @@ -1154,7 +1201,7 @@ rb_io_buffer_get_bytes(VALUE self, void **base, size_t *size) return 0; } -inline static void +static inline void io_buffer_get_bytes_for_writing(struct rb_io_buffer *data, void **base, size_t *size) { if (data->flags & RB_IO_BUFFER_READONLY) { @@ -1215,17 +1262,19 @@ rb_io_buffer_get_bytes_for_reading(VALUE self, const void **base, size_t *size) * * Transfers ownership to a new buffer, deallocating the current one. * - * buffer = IO::Buffer.new('test') - * other = buffer.transfer - * other - * # => - * # # - * # 0x00000000 74 65 73 74 test - * buffer - * # => - * # # - * buffer.null? - * # => true + * Example: + * + * buffer = IO::Buffer.new('test') + * other = buffer.transfer + * other + * # => + * # # + * # 0x00000000 74 65 73 74 test + * buffer + * # => + * # # + * buffer.null? + * # => true */ VALUE rb_io_buffer_transfer(VALUE self) @@ -1339,7 +1388,7 @@ rb_io_buffer_resize(VALUE self, size_t size) * buffer = IO::Buffer.new(4) * buffer.set_string("test", 0) * buffer.resize(8) # resize to 8 bytes - * # => + * # => * # # * # 0x00000000 74 65 73 74 00 00 00 00 test.... * @@ -1811,7 +1860,7 @@ io_buffer_each_byte(int argc, VALUE *argv, VALUE self) return self; } -inline static void +static inline void rb_io_buffer_set_value(const void* base, size_t size, ID data_type, size_t *offset, VALUE value) { #define IO_BUFFER_SET_VALUE(name) if (data_type == RB_IO_BUFFER_DATA_TYPE_##name) {io_buffer_write_##name(base, size, offset, value); return;} @@ -1849,13 +1898,15 @@ rb_io_buffer_set_value(const void* base, size_t size, ID data_type, size_t *offs * symbols described in #get_value. * * buffer = IO::Buffer.new(8) - * # => + * # => * # # * # 0x00000000 00 00 00 00 00 00 00 00 + * * buffer.set_value(:U8, 1, 111) * # => 1 + * * buffer - * # => + * # => * # # * # 0x00000000 00 6f 00 00 00 00 00 00 .o...... * @@ -1863,11 +1914,12 @@ rb_io_buffer_set_value(const void* base, size_t size, ID data_type, size_t *offs * * buffer = IO::Buffer.new(8) * buffer.set_value(:U32, 0, 2.5) + * * buffer - * # => - * # # - * # 0x00000000 00 00 00 02 00 00 00 00 - * # ^^ the same as if we'd pass just integer 2 + * # => + * # # + * # 0x00000000 00 00 00 02 00 00 00 00 + * # ^^ the same as if we'd pass just integer 2 */ static VALUE io_buffer_set_value(VALUE self, VALUE type, VALUE _offset, VALUE value) @@ -1895,7 +1947,7 @@ io_buffer_set_value(VALUE self, VALUE type, VALUE _offset, VALUE value) * buffer = IO::Buffer.new(8) * buffer.set_values([:U8, :U16], 0, [1, 2]) * buffer - * # => + * # => * # # * # 0x00000000 01 00 02 00 00 00 00 00 ........ */ @@ -2028,7 +2080,7 @@ rb_io_buffer_initialize_copy(VALUE self, VALUE source) * at +offset+ using +memcpy+. For copying String instances, see #set_string. * * buffer = IO::Buffer.new(32) - * # => + * # => * # # * # 0x00000000 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ * # 0x00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ * @@ -2036,7 +2088,7 @@ rb_io_buffer_initialize_copy(VALUE self, VALUE source) * buffer.copy(IO::Buffer.for("test"), 8) * # => 4 -- size of data copied * buffer - * # => + * # => * # # * # 0x00000000 00 00 00 00 00 00 00 00 74 65 73 74 00 00 00 00 ........test.... * # 0x00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ * @@ -2077,7 +2129,7 @@ rb_io_buffer_initialize_copy(VALUE self, VALUE source) static VALUE io_buffer_copy(int argc, VALUE *argv, VALUE self) { - if (argc < 1 || argc > 4) rb_error_arity(argc, 1, 4); + rb_check_arity(argc, 1, 4); struct rb_io_buffer *data = NULL; TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); @@ -2097,18 +2149,18 @@ io_buffer_copy(int argc, VALUE *argv, VALUE self) * Read a chunk or all of the buffer into a string, in the specified * +encoding+. If no encoding is provided +Encoding::BINARY+ is used. * - * buffer = IO::Buffer.for('test') - * buffer.get_string - * # => "test" - * buffer.get_string(2) - * # => "st" - * buffer.get_string(2, 1) - * # => "s" + * buffer = IO::Buffer.for('test') + * buffer.get_string + * # => "test" + * buffer.get_string(2) + * # => "st" + * buffer.get_string(2, 1) + * # => "s" */ static VALUE io_buffer_get_string(int argc, VALUE *argv, VALUE self) { - if (argc > 3) rb_error_arity(argc, 0, 3); + rb_check_arity(argc, 0, 3); struct rb_io_buffer *data = NULL; TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); @@ -2167,7 +2219,7 @@ io_buffer_get_string(int argc, VALUE *argv, VALUE self) static VALUE io_buffer_set_string(int argc, VALUE *argv, VALUE self) { - if (argc < 1 || argc > 4) rb_error_arity(argc, 1, 4); + rb_check_arity(argc, 1, 4); struct rb_io_buffer *data = NULL; TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); @@ -2229,7 +2281,7 @@ rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length) static VALUE io_buffer_clear(int argc, VALUE *argv, VALUE self) { - if (argc > 3) rb_error_arity(argc, 0, 3); + rb_check_arity(argc, 0, 3); struct rb_io_buffer *data = NULL; TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); @@ -2297,11 +2349,11 @@ io_buffer_read_internal(void *_argument) } VALUE -rb_io_buffer_read(VALUE self, VALUE io, size_t length) +rb_io_buffer_read(VALUE self, VALUE io, size_t length, size_t offset) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length); + VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, SIZET2NUM(length), SIZET2NUM(offset)); if (result != Qundef) { return result; @@ -2311,7 +2363,7 @@ rb_io_buffer_read(VALUE self, VALUE io, size_t length) struct rb_io_buffer *data = NULL; TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); - io_buffer_validate_range(data, 0, length); + io_buffer_validate_range(data, offset, length); int descriptor = rb_io_descriptor(io); @@ -2319,6 +2371,8 @@ rb_io_buffer_read(VALUE self, VALUE io, size_t length) size_t size; io_buffer_get_bytes_for_writing(data, &base, &size); + base = (unsigned char*)base + offset; + struct io_buffer_read_internal_argument argument = { .descriptor = descriptor, .base = base, @@ -2328,10 +2382,55 @@ rb_io_buffer_read(VALUE self, VALUE io, size_t length) return rb_thread_io_blocking_region(io_buffer_read_internal, &argument, descriptor); } +/* + * call-seq: read(io, [length, [offset]]) -> self + * + * Read at most +length+ bytes from +io+ into the buffer, starting at + * +offset+. + * + * If +length+ is not given, read until the end of the buffer. + * + * If +offset+ is not given, read from the beginning of the buffer. + * + * If +length+ is 0, read nothing. + * + * Example: + * + * buffer = IO::Buffer.for('test') + * # => + * # + * # 0x00000000 74 65 73 74 test + * buffer.read(File.open('/dev/urandom', 'rb'), 4) + * # => + * # + * # 0x00000000 2a 0e 0e 0e *... + */ static VALUE -io_buffer_read(VALUE self, VALUE io, VALUE length) +io_buffer_read(int argc, VALUE *argv, VALUE self) { - return rb_io_buffer_read(self, io, RB_NUM2SIZE(length)); + rb_check_arity(argc, 2, 3); + + VALUE io = argv[0]; + + size_t length; + if (argc >= 2) { + if (rb_int_negative_p(argv[1])) { + rb_raise(rb_eArgError, "Length can't be negative!"); + } + + length = NUM2SIZET(argv[1]); + } + + size_t offset = 0; + if (argc >= 3) { + if (rb_int_negative_p(argv[2])) { + rb_raise(rb_eArgError, "Offset can't be negative!"); + } + + offset = NUM2SIZET(argv[2]); + } + + return rb_io_buffer_read(self, io, length, offset); } struct io_buffer_pread_internal_argument { @@ -2367,11 +2466,11 @@ io_buffer_pread_internal(void *_argument) } VALUE -rb_io_buffer_pread(VALUE self, VALUE io, size_t length, rb_off_t offset) +rb_io_buffer_pread(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, self, length, offset); + VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, OFFT2NUM(from), self, SIZET2NUM(length), SIZET2NUM(offset)); if (result != Qundef) { return result; @@ -2393,16 +2492,36 @@ rb_io_buffer_pread(VALUE self, VALUE io, size_t length, rb_off_t offset) .descriptor = descriptor, .base = base, .size = length, - .offset = offset, + .offset = from, }; return rb_thread_io_blocking_region(io_buffer_pread_internal, &argument, descriptor); } static VALUE -io_buffer_pread(VALUE self, VALUE io, VALUE length, VALUE offset) +io_buffer_pread(int argc, VALUE *argv, VALUE self) { - return rb_io_buffer_pread(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset)); + rb_check_arity(argc, 3, 4); + + VALUE io = argv[0]; + rb_off_t from = NUM2OFFT(argv[1]); + + size_t length; + if (rb_int_negative_p(argv[2])) { + rb_raise(rb_eArgError, "Length can't be negative!"); + } + length = NUM2SIZET(argv[2]); + + size_t offset = 0; + if (argc >= 4) { + if (rb_int_negative_p(argv[3])) { + rb_raise(rb_eArgError, "Offset can't be negative!"); + } + + offset = NUM2SIZET(argv[3]); + } + + return rb_io_buffer_pread(self, io, from, length, offset); } struct io_buffer_write_internal_argument { @@ -2420,11 +2539,11 @@ io_buffer_write_internal(void *_argument) } VALUE -rb_io_buffer_write(VALUE self, VALUE io, size_t length) +rb_io_buffer_write(VALUE self, VALUE io, size_t length, size_t offset) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length); + VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, SIZET2NUM(length), SIZET2NUM(offset)); if (result != Qundef) { return result; @@ -2434,7 +2553,7 @@ rb_io_buffer_write(VALUE self, VALUE io, size_t length) struct rb_io_buffer *data = NULL; TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); - io_buffer_validate_range(data, 0, length); + io_buffer_validate_range(data, offset, length); int descriptor = rb_io_descriptor(io); @@ -2442,6 +2561,8 @@ rb_io_buffer_write(VALUE self, VALUE io, size_t length) size_t size; io_buffer_get_bytes_for_reading(data, &base, &size); + base = (unsigned char *)base + offset; + struct io_buffer_write_internal_argument argument = { .descriptor = descriptor, .base = base, @@ -2452,9 +2573,31 @@ rb_io_buffer_write(VALUE self, VALUE io, size_t length) } static VALUE -io_buffer_write(VALUE self, VALUE io, VALUE length) +io_buffer_write(int argc, VALUE *argv, VALUE self) { - return rb_io_buffer_write(self, io, RB_NUM2SIZE(length)); + rb_check_arity(argc, 2, 3); + + VALUE io = argv[0]; + + size_t length; + if (argc >= 2) { + if (rb_int_negative_p(argv[1])) { + rb_raise(rb_eArgError, "Length can't be negative!"); + } + + length = NUM2SIZET(argv[1]); + } + + size_t offset = 0; + if (argc >= 3) { + if (rb_int_negative_p(argv[2])) { + rb_raise(rb_eArgError, "Offset can't be negative!"); + } + + offset = NUM2SIZET(argv[2]); + } + + return rb_io_buffer_write(self, io, length, offset); } struct io_buffer_pwrite_internal_argument { @@ -2490,11 +2633,11 @@ io_buffer_pwrite_internal(void *_argument) } VALUE -rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, rb_off_t offset) +rb_io_buffer_pwrite(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, self, length, OFFT2NUM(offset)); + VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, OFFT2NUM(from), self, SIZET2NUM(length), SIZET2NUM(offset)); if (result != Qundef) { return result; @@ -2516,16 +2659,36 @@ rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, rb_off_t offset) .descriptor = descriptor, .base = base, .size = length, - .offset = offset, + .offset = from, }; return rb_thread_io_blocking_region(io_buffer_pwrite_internal, &argument, descriptor); } static VALUE -io_buffer_pwrite(VALUE self, VALUE io, VALUE length, VALUE offset) +io_buffer_pwrite(int argc, VALUE *argv, VALUE self) { - return rb_io_buffer_pwrite(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset)); + rb_check_arity(argc, 3, 4); + + VALUE io = argv[0]; + rb_off_t from = NUM2OFFT(argv[1]); + + size_t length; + if (rb_int_negative_p(argv[2])) { + rb_raise(rb_eArgError, "Length can't be negative!"); + } + length = NUM2SIZET(argv[2]); + + size_t offset = 0; + if (argc >= 4) { + if (rb_int_negative_p(argv[3])) { + rb_raise(rb_eArgError, "Offset can't be negative!"); + } + + offset = NUM2SIZET(argv[3]); + } + + return rb_io_buffer_pwrite(self, io, from, length, offset); } static inline void @@ -2910,11 +3073,11 @@ io_buffer_not_inplace(VALUE self) * Empty buffer: * * buffer = IO::Buffer.new(8) # create empty 8-byte buffer - * # => + * # => * # # * # ... * buffer - * # => + * # => * # * # 0x00000000 00 00 00 00 00 00 00 00 * buffer.set_string('test', 2) # put there bytes of the "test" string, starting from offset 2 @@ -2926,11 +3089,11 @@ io_buffer_not_inplace(VALUE self) * * string = 'data' * buffer = IO::Buffer.for(string) - * # => + * # => * # # * # ... * buffer - * # => + * # => * # # * # 0x00000000 64 61 74 61 data * @@ -2939,7 +3102,7 @@ io_buffer_not_inplace(VALUE self) * buffer.set_string('---', 1) # write content, starting from offset 1 * # => 3 * buffer - * # => + * # => * # # * # 0x00000000 64 2d 2d 2d d--- * string # original string changed, too @@ -2950,7 +3113,7 @@ io_buffer_not_inplace(VALUE self) * File.write('test.txt', 'test data') * # => 9 * buffer = IO::Buffer.map(File.open('test.txt')) - * # => + * # => * # # * # ... * buffer.get_string(5, 2) # read 2 bytes, starting from offset 5 @@ -3037,7 +3200,7 @@ Init_IO_Buffer(void) rb_define_method(rb_cIOBuffer, "locked", rb_io_buffer_locked, 0); // Manipulation: - rb_define_method(rb_cIOBuffer, "slice", rb_io_buffer_slice, 2); + rb_define_method(rb_cIOBuffer, "slice", io_buffer_slice, -1); rb_define_method(rb_cIOBuffer, "<=>", rb_io_buffer_compare, 1); rb_define_method(rb_cIOBuffer, "resize", io_buffer_resize, 1); rb_define_method(rb_cIOBuffer, "clear", io_buffer_clear, -1); @@ -3098,8 +3261,8 @@ Init_IO_Buffer(void) rb_define_method(rb_cIOBuffer, "not!", io_buffer_not_inplace, 0); // IO operations: - rb_define_method(rb_cIOBuffer, "read", io_buffer_read, 2); - rb_define_method(rb_cIOBuffer, "pread", io_buffer_pread, 3); - rb_define_method(rb_cIOBuffer, "write", io_buffer_write, 2); - rb_define_method(rb_cIOBuffer, "pwrite", io_buffer_pwrite, 3); + rb_define_method(rb_cIOBuffer, "read", io_buffer_read, -1); + rb_define_method(rb_cIOBuffer, "pread", io_buffer_pread, -1); + rb_define_method(rb_cIOBuffer, "write", io_buffer_write, -1); + rb_define_method(rb_cIOBuffer, "pwrite", io_buffer_pwrite, -1); } diff --git a/scheduler.c b/scheduler.c index 675a0a6768..785ad06f19 100644 --- a/scheduler.c +++ b/scheduler.c @@ -232,43 +232,43 @@ rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io) } VALUE -rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length) +rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { - io, buffer, SIZET2NUM(length) + io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_read, 3, arguments); + return rb_check_funcall(scheduler, id_io_read, 4, arguments); } VALUE -rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, rb_off_t offset) +rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { - io, buffer, SIZET2NUM(length), OFFT2NUM(offset) + io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_pread, 4, arguments); + return rb_check_funcall(scheduler, id_io_pread, 5, arguments); } VALUE -rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length) +rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { - io, buffer, SIZET2NUM(length) + io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_write, 3, arguments); + return rb_check_funcall(scheduler, id_io_write, 4, arguments); } VALUE -rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, rb_off_t offset) +rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { VALUE arguments[] = { - io, buffer, SIZET2NUM(length), OFFT2NUM(offset) + io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_pwrite, 4, arguments); + return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments); } VALUE @@ -276,7 +276,7 @@ rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t { VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED); - VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length); + VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0); rb_io_buffer_unlock(buffer); rb_io_buffer_free(buffer); @@ -289,7 +289,7 @@ rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, { VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY); - VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length); + VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0); rb_io_buffer_unlock(buffer); rb_io_buffer_free(buffer); diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 96b22856d1..ceed606338 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -263,81 +263,68 @@ class Scheduler end class IOBufferScheduler < Scheduler - EAGAIN = Errno::EAGAIN::Errno + EAGAIN = -Errno::EAGAIN::Errno - def io_read(io, buffer, length) - offset = 0 + def io_read(io, buffer, length, offset) + total = 0 + io.nonblock = true - while true - maximum_size = buffer.size - offset - result = blocking{io.read_nonblock(maximum_size, exception: false)} - - # blocking{pp read: maximum_size, result: result, length: length} - - case result - when :wait_readable - if length > 0 - self.io_wait(io, IO::READABLE, nil) - else - return -EAGAIN - end - when :wait_writable - if length > 0 - self.io_wait(io, IO::WRITABLE, nil) - else - return -EAGAIN - end - else - break unless result - - buffer.set_string(result, offset) - - size = result.bytesize - offset += size - break if size >= length - length -= size - end - end - - return offset - end - - def io_write(io, buffer, length) - offset = 0 - - while true + while length >= 0 maximum_size = buffer.size - offset - chunk = buffer.get_string(offset, maximum_size) - result = blocking{io.write_nonblock(chunk, exception: false)} + result = blocking{buffer.read(io, maximum_size, offset)} - # blocking{pp write: maximum_size, result: result, length: length} - - case result - when :wait_readable - if length > 0 - self.io_wait(io, IO::READABLE, nil) - else - return -EAGAIN - end - when :wait_writable - if length > 0 - self.io_wait(io, IO::WRITABLE, nil) - else - return -EAGAIN - end - else + if result > 0 + total += result offset += result break if result >= length - length -= result + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::READABLE, nil) + else + return result + end + elsif result < 0 + return result end end - return offset + return total + end + + def io_write(io, buffer, length, offset) + total = 0 + io.nonblock = true + + while length >= 0 + maximum_size = buffer.size - offset + + result = blocking{buffer.write(io, maximum_size, offset)} + + if result > 0 + total += result + offset += result + break if result >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::WRITABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total end def blocking(&block) - Fiber.new(blocking: true, &block).resume + Fiber.blocking(&block) end end diff --git a/test/ruby/test_io_buffer.rb b/test/ruby/test_io_buffer.rb index 88b0a0280a..95ed98e1f4 100644 --- a/test/ruby/test_io_buffer.rb +++ b/test/ruby/test_io_buffer.rb @@ -169,16 +169,26 @@ class TestIOBuffer < Test::Unit::TestCase assert_equal("Hello World", buffer.get_string(8, 11)) end - def test_slice_bounds + def test_slice_arguments + buffer = IO::Buffer.for("Hello World") + + slice = buffer.slice + assert_equal "Hello World", slice.get_string + + slice = buffer.slice(2) + assert_equal("llo World", slice.get_string) + end + + def test_slice_bounds_error buffer = IO::Buffer.new(128) assert_raise ArgumentError do buffer.slice(128, 10) end - # assert_raise RuntimeError do - # pp buffer.slice(-10, 10) - # end + assert_raise ArgumentError do + buffer.slice(-10, 10) + end end def test_locked @@ -351,7 +361,7 @@ class TestIOBuffer < Test::Unit::TestCase io.seek(0) buffer = IO::Buffer.new(128) - buffer.pread(io, 5, 6) + buffer.pread(io, 6, 5) assert_equal "World", buffer.get_string(0, 5) assert_equal 0, io.tell @@ -364,7 +374,7 @@ class TestIOBuffer < Test::Unit::TestCase buffer = IO::Buffer.new(128) buffer.set_string("World") - buffer.pwrite(io, 5, 6) + buffer.pwrite(io, 6, 5) assert_equal 0, io.tell diff --git a/vm_core.h b/vm_core.h index aed15114e5..9aee345210 100644 --- a/vm_core.h +++ b/vm_core.h @@ -1099,7 +1099,7 @@ typedef struct rb_thread_struct { rb_fiber_t *root_fiber; VALUE scheduler; - unsigned blocking; + unsigned int blocking; /* misc */ VALUE name;