diff --git a/cont.c b/cont.c index 39f0fc8171..b6d26d716e 100644 --- a/cont.c +++ b/cont.c @@ -1991,7 +1991,7 @@ rb_fiber_s_schedule_kw(int argc, VALUE* argv, int kw_splat) VALUE fiber = Qnil; if (scheduler != Qnil) { - fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat); + fiber = rb_fiber_scheduler_fiber(scheduler, argc, argv, kw_splat); } else { rb_raise(rb_eRuntimeError, "No scheduler is available!"); @@ -3000,329 +3000,6 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) * fiber.resume #=> FiberError: dead fiber called */ -/* - * Document-class: Fiber::SchedulerInterface - * - * This is not an existing class, but documentation of the interface that Scheduler - * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking - * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations - * of some concepts. - * - * Scheduler's behavior and usage are expected to be as follows: - * - * * When the execution in the non-blocking Fiber reaches some blocking operation (like - * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's - * hook methods, listed below. - * * Scheduler somehow registers what the current fiber is waiting on, and yields control - * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its - * wait to end, and other fibers in the same thread can perform) - * * At the end of the current thread execution, the scheduler's method #close is called - * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has - * registered on hook calls) and resuming them when the awaited resource is ready - * (e.g. I/O ready or sleep time elapsed). - * - * A typical implementation would probably rely for this closing loop on a gem like - * EventMachine[https://github.com/eventmachine/eventmachine] or - * Async[https://github.com/socketry/async]. - * - * This way concurrent execution will be achieved transparently for every - * individual Fiber's code. - * - * Hook methods are: - * - * * #io_wait, #io_read, and #io_write - * * #process_wait - * * #kernel_sleep - * * #timeout_after - * * #address_resolve - * * #block and #unblock - * * (the list is expanded as Ruby developers make more methods having non-blocking calls) - * - * When not specified otherwise, the hook implementations are mandatory: if they are not - * implemented, the methods trying to call hook will fail. To provide backward compatibility, - * in the future hooks will be optional (if they are not implemented, due to the scheduler - * being created for the older Ruby version, the code which needs this hook will not fail, - * and will just behave in a blocking fashion). - * - * It is also strongly recommended that the scheduler implements the #fiber method, which is - * delegated to by Fiber.schedule. - * - * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in - * test/fiber/scheduler.rb - * - */ - -#if 0 /* for RDoc */ -/* - * - * Document-method: Fiber::SchedulerInterface#close - * - * Called when the current thread exits. The scheduler is expected to implement this - * method in order to allow all waiting fibers to finalize their execution. - * - * The suggested pattern is to implement the main event loop in the #close method. - * - */ -static VALUE -rb_fiber_scheduler_interface_close(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#process_wait - * call-seq: process_wait(pid, flags) - * - * Invoked by Process::Status.wait in order to wait for a specified process. - * See that method description for arguments description. - * - * Suggested minimal implementation: - * - * Thread.new do - * Process::Status.wait(pid, flags) - * end.value - * - * This hook is optional: if it is not present in the current scheduler, - * Process::Status.wait will behave as a blocking method. - * - * Expected to return a Process::Status instance. - */ -static VALUE -rb_fiber_scheduler_interface_process_wait(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#io_wait - * call-seq: io_wait(io, events, timeout) - * - * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the - * specified descriptor is ready for specified events within - * the specified +timeout+. - * - * +events+ is a bit mask of IO::READABLE, IO::WRITABLE, and - * IO::PRIORITY. - * - * Suggested implementation should register which Fiber is waiting for which - * resources and immediately calling Fiber.yield to pass control to other - * fibers. Then, in the #close method, the scheduler might dispatch all the - * I/O resources to fibers waiting for it. - * - * Expected to return the subset of events that are ready immediately. - * - */ -static VALUE -rb_fiber_scheduler_interface_io_wait(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#io_read - * call-seq: io_read(io, buffer, length) -> read length or -errno - * - * Invoked by IO#read to read +length+ bytes from +io+ into a specified - * +buffer+ (see IO::Buffer). - * - * The +length+ argument is the "minimum length to be read". - * If the IO buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to - * 8KiB might be read, but at least 1KiB will be. - * Generally, the only case where less data than +length+ will be read is if - * there is an error reading the data. - * - * Specifying a +length+ of 0 is valid and means try reading at least once - * and return any available data. - * - * Suggested implementation should try to read from +io+ in a non-blocking - * manner and call #io_wait if the +io+ is not ready (which will yield control - * to other fibers). - * - * See IO::Buffer for an interface available to return data. - * - * Expected to return number of bytes read, or, in case of an error, -errno - * (negated number corresponding to system's error code). - * - * The method should be considered _experimental_. - */ -static VALUE -rb_fiber_scheduler_interface_io_read(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#io_write - * call-seq: io_write(io, buffer, length) -> written length or -errno - * - * Invoked by IO#write to write +length+ bytes to +io+ from - * from a specified +buffer+ (see IO::Buffer). - * - * The +length+ argument is the "(minimum) length to be written". - * If the IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), - * at most 8KiB will be written, but at least 1KiB will be. - * Generally, the only case where less data than +length+ will be written is if - * there is an error writing the data. - * - * Specifying a +length+ of 0 is valid and means try writing at least once, - * as much data as possible. - * - * Suggested implementation should try to write to +io+ in a non-blocking - * manner and call #io_wait if the +io+ is not ready (which will yield control - * to other fibers). - * - * See IO::Buffer for an interface available to get data from buffer efficiently. - * - * Expected to return number of bytes written, or, in case of an error, -errno - * (negated number corresponding to system's error code). - * - * The method should be considered _experimental_. - */ -static VALUE -rb_fiber_scheduler_interface_io_write(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#kernel_sleep - * call-seq: kernel_sleep(duration = nil) - * - * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide - * an implementation of sleeping in a non-blocking way. Implementation might - * register the current fiber in some list of "which fiber wait until what - * moment", call Fiber.yield to pass control, and then in #close resume - * the fibers whose wait period has elapsed. - * - */ -static VALUE -rb_fiber_scheduler_interface_kernel_sleep(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#address_resolve - * call-seq: address_resolve(hostname) -> array_of_strings or nil - * - * Invoked by any method that performs a non-reverse DNS lookup. The most - * notable method is Addrinfo.getaddrinfo, but there are many other. - * - * The method is expected to return an array of strings corresponding to ip - * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved. - * - * Fairly exhaustive list of all possible call-sites: - * - * - Addrinfo.getaddrinfo - * - Addrinfo.tcp - * - Addrinfo.udp - * - Addrinfo.ip - * - Addrinfo.new - * - Addrinfo.marshal_load - * - SOCKSSocket.new - * - TCPServer.new - * - TCPSocket.new - * - IPSocket.getaddress - * - TCPSocket.gethostbyname - * - UDPSocket#connect - * - UDPSocket#bind - * - UDPSocket#send - * - Socket.getaddrinfo - * - Socket.gethostbyname - * - Socket.pack_sockaddr_in - * - Socket.sockaddr_in - * - Socket.unpack_sockaddr_in - */ -static VALUE -rb_fiber_scheduler_interface_address_resolve(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#timeout_after - * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block - * - * Invoked by Timeout.timeout to execute the given +block+ within the given - * +duration+. It can also be invoked directly by the scheduler or user code. - * - * Attempt to limit the execution time of a given +block+ to the given - * +duration+ if possible. When a non-blocking operation causes the +block+'s - * execution time to exceed the specified +duration+, that non-blocking - * operation should be interrupted by raising the specified +exception_class+ - * constructed with the given +exception_arguments+. - * - * General execution timeouts are often considered risky. This implementation - * will only interrupt non-blocking operations. This is by design because it's - * expected that non-blocking operations can fail for a variety of - * unpredictable reasons, so applications should already be robust in handling - * these conditions and by implication timeouts. - * - * However, as a result of this design, if the +block+ does not invoke any - * non-blocking operations, it will be impossible to interrupt it. If you - * desire to provide predictable points for timeouts, consider adding - * +sleep(0)+. - * - * If the block is executed successfully, its result will be returned. - * - * The exception will typically be raised using Fiber#raise. - */ -static VALUE -rb_fiber_scheduler_interface_timeout_after(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#block - * call-seq: block(blocker, timeout = nil) - * - * Invoked by methods like Thread.join, and by Mutex, to signify that current - * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has - * elapsed. - * - * +blocker+ is what we are waiting on, informational only (for debugging and - * logging). There are no guarantee about its value. - * - * Expected to return boolean, specifying whether the blocking operation was - * successful or not. - */ -static VALUE -rb_fiber_scheduler_interface_block(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#unblock - * call-seq: unblock(blocker, fiber) - * - * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock - * calls #block and Mutex#unlock calls #unblock). The scheduler should use - * the +fiber+ parameter to understand which fiber is unblocked. - * - * +blocker+ is what was awaited for, but it is informational only (for debugging - * and logging), and it is not guaranteed to be the same value as the +blocker+ for - * #block. - * - */ -static VALUE -rb_fiber_scheduler_interface_unblock(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#fiber - * call-seq: fiber(&block) - * - * Implementation of the Fiber.schedule. The method is expected to immediately - * run the given block of code in a separate non-blocking fiber, and to return that Fiber. - * - * Minimal suggested implementation is: - * - * def fiber(&block) - * fiber = Fiber.new(blocking: false, &block) - * fiber.resume - * fiber - * end - */ -static VALUE -rb_fiber_scheduler_interface_fiber(VALUE self) -{ -} -#endif - void Init_Cont(void) { @@ -3374,21 +3051,6 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "schedule", rb_fiber_s_schedule, -1); -#if 0 /* for RDoc */ - rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "SchedulerInterface", rb_cObject); - rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_interface_close, 0); - rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_interface_process_wait, 0); - rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_interface_io_wait, 0); - rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_interface_io_read, 0); - rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_interface_io_write, 0); - rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_interface_kernel_sleep, 0); - rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_interface_address_resolve, 0); - rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_interface_timeout_after, 0); - rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_interface_block, 0); - rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_interface_unblock, 0); - rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_interface_fiber, 0); -#endif - #ifdef RB_EXPERIMENTAL_FIBER_POOL rb_cFiberPool = rb_define_class_under(rb_cFiber, "Pool", rb_cObject); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h index d6a033d322..250b39b6df 100644 --- a/include/ruby/fiber/scheduler.h +++ b/include/ruby/fiber/scheduler.h @@ -146,7 +146,7 @@ VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout); VALUE rb_fiber_scheduler_close(VALUE scheduler); /** - * Nonblocking `sleep`. Depending on scheduler implementation, this for + * Non-blocking `sleep`. Depending on scheduler implementation, this for * instance switches to another fiber etc. * * @param[in] scheduler Target scheduler. @@ -174,7 +174,7 @@ int rb_fiber_scheduler_supports_process_wait(VALUE scheduler); #endif /** - * Nonblocking `waitpid`. Depending on scheduler implementation, this for + * Non-blocking `waitpid`. Depending on scheduler implementation, this for * instance switches to another fiber etc. * * @param[in] scheduler Target scheduler. @@ -185,7 +185,7 @@ int rb_fiber_scheduler_supports_process_wait(VALUE scheduler); VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags); /** - * Nonblocking wait for the passed "blocker", which is for instance + * Non-blocking wait for the passed "blocker", which is for instance * `Thread.join` or `Mutex.lock`. Depending on scheduler implementation, this * for instance switches to another fiber etc. * @@ -207,8 +207,8 @@ VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout); VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber); /** - * Nonblocking version of rb_io_wait(). Depending on scheduler implementation, - * this for instance switches to another fiber etc. + * Non-blocking version of rb_io_wait(). Depending on scheduler + * implementation, this for instance switches to another fiber etc. * * The "events" here is a Ruby level integer, which is an OR-ed value of * `IO::READABLE`, `IO::WRITABLE`, and `IO::PRIORITY`. @@ -222,7 +222,7 @@ VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber); VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout); /** - * Nonblocking wait until the passed IO is ready for reading. This is a + * Non-blocking wait until the passed IO is ready for reading. This is a * special case of rb_fiber_scheduler_io_wait(), where the interest is * `IO::READABLE` and timeout is never. * @@ -233,7 +233,7 @@ VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io); /** - * Nonblocking wait until the passed IO is ready for writing. This is a + * Non-blocking wait until the passed IO is ready for writing. This is a * special case of rb_fiber_scheduler_io_wait(), where the interest is * `IO::WRITABLE` and timeout is never. * @@ -277,7 +277,7 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv); VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset); /** - * Nonblocking write to the passed IO. + * Non-blocking write to the passed IO. * * @param[in] scheduler Target scheduler. * @param[out] io An io object to write to. @@ -290,7 +290,7 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset); /** - * Nonblocking read from the passed IO at the specified offset. + * Non-blocking read from the passed IO at the specified offset. * * @param[in] scheduler Target scheduler. * @param[out] io An io object to read from. @@ -304,7 +304,7 @@ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_ VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset); /** - * Nonblocking write to the passed IO at the specified offset. + * Non-blocking write to the passed IO at the specified offset. * * @param[in] scheduler Target scheduler. * @param[out] io An io object to write to. @@ -318,7 +318,7 @@ VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALU VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset); /** - * Nonblocking read from the passed IO using a native buffer. + * Non-blocking read from the passed IO using a native buffer. * * @param[in] scheduler Target scheduler. * @param[out] io An io object to read from. @@ -331,7 +331,7 @@ VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VAL VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length); /** - * Nonblocking write to the passed IO using a native buffer. + * Non-blocking write to the passed IO using a native buffer. * * @param[in] scheduler Target scheduler. * @param[out] io An io object to write to. @@ -344,7 +344,7 @@ VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length); /** - * Nonblocking close the given IO. + * Non-blocking close the given IO. * * @param[in] scheduler Target scheduler. * @param[in] io An io object to close. @@ -354,7 +354,7 @@ VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void * VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io); /** - * Nonblocking DNS lookup. + * Non-blocking DNS lookup. * * @param[in] scheduler Target scheduler. * @param[in] hostname A host name to query. @@ -363,6 +363,12 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io); */ VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname); +/** + * Create and schedule a non-blocking fiber. + * + */ +VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat); + RBIMPL_SYMBOL_EXPORT_END() #endif /* RUBY_FIBER_SCHEDULER_H */ diff --git a/io.c b/io.c index a1b3f27611..a7da551a6a 100644 --- a/io.c +++ b/io.c @@ -10855,9 +10855,7 @@ rb_f_select(int argc, VALUE *argv, VALUE obj) if (scheduler != Qnil) { // It's optionally supported. VALUE result = rb_fiber_scheduler_io_selectv(scheduler, argc, argv); - if (result != Qundef) { - return result; - } + if (result != Qundef) return result; } VALUE timeout; diff --git a/scheduler.c b/scheduler.c index 09fc921c88..611182741c 100644 --- a/scheduler.c +++ b/scheduler.c @@ -33,6 +33,58 @@ static ID id_io_close; static ID id_address_resolve; +static ID id_fiber_schedule; + +/* + * Document-class: Fiber::Scheduler + * + * This is not an existing class, but documentation of the interface that Scheduler + * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking + * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations + * of some concepts. + * + * Scheduler's behavior and usage are expected to be as follows: + * + * * When the execution in the non-blocking Fiber reaches some blocking operation (like + * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's + * hook methods, listed below. + * * Scheduler somehow registers what the current fiber is waiting on, and yields control + * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its + * wait to end, and other fibers in the same thread can perform) + * * At the end of the current thread execution, the scheduler's method #scheduler_close is called + * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has + * registered on hook calls) and resuming them when the awaited resource is ready + * (e.g. I/O ready or sleep time elapsed). + * + * This way concurrent execution will be achieved transparently for every + * individual Fiber's code. + * + * Scheduler implementations are provided by gems, like + * Async[https://github.com/socketry/async]. + * + * Hook methods are: + * + * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close + * * #process_wait + * * #kernel_sleep + * * #timeout_after + * * #address_resolve + * * #block and #unblock + * * (the list is expanded as Ruby developers make more methods having non-blocking calls) + * + * When not specified otherwise, the hook implementations are mandatory: if they are not + * implemented, the methods trying to call hook will fail. To provide backward compatibility, + * in the future hooks will be optional (if they are not implemented, due to the scheduler + * being created for the older Ruby version, the code which needs this hook will not fail, + * and will just behave in a blocking fashion). + * + * It is also strongly recommended that the scheduler implements the #fiber method, which is + * delegated to by Fiber.schedule. + * + * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in + * test/fiber/scheduler.rb + * + */ void Init_Fiber_Scheduler(void) { @@ -56,6 +108,23 @@ Init_Fiber_Scheduler(void) id_io_close = rb_intern_const("io_close"); id_address_resolve = rb_intern_const("address_resolve"); + + id_fiber_schedule = rb_intern_const("fiber"); + +#if 0 /* for RDoc */ + rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject); + rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0); + rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2); + rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3); + rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4); + rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4); + rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1); + rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1); + rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3); + rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2); + rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2); + rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler, -2); +#endif } VALUE @@ -101,7 +170,10 @@ rb_fiber_scheduler_set(VALUE scheduler) verify_interface(scheduler); } - // We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler. + // We invoke Scheduler#close when setting it to something else, to ensure + // the previous scheduler runs to completion before changing the scheduler. + // That way, we do not need to consider interactions, e.g., of a Fiber from + // the previous scheduler with the new scheduler. if (thread->scheduler != Qnil) { rb_fiber_scheduler_close(thread->scheduler); } @@ -135,6 +207,16 @@ VALUE rb_fiber_scheduler_current_for_thread(VALUE thread) return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread)); } +/* + * + * Document-method: Fiber::Scheduler#close + * + * Called when the current thread exits. The scheduler is expected to implement this + * method in order to allow all waiting fibers to finalize their execution. + * + * The suggested pattern is to implement the main event loop in the #close method. + * + */ VALUE rb_fiber_scheduler_close(VALUE scheduler) { @@ -142,6 +224,12 @@ rb_fiber_scheduler_close(VALUE scheduler) VALUE result; + // The reason for calling `scheduler_close` before calling `close` is for + // legacy schedulers which implement `close` and expect the user to call + // it. Subsequently, that method would call `Fiber.set_scheduler(nil)` + // which should call `scheduler_close`. If it were to call `close`, it + // would create an infinite loop. + result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL); if (result != Qundef) return result; @@ -161,6 +249,17 @@ rb_fiber_scheduler_make_timeout(struct timeval *timeout) return Qnil; } +/* + * Document-method: Fiber::Scheduler#kernel_sleep + * call-seq: kernel_sleep(duration = nil) + * + * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide + * an implementation of sleeping in a non-blocking way. Implementation might + * register the current fiber in some list of "which fiber wait until what + * moment", call Fiber.yield to pass control, and then in #close resume + * the fibers whose wait period has elapsed. + * + */ VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout) { @@ -174,6 +273,34 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) } #if 0 +/* + * Document-method: Fiber::Scheduler#timeout_after + * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block + * + * Invoked by Timeout.timeout to execute the given +block+ within the given + * +duration+. It can also be invoked directly by the scheduler or user code. + * + * Attempt to limit the execution time of a given +block+ to the given + * +duration+ if possible. When a non-blocking operation causes the +block+'s + * execution time to exceed the specified +duration+, that non-blocking + * operation should be interrupted by raising the specified +exception_class+ + * constructed with the given +exception_arguments+. + * + * General execution timeouts are often considered risky. This implementation + * will only interrupt non-blocking operations. This is by design because it's + * expected that non-blocking operations can fail for a variety of + * unpredictable reasons, so applications should already be robust in handling + * these conditions and by implication timeouts. + * + * However, as a result of this design, if the +block+ does not invoke any + * non-blocking operations, it will be impossible to interrupt it. If you + * desire to provide predictable points for timeouts, consider adding + * +sleep(0)+. + * + * If the block is executed successfully, its result will be returned. + * + * The exception will typically be raised using Fiber#raise. + */ VALUE rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message) { @@ -191,6 +318,24 @@ rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv) } #endif +/* + * Document-method: Fiber::Scheduler#process_wait + * call-seq: process_wait(pid, flags) + * + * Invoked by Process::Status.wait in order to wait for a specified process. + * See that method description for arguments description. + * + * Suggested minimal implementation: + * + * Thread.new do + * Process::Status.wait(pid, flags) + * end.value + * + * This hook is optional: if it is not present in the current scheduler, + * Process::Status.wait will behave as a blocking method. + * + * Expected to return a Process::Status instance. + */ VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) { @@ -201,12 +346,39 @@ rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) return rb_check_funcall(scheduler, id_process_wait, 2, arguments); } +/* + * Document-method: Fiber::Scheduler#block + * call-seq: block(blocker, timeout = nil) + * + * Invoked by methods like Thread.join, and by Mutex, to signify that current + * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has + * elapsed. + * + * +blocker+ is what we are waiting on, informational only (for debugging and + * logging). There are no guarantee about its value. + * + * Expected to return boolean, specifying whether the blocking operation was + * successful or not. + */ VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) { return rb_funcall(scheduler, id_block, 2, blocker, timeout); } +/* + * Document-method: Fiber::Scheduler#unblock + * call-seq: unblock(blocker, fiber) + * + * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock + * calls #block and Mutex#unlock calls #unblock). The scheduler should use + * the +fiber+ parameter to understand which fiber is unblocked. + * + * +blocker+ is what was awaited for, but it is informational only (for debugging + * and logging), and it is not guaranteed to be the same value as the +blocker+ for + * #block. + * + */ VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) { @@ -215,6 +387,25 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) return rb_funcall(scheduler, id_unblock, 2, blocker, fiber); } +/* + * Document-method: Fiber::Scheduler#io_wait + * call-seq: io_wait(io, events, timeout) + * + * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the + * specified descriptor is ready for specified events within + * the specified +timeout+. + * + * +events+ is a bit mask of IO::READABLE, IO::WRITABLE, and + * IO::PRIORITY. + * + * Suggested implementation should register which Fiber is waiting for which + * resources and immediately calling Fiber.yield to pass control to other + * fibers. Then, in the #close method, the scheduler might dispatch all the + * I/O resources to fibers waiting for it. + * + * Expected to return the subset of events that are ready immediately. + * + */ VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) { @@ -233,6 +424,16 @@ rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io) return rb_fiber_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), rb_io_timeout(io)); } +/* + * Document-method: Fiber::Scheduler#io_select + * call-seq: io_select(readables, writables, exceptables, timeout) + * + * Invoked by IO.select to ask whether the specified descriptors are ready for + * specified events within the specified +timeout+. + * + * Expected to return the 3-tuple of Array of IOs that are ready. + * + */ VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout) { VALUE arguments[] = { @@ -252,6 +453,33 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv) return rb_check_funcall(scheduler, id_io_select, argc, argv); } +/* + * Document-method: Fiber::Scheduler#io_read + * call-seq: io_read(io, buffer, length) -> read length or -errno + * + * Invoked by IO#read to read +length+ bytes from +io+ into a specified + * +buffer+ (see IO::Buffer). + * + * The +length+ argument is the "minimum length to be read". + * If the IO buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to + * 8KiB might be read, but at least 1KiB will be. + * Generally, the only case where less data than +length+ will be read is if + * there is an error reading the data. + * + * Specifying a +length+ of 0 is valid and means try reading at least once + * and return any available data. + * + * Suggested implementation should try to read from +io+ in a non-blocking + * manner and call #io_wait if the +io+ is not ready (which will yield control + * to other fibers). + * + * See IO::Buffer for an interface available to return data. + * + * Expected to return number of bytes read, or, in case of an error, -errno + * (negated number corresponding to system's error code). + * + * The method should be considered _experimental_. + */ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { @@ -272,6 +500,33 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff return rb_check_funcall(scheduler, id_io_pread, 5, arguments); } +/* + * Document-method: Scheduler#io_write + * call-seq: io_write(io, buffer, length) -> written length or -errno + * + * Invoked by IO#write to write +length+ bytes to +io+ from + * from a specified +buffer+ (see IO::Buffer). + * + * The +length+ argument is the "(minimum) length to be written". + * If the IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), + * at most 8KiB will be written, but at least 1KiB will be. + * Generally, the only case where less data than +length+ will be written is if + * there is an error writing the data. + * + * Specifying a +length+ of 0 is valid and means try writing at least once, + * as much data as possible. + * + * Suggested implementation should try to write to +io+ in a non-blocking + * manner and call #io_wait if the +io+ is not ready (which will yield control + * to other fibers). + * + * See IO::Buffer for an interface available to get data from buffer efficiently. + * + * Expected to return number of bytes written, or, in case of an error, -errno + * (negated number corresponding to system's error code). + * + * The method should be considered _experimental_. + */ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { @@ -326,6 +581,38 @@ rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io) return rb_check_funcall(scheduler, id_io_close, 1, arguments); } +/* + * Document-method: Fiber::Scheduler#address_resolve + * call-seq: address_resolve(hostname) -> array_of_strings or nil + * + * Invoked by any method that performs a non-reverse DNS lookup. The most + * notable method is Addrinfo.getaddrinfo, but there are many other. + * + * The method is expected to return an array of strings corresponding to ip + * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved. + * + * Fairly exhaustive list of all possible call-sites: + * + * - Addrinfo.getaddrinfo + * - Addrinfo.tcp + * - Addrinfo.udp + * - Addrinfo.ip + * - Addrinfo.new + * - Addrinfo.marshal_load + * - SOCKSSocket.new + * - TCPServer.new + * - TCPSocket.new + * - IPSocket.getaddress + * - TCPSocket.gethostbyname + * - UDPSocket#connect + * - UDPSocket#bind + * - UDPSocket#send + * - Socket.getaddrinfo + * - Socket.gethostbyname + * - Socket.pack_sockaddr_in + * - Socket.sockaddr_in + * - Socket.unpack_sockaddr_in + */ VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) { @@ -335,3 +622,24 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) return rb_check_funcall(scheduler, id_address_resolve, 1, arguments); } + +/* + * Document-method: Fiber::Scheduler#fiber + * call-seq: fiber(&block) + * + * Implementation of the Fiber.schedule. The method is expected to immediately + * run the given block of code in a separate non-blocking fiber, and to return that Fiber. + * + * Minimal suggested implementation is: + * + * def fiber(&block) + * fiber = Fiber.new(blocking: false, &block) + * fiber.resume + * fiber + * end + */ +VALUE +rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat) +{ + return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat); +}