From 4b8903421828cb9d4de139180563ae8d8f04e1ab Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 2 Jul 2021 22:41:16 +1200 Subject: [PATCH] IO::Buffer for scheduler interface. --- NEWS.md | 5 + benchmark/buffer_get.yml | 9 + common.mk | 178 +++++ doc/fiber.md | 24 +- file.c | 21 +- include/ruby/fiber/scheduler.h | 40 +- include/ruby/internal/intern/file.h | 2 + include/ruby/io.h | 9 + include/ruby/io/buffer.h | 71 ++ inits.c | 1 + io.c | 360 +++++----- io_buffer.c | 1024 +++++++++++++++++++++++++++ scheduler.c | 49 +- test/ruby/test_io_buffer.rb | 131 ++++ 14 files changed, 1747 insertions(+), 177 deletions(-) create mode 100644 benchmark/buffer_get.yml create mode 100644 include/ruby/io/buffer.h create mode 100644 io_buffer.c create mode 100644 test/ruby/test_io_buffer.rb diff --git a/NEWS.md b/NEWS.md index 05e2d41eb9..5c2cac23d1 100644 --- a/NEWS.md +++ b/NEWS.md @@ -204,6 +204,9 @@ Outstanding ones only. * Introduce non-blocking `Timeout.timeout` using `timeout_after` hook. [[Feature #17470]] + * Introduce new scheduler hooks `io_read` and `io_write` along with a + low level `IO::Buffer` for zero-copy read/write. [[Feature #18020]] + * IO hooks `io_wait`, `io_read`, `io_write`, receive the original IO object where possible. [[Bug #18003]] @@ -424,9 +427,11 @@ See [the repository](https://github.com/ruby/error_highlight) in detail. [Bug #18003]: https://bugs.ruby-lang.org/issues/18003 [Feature #18008]: https://bugs.ruby-lang.org/issues/18008 [Feature #18015]: https://bugs.ruby-lang.org/issues/18015 +[Feature #18020]: https://bugs.ruby-lang.org/issues/18020 [Feature #18029]: https://bugs.ruby-lang.org/issues/18029 [Feature #18172]: https://bugs.ruby-lang.org/issues/18172 [Feature #18229]: https://bugs.ruby-lang.org/issues/18229 [Feature #18290]: https://bugs.ruby-lang.org/issues/18290 [GH-1509]: https://github.com/ruby/ruby/pull/1509 [GH-4815]: https://github.com/ruby/ruby/pull/4815 + diff --git a/benchmark/buffer_get.yml b/benchmark/buffer_get.yml new file mode 100644 index 0000000000..e375dcf85d --- /dev/null +++ b/benchmark/buffer_get.yml @@ -0,0 +1,9 @@ +benchmark: + - name: buffer.get + prelude: buffer = IO::Buffer.new(32, IO::Buffer::MAPPED) + script: buffer.get(:U32, 0) + loop_count: 20000000 + - name: string.unpack + prelude: string = "\0" * 32 + script: string.unpack("C") + loop_count: 20000000 diff --git a/common.mk b/common.mk index 96e452fc6c..97a9c34b81 100644 --- a/common.mk +++ b/common.mk @@ -104,6 +104,7 @@ COMMONOBJS = array.$(OBJEXT) \ hash.$(OBJEXT) \ inits.$(OBJEXT) \ io.$(OBJEXT) \ + io_buffer.$(OBJEXT) \ iseq.$(OBJEXT) \ load.$(OBJEXT) \ marshal.$(OBJEXT) \ @@ -6972,6 +6973,7 @@ io.$(OBJEXT): {$(VPATH)}internal/xmalloc.h io.$(OBJEXT): {$(VPATH)}io.c io.$(OBJEXT): {$(VPATH)}io.h io.$(OBJEXT): {$(VPATH)}io.rbinc +io.$(OBJEXT): {$(VPATH)}io/buffer.h io.$(OBJEXT): {$(VPATH)}method.h io.$(OBJEXT): {$(VPATH)}missing.h io.$(OBJEXT): {$(VPATH)}node.h @@ -6988,6 +6990,181 @@ io.$(OBJEXT): {$(VPATH)}thread_native.h io.$(OBJEXT): {$(VPATH)}util.h io.$(OBJEXT): {$(VPATH)}vm_core.h io.$(OBJEXT): {$(VPATH)}vm_opts.h +io_buffer.$(OBJEXT): $(hdrdir)/ruby/ruby.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/bits.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/compilers.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/static_assert.h +io_buffer.$(OBJEXT): $(top_srcdir)/internal/string.h +io_buffer.$(OBJEXT): {$(VPATH)}assert.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/assume.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/attributes.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/bool.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/gcc_version_since.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/inttypes.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/limits.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/long_long.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdalign.h +io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h +io_buffer.$(OBJEXT): {$(VPATH)}config.h +io_buffer.$(OBJEXT): {$(VPATH)}defines.h +io_buffer.$(OBJEXT): {$(VPATH)}encoding.h +io_buffer.$(OBJEXT): {$(VPATH)}intern.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/anyargs.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/char.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/double.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/fixnum.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/gid_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/int.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/intptr_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/long.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/long_long.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/mode_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/off_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/pid_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/short.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/size_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/st_data_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/uid_t.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/assume.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/alloc_size.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/artificial.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/cold.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/const.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/constexpr.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/deprecated.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/diagnose_if.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/enum_extensibility.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/error.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/flag_enum.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/forceinline.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/format.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/maybe_unused.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noalias.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/nodiscard.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noexcept.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noinline.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/nonnull.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noreturn.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/pure.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/restrict.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/returns_nonnull.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/warning.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/weakref.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/cast.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/apple.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/clang.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/gcc.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/intel.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/msvc.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/sunpro.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_since.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/config.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/constant_p.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rarray.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rbasic.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rbignum.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rclass.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rdata.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rfile.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rhash.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/robject.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rregexp.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rstring.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rstruct.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rtypeddata.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/ctype.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/dllexport.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/dosish.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/coderange.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/ctype.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/encoding.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/pathname.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/re.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/sprintf.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/string.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/symbol.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/transcode.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/error.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/eval.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/event.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/fl_type.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/gc.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/glob.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/globals.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/attribute.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/builtin.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/c_attribute.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/cpp_attribute.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/declspec_attribute.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/extension.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/feature.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/has/warning.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/array.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/bignum.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/class.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/compar.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/complex.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/cont.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/dir.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/enum.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/enumerator.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/error.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/eval.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/file.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/gc.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/hash.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/io.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/load.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/marshal.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/numeric.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/object.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/parse.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/proc.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/process.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/random.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/range.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/rational.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/re.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/ruby.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/select.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/select/largesize.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/signal.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/sprintf.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/string.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/struct.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/thread.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/time.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/variable.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/vm.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/interpreter.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/iterator.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/memory.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/method.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/module.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/newobj.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/rgengc.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/scan_args.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/special_consts.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/static_assert.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/stdalign.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/stdbool.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/symbol.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/value.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/value_type.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/variable.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/warning_push.h +io_buffer.$(OBJEXT): {$(VPATH)}internal/xmalloc.h +io_buffer.$(OBJEXT): {$(VPATH)}io.h +io_buffer.$(OBJEXT): {$(VPATH)}io/buffer.h +io_buffer.$(OBJEXT): {$(VPATH)}io_buffer.c +io_buffer.$(OBJEXT): {$(VPATH)}missing.h +io_buffer.$(OBJEXT): {$(VPATH)}onigmo.h +io_buffer.$(OBJEXT): {$(VPATH)}oniguruma.h +io_buffer.$(OBJEXT): {$(VPATH)}st.h +io_buffer.$(OBJEXT): {$(VPATH)}subst.h iseq.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h iseq.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h iseq.$(OBJEXT): $(CCAN_DIR)/list/list.h @@ -12922,6 +13099,7 @@ scheduler.$(OBJEXT): {$(VPATH)}internal/variable.h scheduler.$(OBJEXT): {$(VPATH)}internal/warning_push.h scheduler.$(OBJEXT): {$(VPATH)}internal/xmalloc.h scheduler.$(OBJEXT): {$(VPATH)}io.h +scheduler.$(OBJEXT): {$(VPATH)}io/buffer.h scheduler.$(OBJEXT): {$(VPATH)}method.h scheduler.$(OBJEXT): {$(VPATH)}missing.h scheduler.$(OBJEXT): {$(VPATH)}node.h diff --git a/doc/fiber.md b/doc/fiber.md index 9baab4e4d1..f0785d8ae6 100644 --- a/doc/fiber.md +++ b/doc/fiber.md @@ -48,6 +48,14 @@ When the thread exits, there is an implicit call to `set_scheduler`: Fiber.set_scheduler(nil) ``` +### Design + +The scheduler interface is designed to be a un-opinionated light-weight layer +between user code and blocking operations. The scheduler hooks should avoid +translating or converting arguments or return values. Ideally, the exact same +arguments from the user code are provided directly to the scheduler hook with +no changes. + ### Interface This is the interface you need to implement. @@ -65,7 +73,7 @@ class Scheduler end.value end - # Wait for the given file descriptor to match the specified events within + # Wait for the given io readiness to match the specified events within # the specified timeout. # @parameter event [Integer] A bit mask of `IO::READABLE`, # `IO::WRITABLE` and `IO::PRIORITY`. @@ -74,6 +82,20 @@ class Scheduler def io_wait(io, events, timeout) end + # Read from the given io into the specified buffer. + # @parameter io [IO] The io to read from. + # @parameter buffer [IO::Buffer] The buffer to read into. + # @parameter length [Integer] The minimum amount to read. + def io_read(io, buffer, length) + end + + # Write from the given buffer into the specified IO. + # @parameter io [IO] The io to write to. + # @parameter buffer [IO::Buffer] The buffer to write from. + # @parameter length [Integer] The minimum amount to write. + def io_write(io, buffer, length) + end + # Sleep the current task for the specified duration, or forever if not # specified. # @parameter duration [Numeric] The amount of time to sleep in seconds. diff --git a/file.c b/file.c index 7a257e5e9b..4629a9aee8 100644 --- a/file.c +++ b/file.c @@ -2515,20 +2515,27 @@ rb_file_birthtime(VALUE obj) * */ -static VALUE -rb_file_size(VALUE obj) +size_t rb_file_size(VALUE file) { rb_io_t *fptr; struct stat st; - GetOpenFile(obj, fptr); + RB_IO_POINTER(file, fptr); if (fptr->mode & FMODE_WRITABLE) { - rb_io_flush_raw(obj, 0); + rb_io_flush_raw(file, 0); } + if (fstat(fptr->fd, &st) == -1) { - rb_sys_fail_path(fptr->pathv); + rb_sys_fail_path(fptr->pathv); } - return OFFT2NUM(st.st_size); + + return st.st_size; +} + +static VALUE +file_size(VALUE self) +{ + return RB_SIZE2NUM(rb_file_size(self)); } static int @@ -6780,7 +6787,7 @@ Init_File(void) rb_define_method(rb_cFile, "mtime", rb_file_mtime, 0); rb_define_method(rb_cFile, "ctime", rb_file_ctime, 0); rb_define_method(rb_cFile, "birthtime", rb_file_birthtime, 0); - rb_define_method(rb_cFile, "size", rb_file_size, 0); + rb_define_method(rb_cFile, "size", file_size, 0); rb_define_method(rb_cFile, "chmod", rb_file_chmod, 1); rb_define_method(rb_cFile, "chown", rb_file_chown, 2); diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h index 093b936475..8294442216 100644 --- a/include/ruby/fiber/scheduler.h +++ b/include/ruby/fiber/scheduler.h @@ -193,12 +193,11 @@ VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io); * @param[in] scheduler Target scheduler. * @param[out] io An io object to read from. * @param[out] buffer Return buffer. - * @param[in] offset Offset inside of `buffer`. * @param[in] length Requested number of bytes to read. * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. * @return otherwise What `scheduler.io_read` returns. */ -VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length); +VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length); /** * Nonblocking write to the passed IO. @@ -206,12 +205,45 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t * @param[in] scheduler Target scheduler. * @param[out] io An io object to write to. * @param[in] buffer What to write. - * @param[in] offset Offset inside of `buffer`. * @param[in] length Number of bytes to write. * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. * @return otherwise What `scheduler.io_write` returns. */ -VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length); +VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length); + +/** + * Nonblocking read from the passed IO using a native buffer. + * + * @param[in] scheduler Target scheduler. + * @param[out] io An io object to read from. + * @param[out] buffer Return buffer. + * @param[in] length Requested number of bytes to read. + * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. + * @return otherwise What `scheduler.io_read` returns. + */ +VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length); + +/** + * Nonblocking write to the passed IO using a native buffer. + * + * @param[in] scheduler Target scheduler. + * @param[out] io An io object to write to. + * @param[in] buffer What to write. + * @param[in] length Number of bytes to write. + * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. + * @return otherwise What `scheduler.io_write` returns. + */ +VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length); + +/** + * Nonblocking close the given IO. + * + * @param[in] scheduler Target scheduler. + * @param[in] io An io object to close. + * @retval RUBY_Qundef `scheduler` doesn't have `#io_close`. + * @return otherwise What `scheduler.io_close` returns. + */ +VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io); /** * Nonblocking DNS lookup. diff --git a/include/ruby/internal/intern/file.h b/include/ruby/internal/intern/file.h index 8e98ba08f8..ce676bfd09 100644 --- a/include/ruby/internal/intern/file.h +++ b/include/ruby/internal/intern/file.h @@ -187,6 +187,8 @@ RBIMPL_ATTR_PURE() */ int rb_is_absolute_path(const char *path); +size_t rb_file_size(VALUE file); + RBIMPL_SYMBOL_EXPORT_END() #endif /* RBIMPL_INTERN_FILE_H */ diff --git a/include/ruby/io.h b/include/ruby/io.h index aac7846537..3e035c114d 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -670,6 +670,15 @@ VALUE rb_io_set_write_io(VALUE io, VALUE w); */ void rb_io_set_nonblock(rb_io_t *fptr); +/** + * Returns an integer representing the numeric file descriptor for + * io. + * + * @param[in] io An IO. + * @retval int A file descriptor. + */ +int rb_io_descriptor(VALUE io); + /** * This function breaks down the option hash that `IO#initialize` takes into * components. This is an implementation detail of rb_io_extract_modeenc() diff --git a/include/ruby/io/buffer.h b/include/ruby/io/buffer.h new file mode 100644 index 0000000000..073215186c --- /dev/null +++ b/include/ruby/io/buffer.h @@ -0,0 +1,71 @@ +/** + * @file + * @author Samuel Williams + * @date Fri 2 Jul 2021 16:29:01 NZST + * @copyright Copyright (C) 2021 Samuel Williams + * @copyright This file is a part of the programming language Ruby. + * Permission is hereby granted, to either redistribute and/or + * modify this file, provided that the conditions mentioned in the + * file COPYING are met. Consult the file for details. + */ + +#pragma once + +#include "ruby/ruby.h" +#include "ruby/internal/config.h" + +RUBY_SYMBOL_EXPORT_BEGIN + +RUBY_EXTERN VALUE rb_cIOBuffer; +RUBY_EXTERN size_t RUBY_IO_BUFFER_PAGE_SIZE; + +enum rb_io_buffer_flags { + // The memory in the buffer is owned by someone else. + RB_IO_BUFFER_EXTERNAL = 0, + // The memory in the buffer is allocated internally. + RB_IO_BUFFER_INTERNAL = 1, + // The memory in the buffer is mapped. + RB_IO_BUFFER_MAPPED = 2, + + // The buffer is locked and cannot be resized. + RB_IO_BUFFER_LOCKED = 16, + + // The buffer mapping is private and will not impact other processes or the underlying file. + RB_IO_BUFFER_PRIVATE = 32, + + // The buffer is read-only and cannot be modified. + RB_IO_BUFFER_IMMUTABLE = 64 +}; + +enum rb_io_buffer_endian { + RB_IO_BUFFER_LITTLE_ENDIAN = 4, + RB_IO_BUFFER_BIG_ENDIAN = 8, + +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_LITTLE_ENDIAN, +#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_BIG_ENDIAN, +#elif REG_DWORD == REG_DWORD_LITTLE_ENDIAN + RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_LITTLE_ENDIAN, +#elif REG_DWORD == REG_DWORD_BIG_ENDIAN + RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_BIG_ENDIAN, +#endif + + RB_IO_BUFFER_NETWORK_ENDIAN = RB_IO_BUFFER_BIG_ENDIAN, +}; + +VALUE rb_io_buffer_new(void *base, size_t size, enum rb_io_buffer_flags flags); +VALUE rb_io_buffer_map(VALUE io, size_t size, off_t offset, enum rb_io_buffer_flags flags); + +VALUE rb_io_buffer_lock(VALUE self); +VALUE rb_io_buffer_unlock(VALUE self); +VALUE rb_io_buffer_free(VALUE self); + +void rb_io_buffer_get_mutable(VALUE self, void **base, size_t *size); +void rb_io_buffer_get_immutable(VALUE self, const void **base, size_t *size); + +size_t rb_io_buffer_copy(VALUE self, VALUE source, size_t offset); +void rb_io_buffer_resize(VALUE self, size_t size, size_t preserve); +void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length); + +RUBY_SYMBOL_EXPORT_END diff --git a/inits.c b/inits.c index f702e306b0..8c230c6df0 100644 --- a/inits.c +++ b/inits.c @@ -51,6 +51,7 @@ rb_call_inits(void) CALL(marshal); CALL(Range); CALL(IO); + CALL(IO_Buffer) CALL(Dir); CALL(Time); CALL(Random); diff --git a/io.c b/io.c index ac7c7593af..81f7d8ecb6 100644 --- a/io.c +++ b/io.c @@ -14,6 +14,7 @@ #include "ruby/internal/config.h" #include "ruby/fiber/scheduler.h" +#include "ruby/io/buffer.h" #ifdef _WIN32 # include "ruby/ruby.h" @@ -131,6 +132,7 @@ #include "internal/transcode.h" #include "internal/variable.h" #include "ruby/io.h" +#include "ruby/io/buffer.h" #include "ruby/thread.h" #include "ruby/util.h" #include "ruby_atomic.h" @@ -203,7 +205,7 @@ VALUE rb_default_rs; static VALUE argf; -static ID id_write, id_read, id_getc, id_flush, id_readpartial, id_set_encoding; +static ID id_write, id_read, id_getc, id_flush, id_readpartial, id_set_encoding, id_fileno; static VALUE sym_mode, sym_perm, sym_flags, sym_extenc, sym_intenc, sym_encoding, sym_open_args; static VALUE sym_textmode, sym_binmode, sym_autoclose; static VALUE sym_SET, sym_CUR, sym_END; @@ -1060,7 +1062,7 @@ io_alloc(VALUE klass) struct io_internal_read_struct { VALUE th; - int fd; + rb_io_t *fptr; int nonblock; void *buf; size_t capa; @@ -1080,18 +1082,18 @@ struct io_internal_writev_struct { }; #endif -static int nogvl_wait_for_single_fd(VALUE th, int fd, short events); +static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events); static VALUE internal_read_func(void *ptr) { struct io_internal_read_struct *iis = ptr; ssize_t r; retry: - r = read(iis->fd, iis->buf, iis->capa); + r = read(iis->fptr->fd, iis->buf, iis->capa); if (r < 0 && !iis->nonblock) { int e = errno; if (io_again_p(e)) { - if (nogvl_wait_for_single_fd(iis->th, iis->fd, RB_WAITFD_IN) != -1) { + if (nogvl_wait_for(iis->th, iis->fptr, RB_WAITFD_IN) != -1) { goto retry; } errno = e; @@ -1132,36 +1134,62 @@ internal_writev_func(void *ptr) #endif static ssize_t -rb_read_internal(int fd, void *buf, size_t count) +rb_read_internal(rb_io_t *fptr, void *buf, size_t count) { + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, 1); + + if (result != Qundef) { + ssize_t length = RB_NUM2SSIZE(result); + + if (length < 0) rb_sys_fail_path(fptr->pathv); + + return length; + } + } + struct io_internal_read_struct iis = { .th = rb_thread_current(), - .fd = fd, + .fptr = fptr, .nonblock = 0, .buf = buf, .capa = count }; - return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd); + return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fptr->fd); } static ssize_t -rb_write_internal(int fd, const void *buf, size_t count) +rb_write_internal(rb_io_t *fptr, const void *buf, size_t count) { + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, count); + + if (result != Qundef) { + ssize_t length = RB_NUM2SSIZE(result); + + if (length < 0) rb_sys_fail_path(fptr->pathv); + + return length; + } + } + struct io_internal_write_struct iis = { - .fd = fd, + .fd = fptr->fd, .buf = buf, .capa = count }; - return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fd); + return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd); } static ssize_t -rb_write_internal2(int fd, const void *buf, size_t count) +rb_write_internal2(rb_io_t *fptr, const void *buf, size_t count) { struct io_internal_write_struct iis = { - .fd = fd, + .fd = fptr->fd, .buf = buf, .capa = count }; @@ -1581,7 +1609,7 @@ io_binwrite_string(VALUE arg) } } else { - r = rb_write_internal(fptr->fd, p->ptr, p->length); + r = rb_write_internal(fptr, p->ptr, p->length); } return r; @@ -1612,7 +1640,7 @@ io_binwrite_string(VALUE arg) return len; } - return rb_write_internal(p->fptr->fd, p->ptr, p->length); + return rb_write_internal(p->fptr, p->ptr, p->length); } #endif @@ -1628,7 +1656,7 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync) VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_write(scheduler, fptr->self, str, offset, len); + VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, ptr, len, len); if (result != Qundef) { ssize_t length = RB_NUM2SSIZE(result); @@ -2316,27 +2344,26 @@ io_fillbuf(rb_io_t *fptr) fptr->rbuf.capa = IO_RBUF_CAPA_FOR(fptr); fptr->rbuf.ptr = ALLOC_N(char, fptr->rbuf.capa); #ifdef _WIN32 - fptr->rbuf.capa--; + fptr->rbuf.capa--; #endif } if (fptr->rbuf.len == 0) { retry: - { - r = rb_read_internal(fptr->fd, fptr->rbuf.ptr, fptr->rbuf.capa); - } + r = rb_read_internal(fptr, fptr->rbuf.ptr, fptr->rbuf.capa); + if (r < 0) { if (fptr_wait_readable(fptr)) goto retry; - { - int e = errno; - VALUE path = rb_sprintf("fd:%d ", fptr->fd); - if (!NIL_P(fptr->pathv)) { - rb_str_append(path, fptr->pathv); - } - rb_syserr_fail_path(e, path); - } + + int e = errno; + VALUE path = rb_sprintf("fd:%d ", fptr->fd); + if (!NIL_P(fptr->pathv)) { + rb_str_append(path, fptr->pathv); + } + + rb_syserr_fail_path(e, path); } - if (r > 0) rb_io_check_closed(fptr); + if (r > 0) rb_io_check_closed(fptr); fptr->rbuf.off = 0; fptr->rbuf.len = (int)r; /* r should be <= rbuf_capa */ if (r == 0) @@ -2557,6 +2584,16 @@ rb_io_fileno(VALUE io) return INT2FIX(fd); } +int rb_io_descriptor(VALUE io) +{ + if (RB_TYPE_P(io, T_FILE)) { + rb_io_t *fptr = RFILE(io)->fptr; + rb_io_check_closed(fptr); + return fptr->fd; + } else { + return RB_NUM2INT(rb_funcall(io, id_fileno, 0)); + } +} /* * call-seq: @@ -2665,7 +2702,7 @@ io_bufread(char *ptr, long len, rb_io_t *fptr) while (n > 0) { again: rb_io_check_closed(fptr); - c = rb_read_internal(fptr->fd, ptr+offset, n); + c = rb_read_internal(fptr, ptr+offset, n); if (c == 0) break; if (c < 0) { if (fptr_wait_readable(fptr)) @@ -2711,19 +2748,6 @@ bufread_call(VALUE arg) static long io_fread(VALUE str, long offset, long size, rb_io_t *fptr) { - VALUE scheduler = rb_fiber_scheduler_current(); - if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_read(scheduler, fptr->self, str, offset, size); - - if (result != Qundef) { - ssize_t length = RB_NUM2SSIZE(result); - - if (length < 0) rb_sys_fail_path(fptr->pathv); - - return length; - } - } - long len; struct bufread_arg arg; @@ -3035,7 +3059,16 @@ read_internal_call(VALUE arg) { struct io_internal_read_struct *iis = (struct io_internal_read_struct *)arg; - return rb_thread_io_blocking_region(internal_read_func, iis, iis->fd); + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 1); + + if (result != Qundef) { + return (VALUE)RB_NUM2SSIZE(result); + } + } + + return rb_thread_io_blocking_region(internal_read_func, iis, iis->fptr->fd); } static long @@ -3079,7 +3112,7 @@ io_getpartial(int argc, VALUE *argv, VALUE io, int no_exception, int nonblock) } io_setstrbuf(&str, len); iis.th = rb_thread_current(); - iis.fd = fptr->fd; + iis.fptr = fptr; iis.nonblock = nonblock; iis.buf = RSTRING_PTR(str); iis.capa = len; @@ -3217,7 +3250,7 @@ io_read_nonblock(rb_execution_context_t *ec, VALUE io, VALUE length, VALUE str, if (n <= 0) { rb_io_set_nonblock(fptr); shrinkable |= io_setstrbuf(&str, len); - iis.fd = fptr->fd; + iis.fptr = fptr; iis.nonblock = 1; iis.buf = RSTRING_PTR(str); iis.capa = len; @@ -4726,10 +4759,10 @@ finish_writeconv(rb_io_t *fptr, int noalloc) res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0); while (dp-ds) { retry: - if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock)) - r = rb_write_internal2(fptr->fd, ds, dp-ds); - else - r = rb_write_internal(fptr->fd, ds, dp-ds); + if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock)) + r = rb_write_internal2(fptr, ds, dp-ds); + else + r = rb_write_internal(fptr, ds, dp-ds); if (r == dp-ds) break; if (0 <= r) { @@ -4796,7 +4829,7 @@ static int maygvl_close(int fd, int keepgvl) { if (keepgvl) - return close(fd); + return close(fd); /* * close() may block for certain file types (NFS, SO_LINGER sockets, @@ -4817,7 +4850,7 @@ static int maygvl_fclose(FILE *file, int keepgvl) { if (keepgvl) - return fclose(file); + return fclose(file); return (int)(intptr_t)rb_thread_call_without_gvl(nogvl_fclose, file, RUBY_UBF_IO, 0); } @@ -4835,64 +4868,77 @@ fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl, int mode = fptr->mode; if (fptr->writeconv) { - if (fptr->write_lock && !noraise) { + if (fptr->write_lock && !noraise) { struct finish_writeconv_arg arg; arg.fptr = fptr; arg.noalloc = noraise; err = rb_mutex_synchronize(fptr->write_lock, finish_writeconv_sync, (VALUE)&arg); - } - else { - err = finish_writeconv(fptr, noraise); - } + } + else { + err = finish_writeconv(fptr, noraise); + } } if (fptr->wbuf.len) { - if (noraise) { - io_flush_buffer_sync(fptr); - } - else { - if (io_fflush(fptr) < 0 && NIL_P(err)) - err = INT2NUM(errno); - } + if (noraise) { + io_flush_buffer_sync(fptr); + } + else { + if (io_fflush(fptr) < 0 && NIL_P(err)) + err = INT2NUM(errno); + } + } + + int done = 0; + + if (IS_PREP_STDIO(fptr) || fd <= 2) { + // Need to keep FILE objects of stdin, stdout and stderr, so we are done: + done = 1; } fptr->fd = -1; fptr->stdio_file = 0; fptr->mode &= ~(FMODE_READABLE|FMODE_WRITABLE); - /* - * ensure waiting_fd users do not hit EBADF, wait for them - * to exit before we call close(). - */ + // Ensure waiting_fd users do not hit EBADF. if (busy) { + // Wait for them to exit before we call close(). do rb_thread_schedule(); while (!list_empty(busy)); } - if (IS_PREP_STDIO(fptr) || fd <= 2) { - /* need to keep FILE objects of stdin, stdout and stderr */ - } - else if (stdio_file) { - /* stdio_file is deallocated anyway - * even if fclose failed. */ - if ((maygvl_fclose(stdio_file, noraise) < 0) && NIL_P(err)) - if (!noraise) err = INT2NUM(errno); - } - else if (0 <= fd) { - /* fptr->fd may be closed even if close fails. - * POSIX doesn't specify it. - * We assumes it is closed. */ + // Disable for now. + // if (!done && fd >= 0) { + // VALUE scheduler = rb_fiber_scheduler_current(); + // if (scheduler != Qnil) { + // VALUE result = rb_fiber_scheduler_io_close(scheduler, fptr->self); + // if (result != Qundef) done = 1; + // } + // } - /**/ - keepgvl |= !(mode & FMODE_WRITABLE); - keepgvl |= noraise; - if ((maygvl_close(fd, keepgvl) < 0) && NIL_P(err)) - if (!noraise) err = INT2NUM(errno); + if (!done && stdio_file) { + // stdio_file is deallocated anyway even if fclose failed. + if ((maygvl_fclose(stdio_file, noraise) < 0) && NIL_P(err)) + if (!noraise) err = INT2NUM(errno); + + done = 1; + } + + if (!done && fd >= 0) { + // fptr->fd may be closed even if close fails. POSIX doesn't specify it. + // We assumes it is closed. + + keepgvl |= !(mode & FMODE_WRITABLE); + keepgvl |= noraise; + if ((maygvl_close(fd, keepgvl) < 0) && NIL_P(err)) + if (!noraise) err = INT2NUM(errno); + + done = 1; } if (!NIL_P(err) && !noraise) { - if (RB_INTEGER_TYPE_P(err)) - rb_syserr_fail_path(NUM2INT(err), fptr->pathv); - else - rb_exc_raise(err); + if (RB_INTEGER_TYPE_P(err)) + rb_syserr_fail_path(NUM2INT(err), fptr->pathv); + else + rb_exc_raise(err); } } @@ -5333,7 +5379,7 @@ rb_io_syswrite(VALUE io, VALUE str) tmp = rb_str_tmp_frozen_acquire(str); RSTRING_GETMEM(tmp, ptr, len); - n = rb_write_internal(fptr->fd, ptr, len); + n = rb_write_internal(fptr, ptr, len); if (n < 0) rb_sys_fail_path(fptr->pathv); rb_str_tmp_frozen_release(str, tmp); @@ -5385,7 +5431,7 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io) io_setstrbuf(&str, ilen); iis.th = rb_thread_current(); - iis.fd = fptr->fd; + iis.fptr = fptr; iis.nonblock = 0; iis.buf = RSTRING_PTR(str); iis.capa = ilen; @@ -11141,8 +11187,8 @@ struct copy_stream_struct { off_t copy_length; /* (off_t)-1 if not specified */ off_t src_offset; /* (off_t)-1 if not specified */ - int src_fd; - int dst_fd; + rb_io_t *src_fptr; + rb_io_t *dst_fptr; unsigned close_src : 1; unsigned close_dst : 1; int error_no; @@ -11192,18 +11238,18 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp) struct wait_for_single_fd { VALUE scheduler; - int fd; + rb_io_t *fptr; short events; VALUE result; }; static void * -rb_thread_fiber_scheduler_wait_for_single_fd(void * _args) +rb_thread_fiber_scheduler_wait_for(void * _args) { struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args; - args->result = rb_fiber_scheduler_io_wait(args->scheduler, io_from_fd(args->fd), INT2NUM(args->events), Qnil); + args->result = rb_fiber_scheduler_io_wait(args->scheduler, args->fptr->self, INT2NUM(args->events), Qnil); return NULL; } @@ -11213,18 +11259,18 @@ rb_thread_fiber_scheduler_wait_for_single_fd(void * _args) STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN); STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT); static int -nogvl_wait_for_single_fd(VALUE th, int fd, short events) +nogvl_wait_for(VALUE th, rb_io_t *fptr, short events) { VALUE scheduler = rb_fiber_scheduler_current_for_thread(th); if (scheduler != Qnil) { - struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events}; - rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for_single_fd, &args); + struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events}; + rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args); return RTEST(args.result); } struct pollfd fds; - fds.fd = fd; + fds.fd = fptr->fd; fds.events = events; return poll(&fds, 1, -1); @@ -11232,12 +11278,12 @@ nogvl_wait_for_single_fd(VALUE th, int fd, short events) #else /* !USE_POLL */ # define IOWAIT_SYSCALL "select" static int -nogvl_wait_for_single_fd(VALUE th, int fd, short events) +nogvl_wait_for(VALUE th, rb_io_t *fptr, short events) { VALUE scheduler = rb_fiber_scheduler_current_for_thread(th); if (scheduler != Qnil) { - struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events}; - rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for_single_fd, &args); + struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events}; + rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args); return RTEST(args.result); } @@ -11245,17 +11291,17 @@ nogvl_wait_for_single_fd(VALUE th, int fd, short events) int ret; rb_fd_init(&fds); - rb_fd_set(fd, &fds); + rb_fd_set(fptr->fd, &fds); switch (events) { case RB_WAITFD_IN: - ret = rb_fd_select(fd + 1, &fds, 0, 0, 0); + ret = rb_fd_select(fptr->fd + 1, &fds, 0, 0, 0); break; case RB_WAITFD_OUT: - ret = rb_fd_select(fd + 1, 0, &fds, 0, 0); + ret = rb_fd_select(fptr->fd + 1, 0, &fds, 0, 0); break; default: - VM_UNREACHABLE(nogvl_wait_for_single_fd); + VM_UNREACHABLE(nogvl_wait_for); } rb_fd_term(&fds); @@ -11273,7 +11319,7 @@ maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp) ret = RB_NUM2INT(rb_io_wait(stp->src, RB_INT2NUM(RUBY_IO_READABLE), Qnil)); } else { - ret = nogvl_wait_for_single_fd(stp->th, stp->src_fd, RB_WAITFD_IN); + ret = nogvl_wait_for(stp->th, stp->src_fptr, RB_WAITFD_IN); } } while (ret < 0 && maygvl_copy_stream_continue_p(has_gvl, stp)); @@ -11291,7 +11337,7 @@ nogvl_copy_stream_wait_write(struct copy_stream_struct *stp) int ret; do { - ret = nogvl_wait_for_single_fd(stp->th, stp->dst_fd, RB_WAITFD_OUT); + ret = nogvl_wait_for(stp->th, stp->dst_fptr, RB_WAITFD_OUT); } while (ret < 0 && maygvl_copy_stream_continue_p(0, stp)); if (ret < 0) { @@ -11338,7 +11384,7 @@ nogvl_copy_file_range(struct copy_stream_struct *stp) if (src_offset < (off_t)0) { off_t current_offset; errno = 0; - current_offset = lseek(stp->src_fd, 0, SEEK_CUR); + current_offset = lseek(stp->src_fptr->fd, 0, SEEK_CUR); if (current_offset < (off_t)0 && errno) { stp->syserr = "lseek"; stp->error_no = errno; @@ -11358,7 +11404,7 @@ nogvl_copy_file_range(struct copy_stream_struct *stp) # else ss = (ssize_t)copy_length; # endif - ss = simple_copy_file_range(stp->src_fd, src_offset_ptr, stp->dst_fd, NULL, ss, 0); + ss = simple_copy_file_range(stp->src_fptr->fd, src_offset_ptr, stp->dst_fptr->fd, NULL, ss, 0); if (0 < ss) { stp->total += ss; copy_length -= ss; @@ -11393,7 +11439,7 @@ nogvl_copy_file_range(struct copy_stream_struct *stp) case EBADF: { int e = errno; - int flags = fcntl(stp->dst_fd, F_GETFL); + int flags = fcntl(stp->dst_fptr->fd, F_GETFL); if (flags != -1 && flags & O_APPEND) { return 0; @@ -11427,7 +11473,7 @@ nogvl_fcopyfile(struct copy_stream_struct *stp) if (!S_ISREG(stp->dst_stat.st_mode)) return 0; - if (lseek(stp->dst_fd, 0, SEEK_CUR) > (off_t)0) /* if dst IO was already written */ + if (lseek(stp->dst_fptr->fd, 0, SEEK_CUR) > (off_t)0) /* if dst IO was already written */ return 0; if (src_offset > (off_t)0) { @@ -11435,14 +11481,14 @@ nogvl_fcopyfile(struct copy_stream_struct *stp) /* get current offset */ errno = 0; - cur = lseek(stp->src_fd, 0, SEEK_CUR); + cur = lseek(stp->src_fptr->fd, 0, SEEK_CUR); if (cur < (off_t)0 && errno) { stp->error_no = errno; return 1; } errno = 0; - r = lseek(stp->src_fd, src_offset, SEEK_SET); + r = lseek(stp->src_fptr->fd, src_offset, SEEK_SET); if (r < (off_t)0 && errno) { stp->error_no = errno; return 1; @@ -11450,7 +11496,7 @@ nogvl_fcopyfile(struct copy_stream_struct *stp) } stp->copyfile_state = copyfile_state_alloc(); /* this will be freed by copy_stream_finalize() */ - ret = fcopyfile(stp->src_fd, stp->dst_fd, stp->copyfile_state, COPYFILE_DATA); + ret = fcopyfile(stp->src_fptr->fd, stp->dst_fptr->fd, stp->copyfile_state, COPYFILE_DATA); copyfile_state_get(stp->copyfile_state, COPYFILE_STATE_COPIED, &ss); /* get copied bytes */ if (ret == 0) { /* success */ @@ -11459,7 +11505,7 @@ nogvl_fcopyfile(struct copy_stream_struct *stp) off_t r; errno = 0; /* reset offset */ - r = lseek(stp->src_fd, cur, SEEK_SET); + r = lseek(stp->src_fptr->fd, cur, SEEK_SET); if (r < (off_t)0 && errno) { stp->error_no = errno; return 1; @@ -11557,7 +11603,7 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp) else { off_t cur; errno = 0; - cur = lseek(stp->src_fd, 0, SEEK_CUR); + cur = lseek(stp->src_fptr->fd, 0, SEEK_CUR); if (cur < (off_t)0 && errno) { stp->syserr = "lseek"; stp->error_no = errno; @@ -11575,10 +11621,10 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp) ss = (ssize_t)copy_length; # endif if (use_pread) { - ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, ss); + ss = simple_sendfile(stp->dst_fptr->fd, stp->src_fptr->fd, &src_offset, ss); } else { - ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, ss); + ss = simple_sendfile(stp->dst_fptr->fd, stp->src_fptr->fd, NULL, ss); } if (0 < ss) { stp->total += ss; @@ -11609,7 +11655,7 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp) int ret; #ifndef __linux__ /* - * Linux requires stp->src_fd to be a mmap-able (regular) file, + * Linux requires stp->src_fptr->fd to be a mmap-able (regular) file, * select() reports regular files to always be "ready", so * there is no need to select() on it. * Other OSes may have the same limitation for sendfile() which @@ -11632,12 +11678,12 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp) #endif static ssize_t -maygvl_read(int has_gvl, int fd, void *buf, size_t count) +maygvl_read(int has_gvl, rb_io_t *fptr, void *buf, size_t count) { if (has_gvl) - return rb_read_internal(fd, buf, count); + return rb_read_internal(fptr, buf, count); else - return read(fd, buf, count); + return read(fptr->fd, buf, count); } static ssize_t @@ -11646,11 +11692,11 @@ maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, ssize_t ss; retry_read: if (offset < (off_t)0) { - ss = maygvl_read(has_gvl, stp->src_fd, buf, len); + ss = maygvl_read(has_gvl, stp->src_fptr, buf, len); } else { #ifdef HAVE_PREAD - ss = pread(stp->src_fd, buf, len, offset); + ss = pread(stp->src_fptr->fd, buf, len, offset); #else stp->notimp = "pread"; return -1; @@ -11690,7 +11736,7 @@ nogvl_copy_stream_write(struct copy_stream_struct *stp, char *buf, size_t len) ssize_t ss; int off = 0; while (len) { - ss = write(stp->dst_fd, buf+off, len); + ss = write(stp->dst_fptr->fd, buf+off, len); if (ss < 0) { if (maygvl_copy_stream_continue_p(0, stp)) continue; @@ -11730,7 +11776,7 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp) if (use_pread && stp->close_src) { off_t r; errno = 0; - r = lseek(stp->src_fd, src_offset, SEEK_SET); + r = lseek(stp->src_fptr->fd, src_offset, SEEK_SET); if (r < (off_t)0 && errno) { stp->syserr = "lseek"; stp->error_no = errno; @@ -11812,7 +11858,7 @@ copy_stream_fallback_body(VALUE arg) off_t off = stp->src_offset; ID read_method = id_readpartial; - if (stp->src_fd < 0) { + if (!stp->src_fptr) { if (!rb_respond_to(stp->src, read_method)) { read_method = id_read; } @@ -11831,7 +11877,7 @@ copy_stream_fallback_body(VALUE arg) } l = buflen < rest ? buflen : (long)rest; } - if (stp->src_fd < 0) { + if (!stp->src_fptr) { VALUE rc = rb_funcall(stp->src, read_method, 2, INT2FIX(l), buf); if (read_method == id_read && NIL_P(rc)) @@ -11864,7 +11910,7 @@ copy_stream_fallback_body(VALUE arg) static VALUE copy_stream_fallback(struct copy_stream_struct *stp) { - if (stp->src_fd < 0 && stp->src_offset >= (off_t)0) { + if (!stp->src_fptr && stp->src_offset >= (off_t)0) { rb_raise(rb_eArgError, "cannot specify src_offset for non-IO"); } rb_rescue2(copy_stream_fallback_body, (VALUE)stp, @@ -11878,8 +11924,6 @@ copy_stream_body(VALUE arg) { struct copy_stream_struct *stp = (struct copy_stream_struct *)arg; VALUE src_io = stp->src, dst_io = stp->dst; - rb_io_t *src_fptr = 0, *dst_fptr = 0; - int src_fd, dst_fd; const int common_oflags = 0 #ifdef O_NOCTTY | O_NOCTTY @@ -11894,7 +11938,7 @@ copy_stream_body(VALUE arg) !(RB_TYPE_P(src_io, T_FILE) || RB_TYPE_P(src_io, T_STRING) || rb_respond_to(src_io, rb_intern("to_path")))) { - src_fd = -1; + stp->src_fptr = NULL; } else { int stat_ret; @@ -11911,24 +11955,22 @@ copy_stream_body(VALUE arg) stp->src = src_io; stp->close_src = 1; } - GetOpenFile(src_io, src_fptr); - rb_io_check_byte_readable(src_fptr); - src_fd = src_fptr->fd; + RB_IO_POINTER(src_io, stp->src_fptr); + rb_io_check_byte_readable(stp->src_fptr); - stat_ret = fstat(src_fd, &stp->src_stat); + stat_ret = fstat(stp->src_fptr->fd, &stp->src_stat); if (stat_ret < 0) { stp->syserr = "fstat"; stp->error_no = errno; return Qnil; } } - stp->src_fd = src_fd; if (dst_io == argf || !(RB_TYPE_P(dst_io, T_FILE) || RB_TYPE_P(dst_io, T_STRING) || rb_respond_to(dst_io, rb_intern("to_path")))) { - dst_fd = -1; + stp->dst_fptr = NULL; } else { int stat_ret; @@ -11950,38 +11992,36 @@ copy_stream_body(VALUE arg) dst_io = GetWriteIO(dst_io); stp->dst = dst_io; } - GetOpenFile(dst_io, dst_fptr); - rb_io_check_writable(dst_fptr); - dst_fd = dst_fptr->fd; + RB_IO_POINTER(dst_io, stp->dst_fptr); + rb_io_check_writable(stp->dst_fptr); - stat_ret = fstat(dst_fd, &stp->dst_stat); + stat_ret = fstat(stp->dst_fptr->fd, &stp->dst_stat); if (stat_ret < 0) { stp->syserr = "fstat"; stp->error_no = errno; return Qnil; } } - stp->dst_fd = dst_fd; #ifdef O_BINARY - if (src_fptr) - SET_BINARY_MODE_WITH_SEEK_CUR(src_fptr); + if (stp->src_fptr) + SET_BINARY_MODE_WITH_SEEK_CUR(stp->src_fptr); #endif - if (dst_fptr) - io_ascii8bit_binmode(dst_fptr); + if (stp->dst_fptr) + io_ascii8bit_binmode(stp->dst_fptr); - if (stp->src_offset < (off_t)0 && src_fptr && src_fptr->rbuf.len) { - size_t len = src_fptr->rbuf.len; + if (stp->src_offset < (off_t)0 && stp->src_fptr && stp->src_fptr->rbuf.len) { + size_t len = stp->src_fptr->rbuf.len; VALUE str; if (stp->copy_length >= (off_t)0 && stp->copy_length < (off_t)len) { len = (size_t)stp->copy_length; } str = rb_str_buf_new(len); rb_str_resize(str,len); - read_buffered_data(RSTRING_PTR(str), len, src_fptr); - if (dst_fptr) { /* IO or filename */ - if (io_binwrite(str, RSTRING_PTR(str), RSTRING_LEN(str), dst_fptr, 0) < 0) - rb_sys_fail_on_write(dst_fptr); + read_buffered_data(RSTRING_PTR(str), len, stp->src_fptr); + if (stp->dst_fptr) { /* IO or filename */ + if (io_binwrite(str, RSTRING_PTR(str), RSTRING_LEN(str), stp->dst_fptr, 0) < 0) + rb_sys_fail_on_write(stp->dst_fptr); } else /* others such as StringIO */ rb_io_write(dst_io, str); @@ -11991,14 +12031,14 @@ copy_stream_body(VALUE arg) stp->copy_length -= len; } - if (dst_fptr && io_fflush(dst_fptr) < 0) { + if (stp->dst_fptr && io_fflush(stp->dst_fptr) < 0) { rb_raise(rb_eIOError, "flush failed"); } if (stp->copy_length == 0) return Qnil; - if (src_fd < 0 || dst_fd < 0) { + if (stp->src_fptr == NULL || stp->dst_fptr == NULL) { return copy_stream_fallback(stp); } @@ -12077,6 +12117,9 @@ rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io) st.src = src; st.dst = dst; + st.src_fptr = NULL; + st.dst_fptr = NULL; + if (NIL_P(length)) st.copy_length = (off_t)-1; else @@ -13678,6 +13721,7 @@ Init_IO(void) id_flush = rb_intern_const("flush"); id_readpartial = rb_intern_const("readpartial"); id_set_encoding = rb_intern_const("set_encoding"); + id_fileno = rb_intern_const("fileno"); rb_define_global_function("syscall", rb_f_syscall, -1); diff --git a/io_buffer.c b/io_buffer.c new file mode 100644 index 0000000000..3455713f8d --- /dev/null +++ b/io_buffer.c @@ -0,0 +1,1024 @@ +/********************************************************************** + + io_buffer.c + + Copyright (C) 2021 Samuel Grant Dawson Williams + +**********************************************************************/ + +#include "ruby/io.h" +#include "ruby/io/buffer.h" + +#include "internal/string.h" +#include "internal/bits.h" + +VALUE rb_cIOBuffer; +size_t RUBY_IO_BUFFER_PAGE_SIZE; + +#ifdef _WIN32 +#else +#include +#include +#endif + +struct rb_io_buffer { + void *base; + size_t size; + enum rb_io_buffer_flags flags; + +#if defined(_WIN32) + HANDLE mapping; +#endif + + VALUE source; +}; + +static inline void* io_buffer_map_memory(size_t size) +{ +#if defined(_WIN32) + void * base = VirtualAlloc(0, size, MEM_COMMIT, PAGE_READWRITE); + + if (!base) { + rb_sys_fail("io_buffer_map_memory:VirtualAlloc"); + } +#else + void * base = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0); + + if (base == MAP_FAILED) { + rb_sys_fail("io_buffer_map_memory:mmap"); + } +#endif + + return base; +} + +static +void io_buffer_map_file(struct rb_io_buffer *data, int descriptor, size_t size, off_t offset, enum rb_io_buffer_flags flags) +{ +#if defined(_WIN32) + HANDLE file = (HANDLE)_get_osfhandle(descriptor); + if (!file) rb_sys_fail("io_buffer_map_descriptor:_get_osfhandle"); + + DWORD protect = PAGE_READONLY, access = FILE_MAP_READ; + + if (flags & RB_IO_BUFFER_IMMUTABLE) { + data->flags |= RB_IO_BUFFER_IMMUTABLE; + } else { + protect = PAGE_READWRITE; + access = FILE_MAP_WRITE; + } + + HANDLE mapping = CreateFileMapping(file, NULL, protect, 0, 0, NULL); + if (!mapping) rb_sys_fail("io_buffer_map_descriptor:CreateFileMapping"); + + if (flags & RB_IO_BUFFER_PRIVATE) { + access |= FILE_MAP_COPY; + data->flags |= RB_IO_BUFFER_PRIVATE; + } + + void *base = MapViewOfFile(mapping, access, (DWORD)(offset >> 32), (DWORD)(offset & 0xFFFFFFFF), size); + + if (!base) { + CloseHandle(mapping); + rb_sys_fail("io_buffer_map_file:MapViewOfFile"); + } + + data->mapping = mapping; +#else + int protect = PROT_READ, access = 0; + + if (flags & RB_IO_BUFFER_IMMUTABLE) { + data->flags |= RB_IO_BUFFER_IMMUTABLE; + } else { + protect |= PROT_WRITE; + } + + if (flags & RB_IO_BUFFER_PRIVATE) { + data->flags |= RB_IO_BUFFER_PRIVATE; + } else { + access |= MAP_SHARED; + } + + void *base = mmap(NULL, size, protect, access, descriptor, offset); + + if (base == MAP_FAILED) { + rb_sys_fail("io_buffer_map_file:mmap"); + } +#endif + + data->base = base; + data->size = size; + + data->flags |= RB_IO_BUFFER_MAPPED; +} + +static inline void io_buffer_unmap(void* base, size_t size) +{ +#ifdef _WIN32 + VirtualFree(base, 0, MEM_RELEASE); +#else + munmap(base, size); +#endif +} + +static void io_buffer_initialize(struct rb_io_buffer *data, void *base, size_t size, enum rb_io_buffer_flags flags, VALUE source) +{ + data->flags = flags; + data->size = size; + + if (base) { + data->base = base; + } else { + if (data->flags & RB_IO_BUFFER_INTERNAL) { + data->base = calloc(data->size, 1); + } else if (data->flags & RB_IO_BUFFER_MAPPED) { + data->base = io_buffer_map_memory(data->size); + } + } + + if (!data->base) { + rb_raise(rb_eRuntimeError, "Could not allocate buffer!"); + } + + data->source = source; +} + +static int io_buffer_free(struct rb_io_buffer *data) +{ + if (data->base) { + if (data->flags & RB_IO_BUFFER_INTERNAL) { + free(data->base); + } + + if (data->flags & RB_IO_BUFFER_MAPPED) { + io_buffer_unmap(data->base, data->size); + } + + data->base = NULL; + +#if defined(_WIN32) + if (data->mapping) { + CloseHandle(data->mapping); + data->mapping = NULL; + } +#endif + + return 1; + } + + return 0; +} + +void rb_io_buffer_type_mark(void *_data) +{ + struct rb_io_buffer *data = _data; + rb_gc_mark(data->source); +} + +void rb_io_buffer_type_free(void *_data) +{ + struct rb_io_buffer *data = _data; + + io_buffer_free(data); + + free(data); +} + +size_t rb_io_buffer_type_size(const void *_data) +{ + const struct rb_io_buffer *data = _data; + size_t total = sizeof(struct rb_io_buffer); + + if (data->flags) { + total += data->size; + } + + return total; +} + +static const rb_data_type_t rb_io_buffer_type = { + .wrap_struct_name = "IO::Buffer", + .function = { + .dmark = rb_io_buffer_type_mark, + .dfree = rb_io_buffer_type_free, + .dsize = rb_io_buffer_type_size, + }, + .data = NULL, + .flags = RUBY_TYPED_FREE_IMMEDIATELY, +}; + +VALUE rb_io_buffer_type_allocate(VALUE self) +{ + struct rb_io_buffer *data = NULL; + VALUE instance = TypedData_Make_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + data->base = NULL; + data->size = 0; + data->flags = 0; + data->source = Qnil; + + return instance; +} + +VALUE rb_io_buffer_new(void *base, size_t size, enum rb_io_buffer_flags flags) +{ + VALUE instance = rb_io_buffer_type_allocate(rb_cIOBuffer); + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(instance, struct rb_io_buffer, &rb_io_buffer_type, data); + + io_buffer_initialize(data, base, size, 0, Qnil); + + return instance; +} + +VALUE rb_io_buffer_map(VALUE io, size_t size, off_t offset, enum rb_io_buffer_flags flags) +{ + VALUE instance = rb_io_buffer_type_allocate(rb_cIOBuffer); + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(instance, struct rb_io_buffer, &rb_io_buffer_type, data); + + int descriptor = rb_io_descriptor(io); + + io_buffer_map_file(data, descriptor, size, offset, flags); + + return instance; +} + +static +VALUE io_buffer_map(int argc, VALUE *argv, VALUE klass) +{ + if (argc < 1 || argc > 4) { + rb_error_arity(argc, 2, 4); + } + + VALUE io = argv[0]; + + size_t size; + if (argc >= 2) { + size = RB_NUM2SIZE(argv[1]); + } else { + size = rb_file_size(io); + } + + off_t offset = 0; + if (argc >= 3) { + offset = NUM2OFFT(argv[2]); + } + + enum rb_io_buffer_flags flags = RB_IO_BUFFER_IMMUTABLE; + if (argc >= 4) { + flags = RB_NUM2UINT(argv[3]); + } + + return rb_io_buffer_map(io, size, offset, flags); +} + +VALUE rb_io_buffer_initialize(int argc, VALUE *argv, VALUE self) +{ + if (argc < 1 || argc > 2) { + rb_error_arity(argc, 1, 2); + } + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + size_t size = RB_NUM2SIZE(argv[0]); + + enum rb_io_buffer_flags flags = 0; + if (argc >= 2) { + flags = RB_NUM2UINT(argv[1]); + } else { + if (size > RUBY_IO_BUFFER_PAGE_SIZE) { + flags |= RB_IO_BUFFER_MAPPED; + } else { + flags |= RB_IO_BUFFER_INTERNAL; + } + } + + io_buffer_initialize(data, NULL, size, flags, Qnil); + + return self; +} + +static int io_buffer_validate_slice(VALUE source, void *base, size_t size) +{ + const void *source_base = NULL; + size_t source_size = 0; + + if (RB_TYPE_P(source, T_STRING)) { + RSTRING_GETMEM(source, source_base, source_size); + } else { + rb_io_buffer_get_immutable(source, &source_base, &source_size); + } + + // Source is invalid: + if (source_base == NULL) return 0; + + // Base is out of range: + if (base < source_base) return 0; + + const void *source_end = (char*)source_base + source_size; + const void *end = (char*)base + size; + + // End is out of range: + if (end > source_end) return 0; + + // It seems okay: + return 1; +} + +static int io_buffer_validate(struct rb_io_buffer *data) +{ + if (data->source != Qnil) { + // Only slices incur this overhead, unfortunately... better safe than sorry! + return io_buffer_validate_slice(data->source, data->base, data->size); + } else { + return 1; + } +} + +VALUE rb_io_buffer_to_s(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + VALUE result = rb_str_new_cstr("#<"); + + rb_str_append(result, rb_class_name(CLASS_OF(self))); + rb_str_catf(result, " %p+%ld", data->base, data->size); + + if (data->flags & RB_IO_BUFFER_INTERNAL) { + rb_str_cat2(result, " INTERNAL"); + } + + if (data->flags & RB_IO_BUFFER_MAPPED) { + rb_str_cat2(result, " MAPPED"); + } + + if (data->flags & RB_IO_BUFFER_LOCKED) { + rb_str_cat2(result, " LOCKED"); + } + + if (data->flags & RB_IO_BUFFER_IMMUTABLE) { + rb_str_cat2(result, " IMMUTABLE"); + } + + if (data->source != Qnil) { + rb_str_cat2(result, " SLICE"); + } + + if (!io_buffer_validate(data)) { + rb_str_cat2(result, " INVALID"); + } + + return rb_str_cat2(result, ">"); + +} + +static VALUE io_buffer_hexdump(VALUE string, size_t width, char *base, size_t size) +{ + char *text = alloca(width+1); + text[width] = '\0'; + + for (size_t offset = 0; offset < size; offset += width) { + memset(text, '\0', width); + rb_str_catf(string, "\n0x%08zx ", offset); + + for (size_t i = 0; i < width; i += 1) { + if (offset+i < size) { + unsigned char value = ((unsigned char*)base)[offset+i]; + + if (value < 127 && isprint(value)) { + text[i] = (char)value; + } else { + text[i] = '.'; + } + + rb_str_catf(string, " %02x", value); + } else { + rb_str_cat2(string, " "); + } + } + + rb_str_catf(string, " %s", text); + } + + rb_str_cat2(string, "\n"); + + return string; +} + +VALUE rb_io_buffer_inspect(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + VALUE result = rb_io_buffer_to_s(self); + + if (io_buffer_validate(data)) { + io_buffer_hexdump(result, 16, data->base, data->size); + } + + return result; +} + +VALUE rb_io_buffer_size(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + return SIZET2NUM(data->size); +} + +static VALUE rb_io_buffer_external_p(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + return data->flags & (RB_IO_BUFFER_INTERNAL | RB_IO_BUFFER_MAPPED) ? Qfalse : Qtrue; +} + +static VALUE rb_io_buffer_internal_p(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + return data->flags & RB_IO_BUFFER_INTERNAL ? Qtrue : Qfalse; +} + +static VALUE rb_io_buffer_mapped_p(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + return data->flags & RB_IO_BUFFER_MAPPED ? Qtrue : Qfalse; +} + +static VALUE rb_io_buffer_locked_p(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + return data->flags & RB_IO_BUFFER_LOCKED ? Qtrue : Qfalse; +} + +static VALUE rb_io_buffer_immutable_p(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + return data->flags & RB_IO_BUFFER_IMMUTABLE ? Qtrue : Qfalse; +} + +VALUE rb_io_buffer_lock(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (data->flags & RB_IO_BUFFER_LOCKED) { + rb_raise(rb_eRuntimeError, "Buffer already locked!"); + } + + data->flags |= RB_IO_BUFFER_LOCKED; + + return self; +} + +VALUE rb_io_buffer_unlock(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (!(data->flags & RB_IO_BUFFER_LOCKED)) { + rb_raise(rb_eRuntimeError, "Buffer not locked!"); + } + + data->flags &= ~RB_IO_BUFFER_LOCKED; + + return self; +} + +VALUE rb_io_buffer_locked(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (data->flags & RB_IO_BUFFER_LOCKED) { + rb_raise(rb_eRuntimeError, "Buffer already locked!"); + } + + data->flags |= RB_IO_BUFFER_LOCKED; + + VALUE result = rb_yield(self); + + data->flags &= ~RB_IO_BUFFER_LOCKED; + + return result; +} + +VALUE rb_io_buffer_free(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (data->flags & RB_IO_BUFFER_LOCKED) { + rb_raise(rb_eRuntimeError, "Buffer is locked!"); + } + + io_buffer_free(data); + + return self; +} + +static inline void rb_io_buffer_validate(struct rb_io_buffer *data, size_t offset, size_t length) +{ + if (offset + length > data->size) { + rb_raise(rb_eRuntimeError, "Specified offset + length exceeds source size!"); + } +} + +VALUE rb_io_buffer_slice(VALUE self, VALUE _offset, VALUE _length) +{ + // TODO fail on negative offets/lengths. + size_t offset = NUM2SIZET(_offset); + size_t length = NUM2SIZET(_length); + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + rb_io_buffer_validate(data, offset, length); + + VALUE instance = rb_io_buffer_type_allocate(rb_class_of(self)); + struct rb_io_buffer *slice = NULL; + TypedData_Get_Struct(instance, struct rb_io_buffer, &rb_io_buffer_type, slice); + + slice->base = (char*)data->base + offset; + slice->size = length; + + // The source should be the root buffer: + if (data->source != Qnil) + slice->source = data->source; + else + slice->source = self; + + return instance; +} + +VALUE rb_io_buffer_to_str(int argc, VALUE *argv, VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + size_t offset = 0; + size_t length = data->size; + + if (argc == 0) { + // Defaults. + } else if (argc == 1) { + offset = NUM2SIZET(argv[0]); + length = data->size - offset; + } else if (argc == 2) { + offset = NUM2SIZET(argv[0]); + length = NUM2SIZET(argv[1]); + } else { + rb_error_arity(argc, 0, 2); + } + + rb_io_buffer_validate(data, offset, length); + + return rb_usascii_str_new((char*)data->base + offset, length); +} + +void rb_io_buffer_get_mutable(VALUE self, void **base, size_t *size) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (data->flags & RB_IO_BUFFER_IMMUTABLE) { + rb_raise(rb_eRuntimeError, "Buffer is immutable!"); + } + + if (!io_buffer_validate(data)) { + rb_raise(rb_eRuntimeError, "Buffer has been invalidated!"); + } + + if (data && data->base) { + *base = data->base; + *size = data->size; + + return; + } + + rb_raise(rb_eRuntimeError, "Buffer is not allocated!"); +} + +void rb_io_buffer_get_immutable(VALUE self, const void **base, size_t *size) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (!io_buffer_validate(data)) { + rb_raise(rb_eRuntimeError, "Buffer has been invalidated!"); + } + + if (data && data->base) { + *base = data->base; + *size = data->size; + + return; + } + + rb_raise(rb_eRuntimeError, "Buffer is not allocated!"); +} + +size_t rb_io_buffer_copy(VALUE self, VALUE source, size_t offset) +{ + const void *source_base = NULL; + size_t source_size = 0; + + if (RB_TYPE_P(source, T_STRING)) { + RSTRING_GETMEM(source, source_base, source_size); + } else { + rb_io_buffer_get_immutable(source, &source_base, &source_size); + } + + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + rb_io_buffer_validate(data, offset, source_size); + + memcpy((char*)data->base + offset, source_base, source_size); + + return source_size; +} + +static VALUE io_buffer_copy(VALUE self, VALUE source, VALUE offset) +{ + size_t size = rb_io_buffer_copy(self, source, NUM2SIZET(offset)); + + return RB_SIZE2NUM(size); +} + +static int io_buffer_external_p(enum rb_io_buffer_flags flags) +{ + return !(flags & (RB_IO_BUFFER_INTERNAL | RB_IO_BUFFER_MAPPED)); +} + +void rb_io_buffer_resize(VALUE self, size_t size, size_t preserve) +{ + struct rb_io_buffer *data = NULL, updated; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (preserve > data->size) { + rb_raise(rb_eRuntimeError, "Preservation size bigger than buffer size!"); + } + + if (preserve > size) { + rb_raise(rb_eRuntimeError, "Preservation size bigger than destination size!"); + } + + if (data->flags & RB_IO_BUFFER_LOCKED) { + rb_raise(rb_eRuntimeError, "Cannot resize locked buffer!"); + } + + // By virtue of this passing, we don't need to do any further validation on the buffer: + if (io_buffer_external_p(data->flags)) { + rb_raise(rb_eRuntimeError, "Cannot resize external buffer!"); + } + + io_buffer_initialize(&updated, NULL, size, data->flags, data->source); + + if (data->base && preserve > 0) { + memcpy(updated.base, data->base, preserve); + } + + io_buffer_free(data); + *data = updated; +} + +static +VALUE rb_io_buffer_compare(VALUE self, VALUE other) +{ + const void *ptr1, *ptr2; + size_t size1, size2; + + rb_io_buffer_get_immutable(self, &ptr1, &size1); + rb_io_buffer_get_immutable(other, &ptr2, &size2); + + if (size1 < size2) { + return RB_INT2NUM(-1); + } + + if (size1 > size2) { + return RB_INT2NUM(1); + } + + return RB_INT2NUM(memcmp(ptr1, ptr2, size1)); +} + +static VALUE io_buffer_resize(VALUE self, VALUE size, VALUE preserve) +{ + rb_io_buffer_resize(self, NUM2SIZET(size), NUM2SIZET(preserve)); + + return self; +} + +static void io_buffer_validate_type(size_t size, size_t offset) { + if (offset > size) { + rb_raise(rb_eRuntimeError, "Type extends beyond end of buffer!"); + } +} + +// Lower case: little endian. +// Upper case: big endian (network endian). +// +// :U8 | unsigned 8-bit integer. +// :S8 | signed 8-bit integer. +// +// :u16, :U16 | unsigned 16-bit integer. +// :s16, :S16 | signed 16-bit integer. +// +// :u32, :U32 | unsigned 32-bit integer. +// :s32, :S32 | signed 32-bit integer. +// +// :u64, :U64 | unsigned 64-bit integer. +// :s64, :S64 | signed 64-bit integer. +// +// :f32, :F32 | 32-bit floating point number. +// :f64, :F64 | 64-bit floating point number. + +#define ruby_swap8(value) value + +union swapf32 { + uint32_t integral; + float value; +}; + +static float ruby_swapf32(float value) +{ + union swapf32 swap = {.value = value}; + swap.integral = ruby_swap32(swap.integral); + return swap.value; +} + +union swapf64 { + uint64_t integral; + double value; +}; + +static double ruby_swapf64(double value) +{ + union swapf64 swap = {.value = value}; + swap.integral = ruby_swap64(swap.integral); + return swap.value; +} + +#define DECLAIR_TYPE(name, type, endian, wrap, unwrap, swap) \ +static ID RB_IO_BUFFER_TYPE_##name; \ +\ +static VALUE io_buffer_read_##name(const void* base, size_t size, size_t *offset) \ +{ \ + io_buffer_validate_type(size, *offset + sizeof(type)); \ + type value; \ + memcpy(&value, (char*)base + *offset, sizeof(type)); \ + if (endian != RB_IO_BUFFER_HOST_ENDIAN) value = swap(value); \ + *offset += sizeof(type); \ + return wrap(value); \ +} \ +\ +static void io_buffer_write_##name(const void* base, size_t size, size_t *offset, VALUE _value) \ +{ \ + io_buffer_validate_type(size, *offset + sizeof(type)); \ + type value = unwrap(_value); \ + if (endian != RB_IO_BUFFER_HOST_ENDIAN) value = swap(value); \ + memcpy((char*)base + *offset, &value, sizeof(type)); \ + *offset += sizeof(type); \ +} + +DECLAIR_TYPE(U8, uint8_t, RB_IO_BUFFER_BIG_ENDIAN, RB_UINT2NUM, RB_NUM2UINT, ruby_swap8) +DECLAIR_TYPE(S8, int8_t, RB_IO_BUFFER_BIG_ENDIAN, RB_INT2NUM, RB_NUM2INT, ruby_swap8) + +DECLAIR_TYPE(u16, uint16_t, RB_IO_BUFFER_LITTLE_ENDIAN, RB_UINT2NUM, RB_NUM2UINT, ruby_swap16) +DECLAIR_TYPE(U16, uint16_t, RB_IO_BUFFER_BIG_ENDIAN, RB_UINT2NUM, RB_NUM2UINT, ruby_swap16) +DECLAIR_TYPE(s16, int16_t, RB_IO_BUFFER_LITTLE_ENDIAN, RB_INT2NUM, RB_NUM2INT, ruby_swap16) +DECLAIR_TYPE(S16, int16_t, RB_IO_BUFFER_BIG_ENDIAN, RB_INT2NUM, RB_NUM2INT, ruby_swap16) + +DECLAIR_TYPE(u32, uint32_t, RB_IO_BUFFER_LITTLE_ENDIAN, RB_UINT2NUM, RB_NUM2UINT, ruby_swap32) +DECLAIR_TYPE(U32, uint32_t, RB_IO_BUFFER_BIG_ENDIAN, RB_UINT2NUM, RB_NUM2UINT, ruby_swap32) +DECLAIR_TYPE(s32, int32_t, RB_IO_BUFFER_LITTLE_ENDIAN, RB_INT2NUM, RB_NUM2INT, ruby_swap32) +DECLAIR_TYPE(S32, int32_t, RB_IO_BUFFER_BIG_ENDIAN, RB_INT2NUM, RB_NUM2INT, ruby_swap32) + +DECLAIR_TYPE(u64, uint64_t, RB_IO_BUFFER_LITTLE_ENDIAN, RB_ULONG2NUM, RB_NUM2ULONG, ruby_swap64) +DECLAIR_TYPE(U64, uint64_t, RB_IO_BUFFER_BIG_ENDIAN, RB_ULONG2NUM, RB_NUM2ULONG, ruby_swap64) +DECLAIR_TYPE(s64, int64_t, RB_IO_BUFFER_LITTLE_ENDIAN, RB_LONG2NUM, RB_NUM2LONG, ruby_swap64) +DECLAIR_TYPE(S64, int64_t, RB_IO_BUFFER_BIG_ENDIAN, RB_LONG2NUM, RB_NUM2LONG, ruby_swap64) + +DECLAIR_TYPE(f32, float, RB_IO_BUFFER_LITTLE_ENDIAN, DBL2NUM, NUM2DBL, ruby_swapf32) +DECLAIR_TYPE(F32, float, RB_IO_BUFFER_BIG_ENDIAN, DBL2NUM, NUM2DBL, ruby_swapf32) +DECLAIR_TYPE(f64, double, RB_IO_BUFFER_LITTLE_ENDIAN, DBL2NUM, NUM2DBL, ruby_swapf64) +DECLAIR_TYPE(F64, double, RB_IO_BUFFER_BIG_ENDIAN, DBL2NUM, NUM2DBL, ruby_swapf64) +#undef DECLAIR_TYPE + +VALUE rb_io_buffer_get(const void* base, size_t size, ID type, size_t offset) +{ +#define READ_TYPE(name) if (type == RB_IO_BUFFER_TYPE_##name) return io_buffer_read_##name(base, size, &offset); + READ_TYPE(U8) + READ_TYPE(S8) + + READ_TYPE(u16) + READ_TYPE(U16) + READ_TYPE(s16) + READ_TYPE(S16) + + READ_TYPE(u32) + READ_TYPE(U32) + READ_TYPE(s32) + READ_TYPE(S32) + + READ_TYPE(u64) + READ_TYPE(U64) + READ_TYPE(s64) + READ_TYPE(S64) + + READ_TYPE(f32) + READ_TYPE(F32) + READ_TYPE(f64) + READ_TYPE(F64) +#undef READ_TYPE + + rb_raise(rb_eArgError, "Invalid type name!"); +} + +static VALUE io_buffer_get(VALUE self, VALUE type, VALUE _offset) +{ + const void *base; + size_t size; + size_t offset = NUM2SIZET(_offset); + + rb_io_buffer_get_immutable(self, &base, &size); + + return rb_io_buffer_get(base, size, RB_SYM2ID(type), offset); +} + +void rb_io_buffer_set(const void* base, size_t size, ID type, size_t offset, VALUE value) +{ +#define WRITE_TYPE(name) if (type == RB_IO_BUFFER_TYPE_##name) {io_buffer_write_##name(base, size, &offset, value); return;} + WRITE_TYPE(U8) + WRITE_TYPE(S8) + + WRITE_TYPE(u16) + WRITE_TYPE(U16) + WRITE_TYPE(s16) + WRITE_TYPE(S16) + + WRITE_TYPE(u32) + WRITE_TYPE(U32) + WRITE_TYPE(s32) + WRITE_TYPE(S32) + + WRITE_TYPE(u64) + WRITE_TYPE(U64) + WRITE_TYPE(s64) + WRITE_TYPE(S64) + + WRITE_TYPE(f32) + WRITE_TYPE(F32) + WRITE_TYPE(f64) + WRITE_TYPE(F64) +#undef WRITE_TYPE + + rb_raise(rb_eArgError, "Invalid type name!"); +} + +static VALUE io_buffer_set(VALUE self, VALUE type, VALUE _offset, VALUE value) +{ + void *base; + size_t size; + size_t offset = NUM2SIZET(_offset); + + rb_io_buffer_get_mutable(self, &base, &size); + + rb_io_buffer_set(base, size, RB_SYM2ID(type), offset, value); + + return SIZET2NUM(offset); +} + +void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length) +{ + void *base; + size_t size; + + rb_io_buffer_get_mutable(self, &base, &size); + + if (offset + length > size) { + rb_raise(rb_eRuntimeError, "Offset + length out of bounds!"); + } + + memset((char*)base + offset, value, length); +} + +static +VALUE io_buffer_clear(int argc, VALUE *argv, VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + if (argc > 3) { + rb_error_arity(argc, 0, 3); + } + + uint8_t value = 0; + if (argc >= 1) { + value = NUM2UINT(argv[0]); + } + + size_t offset = 0; + if (argc >= 2) { + offset = NUM2SIZET(argv[1]); + } + + size_t length = data->size; + if (argc >= 3) { + length = NUM2SIZET(argv[2]); + } + + rb_io_buffer_clear(self, value, offset, length); + + return self; +} + +void Init_IO_Buffer(void) +{ + rb_cIOBuffer = rb_define_class_under(rb_cIO, "Buffer", rb_cObject); + + rb_define_alloc_func(rb_cIOBuffer, rb_io_buffer_type_allocate); + +#ifdef _WIN32 + SYSTEM_INFO info; + GetSystemInfo(&info); + RUBY_IO_BUFFER_PAGE_SIZE = info.dwPageSize; +#else /* not WIN32 */ + RUBY_IO_BUFFER_PAGE_SIZE = sysconf(_SC_PAGESIZE); +#endif + + // Efficient sicing of mapped buffers: + rb_define_const(rb_cIOBuffer, "PAGE_SIZE", SIZET2NUM(RUBY_IO_BUFFER_PAGE_SIZE)); + + rb_define_singleton_method(rb_cIOBuffer, "map", io_buffer_map, -1); + + // General use: + rb_define_method(rb_cIOBuffer, "initialize", rb_io_buffer_initialize, -1); + rb_define_method(rb_cIOBuffer, "inspect", rb_io_buffer_inspect, 0); + rb_define_method(rb_cIOBuffer, "to_s", rb_io_buffer_to_s, 0); + rb_define_method(rb_cIOBuffer, "size", rb_io_buffer_size, 0); + + // Flags: + rb_define_const(rb_cIOBuffer, "EXTERNAL", RB_INT2NUM(RB_IO_BUFFER_EXTERNAL)); + rb_define_const(rb_cIOBuffer, "INTERNAL", RB_INT2NUM(RB_IO_BUFFER_INTERNAL)); + rb_define_const(rb_cIOBuffer, "MAPPED", RB_INT2NUM(RB_IO_BUFFER_MAPPED)); + rb_define_const(rb_cIOBuffer, "LOCKED", RB_INT2NUM(RB_IO_BUFFER_LOCKED)); + rb_define_const(rb_cIOBuffer, "PRIVATE", RB_INT2NUM(RB_IO_BUFFER_PRIVATE)); + rb_define_const(rb_cIOBuffer, "IMMUTABLE", RB_INT2NUM(RB_IO_BUFFER_IMMUTABLE)); + + // Endian: + rb_define_const(rb_cIOBuffer, "LITTLE_ENDIAN", RB_INT2NUM(RB_IO_BUFFER_LITTLE_ENDIAN)); + rb_define_const(rb_cIOBuffer, "BIG_ENDIAN", RB_INT2NUM(RB_IO_BUFFER_BIG_ENDIAN)); + rb_define_const(rb_cIOBuffer, "HOST_ENDIAN", RB_INT2NUM(RB_IO_BUFFER_HOST_ENDIAN)); + rb_define_const(rb_cIOBuffer, "NETWORK_ENDIAN", RB_INT2NUM(RB_IO_BUFFER_NETWORK_ENDIAN)); + + rb_define_method(rb_cIOBuffer, "external?", rb_io_buffer_external_p, 0); + rb_define_method(rb_cIOBuffer, "internal?", rb_io_buffer_internal_p, 0); + rb_define_method(rb_cIOBuffer, "mapped?", rb_io_buffer_mapped_p, 0); + rb_define_method(rb_cIOBuffer, "locked?", rb_io_buffer_locked_p, 0); + rb_define_method(rb_cIOBuffer, "immutable?", rb_io_buffer_immutable_p, 0); + + // Locking to prevent changes while using pointer: + // rb_define_method(rb_cIOBuffer, "lock", rb_io_buffer_lock, 0); + // rb_define_method(rb_cIOBuffer, "unlock", rb_io_buffer_unlock, 0); + rb_define_method(rb_cIOBuffer, "locked", rb_io_buffer_locked, 0); + + // Manipulation: + rb_define_method(rb_cIOBuffer, "slice", rb_io_buffer_slice, 2); + rb_define_method(rb_cIOBuffer, "to_str", rb_io_buffer_to_str, -1); + rb_define_method(rb_cIOBuffer, "copy", io_buffer_copy, 2); + rb_define_method(rb_cIOBuffer, "<=>", rb_io_buffer_compare, 1); + rb_define_method(rb_cIOBuffer, "resize", io_buffer_resize, 2); + rb_define_method(rb_cIOBuffer, "clear", io_buffer_clear, -1); + rb_define_method(rb_cIOBuffer, "free", rb_io_buffer_free, 0); + + rb_include_module(rb_cIOBuffer, rb_mComparable); + +#define DEFINE_TYPE(name) RB_IO_BUFFER_TYPE_##name = rb_intern_const(#name) + DEFINE_TYPE(U8); DEFINE_TYPE(S8); + DEFINE_TYPE(u16); DEFINE_TYPE(U16); DEFINE_TYPE(s16); DEFINE_TYPE(S16); + DEFINE_TYPE(u32); DEFINE_TYPE(U32); DEFINE_TYPE(s32); DEFINE_TYPE(S32); + DEFINE_TYPE(u64); DEFINE_TYPE(U64); DEFINE_TYPE(s64); DEFINE_TYPE(S64); + DEFINE_TYPE(f32); DEFINE_TYPE(F32); DEFINE_TYPE(f64); DEFINE_TYPE(F64); +#undef DEFINE_TYPE + + // Data access: + rb_define_method(rb_cIOBuffer, "get", io_buffer_get, 2); + rb_define_method(rb_cIOBuffer, "set", io_buffer_set, 3); +} diff --git a/scheduler.c b/scheduler.c index 127d63383b..91abeb82e3 100644 --- a/scheduler.c +++ b/scheduler.c @@ -11,6 +11,8 @@ #include "vm_core.h" #include "ruby/fiber/scheduler.h" #include "ruby/io.h" +#include "ruby/io/buffer.h" + #include "internal/thread.h" static ID id_close; @@ -26,6 +28,7 @@ static ID id_process_wait; static ID id_io_read; static ID id_io_write; static ID id_io_wait; +static ID id_io_close; static ID id_address_resolve; @@ -45,6 +48,7 @@ Init_Fiber_Scheduler(void) id_io_read = rb_intern_const("io_read"); id_io_write = rb_intern_const("io_write"); id_io_wait = rb_intern_const("io_wait"); + id_io_close = rb_intern_const("io_close"); id_address_resolve = rb_intern_const("address_resolve"); } @@ -225,24 +229,55 @@ rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io) } VALUE -rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length) +rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length) { VALUE arguments[] = { - io, buffer, SIZET2NUM(offset), SIZET2NUM(length) + io, buffer, SIZET2NUM(length) }; - return rb_check_funcall(scheduler, id_io_read, 4, arguments); + return rb_check_funcall(scheduler, id_io_read, 3, arguments); } VALUE -rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length) +rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length) { VALUE arguments[] = { - io, buffer, SIZET2NUM(offset), SIZET2NUM(length) + io, buffer, SIZET2NUM(length) }; - // We should ensure string has capacity to receive data, and then resize it afterwards. - return rb_check_funcall(scheduler, id_io_write, 4, arguments); + return rb_check_funcall(scheduler, id_io_write, 3, arguments); +} + +VALUE +rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length) +{ + VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED); + + VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length); + + rb_io_buffer_free(buffer); + + return result; +} + +VALUE +rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length) +{ + VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_IMMUTABLE); + + VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length); + + rb_io_buffer_free(buffer); + + return result; +} + +VALUE +rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io) +{ + VALUE arguments[] = {io}; + + return rb_check_funcall(scheduler, id_io_close, 1, arguments); } VALUE diff --git a/test/ruby/test_io_buffer.rb b/test/ruby/test_io_buffer.rb new file mode 100644 index 0000000000..ec10d2e1e6 --- /dev/null +++ b/test/ruby/test_io_buffer.rb @@ -0,0 +1,131 @@ +# frozen_string_literal: false + +class TestIOBuffer < Test::Unit::TestCase + def assert_negative(value) + assert(value < 0, "Expected #{value} to be negative!") + end + + def assert_positive(value) + assert(value > 0, "Expected #{value} to be positive!") + end + + def test_flags + assert_equal 0, IO::Buffer::EXTERNAL + assert_equal 1, IO::Buffer::INTERNAL + assert_equal 2, IO::Buffer::MAPPED + + assert_equal 16, IO::Buffer::LOCKED + assert_equal 32, IO::Buffer::PRIVATE + + assert_equal 64, IO::Buffer::IMMUTABLE + end + + def test_endian + assert_equal 4, IO::Buffer::LITTLE_ENDIAN + assert_equal 8, IO::Buffer::BIG_ENDIAN + assert_equal 8, IO::Buffer::NETWORK_ENDIAN + + assert_include [IO::Buffer::LITTLE_ENDIAN, IO::Buffer::BIG_ENDIAN], IO::Buffer::HOST_ENDIAN + end + + def test_new_internal + buffer = IO::Buffer.new(1024, IO::Buffer::INTERNAL) + assert_equal 1024, buffer.size + refute buffer.external? + assert buffer.internal? + refute buffer.mapped? + end + + def test_new_mapped + buffer = IO::Buffer.new(1024, IO::Buffer::MAPPED) + assert_equal 1024, buffer.size + refute buffer.external? + refute buffer.internal? + assert buffer.mapped? + end + + def test_file_mapped + buffer = File.open(__FILE__) {|file| IO::Buffer.map(file)} + assert_include buffer.to_str, "Hello World" + end + + def test_resize + buffer = IO::Buffer.new(1024, IO::Buffer::MAPPED) + buffer.resize(2048, 0) + assert_equal 2048, buffer.size + end + + def test_resize_preserve + message = "Hello World" + buffer = IO::Buffer.new(1024, IO::Buffer::MAPPED) + buffer.copy(message, 0) + buffer.resize(2048, 1024) + assert_equal message, buffer.to_str(0, message.bytesize) + end + + def test_compare_same_size + buffer1 = IO::Buffer.new(1) + assert_equal buffer1, buffer1 + + buffer2 = IO::Buffer.new(1) + buffer1.set(:U8, 0, 0x10) + buffer2.set(:U8, 0, 0x20) + + assert_negative buffer1 <=> buffer2 + assert_positive buffer2 <=> buffer1 + end + + def test_compare_different_size + buffer1 = IO::Buffer.new(3) + buffer2 = IO::Buffer.new(5) + + assert_negative buffer1 <=> buffer2 + assert_positive buffer2 <=> buffer1 + end + + def test_slice + buffer = IO::Buffer.new(128) + slice = buffer.slice(8, 32) + slice.copy("Hello World", 0) + assert_equal("Hello World", buffer.to_str(8, 11)) + end + + def test_slice_bounds + buffer = IO::Buffer.new(128) + + # What is best exception class? + assert_raise RuntimeError do + buffer.slice(128, 10) + end + + # assert_raise RuntimeError do + # pp buffer.slice(-10, 10) + # end + end + + def test_invalidation + input, output = IO.pipe + + # (1) rb_write_internal creates IO::Buffer object, + buffer = IO::Buffer.new(128) + + # (2) it is passed to (malicious) scheduler + # (3) scheduler starts a thread which call system call with the buffer object + thread = Thread.new{buffer.locked{input.read}} + + Thread.pass until thread.stop? + + # (4) scheduler returns + # (5) rb_write_internal invalidate the buffer object + assert_raise RuntimeError do + buffer.free + end + + # (6) the system call access the memory area after invalidation + output.write("Hello World") + output.close + thread.join + + input.close + end +end