1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Rename to Fiber#set_scheduler.

This commit is contained in:
Samuel Williams 2020-10-16 14:25:58 +13:00
parent 656d4cddaf
commit a08ee8330d
Notes: git 2020-11-07 19:40:25 +09:00
17 changed files with 157 additions and 133 deletions

View file

@ -198,10 +198,10 @@ Outstanding ones only.
* Thread
* Introduce `Thread#scheduler` for intercepting blocking operations and
`Thread.scheduler` for accessing the current scheduler. See
* Introduce `Fiber.set_scheduler` for intercepting blocking operations and
`Fiber.scheduler` for accessing the current scheduler. See
doc/scheduler.md for more details. [[Feature #16786]]
* `Thread#blocking?` tells whether the current execution context is
* `Fiber.blocking?` tells whether the current execution context is
blocking. [[Feature #16786]]
* `Thread#join` invokes the scheduler hooks `block`/`unblock` in a
non-blocking execution context. [[Feature #16786]]

44
cont.c
View file

@ -25,6 +25,7 @@
#include "internal/mjit.h"
#include "internal/proc.h"
#include "internal/warnings.h"
#include "internal/scheduler.h"
#include "mjit.h"
#include "vm_core.h"
#include "id_table.h"
@ -1821,7 +1822,7 @@ static VALUE
rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
{
VALUE pool = Qnil;
VALUE blocking = Qtrue;
VALUE blocking = Qfalse;
if (kw_splat != RB_NO_KEYWORDS) {
VALUE options = Qnil;
@ -1830,8 +1831,13 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments);
blocking = arguments[0];
pool = arguments[1];
if (arguments[0] != Qundef) {
blocking = arguments[0];
}
if (arguments[1] != Qundef) {
pool = arguments[1];
}
}
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
@ -1872,6 +1878,22 @@ rb_f_fiber(int argc, VALUE *argv, VALUE obj)
return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
}
static VALUE
rb_fiber_scheduler(VALUE klass)
{
return rb_scheduler_get();
}
static VALUE
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
{
// if (rb_scheduler_get() != Qnil) {
// rb_raise(rb_eFiberError, "Scheduler is already defined!");
// }
return rb_scheduler_set(scheduler);
}
static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt);
void
@ -2178,6 +2200,18 @@ rb_fiber_blocking_p(VALUE fiber)
return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
}
static VALUE
rb_f_fiber_blocking_p(VALUE klass)
{
rb_thread_t *thread = GET_THREAD();
unsigned blocking = thread->blocking;
if (blocking == 0)
return Qfalse;
return INT2NUM(blocking);
}
void
rb_fiber_close(rb_fiber_t *fiber)
{
@ -2594,6 +2628,10 @@ Init_Cont(void)
rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0);
rb_define_alias(rb_cFiber, "inspect", "to_s");
rb_define_singleton_method(rb_cFiber, "blocking?", rb_f_fiber_blocking_p, 0);
rb_define_singleton_method(rb_cFiber, "scheduler", rb_fiber_scheduler, 0);
rb_define_singleton_method(rb_cFiber, "set_scheduler", rb_fiber_set_scheduler, 1);
rb_define_singleton_method(rb_cFiber, "schedule", rb_f_fiber, -1);
//rb_define_global_function("Fiber", rb_f_fiber, -1);

View file

@ -76,13 +76,13 @@ Fibers can be used to create non-blocking execution contexts.
Fiber.new(blocking: false) do
puts Fiber.current.blocking? # false
# May invoke `Thread.scheduler&.io_wait`.
# May invoke `Fiber.scheduler&.io_wait`.
io.read(...)
# May invoke `Thread.scheduler&.io_wait`.
# May invoke `Fiber.scheduler&.io_wait`.
io.write(...)
# Will invoke `Thread.scheduler&.kernel_sleep`.
# Will invoke `Fiber.scheduler&.kernel_sleep`.
sleep(n)
end.resume
~~~

4
eval.c
View file

@ -30,6 +30,7 @@
#include "internal/object.h"
#include "internal/thread.h"
#include "internal/variable.h"
#include "internal/scheduler.h"
#include "iseq.h"
#include "mjit.h"
#include "probes.h"
@ -149,12 +150,11 @@ ruby_options(int argc, char **argv)
static void
rb_ec_scheduler_finalize(rb_execution_context_t *ec)
{
rb_thread_t *thread = rb_ec_thread_ptr(ec);
enum ruby_tag_type state;
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
rb_thread_scheduler_set(thread->self, Qnil);
rb_scheduler_set(Qnil);
}
else {
state = error_handle(ec, state);

View file

@ -12,6 +12,12 @@
#include "ruby/ruby.h"
#include "ruby/intern.h"
VALUE rb_scheduler_get();
VALUE rb_scheduler_set(VALUE scheduler);
VALUE rb_scheduler_current();
VALUE rb_thread_scheduler_current(VALUE thread);
VALUE rb_scheduler_timeout(struct timeval *timeout);
VALUE rb_scheduler_close(VALUE scheduler);

View file

@ -39,12 +39,6 @@ VALUE rb_mutex_owned_p(VALUE self);
int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);
VALUE rb_thread_scheduler_get(VALUE thread);
VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler);
VALUE rb_thread_scheduler_if_nonblocking(VALUE thread);
VALUE rb_thread_current_scheduler();
RUBY_SYMBOL_EXPORT_BEGIN
/* Temporary. This API will be removed (renamed). */
VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd);

16
io.c
View file

@ -1264,7 +1264,7 @@ io_fflush(rb_io_t *fptr)
VALUE
rb_io_wait(VALUE io, VALUE events, VALUE timeout)
{
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
return rb_scheduler_io_wait(scheduler, io, events, timeout);
@ -1306,7 +1306,7 @@ rb_io_from_fd(int fd)
int
rb_io_wait_readable(int f)
{
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
return RTEST(
rb_scheduler_io_wait_readable(scheduler, rb_io_from_fd(f))
@ -1337,7 +1337,7 @@ rb_io_wait_readable(int f)
int
rb_io_wait_writable(int f)
{
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
return RTEST(
rb_scheduler_io_wait_writable(scheduler, rb_io_from_fd(f))
@ -1377,7 +1377,7 @@ rb_io_wait_writable(int f)
int
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
return RTEST(
@ -1538,7 +1538,7 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
if ((n = len) <= 0) return n;
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil && rb_scheduler_supports_io_write(scheduler)) {
ssize_t length = RB_NUM2SSIZE(
rb_scheduler_io_write(scheduler, fptr->self, str, offset, len)
@ -2623,7 +2623,7 @@ bufread_call(VALUE arg)
static long
io_fread(VALUE str, long offset, long size, rb_io_t *fptr)
{
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil && rb_scheduler_supports_io_read(scheduler)) {
ssize_t length = RB_NUM2SSIZE(
rb_scheduler_io_read(scheduler, fptr->self, str, offset, size)
@ -11077,7 +11077,7 @@ STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
static int
nogvl_wait_for_single_fd(VALUE th, int fd, short events)
{
VALUE scheduler = rb_thread_scheduler_if_nonblocking(th);
VALUE scheduler = rb_thread_scheduler_current(th);
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
@ -11096,7 +11096,7 @@ nogvl_wait_for_single_fd(VALUE th, int fd, short events)
static int
nogvl_wait_for_single_fd(VALUE th, int fd, short events)
{
VALUE scheduler = rb_thread_scheduler_if_nonblocking(th);
VALUE scheduler = rb_thread_scheduler_current(th);
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);

View file

@ -4927,7 +4927,7 @@ static VALUE
rb_f_sleep(int argc, VALUE *argv, VALUE _)
{
time_t beg = time(0);
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
rb_scheduler_kernel_sleepv(scheduler, argc, argv);

View file

@ -8,6 +8,7 @@
**********************************************************************/
#include "vm_core.h"
#include "internal/scheduler.h"
#include "ruby/io.h"
@ -37,6 +38,54 @@ Init_Scheduler(void)
id_io_wait = rb_intern_const("io_wait");
}
VALUE
rb_scheduler_get()
{
rb_thread_t *thread = GET_THREAD();
VM_ASSERT(thread);
return thread->scheduler;
}
VALUE
rb_scheduler_set(VALUE scheduler)
{
rb_thread_t *thread = GET_THREAD();
VM_ASSERT(thread);
// We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
if (thread->scheduler != Qnil) {
rb_scheduler_close(thread->scheduler);
}
thread->scheduler = scheduler;
return thread->scheduler;
}
static VALUE
rb_threadptr_scheduler_current(rb_thread_t *thread)
{
VM_ASSERT(thread);
if (thread->blocking == 0) {
return thread->scheduler;
} else {
return Qnil;
}
}
VALUE
rb_scheduler_current()
{
return rb_threadptr_scheduler_current(GET_THREAD());
}
VALUE rb_thread_scheduler_current(VALUE thread)
{
return rb_threadptr_scheduler_current(rb_thread_ptr(thread));
}
VALUE
rb_scheduler_close(VALUE scheduler)
{

View file

@ -19,7 +19,7 @@ def fetch_topics(topics)
end.resume
end
Thread.scheduler&.run
Thread.fiber_scheduler&.run
return responses
end
@ -32,7 +32,7 @@ def sweep(repeats: 3, **options)
Thread.new do
Benchmark.realtime do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
repeats.times do
Fiber.new(**options) do

View file

@ -20,7 +20,7 @@ class TestFiberEnumerator < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
e = i.to_enum(:each_char)

View file

@ -20,7 +20,7 @@ class TestFiberIO < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
message = i.read(20)
@ -48,7 +48,7 @@ class TestFiberIO < Test::Unit::TestCase
i, o = UNIXSocket.pair
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
i.read(20)

View file

@ -8,13 +8,13 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
assert_not_predicate Thread.current, :blocking?
assert_not_predicate Fiber, :blocking?
mutex.synchronize do
assert_not_predicate Thread.current, :blocking?
assert_not_predicate Fiber, :blocking?
end
end
end
@ -27,7 +27,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
mutex.lock
@ -53,7 +53,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
mutex.lock
@ -79,7 +79,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
f = Fiber.schedule do
assert_raise_with_message(RuntimeError, "bye") do
@ -110,7 +110,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
mutex.synchronize do
@ -145,7 +145,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
3.times do |i|
@ -176,7 +176,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
result = nil
Fiber.schedule do
@ -204,7 +204,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
mutex.synchronize do

View file

@ -11,11 +11,27 @@ class TestFiberScheduler < Test::Unit::TestCase
end
end
def test_fiber_new
f = Fiber.new{}
refute f.blocking?
end
def test_fiber_new_with_options
f = Fiber.new(blocking: true){}
assert f.blocking?
f = Fiber.new(blocking: false){}
refute f.blocking?
f = Fiber.new(pool: nil){}
refute f.blocking?
end
def test_closed_at_thread_exit
scheduler = Scheduler.new
thread = Thread.new do
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
end
thread.join
@ -27,8 +43,8 @@ class TestFiberScheduler < Test::Unit::TestCase
scheduler = Scheduler.new
thread = Thread.new do
Thread.current.scheduler = scheduler
Thread.current.scheduler = nil
Fiber.set_scheduler scheduler
Fiber.set_scheduler nil
assert scheduler.closed?
end
@ -41,7 +57,7 @@ class TestFiberScheduler < Test::Unit::TestCase
require 'scheduler'
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
sleep(0)
@ -52,7 +68,7 @@ class TestFiberScheduler < Test::Unit::TestCase
def test_optional_close
thread = Thread.new do
Thread.current.scheduler = Object.new
Fiber.set_scheduler Object.new
end
thread.join

View file

@ -10,7 +10,7 @@ class TestFiberSleep < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
5.times do |i|
Fiber.schedule do
@ -33,7 +33,7 @@ class TestFiberSleep < Test::Unit::TestCase
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.set_scheduler scheduler
Fiber.schedule do
seconds = sleep(2)
end

View file

@ -774,7 +774,7 @@ thread_do_start(rb_thread_t *th)
rb_bug("unreachable");
}
rb_thread_scheduler_set(th->self, Qnil);
rb_scheduler_set(Qnil);
}
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
@ -1175,7 +1175,7 @@ thread_join_sleep(VALUE arg)
}
while (target_th->status != THREAD_KILLED) {
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
rb_scheduler_block(scheduler, target_th->self, p->timeout);
@ -1522,7 +1522,7 @@ rb_thread_sleep_interruptible(void)
static void
rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker)
{
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
rb_scheduler_block(scheduler, blocker, Qnil);
} else {
@ -3796,80 +3796,6 @@ rb_thread_variables(VALUE thread)
return ary;
}
VALUE
rb_thread_scheduler_get(VALUE thread)
{
rb_thread_t * th = rb_thread_ptr(thread);
VM_ASSERT(th);
return th->scheduler;
}
VALUE
rb_thread_scheduler_set(VALUE thread, VALUE scheduler)
{
rb_thread_t * th = rb_thread_ptr(thread);
VM_ASSERT(th);
// We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
if (th->scheduler != Qnil) {
rb_scheduler_close(th->scheduler);
}
th->scheduler = scheduler;
return th->scheduler;
}
#if 0 // no longer used
/*
* call-seq:
* Thread.scheduler -> scheduler or nil
*
* Returns the current scheduler if scheduling operations are permitted.
*
*/
static VALUE
rb_thread_scheduler(VALUE klass)
{
return rb_thread_scheduler_if_nonblocking(rb_thread_current());
}
#endif
VALUE
rb_thread_current_scheduler()
{
return rb_thread_scheduler_if_nonblocking(rb_thread_current());
}
VALUE
rb_thread_scheduler_if_nonblocking(VALUE thread)
{
rb_thread_t * th = rb_thread_ptr(thread);
VM_ASSERT(th);
if (th->blocking == 0) {
return th->scheduler;
} else {
return Qnil;
}
}
static VALUE
rb_thread_blocking_p(VALUE thread)
{
unsigned blocking = rb_thread_ptr(thread)->blocking;
if (blocking == 0)
return Qfalse;
return INT2NUM(blocking);
}
/*
* call-seq:
* thr.thread_variable?(key) -> true or false
@ -5558,7 +5484,6 @@ Init_Thread(void)
rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
rb_define_method(rb_cThread, "blocking?", rb_thread_blocking_p, 0);
rb_define_method(rb_cThread, "status", rb_thread_status, 0);
rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
@ -5574,10 +5499,6 @@ Init_Thread(void)
rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
// rb_define_singleton_method(rb_cThread, "scheduler", rb_thread_scheduler, 0);
rb_define_method(rb_cThread, "scheduler", rb_thread_scheduler_get, 0);
rb_define_method(rb_cThread, "scheduler=", rb_thread_scheduler_set, 1);
rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);

View file

@ -246,7 +246,7 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
}
static VALUE call_rb_scheduler_block(VALUE mutex) {
return rb_scheduler_block(rb_thread_current_scheduler(), mutex, Qnil);
return rb_scheduler_block(rb_scheduler_current(), mutex, Qnil);
}
static VALUE remove_from_mutex_lock_waiters(VALUE arg) {
@ -281,7 +281,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
while (mutex->fiber != fiber) {
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
list_add_tail(&mutex->waitq, &w.node);
@ -516,7 +516,7 @@ rb_mutex_sleep(VALUE self, VALUE timeout)
rb_mutex_unlock(self);
time_t beg = time(0);
VALUE scheduler = rb_thread_current_scheduler();
VALUE scheduler = rb_scheduler_current();
if (scheduler != Qnil) {
rb_scheduler_kernel_sleep(scheduler, timeout);
mutex_lock_uninterruptible(self);