1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

IO::Buffer for scheduler interface.

This commit is contained in:
Samuel Williams 2021-07-02 22:41:16 +12:00
parent 56b90cf944
commit 4b89034218
Notes: git 2021-11-10 15:21:29 +09:00
14 changed files with 1747 additions and 177 deletions

View file

@ -204,6 +204,9 @@ Outstanding ones only.
* Introduce non-blocking `Timeout.timeout` using `timeout_after` hook.
[[Feature #17470]]
* Introduce new scheduler hooks `io_read` and `io_write` along with a
low level `IO::Buffer` for zero-copy read/write. [[Feature #18020]]
* IO hooks `io_wait`, `io_read`, `io_write`, receive the original IO object
where possible. [[Bug #18003]]
@ -424,9 +427,11 @@ See [the repository](https://github.com/ruby/error_highlight) in detail.
[Bug #18003]: https://bugs.ruby-lang.org/issues/18003
[Feature #18008]: https://bugs.ruby-lang.org/issues/18008
[Feature #18015]: https://bugs.ruby-lang.org/issues/18015
[Feature #18020]: https://bugs.ruby-lang.org/issues/18020
[Feature #18029]: https://bugs.ruby-lang.org/issues/18029
[Feature #18172]: https://bugs.ruby-lang.org/issues/18172
[Feature #18229]: https://bugs.ruby-lang.org/issues/18229
[Feature #18290]: https://bugs.ruby-lang.org/issues/18290
[GH-1509]: https://github.com/ruby/ruby/pull/1509
[GH-4815]: https://github.com/ruby/ruby/pull/4815

9
benchmark/buffer_get.yml Normal file
View file

@ -0,0 +1,9 @@
benchmark:
- name: buffer.get
prelude: buffer = IO::Buffer.new(32, IO::Buffer::MAPPED)
script: buffer.get(:U32, 0)
loop_count: 20000000
- name: string.unpack
prelude: string = "\0" * 32
script: string.unpack("C")
loop_count: 20000000

178
common.mk
View file

@ -104,6 +104,7 @@ COMMONOBJS = array.$(OBJEXT) \
hash.$(OBJEXT) \
inits.$(OBJEXT) \
io.$(OBJEXT) \
io_buffer.$(OBJEXT) \
iseq.$(OBJEXT) \
load.$(OBJEXT) \
marshal.$(OBJEXT) \
@ -6972,6 +6973,7 @@ io.$(OBJEXT): {$(VPATH)}internal/xmalloc.h
io.$(OBJEXT): {$(VPATH)}io.c
io.$(OBJEXT): {$(VPATH)}io.h
io.$(OBJEXT): {$(VPATH)}io.rbinc
io.$(OBJEXT): {$(VPATH)}io/buffer.h
io.$(OBJEXT): {$(VPATH)}method.h
io.$(OBJEXT): {$(VPATH)}missing.h
io.$(OBJEXT): {$(VPATH)}node.h
@ -6988,6 +6990,181 @@ io.$(OBJEXT): {$(VPATH)}thread_native.h
io.$(OBJEXT): {$(VPATH)}util.h
io.$(OBJEXT): {$(VPATH)}vm_core.h
io.$(OBJEXT): {$(VPATH)}vm_opts.h
io_buffer.$(OBJEXT): $(hdrdir)/ruby/ruby.h
io_buffer.$(OBJEXT): $(top_srcdir)/internal/bits.h
io_buffer.$(OBJEXT): $(top_srcdir)/internal/compilers.h
io_buffer.$(OBJEXT): $(top_srcdir)/internal/static_assert.h
io_buffer.$(OBJEXT): $(top_srcdir)/internal/string.h
io_buffer.$(OBJEXT): {$(VPATH)}assert.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/assume.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/attributes.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/bool.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/gcc_version_since.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/inttypes.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/limits.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/long_long.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdalign.h
io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h
io_buffer.$(OBJEXT): {$(VPATH)}config.h
io_buffer.$(OBJEXT): {$(VPATH)}defines.h
io_buffer.$(OBJEXT): {$(VPATH)}encoding.h
io_buffer.$(OBJEXT): {$(VPATH)}intern.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/anyargs.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/char.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/double.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/fixnum.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/gid_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/int.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/intptr_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/long.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/long_long.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/mode_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/off_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/pid_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/short.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/size_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/st_data_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/arithmetic/uid_t.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/assume.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/alloc_size.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/artificial.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/cold.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/const.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/constexpr.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/deprecated.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/diagnose_if.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/enum_extensibility.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/error.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/flag_enum.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/forceinline.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/format.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/maybe_unused.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noalias.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/nodiscard.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noexcept.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noinline.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/nonnull.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/noreturn.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/pure.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/restrict.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/returns_nonnull.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/warning.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/attr/weakref.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/cast.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/apple.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/clang.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/gcc.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/intel.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/msvc.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_is/sunpro.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/compiler_since.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/config.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/constant_p.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rarray.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rbasic.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rbignum.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rclass.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rdata.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rfile.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rhash.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/robject.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rregexp.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rstring.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rstruct.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/core/rtypeddata.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/ctype.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/dllexport.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/dosish.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/coderange.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/ctype.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/encoding.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/pathname.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/re.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/sprintf.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/string.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/symbol.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/encoding/transcode.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/error.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/eval.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/event.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/fl_type.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/gc.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/glob.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/globals.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/attribute.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/builtin.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/c_attribute.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/cpp_attribute.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/declspec_attribute.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/extension.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/feature.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/has/warning.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/array.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/bignum.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/class.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/compar.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/complex.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/cont.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/dir.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/enum.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/enumerator.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/error.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/eval.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/file.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/gc.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/hash.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/io.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/load.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/marshal.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/numeric.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/object.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/parse.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/proc.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/process.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/random.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/range.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/rational.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/re.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/ruby.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/select.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/select/largesize.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/signal.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/sprintf.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/string.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/struct.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/thread.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/time.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/variable.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/intern/vm.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/interpreter.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/iterator.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/memory.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/method.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/module.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/newobj.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/rgengc.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/scan_args.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/special_consts.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/static_assert.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/stdalign.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/stdbool.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/symbol.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/value.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/value_type.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/variable.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/warning_push.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/xmalloc.h
io_buffer.$(OBJEXT): {$(VPATH)}io.h
io_buffer.$(OBJEXT): {$(VPATH)}io/buffer.h
io_buffer.$(OBJEXT): {$(VPATH)}io_buffer.c
io_buffer.$(OBJEXT): {$(VPATH)}missing.h
io_buffer.$(OBJEXT): {$(VPATH)}onigmo.h
io_buffer.$(OBJEXT): {$(VPATH)}oniguruma.h
io_buffer.$(OBJEXT): {$(VPATH)}st.h
io_buffer.$(OBJEXT): {$(VPATH)}subst.h
iseq.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h
iseq.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h
iseq.$(OBJEXT): $(CCAN_DIR)/list/list.h
@ -12922,6 +13099,7 @@ scheduler.$(OBJEXT): {$(VPATH)}internal/variable.h
scheduler.$(OBJEXT): {$(VPATH)}internal/warning_push.h
scheduler.$(OBJEXT): {$(VPATH)}internal/xmalloc.h
scheduler.$(OBJEXT): {$(VPATH)}io.h
scheduler.$(OBJEXT): {$(VPATH)}io/buffer.h
scheduler.$(OBJEXT): {$(VPATH)}method.h
scheduler.$(OBJEXT): {$(VPATH)}missing.h
scheduler.$(OBJEXT): {$(VPATH)}node.h

View file

@ -48,6 +48,14 @@ When the thread exits, there is an implicit call to `set_scheduler`:
Fiber.set_scheduler(nil)
```
### Design
The scheduler interface is designed to be a un-opinionated light-weight layer
between user code and blocking operations. The scheduler hooks should avoid
translating or converting arguments or return values. Ideally, the exact same
arguments from the user code are provided directly to the scheduler hook with
no changes.
### Interface
This is the interface you need to implement.
@ -65,7 +73,7 @@ class Scheduler
end.value
end
# Wait for the given file descriptor to match the specified events within
# Wait for the given io readiness to match the specified events within
# the specified timeout.
# @parameter event [Integer] A bit mask of `IO::READABLE`,
# `IO::WRITABLE` and `IO::PRIORITY`.
@ -74,6 +82,20 @@ class Scheduler
def io_wait(io, events, timeout)
end
# Read from the given io into the specified buffer.
# @parameter io [IO] The io to read from.
# @parameter buffer [IO::Buffer] The buffer to read into.
# @parameter length [Integer] The minimum amount to read.
def io_read(io, buffer, length)
end
# Write from the given buffer into the specified IO.
# @parameter io [IO] The io to write to.
# @parameter buffer [IO::Buffer] The buffer to write from.
# @parameter length [Integer] The minimum amount to write.
def io_write(io, buffer, length)
end
# Sleep the current task for the specified duration, or forever if not
# specified.
# @parameter duration [Numeric] The amount of time to sleep in seconds.

21
file.c
View file

@ -2515,20 +2515,27 @@ rb_file_birthtime(VALUE obj)
*
*/
static VALUE
rb_file_size(VALUE obj)
size_t rb_file_size(VALUE file)
{
rb_io_t *fptr;
struct stat st;
GetOpenFile(obj, fptr);
RB_IO_POINTER(file, fptr);
if (fptr->mode & FMODE_WRITABLE) {
rb_io_flush_raw(obj, 0);
rb_io_flush_raw(file, 0);
}
if (fstat(fptr->fd, &st) == -1) {
rb_sys_fail_path(fptr->pathv);
rb_sys_fail_path(fptr->pathv);
}
return OFFT2NUM(st.st_size);
return st.st_size;
}
static VALUE
file_size(VALUE self)
{
return RB_SIZE2NUM(rb_file_size(self));
}
static int
@ -6780,7 +6787,7 @@ Init_File(void)
rb_define_method(rb_cFile, "mtime", rb_file_mtime, 0);
rb_define_method(rb_cFile, "ctime", rb_file_ctime, 0);
rb_define_method(rb_cFile, "birthtime", rb_file_birthtime, 0);
rb_define_method(rb_cFile, "size", rb_file_size, 0);
rb_define_method(rb_cFile, "size", file_size, 0);
rb_define_method(rb_cFile, "chmod", rb_file_chmod, 1);
rb_define_method(rb_cFile, "chown", rb_file_chown, 2);

View file

@ -193,12 +193,11 @@ VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[out] buffer Return buffer.
* @param[in] offset Offset inside of `buffer`.
* @param[in] length Requested number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
*/
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length);
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
/**
* Nonblocking write to the passed IO.
@ -206,12 +205,45 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] buffer What to write.
* @param[in] offset Offset inside of `buffer`.
* @param[in] length Number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
*/
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length);
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
/**
* Nonblocking read from the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[out] buffer Return buffer.
* @param[in] length Requested number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
*/
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length);
/**
* Nonblocking write to the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] buffer What to write.
* @param[in] length Number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
*/
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length);
/**
* Nonblocking close the given IO.
*
* @param[in] scheduler Target scheduler.
* @param[in] io An io object to close.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_close`.
* @return otherwise What `scheduler.io_close` returns.
*/
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io);
/**
* Nonblocking DNS lookup.

View file

@ -187,6 +187,8 @@ RBIMPL_ATTR_PURE()
*/
int rb_is_absolute_path(const char *path);
size_t rb_file_size(VALUE file);
RBIMPL_SYMBOL_EXPORT_END()
#endif /* RBIMPL_INTERN_FILE_H */

View file

@ -670,6 +670,15 @@ VALUE rb_io_set_write_io(VALUE io, VALUE w);
*/
void rb_io_set_nonblock(rb_io_t *fptr);
/**
* Returns an integer representing the numeric file descriptor for
* <em>io</em>.
*
* @param[in] io An IO.
* @retval int A file descriptor.
*/
int rb_io_descriptor(VALUE io);
/**
* This function breaks down the option hash that `IO#initialize` takes into
* components. This is an implementation detail of rb_io_extract_modeenc()

71
include/ruby/io/buffer.h Normal file
View file

@ -0,0 +1,71 @@
/**
* @file
* @author Samuel Williams
* @date Fri 2 Jul 2021 16:29:01 NZST
* @copyright Copyright (C) 2021 Samuel Williams
* @copyright This file is a part of the programming language Ruby.
* Permission is hereby granted, to either redistribute and/or
* modify this file, provided that the conditions mentioned in the
* file COPYING are met. Consult the file for details.
*/
#pragma once
#include "ruby/ruby.h"
#include "ruby/internal/config.h"
RUBY_SYMBOL_EXPORT_BEGIN
RUBY_EXTERN VALUE rb_cIOBuffer;
RUBY_EXTERN size_t RUBY_IO_BUFFER_PAGE_SIZE;
enum rb_io_buffer_flags {
// The memory in the buffer is owned by someone else.
RB_IO_BUFFER_EXTERNAL = 0,
// The memory in the buffer is allocated internally.
RB_IO_BUFFER_INTERNAL = 1,
// The memory in the buffer is mapped.
RB_IO_BUFFER_MAPPED = 2,
// The buffer is locked and cannot be resized.
RB_IO_BUFFER_LOCKED = 16,
// The buffer mapping is private and will not impact other processes or the underlying file.
RB_IO_BUFFER_PRIVATE = 32,
// The buffer is read-only and cannot be modified.
RB_IO_BUFFER_IMMUTABLE = 64
};
enum rb_io_buffer_endian {
RB_IO_BUFFER_LITTLE_ENDIAN = 4,
RB_IO_BUFFER_BIG_ENDIAN = 8,
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_LITTLE_ENDIAN,
#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_BIG_ENDIAN,
#elif REG_DWORD == REG_DWORD_LITTLE_ENDIAN
RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_LITTLE_ENDIAN,
#elif REG_DWORD == REG_DWORD_BIG_ENDIAN
RB_IO_BUFFER_HOST_ENDIAN = RB_IO_BUFFER_BIG_ENDIAN,
#endif
RB_IO_BUFFER_NETWORK_ENDIAN = RB_IO_BUFFER_BIG_ENDIAN,
};
VALUE rb_io_buffer_new(void *base, size_t size, enum rb_io_buffer_flags flags);
VALUE rb_io_buffer_map(VALUE io, size_t size, off_t offset, enum rb_io_buffer_flags flags);
VALUE rb_io_buffer_lock(VALUE self);
VALUE rb_io_buffer_unlock(VALUE self);
VALUE rb_io_buffer_free(VALUE self);
void rb_io_buffer_get_mutable(VALUE self, void **base, size_t *size);
void rb_io_buffer_get_immutable(VALUE self, const void **base, size_t *size);
size_t rb_io_buffer_copy(VALUE self, VALUE source, size_t offset);
void rb_io_buffer_resize(VALUE self, size_t size, size_t preserve);
void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length);
RUBY_SYMBOL_EXPORT_END

View file

@ -51,6 +51,7 @@ rb_call_inits(void)
CALL(marshal);
CALL(Range);
CALL(IO);
CALL(IO_Buffer)
CALL(Dir);
CALL(Time);
CALL(Random);

360
io.c
View file

@ -14,6 +14,7 @@
#include "ruby/internal/config.h"
#include "ruby/fiber/scheduler.h"
#include "ruby/io/buffer.h"
#ifdef _WIN32
# include "ruby/ruby.h"
@ -131,6 +132,7 @@
#include "internal/transcode.h"
#include "internal/variable.h"
#include "ruby/io.h"
#include "ruby/io/buffer.h"
#include "ruby/thread.h"
#include "ruby/util.h"
#include "ruby_atomic.h"
@ -203,7 +205,7 @@ VALUE rb_default_rs;
static VALUE argf;
static ID id_write, id_read, id_getc, id_flush, id_readpartial, id_set_encoding;
static ID id_write, id_read, id_getc, id_flush, id_readpartial, id_set_encoding, id_fileno;
static VALUE sym_mode, sym_perm, sym_flags, sym_extenc, sym_intenc, sym_encoding, sym_open_args;
static VALUE sym_textmode, sym_binmode, sym_autoclose;
static VALUE sym_SET, sym_CUR, sym_END;
@ -1060,7 +1062,7 @@ io_alloc(VALUE klass)
struct io_internal_read_struct {
VALUE th;
int fd;
rb_io_t *fptr;
int nonblock;
void *buf;
size_t capa;
@ -1080,18 +1082,18 @@ struct io_internal_writev_struct {
};
#endif
static int nogvl_wait_for_single_fd(VALUE th, int fd, short events);
static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events);
static VALUE
internal_read_func(void *ptr)
{
struct io_internal_read_struct *iis = ptr;
ssize_t r;
retry:
r = read(iis->fd, iis->buf, iis->capa);
r = read(iis->fptr->fd, iis->buf, iis->capa);
if (r < 0 && !iis->nonblock) {
int e = errno;
if (io_again_p(e)) {
if (nogvl_wait_for_single_fd(iis->th, iis->fd, RB_WAITFD_IN) != -1) {
if (nogvl_wait_for(iis->th, iis->fptr, RB_WAITFD_IN) != -1) {
goto retry;
}
errno = e;
@ -1132,36 +1134,62 @@ internal_writev_func(void *ptr)
#endif
static ssize_t
rb_read_internal(int fd, void *buf, size_t count)
rb_read_internal(rb_io_t *fptr, void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, 1);
if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
if (length < 0) rb_sys_fail_path(fptr->pathv);
return length;
}
}
struct io_internal_read_struct iis = {
.th = rb_thread_current(),
.fd = fd,
.fptr = fptr,
.nonblock = 0,
.buf = buf,
.capa = count
};
return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd);
return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fptr->fd);
}
static ssize_t
rb_write_internal(int fd, const void *buf, size_t count)
rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, count);
if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
if (length < 0) rb_sys_fail_path(fptr->pathv);
return length;
}
}
struct io_internal_write_struct iis = {
.fd = fd,
.fd = fptr->fd,
.buf = buf,
.capa = count
};
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fd);
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
}
static ssize_t
rb_write_internal2(int fd, const void *buf, size_t count)
rb_write_internal2(rb_io_t *fptr, const void *buf, size_t count)
{
struct io_internal_write_struct iis = {
.fd = fd,
.fd = fptr->fd,
.buf = buf,
.capa = count
};
@ -1581,7 +1609,7 @@ io_binwrite_string(VALUE arg)
}
}
else {
r = rb_write_internal(fptr->fd, p->ptr, p->length);
r = rb_write_internal(fptr, p->ptr, p->length);
}
return r;
@ -1612,7 +1640,7 @@ io_binwrite_string(VALUE arg)
return len;
}
return rb_write_internal(p->fptr->fd, p->ptr, p->length);
return rb_write_internal(p->fptr, p->ptr, p->length);
}
#endif
@ -1628,7 +1656,7 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write(scheduler, fptr->self, str, offset, len);
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, ptr, len, len);
if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
@ -2316,27 +2344,26 @@ io_fillbuf(rb_io_t *fptr)
fptr->rbuf.capa = IO_RBUF_CAPA_FOR(fptr);
fptr->rbuf.ptr = ALLOC_N(char, fptr->rbuf.capa);
#ifdef _WIN32
fptr->rbuf.capa--;
fptr->rbuf.capa--;
#endif
}
if (fptr->rbuf.len == 0) {
retry:
{
r = rb_read_internal(fptr->fd, fptr->rbuf.ptr, fptr->rbuf.capa);
}
r = rb_read_internal(fptr, fptr->rbuf.ptr, fptr->rbuf.capa);
if (r < 0) {
if (fptr_wait_readable(fptr))
goto retry;
{
int e = errno;
VALUE path = rb_sprintf("fd:%d ", fptr->fd);
if (!NIL_P(fptr->pathv)) {
rb_str_append(path, fptr->pathv);
}
rb_syserr_fail_path(e, path);
}
int e = errno;
VALUE path = rb_sprintf("fd:%d ", fptr->fd);
if (!NIL_P(fptr->pathv)) {
rb_str_append(path, fptr->pathv);
}
rb_syserr_fail_path(e, path);
}
if (r > 0) rb_io_check_closed(fptr);
if (r > 0) rb_io_check_closed(fptr);
fptr->rbuf.off = 0;
fptr->rbuf.len = (int)r; /* r should be <= rbuf_capa */
if (r == 0)
@ -2557,6 +2584,16 @@ rb_io_fileno(VALUE io)
return INT2FIX(fd);
}
int rb_io_descriptor(VALUE io)
{
if (RB_TYPE_P(io, T_FILE)) {
rb_io_t *fptr = RFILE(io)->fptr;
rb_io_check_closed(fptr);
return fptr->fd;
} else {
return RB_NUM2INT(rb_funcall(io, id_fileno, 0));
}
}
/*
* call-seq:
@ -2665,7 +2702,7 @@ io_bufread(char *ptr, long len, rb_io_t *fptr)
while (n > 0) {
again:
rb_io_check_closed(fptr);
c = rb_read_internal(fptr->fd, ptr+offset, n);
c = rb_read_internal(fptr, ptr+offset, n);
if (c == 0) break;
if (c < 0) {
if (fptr_wait_readable(fptr))
@ -2711,19 +2748,6 @@ bufread_call(VALUE arg)
static long
io_fread(VALUE str, long offset, long size, rb_io_t *fptr)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read(scheduler, fptr->self, str, offset, size);
if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
if (length < 0) rb_sys_fail_path(fptr->pathv);
return length;
}
}
long len;
struct bufread_arg arg;
@ -3035,7 +3059,16 @@ read_internal_call(VALUE arg)
{
struct io_internal_read_struct *iis = (struct io_internal_read_struct *)arg;
return rb_thread_io_blocking_region(internal_read_func, iis, iis->fd);
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 1);
if (result != Qundef) {
return (VALUE)RB_NUM2SSIZE(result);
}
}
return rb_thread_io_blocking_region(internal_read_func, iis, iis->fptr->fd);
}
static long
@ -3079,7 +3112,7 @@ io_getpartial(int argc, VALUE *argv, VALUE io, int no_exception, int nonblock)
}
io_setstrbuf(&str, len);
iis.th = rb_thread_current();
iis.fd = fptr->fd;
iis.fptr = fptr;
iis.nonblock = nonblock;
iis.buf = RSTRING_PTR(str);
iis.capa = len;
@ -3217,7 +3250,7 @@ io_read_nonblock(rb_execution_context_t *ec, VALUE io, VALUE length, VALUE str,
if (n <= 0) {
rb_io_set_nonblock(fptr);
shrinkable |= io_setstrbuf(&str, len);
iis.fd = fptr->fd;
iis.fptr = fptr;
iis.nonblock = 1;
iis.buf = RSTRING_PTR(str);
iis.capa = len;
@ -4726,10 +4759,10 @@ finish_writeconv(rb_io_t *fptr, int noalloc)
res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0);
while (dp-ds) {
retry:
if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
r = rb_write_internal2(fptr->fd, ds, dp-ds);
else
r = rb_write_internal(fptr->fd, ds, dp-ds);
if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
r = rb_write_internal2(fptr, ds, dp-ds);
else
r = rb_write_internal(fptr, ds, dp-ds);
if (r == dp-ds)
break;
if (0 <= r) {
@ -4796,7 +4829,7 @@ static int
maygvl_close(int fd, int keepgvl)
{
if (keepgvl)
return close(fd);
return close(fd);
/*
* close() may block for certain file types (NFS, SO_LINGER sockets,
@ -4817,7 +4850,7 @@ static int
maygvl_fclose(FILE *file, int keepgvl)
{
if (keepgvl)
return fclose(file);
return fclose(file);
return (int)(intptr_t)rb_thread_call_without_gvl(nogvl_fclose, file, RUBY_UBF_IO, 0);
}
@ -4835,64 +4868,77 @@ fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
int mode = fptr->mode;
if (fptr->writeconv) {
if (fptr->write_lock && !noraise) {
if (fptr->write_lock && !noraise) {
struct finish_writeconv_arg arg;
arg.fptr = fptr;
arg.noalloc = noraise;
err = rb_mutex_synchronize(fptr->write_lock, finish_writeconv_sync, (VALUE)&arg);
}
else {
err = finish_writeconv(fptr, noraise);
}
}
else {
err = finish_writeconv(fptr, noraise);
}
}
if (fptr->wbuf.len) {
if (noraise) {
io_flush_buffer_sync(fptr);
}
else {
if (io_fflush(fptr) < 0 && NIL_P(err))
err = INT2NUM(errno);
}
if (noraise) {
io_flush_buffer_sync(fptr);
}
else {
if (io_fflush(fptr) < 0 && NIL_P(err))
err = INT2NUM(errno);
}
}
int done = 0;
if (IS_PREP_STDIO(fptr) || fd <= 2) {
// Need to keep FILE objects of stdin, stdout and stderr, so we are done:
done = 1;
}
fptr->fd = -1;
fptr->stdio_file = 0;
fptr->mode &= ~(FMODE_READABLE|FMODE_WRITABLE);
/*
* ensure waiting_fd users do not hit EBADF, wait for them
* to exit before we call close().
*/
// Ensure waiting_fd users do not hit EBADF.
if (busy) {
// Wait for them to exit before we call close().
do rb_thread_schedule(); while (!list_empty(busy));
}
if (IS_PREP_STDIO(fptr) || fd <= 2) {
/* need to keep FILE objects of stdin, stdout and stderr */
}
else if (stdio_file) {
/* stdio_file is deallocated anyway
* even if fclose failed. */
if ((maygvl_fclose(stdio_file, noraise) < 0) && NIL_P(err))
if (!noraise) err = INT2NUM(errno);
}
else if (0 <= fd) {
/* fptr->fd may be closed even if close fails.
* POSIX doesn't specify it.
* We assumes it is closed. */
// Disable for now.
// if (!done && fd >= 0) {
// VALUE scheduler = rb_fiber_scheduler_current();
// if (scheduler != Qnil) {
// VALUE result = rb_fiber_scheduler_io_close(scheduler, fptr->self);
// if (result != Qundef) done = 1;
// }
// }
/**/
keepgvl |= !(mode & FMODE_WRITABLE);
keepgvl |= noraise;
if ((maygvl_close(fd, keepgvl) < 0) && NIL_P(err))
if (!noraise) err = INT2NUM(errno);
if (!done && stdio_file) {
// stdio_file is deallocated anyway even if fclose failed.
if ((maygvl_fclose(stdio_file, noraise) < 0) && NIL_P(err))
if (!noraise) err = INT2NUM(errno);
done = 1;
}
if (!done && fd >= 0) {
// fptr->fd may be closed even if close fails. POSIX doesn't specify it.
// We assumes it is closed.
keepgvl |= !(mode & FMODE_WRITABLE);
keepgvl |= noraise;
if ((maygvl_close(fd, keepgvl) < 0) && NIL_P(err))
if (!noraise) err = INT2NUM(errno);
done = 1;
}
if (!NIL_P(err) && !noraise) {
if (RB_INTEGER_TYPE_P(err))
rb_syserr_fail_path(NUM2INT(err), fptr->pathv);
else
rb_exc_raise(err);
if (RB_INTEGER_TYPE_P(err))
rb_syserr_fail_path(NUM2INT(err), fptr->pathv);
else
rb_exc_raise(err);
}
}
@ -5333,7 +5379,7 @@ rb_io_syswrite(VALUE io, VALUE str)
tmp = rb_str_tmp_frozen_acquire(str);
RSTRING_GETMEM(tmp, ptr, len);
n = rb_write_internal(fptr->fd, ptr, len);
n = rb_write_internal(fptr, ptr, len);
if (n < 0) rb_sys_fail_path(fptr->pathv);
rb_str_tmp_frozen_release(str, tmp);
@ -5385,7 +5431,7 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io)
io_setstrbuf(&str, ilen);
iis.th = rb_thread_current();
iis.fd = fptr->fd;
iis.fptr = fptr;
iis.nonblock = 0;
iis.buf = RSTRING_PTR(str);
iis.capa = ilen;
@ -11141,8 +11187,8 @@ struct copy_stream_struct {
off_t copy_length; /* (off_t)-1 if not specified */
off_t src_offset; /* (off_t)-1 if not specified */
int src_fd;
int dst_fd;
rb_io_t *src_fptr;
rb_io_t *dst_fptr;
unsigned close_src : 1;
unsigned close_dst : 1;
int error_no;
@ -11192,18 +11238,18 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
struct wait_for_single_fd {
VALUE scheduler;
int fd;
rb_io_t *fptr;
short events;
VALUE result;
};
static void *
rb_thread_fiber_scheduler_wait_for_single_fd(void * _args)
rb_thread_fiber_scheduler_wait_for(void * _args)
{
struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args;
args->result = rb_fiber_scheduler_io_wait(args->scheduler, io_from_fd(args->fd), INT2NUM(args->events), Qnil);
args->result = rb_fiber_scheduler_io_wait(args->scheduler, args->fptr->self, INT2NUM(args->events), Qnil);
return NULL;
}
@ -11213,18 +11259,18 @@ rb_thread_fiber_scheduler_wait_for_single_fd(void * _args)
STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN);
STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
static int
nogvl_wait_for_single_fd(VALUE th, int fd, short events)
nogvl_wait_for(VALUE th, rb_io_t *fptr, short events)
{
VALUE scheduler = rb_fiber_scheduler_current_for_thread(th);
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for_single_fd, &args);
struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events};
rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args);
return RTEST(args.result);
}
struct pollfd fds;
fds.fd = fd;
fds.fd = fptr->fd;
fds.events = events;
return poll(&fds, 1, -1);
@ -11232,12 +11278,12 @@ nogvl_wait_for_single_fd(VALUE th, int fd, short events)
#else /* !USE_POLL */
# define IOWAIT_SYSCALL "select"
static int
nogvl_wait_for_single_fd(VALUE th, int fd, short events)
nogvl_wait_for(VALUE th, rb_io_t *fptr, short events)
{
VALUE scheduler = rb_fiber_scheduler_current_for_thread(th);
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for_single_fd, &args);
struct wait_for_single_fd args = {.scheduler = scheduler, .fptr = fptr, .events = events};
rb_thread_call_with_gvl(rb_thread_fiber_scheduler_wait_for, &args);
return RTEST(args.result);
}
@ -11245,17 +11291,17 @@ nogvl_wait_for_single_fd(VALUE th, int fd, short events)
int ret;
rb_fd_init(&fds);
rb_fd_set(fd, &fds);
rb_fd_set(fptr->fd, &fds);
switch (events) {
case RB_WAITFD_IN:
ret = rb_fd_select(fd + 1, &fds, 0, 0, 0);
ret = rb_fd_select(fptr->fd + 1, &fds, 0, 0, 0);
break;
case RB_WAITFD_OUT:
ret = rb_fd_select(fd + 1, 0, &fds, 0, 0);
ret = rb_fd_select(fptr->fd + 1, 0, &fds, 0, 0);
break;
default:
VM_UNREACHABLE(nogvl_wait_for_single_fd);
VM_UNREACHABLE(nogvl_wait_for);
}
rb_fd_term(&fds);
@ -11273,7 +11319,7 @@ maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
ret = RB_NUM2INT(rb_io_wait(stp->src, RB_INT2NUM(RUBY_IO_READABLE), Qnil));
}
else {
ret = nogvl_wait_for_single_fd(stp->th, stp->src_fd, RB_WAITFD_IN);
ret = nogvl_wait_for(stp->th, stp->src_fptr, RB_WAITFD_IN);
}
} while (ret < 0 && maygvl_copy_stream_continue_p(has_gvl, stp));
@ -11291,7 +11337,7 @@ nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
int ret;
do {
ret = nogvl_wait_for_single_fd(stp->th, stp->dst_fd, RB_WAITFD_OUT);
ret = nogvl_wait_for(stp->th, stp->dst_fptr, RB_WAITFD_OUT);
} while (ret < 0 && maygvl_copy_stream_continue_p(0, stp));
if (ret < 0) {
@ -11338,7 +11384,7 @@ nogvl_copy_file_range(struct copy_stream_struct *stp)
if (src_offset < (off_t)0) {
off_t current_offset;
errno = 0;
current_offset = lseek(stp->src_fd, 0, SEEK_CUR);
current_offset = lseek(stp->src_fptr->fd, 0, SEEK_CUR);
if (current_offset < (off_t)0 && errno) {
stp->syserr = "lseek";
stp->error_no = errno;
@ -11358,7 +11404,7 @@ nogvl_copy_file_range(struct copy_stream_struct *stp)
# else
ss = (ssize_t)copy_length;
# endif
ss = simple_copy_file_range(stp->src_fd, src_offset_ptr, stp->dst_fd, NULL, ss, 0);
ss = simple_copy_file_range(stp->src_fptr->fd, src_offset_ptr, stp->dst_fptr->fd, NULL, ss, 0);
if (0 < ss) {
stp->total += ss;
copy_length -= ss;
@ -11393,7 +11439,7 @@ nogvl_copy_file_range(struct copy_stream_struct *stp)
case EBADF:
{
int e = errno;
int flags = fcntl(stp->dst_fd, F_GETFL);
int flags = fcntl(stp->dst_fptr->fd, F_GETFL);
if (flags != -1 && flags & O_APPEND) {
return 0;
@ -11427,7 +11473,7 @@ nogvl_fcopyfile(struct copy_stream_struct *stp)
if (!S_ISREG(stp->dst_stat.st_mode))
return 0;
if (lseek(stp->dst_fd, 0, SEEK_CUR) > (off_t)0) /* if dst IO was already written */
if (lseek(stp->dst_fptr->fd, 0, SEEK_CUR) > (off_t)0) /* if dst IO was already written */
return 0;
if (src_offset > (off_t)0) {
@ -11435,14 +11481,14 @@ nogvl_fcopyfile(struct copy_stream_struct *stp)
/* get current offset */
errno = 0;
cur = lseek(stp->src_fd, 0, SEEK_CUR);
cur = lseek(stp->src_fptr->fd, 0, SEEK_CUR);
if (cur < (off_t)0 && errno) {
stp->error_no = errno;
return 1;
}
errno = 0;
r = lseek(stp->src_fd, src_offset, SEEK_SET);
r = lseek(stp->src_fptr->fd, src_offset, SEEK_SET);
if (r < (off_t)0 && errno) {
stp->error_no = errno;
return 1;
@ -11450,7 +11496,7 @@ nogvl_fcopyfile(struct copy_stream_struct *stp)
}
stp->copyfile_state = copyfile_state_alloc(); /* this will be freed by copy_stream_finalize() */
ret = fcopyfile(stp->src_fd, stp->dst_fd, stp->copyfile_state, COPYFILE_DATA);
ret = fcopyfile(stp->src_fptr->fd, stp->dst_fptr->fd, stp->copyfile_state, COPYFILE_DATA);
copyfile_state_get(stp->copyfile_state, COPYFILE_STATE_COPIED, &ss); /* get copied bytes */
if (ret == 0) { /* success */
@ -11459,7 +11505,7 @@ nogvl_fcopyfile(struct copy_stream_struct *stp)
off_t r;
errno = 0;
/* reset offset */
r = lseek(stp->src_fd, cur, SEEK_SET);
r = lseek(stp->src_fptr->fd, cur, SEEK_SET);
if (r < (off_t)0 && errno) {
stp->error_no = errno;
return 1;
@ -11557,7 +11603,7 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
else {
off_t cur;
errno = 0;
cur = lseek(stp->src_fd, 0, SEEK_CUR);
cur = lseek(stp->src_fptr->fd, 0, SEEK_CUR);
if (cur < (off_t)0 && errno) {
stp->syserr = "lseek";
stp->error_no = errno;
@ -11575,10 +11621,10 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
ss = (ssize_t)copy_length;
# endif
if (use_pread) {
ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, ss);
ss = simple_sendfile(stp->dst_fptr->fd, stp->src_fptr->fd, &src_offset, ss);
}
else {
ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, ss);
ss = simple_sendfile(stp->dst_fptr->fd, stp->src_fptr->fd, NULL, ss);
}
if (0 < ss) {
stp->total += ss;
@ -11609,7 +11655,7 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
int ret;
#ifndef __linux__
/*
* Linux requires stp->src_fd to be a mmap-able (regular) file,
* Linux requires stp->src_fptr->fd to be a mmap-able (regular) file,
* select() reports regular files to always be "ready", so
* there is no need to select() on it.
* Other OSes may have the same limitation for sendfile() which
@ -11632,12 +11678,12 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
#endif
static ssize_t
maygvl_read(int has_gvl, int fd, void *buf, size_t count)
maygvl_read(int has_gvl, rb_io_t *fptr, void *buf, size_t count)
{
if (has_gvl)
return rb_read_internal(fd, buf, count);
return rb_read_internal(fptr, buf, count);
else
return read(fd, buf, count);
return read(fptr->fd, buf, count);
}
static ssize_t
@ -11646,11 +11692,11 @@ maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf,
ssize_t ss;
retry_read:
if (offset < (off_t)0) {
ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
ss = maygvl_read(has_gvl, stp->src_fptr, buf, len);
}
else {
#ifdef HAVE_PREAD
ss = pread(stp->src_fd, buf, len, offset);
ss = pread(stp->src_fptr->fd, buf, len, offset);
#else
stp->notimp = "pread";
return -1;
@ -11690,7 +11736,7 @@ nogvl_copy_stream_write(struct copy_stream_struct *stp, char *buf, size_t len)
ssize_t ss;
int off = 0;
while (len) {
ss = write(stp->dst_fd, buf+off, len);
ss = write(stp->dst_fptr->fd, buf+off, len);
if (ss < 0) {
if (maygvl_copy_stream_continue_p(0, stp))
continue;
@ -11730,7 +11776,7 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp)
if (use_pread && stp->close_src) {
off_t r;
errno = 0;
r = lseek(stp->src_fd, src_offset, SEEK_SET);
r = lseek(stp->src_fptr->fd, src_offset, SEEK_SET);
if (r < (off_t)0 && errno) {
stp->syserr = "lseek";
stp->error_no = errno;
@ -11812,7 +11858,7 @@ copy_stream_fallback_body(VALUE arg)
off_t off = stp->src_offset;
ID read_method = id_readpartial;
if (stp->src_fd < 0) {
if (!stp->src_fptr) {
if (!rb_respond_to(stp->src, read_method)) {
read_method = id_read;
}
@ -11831,7 +11877,7 @@ copy_stream_fallback_body(VALUE arg)
}
l = buflen < rest ? buflen : (long)rest;
}
if (stp->src_fd < 0) {
if (!stp->src_fptr) {
VALUE rc = rb_funcall(stp->src, read_method, 2, INT2FIX(l), buf);
if (read_method == id_read && NIL_P(rc))
@ -11864,7 +11910,7 @@ copy_stream_fallback_body(VALUE arg)
static VALUE
copy_stream_fallback(struct copy_stream_struct *stp)
{
if (stp->src_fd < 0 && stp->src_offset >= (off_t)0) {
if (!stp->src_fptr && stp->src_offset >= (off_t)0) {
rb_raise(rb_eArgError, "cannot specify src_offset for non-IO");
}
rb_rescue2(copy_stream_fallback_body, (VALUE)stp,
@ -11878,8 +11924,6 @@ copy_stream_body(VALUE arg)
{
struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
VALUE src_io = stp->src, dst_io = stp->dst;
rb_io_t *src_fptr = 0, *dst_fptr = 0;
int src_fd, dst_fd;
const int common_oflags = 0
#ifdef O_NOCTTY
| O_NOCTTY
@ -11894,7 +11938,7 @@ copy_stream_body(VALUE arg)
!(RB_TYPE_P(src_io, T_FILE) ||
RB_TYPE_P(src_io, T_STRING) ||
rb_respond_to(src_io, rb_intern("to_path")))) {
src_fd = -1;
stp->src_fptr = NULL;
}
else {
int stat_ret;
@ -11911,24 +11955,22 @@ copy_stream_body(VALUE arg)
stp->src = src_io;
stp->close_src = 1;
}
GetOpenFile(src_io, src_fptr);
rb_io_check_byte_readable(src_fptr);
src_fd = src_fptr->fd;
RB_IO_POINTER(src_io, stp->src_fptr);
rb_io_check_byte_readable(stp->src_fptr);
stat_ret = fstat(src_fd, &stp->src_stat);
stat_ret = fstat(stp->src_fptr->fd, &stp->src_stat);
if (stat_ret < 0) {
stp->syserr = "fstat";
stp->error_no = errno;
return Qnil;
}
}
stp->src_fd = src_fd;
if (dst_io == argf ||
!(RB_TYPE_P(dst_io, T_FILE) ||
RB_TYPE_P(dst_io, T_STRING) ||
rb_respond_to(dst_io, rb_intern("to_path")))) {
dst_fd = -1;
stp->dst_fptr = NULL;
}
else {
int stat_ret;
@ -11950,38 +11992,36 @@ copy_stream_body(VALUE arg)
dst_io = GetWriteIO(dst_io);
stp->dst = dst_io;
}
GetOpenFile(dst_io, dst_fptr);
rb_io_check_writable(dst_fptr);
dst_fd = dst_fptr->fd;
RB_IO_POINTER(dst_io, stp->dst_fptr);
rb_io_check_writable(stp->dst_fptr);
stat_ret = fstat(dst_fd, &stp->dst_stat);
stat_ret = fstat(stp->dst_fptr->fd, &stp->dst_stat);
if (stat_ret < 0) {
stp->syserr = "fstat";
stp->error_no = errno;
return Qnil;
}
}
stp->dst_fd = dst_fd;
#ifdef O_BINARY
if (src_fptr)
SET_BINARY_MODE_WITH_SEEK_CUR(src_fptr);
if (stp->src_fptr)
SET_BINARY_MODE_WITH_SEEK_CUR(stp->src_fptr);
#endif
if (dst_fptr)
io_ascii8bit_binmode(dst_fptr);
if (stp->dst_fptr)
io_ascii8bit_binmode(stp->dst_fptr);
if (stp->src_offset < (off_t)0 && src_fptr && src_fptr->rbuf.len) {
size_t len = src_fptr->rbuf.len;
if (stp->src_offset < (off_t)0 && stp->src_fptr && stp->src_fptr->rbuf.len) {
size_t len = stp->src_fptr->rbuf.len;
VALUE str;
if (stp->copy_length >= (off_t)0 && stp->copy_length < (off_t)len) {
len = (size_t)stp->copy_length;
}
str = rb_str_buf_new(len);
rb_str_resize(str,len);
read_buffered_data(RSTRING_PTR(str), len, src_fptr);
if (dst_fptr) { /* IO or filename */
if (io_binwrite(str, RSTRING_PTR(str), RSTRING_LEN(str), dst_fptr, 0) < 0)
rb_sys_fail_on_write(dst_fptr);
read_buffered_data(RSTRING_PTR(str), len, stp->src_fptr);
if (stp->dst_fptr) { /* IO or filename */
if (io_binwrite(str, RSTRING_PTR(str), RSTRING_LEN(str), stp->dst_fptr, 0) < 0)
rb_sys_fail_on_write(stp->dst_fptr);
}
else /* others such as StringIO */
rb_io_write(dst_io, str);
@ -11991,14 +12031,14 @@ copy_stream_body(VALUE arg)
stp->copy_length -= len;
}
if (dst_fptr && io_fflush(dst_fptr) < 0) {
if (stp->dst_fptr && io_fflush(stp->dst_fptr) < 0) {
rb_raise(rb_eIOError, "flush failed");
}
if (stp->copy_length == 0)
return Qnil;
if (src_fd < 0 || dst_fd < 0) {
if (stp->src_fptr == NULL || stp->dst_fptr == NULL) {
return copy_stream_fallback(stp);
}
@ -12077,6 +12117,9 @@ rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io)
st.src = src;
st.dst = dst;
st.src_fptr = NULL;
st.dst_fptr = NULL;
if (NIL_P(length))
st.copy_length = (off_t)-1;
else
@ -13678,6 +13721,7 @@ Init_IO(void)
id_flush = rb_intern_const("flush");
id_readpartial = rb_intern_const("readpartial");
id_set_encoding = rb_intern_const("set_encoding");
id_fileno = rb_intern_const("fileno");
rb_define_global_function("syscall", rb_f_syscall, -1);

1024
io_buffer.c Normal file

File diff suppressed because it is too large Load diff

View file

@ -11,6 +11,8 @@
#include "vm_core.h"
#include "ruby/fiber/scheduler.h"
#include "ruby/io.h"
#include "ruby/io/buffer.h"
#include "internal/thread.h"
static ID id_close;
@ -26,6 +28,7 @@ static ID id_process_wait;
static ID id_io_read;
static ID id_io_write;
static ID id_io_wait;
static ID id_io_close;
static ID id_address_resolve;
@ -45,6 +48,7 @@ Init_Fiber_Scheduler(void)
id_io_read = rb_intern_const("io_read");
id_io_write = rb_intern_const("io_write");
id_io_wait = rb_intern_const("io_wait");
id_io_close = rb_intern_const("io_close");
id_address_resolve = rb_intern_const("address_resolve");
}
@ -225,24 +229,55 @@ rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
}
VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length)
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
{
VALUE arguments[] = {
io, buffer, SIZET2NUM(offset), SIZET2NUM(length)
io, buffer, SIZET2NUM(length)
};
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
return rb_check_funcall(scheduler, id_io_read, 3, arguments);
}
VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t offset, size_t length)
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
{
VALUE arguments[] = {
io, buffer, SIZET2NUM(offset), SIZET2NUM(length)
io, buffer, SIZET2NUM(length)
};
// We should ensure string has capacity to receive data, and then resize it afterwards.
return rb_check_funcall(scheduler, id_io_write, 4, arguments);
return rb_check_funcall(scheduler, id_io_write, 3, arguments);
}
VALUE
rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length);
rb_io_buffer_free(buffer);
return result;
}
VALUE
rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_IMMUTABLE);
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length);
rb_io_buffer_free(buffer);
return result;
}
VALUE
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
{
VALUE arguments[] = {io};
return rb_check_funcall(scheduler, id_io_close, 1, arguments);
}
VALUE

131
test/ruby/test_io_buffer.rb Normal file
View file

@ -0,0 +1,131 @@
# frozen_string_literal: false
class TestIOBuffer < Test::Unit::TestCase
def assert_negative(value)
assert(value < 0, "Expected #{value} to be negative!")
end
def assert_positive(value)
assert(value > 0, "Expected #{value} to be positive!")
end
def test_flags
assert_equal 0, IO::Buffer::EXTERNAL
assert_equal 1, IO::Buffer::INTERNAL
assert_equal 2, IO::Buffer::MAPPED
assert_equal 16, IO::Buffer::LOCKED
assert_equal 32, IO::Buffer::PRIVATE
assert_equal 64, IO::Buffer::IMMUTABLE
end
def test_endian
assert_equal 4, IO::Buffer::LITTLE_ENDIAN
assert_equal 8, IO::Buffer::BIG_ENDIAN
assert_equal 8, IO::Buffer::NETWORK_ENDIAN
assert_include [IO::Buffer::LITTLE_ENDIAN, IO::Buffer::BIG_ENDIAN], IO::Buffer::HOST_ENDIAN
end
def test_new_internal
buffer = IO::Buffer.new(1024, IO::Buffer::INTERNAL)
assert_equal 1024, buffer.size
refute buffer.external?
assert buffer.internal?
refute buffer.mapped?
end
def test_new_mapped
buffer = IO::Buffer.new(1024, IO::Buffer::MAPPED)
assert_equal 1024, buffer.size
refute buffer.external?
refute buffer.internal?
assert buffer.mapped?
end
def test_file_mapped
buffer = File.open(__FILE__) {|file| IO::Buffer.map(file)}
assert_include buffer.to_str, "Hello World"
end
def test_resize
buffer = IO::Buffer.new(1024, IO::Buffer::MAPPED)
buffer.resize(2048, 0)
assert_equal 2048, buffer.size
end
def test_resize_preserve
message = "Hello World"
buffer = IO::Buffer.new(1024, IO::Buffer::MAPPED)
buffer.copy(message, 0)
buffer.resize(2048, 1024)
assert_equal message, buffer.to_str(0, message.bytesize)
end
def test_compare_same_size
buffer1 = IO::Buffer.new(1)
assert_equal buffer1, buffer1
buffer2 = IO::Buffer.new(1)
buffer1.set(:U8, 0, 0x10)
buffer2.set(:U8, 0, 0x20)
assert_negative buffer1 <=> buffer2
assert_positive buffer2 <=> buffer1
end
def test_compare_different_size
buffer1 = IO::Buffer.new(3)
buffer2 = IO::Buffer.new(5)
assert_negative buffer1 <=> buffer2
assert_positive buffer2 <=> buffer1
end
def test_slice
buffer = IO::Buffer.new(128)
slice = buffer.slice(8, 32)
slice.copy("Hello World", 0)
assert_equal("Hello World", buffer.to_str(8, 11))
end
def test_slice_bounds
buffer = IO::Buffer.new(128)
# What is best exception class?
assert_raise RuntimeError do
buffer.slice(128, 10)
end
# assert_raise RuntimeError do
# pp buffer.slice(-10, 10)
# end
end
def test_invalidation
input, output = IO.pipe
# (1) rb_write_internal creates IO::Buffer object,
buffer = IO::Buffer.new(128)
# (2) it is passed to (malicious) scheduler
# (3) scheduler starts a thread which call system call with the buffer object
thread = Thread.new{buffer.locked{input.read}}
Thread.pass until thread.stop?
# (4) scheduler returns
# (5) rb_write_internal invalidate the buffer object
assert_raise RuntimeError do
buffer.free
end
# (6) the system call access the memory area after invalidation
output.write("Hello World")
output.close
thread.join
input.close
end
end