mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Redocument non-blocking Fibers and scheduler
* Document Fiber's method related to scheduling; * Extend Fiber's class docs with concepts of non-blocking fibers; * Introduce "imaginary" (documentation-only) class Fiber::SchedulerInterface to properly document how scheduler's methods should look.
This commit is contained in:
parent
1729fd8c0a
commit
1415653c84
Notes:
git
2020-12-24 17:04:15 +09:00
1 changed files with 339 additions and 1 deletions
340
cont.c
340
cont.c
|
@ -1734,6 +1734,28 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval)
|
||||||
* 1000000
|
* 1000000
|
||||||
* FiberError: dead fiber called
|
* FiberError: dead fiber called
|
||||||
*
|
*
|
||||||
|
* == Non-blocking Fibers
|
||||||
|
*
|
||||||
|
* Since Ruby 3.0, the concept of <em>non-blocking fiber</em> was introduced.
|
||||||
|
* Non-blocking fiber, when reaching any potentially blocking operation (like
|
||||||
|
* sleep, wait for another process, wait for I/O data to be ready), instead
|
||||||
|
* of just freezing itself and all execution in the thread, yields control
|
||||||
|
* to other fibers, and allows the <em>scheduler</em> to handle waiting and waking
|
||||||
|
* (resuming) the fiber when it can proceed.
|
||||||
|
*
|
||||||
|
* For Fiber to behave as non-blocking, it should be created in Fiber.new with
|
||||||
|
* <tt>blocking: false</tt> (which is the default now), and Fiber.scheduler
|
||||||
|
* should be set with Fiber.set_scheduler. If Fiber.scheduler is not set in
|
||||||
|
* the current thread, blocking and non-blocking fiber's behavior is identical.
|
||||||
|
*
|
||||||
|
* Ruby doesn't provide a scheduler class: it is expected to be implemented by
|
||||||
|
* the user and correspond to Fiber::SchedulerInterface.
|
||||||
|
*
|
||||||
|
* There is also Fiber.schedule method, which is expected to immediately perform
|
||||||
|
* passed block in a non-blocking manner (but its actual implementation is up to
|
||||||
|
* the scheduler).
|
||||||
|
*
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static const rb_data_type_t fiber_data_type = {
|
static const rb_data_type_t fiber_data_type = {
|
||||||
|
@ -1842,7 +1864,29 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
|
||||||
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
|
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* :nodoc: */
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* Fiber.new(blocking: false) { |*args| ... } -> fiber
|
||||||
|
*
|
||||||
|
* Creates new Fiber. Initially, fiber is not running, but can be resumed with
|
||||||
|
* #resume. Arguments to the first #resume call would be passed to the block:
|
||||||
|
*
|
||||||
|
* f = Fiber.new do |initial|
|
||||||
|
* current = initial
|
||||||
|
* loop do
|
||||||
|
* puts "current: #{current.inspect}"
|
||||||
|
* current = Fiber.yield
|
||||||
|
* end
|
||||||
|
* end
|
||||||
|
* f.resume(100) # prints: current: 100
|
||||||
|
* f.resume(1, 2, 3) # prints: current: [1, 2, 3]
|
||||||
|
* f.resume # prints: current: nil
|
||||||
|
* # ... and so on ...
|
||||||
|
*
|
||||||
|
* if <tt>blocking: false</tt> is passed to the <tt>Fiber.new</tt>, _and_ current thread
|
||||||
|
* has Fiber.scheduler defined, the Fiber becames non-blocking (see "Non-blocking
|
||||||
|
* fibers" section in class docs).
|
||||||
|
*/
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
|
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
|
||||||
{
|
{
|
||||||
|
@ -1871,18 +1915,84 @@ rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat)
|
||||||
return fiber;
|
return fiber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* Fiber.schedule { |*args| ... } -> fiber
|
||||||
|
*
|
||||||
|
* The method is <em>expected</em> to immediately run the provided block of code in a
|
||||||
|
* separate non-blocking fiber.
|
||||||
|
*
|
||||||
|
* puts "Go to sleep!"
|
||||||
|
*
|
||||||
|
* Fiber.set_scheduler(MyScheduler.new)
|
||||||
|
*
|
||||||
|
* Fiber.schedule do
|
||||||
|
* puts "Going to sleep"
|
||||||
|
* sleep(1)
|
||||||
|
* puts "I slept well"
|
||||||
|
* end
|
||||||
|
*
|
||||||
|
* puts "Wakey-wakey, sleepyhead"
|
||||||
|
*
|
||||||
|
* Assuming MyScheduler is properly implemented, this program will produce:
|
||||||
|
*
|
||||||
|
* Go to sleep!
|
||||||
|
* Going to sleep
|
||||||
|
* Wakey-wakey, sleepyhead
|
||||||
|
* ...1 sec pause here...
|
||||||
|
* I slept well
|
||||||
|
*
|
||||||
|
* ...e.g. on the first blocking operation inside the Fiber (<tt>sleep(1)</tt>),
|
||||||
|
* the control is yielded at the outside code (main fiber), and <em>at the end
|
||||||
|
* of the execution</em>, the scheduler takes care of properly resuming all the
|
||||||
|
* blocked fibers.
|
||||||
|
*
|
||||||
|
* Note that the behavior described above is how the method is <em>expected</em>
|
||||||
|
* to behave, actual behavior is up to the current scheduler's implementation of
|
||||||
|
* Fiber::SchedulerInterface#fiber method. Ruby doesn't enforce this method to
|
||||||
|
* behave in any particular way.
|
||||||
|
*
|
||||||
|
* If the scheduler is not set, the method raises
|
||||||
|
* <tt>RuntimeError (No scheduler is available!)</tt>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_f_fiber(int argc, VALUE *argv, VALUE obj)
|
rb_f_fiber(int argc, VALUE *argv, VALUE obj)
|
||||||
{
|
{
|
||||||
return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
|
return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* Fiber.scheduler -> obj or nil
|
||||||
|
*
|
||||||
|
* Fiber scheduler, set in the current thread with Fiber.set_scheduler. If the scheduler
|
||||||
|
* is +nil+ (which is the default), non-blocking fibers behavior is the same as blocking.
|
||||||
|
* (see "Non-blocking fibers" section in class docs for details about the scheduler concept).
|
||||||
|
*
|
||||||
|
*/
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_fiber_scheduler(VALUE klass)
|
rb_fiber_scheduler(VALUE klass)
|
||||||
{
|
{
|
||||||
return rb_scheduler_get();
|
return rb_scheduler_get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* Fiber.set_scheduler(scheduler) -> scheduler
|
||||||
|
*
|
||||||
|
* Sets Fiber scheduler for the current thread. If the scheduler is set, non-blocking
|
||||||
|
* fibers (created by Fiber.new with <tt>blocking: false</tt>, or by Fiber.schedule)
|
||||||
|
* call that scheduler's hook methods on potentially blocking operations, and the current
|
||||||
|
* thread will call scheduler's +close+ method on finalization (allowing the scheduler to
|
||||||
|
* properly manage all non-finished fibers).
|
||||||
|
*
|
||||||
|
* +scheduler+ can be an object of any class corresponding to Fiber::SchedulerInterface. Its
|
||||||
|
* implementation is up to the user.
|
||||||
|
*
|
||||||
|
* See also the "Non-blocking fibers" section in class docs.
|
||||||
|
*
|
||||||
|
*/
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
|
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
|
||||||
{
|
{
|
||||||
|
@ -2196,12 +2306,44 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv)
|
||||||
return fiber_switch(fiber_ptr(fiber_value), argc, argv, RB_NO_KEYWORDS, Qfalse, false);
|
return fiber_switch(fiber_ptr(fiber_value), argc, argv, RB_NO_KEYWORDS, Qfalse, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* fiber.blocking? -> true or false
|
||||||
|
*
|
||||||
|
* Returns +true+ if +fiber+ is blocking and +false+ otherwise.
|
||||||
|
* Fiber is non-blocking if it was created via passing <tt>blocking: false</tt>
|
||||||
|
* to Fiber.new, or via Fiber.schedule.
|
||||||
|
*
|
||||||
|
* Note, that even if the method returns +false+, Fiber behaves differently
|
||||||
|
* only if Fiber.scheduler is set in the current thread.
|
||||||
|
*
|
||||||
|
* See the "Non-blocking fibers" section in class docs for details.
|
||||||
|
*
|
||||||
|
*/
|
||||||
VALUE
|
VALUE
|
||||||
rb_fiber_blocking_p(VALUE fiber)
|
rb_fiber_blocking_p(VALUE fiber)
|
||||||
{
|
{
|
||||||
return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
|
return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* Fiber.blocking? -> false or number
|
||||||
|
*
|
||||||
|
* Returns +false+ if the current fiber is non-blocking.
|
||||||
|
* Fiber is non-blocking if it was created via passing <tt>blocking: false</tt>
|
||||||
|
* to Fiber.new, or via Fiber.schedule.
|
||||||
|
*
|
||||||
|
* If the current Fiber is blocking, the method, unlike usual
|
||||||
|
* predicate methods, returns a *number* of blocking fibers currently
|
||||||
|
* running (TBD: always 1?).
|
||||||
|
*
|
||||||
|
* Note, that even if the method returns +false+, Fiber behaves differently
|
||||||
|
* only if Fiber.scheduler is set in the current thread.
|
||||||
|
*
|
||||||
|
* See the "Non-blocking fibers" section in class docs for details.
|
||||||
|
*
|
||||||
|
*/
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_f_fiber_blocking_p(VALUE klass)
|
rb_f_fiber_blocking_p(VALUE klass)
|
||||||
{
|
{
|
||||||
|
@ -2707,6 +2849,191 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self)
|
||||||
* fiber.resume #=> FiberError: dead fiber called
|
* fiber.resume #=> FiberError: dead fiber called
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-class: Fiber::SchedulerInterface
|
||||||
|
*
|
||||||
|
* This is not an existing class, but documentation of the interface that Scheduler
|
||||||
|
* object should comply in order to be used as Fiber.scheduler and handle non-blocking
|
||||||
|
* fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
|
||||||
|
* of some concepts.
|
||||||
|
*
|
||||||
|
* Scheduler's behavior and usage are expected to be as follows:
|
||||||
|
*
|
||||||
|
* * When the execution in the non-blocking Fiber reaches some blocking operation (like
|
||||||
|
* sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
|
||||||
|
* hook methods, listed below.
|
||||||
|
* * Scheduler somehow registers what the current fiber is waited for, and yields control
|
||||||
|
* to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
|
||||||
|
* wait to end, and other fibers in the same thread can perform)
|
||||||
|
* * At the end of the current thread execution, the scheduler's method #close is called
|
||||||
|
* * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
|
||||||
|
* registered on hook calls) and resuming them when the awaited resource is ready (I/O
|
||||||
|
* ready, sleep time passed).
|
||||||
|
*
|
||||||
|
* A typical implementation would probably rely for this closing loop on a gem like
|
||||||
|
* EventMachine[https://github.com/eventmachine/eventmachine] or
|
||||||
|
* Async[https://github.com/socketry/async].
|
||||||
|
*
|
||||||
|
* This way concurrent execution will be achieved in a way that is transparent for every
|
||||||
|
* individual Fiber's code.
|
||||||
|
*
|
||||||
|
* Hook methods are:
|
||||||
|
*
|
||||||
|
* * #io_wait
|
||||||
|
* * #process_wait
|
||||||
|
* * #kernel_sleep
|
||||||
|
* * #block and #unblock
|
||||||
|
* * (the list is expanded as Ruby developers make more methods having non-blocking calls)
|
||||||
|
*
|
||||||
|
* When not specified otherwise, the hook implementations are mandatory: if they are not
|
||||||
|
* implemented, the methods trying to call hook will fail. To provide backward compatibility,
|
||||||
|
* in the future hooks will be optional (if they are not implemented, due to the scheduler
|
||||||
|
* being created for the older Ruby version, the code which needs this hook will not fail,
|
||||||
|
* and will just behave in a blocking fashion).
|
||||||
|
*
|
||||||
|
* It is also strongly suggested that the scheduler implement the #fiber method, which is
|
||||||
|
* delegated to by Fiber.schedule.
|
||||||
|
*
|
||||||
|
* Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
|
||||||
|
* <tt>test/fiber/scheduler.rb</tt>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#if 0 /* for RDoc */
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Document-method: Fiber::SchedulerInterface#close
|
||||||
|
*
|
||||||
|
* Called when the current thread exits. The scheduler is expected to implement this
|
||||||
|
* method in order to allow all waiting fibers to finalize their execution.
|
||||||
|
*
|
||||||
|
* The suggested pattern is to implement the main event loop in the #close method.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_interface_close(VALUE self)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: SchedulerInterface#process_wait
|
||||||
|
* call-seq: process_wait(pid, flags)
|
||||||
|
*
|
||||||
|
* Invoked by Process::Status.wait in order to wait for a specified process.
|
||||||
|
* See that method description for arguments description.
|
||||||
|
*
|
||||||
|
* Suggested minimal implementation:
|
||||||
|
*
|
||||||
|
* Thread.new do
|
||||||
|
* Process::Status.wait(pid, flags)
|
||||||
|
* end.value
|
||||||
|
*
|
||||||
|
* This hook is optional: if it is not present in the current scheduler,
|
||||||
|
* Process::Status.wait will behave as a blocking method.
|
||||||
|
*
|
||||||
|
* Expected to returns a Process::Status instance.
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_interface_process_wait(VALUE self)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: SchedulerInterface#io_wait
|
||||||
|
* call-seq: io_wait(io, events, timeout)
|
||||||
|
*
|
||||||
|
* Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
|
||||||
|
* specified descriptor is ready for specified events within
|
||||||
|
* the specified +timeout+.
|
||||||
|
*
|
||||||
|
* +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
|
||||||
|
* <tt>IO::PRIORITY</tt>.
|
||||||
|
*
|
||||||
|
* Suggested implementation should register which Fiber is waiting for which
|
||||||
|
* resources and immediately calling Fiber.yield to pass control to other
|
||||||
|
* fibers. Then, in the #close method, the scheduler might dispatch all the
|
||||||
|
* I/O resources to fibers waiting for it.
|
||||||
|
*
|
||||||
|
* Expected to return the subset of events that are ready immediately.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_interface_io_wait(VALUE self)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: SchedulerInterface#kernel_sleep
|
||||||
|
* call-seq: kernel_sleep(duration = nil)
|
||||||
|
*
|
||||||
|
* Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
|
||||||
|
* an implementation of sleeping in a non-blocking way. Implementation might
|
||||||
|
* register the current fiber in some list of "what fiber waits till what
|
||||||
|
* moment", call Fiber.yield to pass control, and then in #close resume
|
||||||
|
* the fibers whose wait period have ended.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_interface_kernel_sleep(VALUE self)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: SchedulerInterface#block
|
||||||
|
* call-seq: block(blocker, timeout = nil)
|
||||||
|
*
|
||||||
|
* Invoked by methods like Thread.join, and by Mutex, to signify that current
|
||||||
|
* Fiber is blocked till further notice (e.g. #unblock) or till +timeout+ will
|
||||||
|
* pass.
|
||||||
|
*
|
||||||
|
* +blocker+ is what we are waiting on, informational only (for debugging and
|
||||||
|
* logging). There are no guarantees about its value.
|
||||||
|
*
|
||||||
|
* Expected to return boolean, specifying whether the blocking operation was
|
||||||
|
* successful or not.
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_interface_block(VALUE self)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: SchedulerInterface#unblock
|
||||||
|
* call-seq: unblock(blocker, fiber)
|
||||||
|
*
|
||||||
|
* Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
|
||||||
|
* calls #block and Mutex#unlock calls #unblock). The scheduler should use
|
||||||
|
* the +fiber+ parameter to understand which fiber is unblocked.
|
||||||
|
*
|
||||||
|
* +blocker+ is what was awaited for, but it is informational only (for debugging
|
||||||
|
* and logging), and it is not guaranteed to be the same value as the +blocker+ for
|
||||||
|
* #block.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_interface_unblock(VALUE self)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: SchedulerInterface#fiber
|
||||||
|
* call-seq: fiber(&block)
|
||||||
|
*
|
||||||
|
* Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
|
||||||
|
* run passed block of code in a separate non-blocking fiber, and to return that Fiber.
|
||||||
|
*
|
||||||
|
* Minimal suggested implementation is:
|
||||||
|
*
|
||||||
|
* def fiber(&block)
|
||||||
|
* Fiber.new(blocking: false, &block).tap(&:resume)
|
||||||
|
* end
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_interface_fiber(VALUE self)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void
|
void
|
||||||
Init_Cont(void)
|
Init_Cont(void)
|
||||||
{
|
{
|
||||||
|
@ -2754,6 +3081,17 @@ Init_Cont(void)
|
||||||
rb_define_singleton_method(rb_cFiber, "schedule", rb_f_fiber, -1);
|
rb_define_singleton_method(rb_cFiber, "schedule", rb_f_fiber, -1);
|
||||||
//rb_define_global_function("Fiber", rb_f_fiber, -1);
|
//rb_define_global_function("Fiber", rb_f_fiber, -1);
|
||||||
|
|
||||||
|
#if 0 /* for RDoc */
|
||||||
|
rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "SchedulerInterface", rb_cObject);
|
||||||
|
rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_interface_close, 0);
|
||||||
|
rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_interface_process_wait, 0);
|
||||||
|
rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_interface_io_wait, 0);
|
||||||
|
rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_interface_kernel_sleep, 0);
|
||||||
|
rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_interface_block, 0);
|
||||||
|
rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_interface_unblock, 0);
|
||||||
|
rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_interface_fiber, 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef RB_EXPERIMENTAL_FIBER_POOL
|
#ifdef RB_EXPERIMENTAL_FIBER_POOL
|
||||||
rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
|
rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
|
||||||
rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);
|
rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);
|
||||||
|
|
Loading…
Reference in a new issue