mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
When setting current thread scheduler to nil, invoke #close
.
This commit is contained in:
parent
b6d599d76e
commit
501fff14c7
Notes:
git
2020-09-21 06:52:08 +09:00
7 changed files with 75 additions and 6 deletions
|
@ -5214,6 +5214,7 @@ eval.$(OBJEXT): $(top_srcdir)/internal/object.h
|
||||||
eval.$(OBJEXT): $(top_srcdir)/internal/serial.h
|
eval.$(OBJEXT): $(top_srcdir)/internal/serial.h
|
||||||
eval.$(OBJEXT): $(top_srcdir)/internal/static_assert.h
|
eval.$(OBJEXT): $(top_srcdir)/internal/static_assert.h
|
||||||
eval.$(OBJEXT): $(top_srcdir)/internal/string.h
|
eval.$(OBJEXT): $(top_srcdir)/internal/string.h
|
||||||
|
eval.$(OBJEXT): $(top_srcdir)/internal/thread.h
|
||||||
eval.$(OBJEXT): $(top_srcdir)/internal/variable.h
|
eval.$(OBJEXT): $(top_srcdir)/internal/variable.h
|
||||||
eval.$(OBJEXT): $(top_srcdir)/internal/vm.h
|
eval.$(OBJEXT): $(top_srcdir)/internal/vm.h
|
||||||
eval.$(OBJEXT): $(top_srcdir)/internal/warnings.h
|
eval.$(OBJEXT): $(top_srcdir)/internal/warnings.h
|
||||||
|
|
11
eval.c
11
eval.c
|
@ -28,6 +28,7 @@
|
||||||
#include "internal/io.h"
|
#include "internal/io.h"
|
||||||
#include "internal/mjit.h"
|
#include "internal/mjit.h"
|
||||||
#include "internal/object.h"
|
#include "internal/object.h"
|
||||||
|
#include "internal/thread.h"
|
||||||
#include "internal/variable.h"
|
#include "internal/variable.h"
|
||||||
#include "iseq.h"
|
#include "iseq.h"
|
||||||
#include "mjit.h"
|
#include "mjit.h"
|
||||||
|
@ -157,6 +158,13 @@ rb_ec_teardown(rb_execution_context_t *ec)
|
||||||
rb_ec_clear_all_trace_func(ec);
|
rb_ec_clear_all_trace_func(ec);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
rb_ec_scheduler_finalize(rb_execution_context_t *ec)
|
||||||
|
{
|
||||||
|
rb_thread_t *thread = rb_ec_thread_ptr(ec);
|
||||||
|
rb_thread_scheduler_set(thread->self, Qnil);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
rb_ec_finalize(rb_execution_context_t *ec)
|
rb_ec_finalize(rb_execution_context_t *ec)
|
||||||
{
|
{
|
||||||
|
@ -270,6 +278,9 @@ rb_ec_cleanup(rb_execution_context_t *ec, volatile int ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the user code defined a scheduler for the top level thread, run it:
|
||||||
|
rb_ec_scheduler_finalize(ec);
|
||||||
|
|
||||||
mjit_finish(true); // We still need ISeqs here.
|
mjit_finish(true); // We still need ISeqs here.
|
||||||
|
|
||||||
rb_ec_finalize(ec);
|
rb_ec_finalize(ec);
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
|
|
||||||
VALUE rb_scheduler_timeout(struct timeval *timeout);
|
VALUE rb_scheduler_timeout(struct timeval *timeout);
|
||||||
|
|
||||||
|
VALUE rb_scheduler_close(VALUE scheduler);
|
||||||
|
|
||||||
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
|
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
|
||||||
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
|
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
|
||||||
|
|
||||||
|
|
17
scheduler.c
17
scheduler.c
|
@ -11,9 +11,13 @@
|
||||||
#include "internal/scheduler.h"
|
#include "internal/scheduler.h"
|
||||||
#include "ruby/io.h"
|
#include "ruby/io.h"
|
||||||
|
|
||||||
static ID id_kernel_sleep;
|
static ID id_close;
|
||||||
|
|
||||||
static ID id_block;
|
static ID id_block;
|
||||||
static ID id_unblock;
|
static ID id_unblock;
|
||||||
|
|
||||||
|
static ID id_kernel_sleep;
|
||||||
|
|
||||||
static ID id_io_read;
|
static ID id_io_read;
|
||||||
static ID id_io_write;
|
static ID id_io_write;
|
||||||
static ID id_io_wait;
|
static ID id_io_wait;
|
||||||
|
@ -21,14 +25,23 @@ static ID id_io_wait;
|
||||||
void
|
void
|
||||||
Init_Scheduler(void)
|
Init_Scheduler(void)
|
||||||
{
|
{
|
||||||
id_kernel_sleep = rb_intern_const("kernel_sleep");
|
id_close = rb_intern_const("close");
|
||||||
|
|
||||||
id_block = rb_intern_const("block");
|
id_block = rb_intern_const("block");
|
||||||
id_unblock = rb_intern_const("unblock");
|
id_unblock = rb_intern_const("unblock");
|
||||||
|
|
||||||
|
id_kernel_sleep = rb_intern_const("kernel_sleep");
|
||||||
|
|
||||||
id_io_read = rb_intern_const("io_read");
|
id_io_read = rb_intern_const("io_read");
|
||||||
id_io_write = rb_intern_const("io_write");
|
id_io_write = rb_intern_const("io_write");
|
||||||
id_io_wait = rb_intern_const("io_wait");
|
id_io_wait = rb_intern_const("io_wait");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
VALUE rb_scheduler_close(VALUE scheduler)
|
||||||
|
{
|
||||||
|
return rb_funcall(scheduler, id_close, 0);
|
||||||
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
rb_scheduler_timeout(struct timeval *timeout) {
|
rb_scheduler_timeout(struct timeval *timeout) {
|
||||||
if (timeout) {
|
if (timeout) {
|
||||||
|
|
|
@ -19,6 +19,8 @@ class Scheduler
|
||||||
@writable = {}
|
@writable = {}
|
||||||
@waiting = {}
|
@waiting = {}
|
||||||
|
|
||||||
|
@closed = false
|
||||||
|
|
||||||
@lock = Mutex.new
|
@lock = Mutex.new
|
||||||
@locking = 0
|
@locking = 0
|
||||||
@ready = []
|
@ready = []
|
||||||
|
@ -96,6 +98,19 @@ class Scheduler
|
||||||
@urgent = nil
|
@urgent = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def close
|
||||||
|
self.run
|
||||||
|
ensure
|
||||||
|
@closed = true
|
||||||
|
|
||||||
|
# We freeze to detect any inadvertant modifications after the scheduler is closed:
|
||||||
|
self.freeze
|
||||||
|
end
|
||||||
|
|
||||||
|
def closed?
|
||||||
|
@closed
|
||||||
|
end
|
||||||
|
|
||||||
def current_time
|
def current_time
|
||||||
Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||||
end
|
end
|
||||||
|
|
|
@ -10,4 +10,29 @@ class TestFiberScheduler < Test::Unit::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_closed_at_thread_exit
|
||||||
|
scheduler = Scheduler.new
|
||||||
|
|
||||||
|
thread = Thread.new do
|
||||||
|
Thread.current.scheduler = scheduler
|
||||||
|
end
|
||||||
|
|
||||||
|
thread.join
|
||||||
|
|
||||||
|
assert scheduler.closed?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_closed_when_set_to_nil
|
||||||
|
scheduler = Scheduler.new
|
||||||
|
|
||||||
|
thread = Thread.new do
|
||||||
|
Thread.current.scheduler = scheduler
|
||||||
|
Thread.current.scheduler = nil
|
||||||
|
|
||||||
|
assert scheduler.closed?
|
||||||
|
end
|
||||||
|
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
10
thread.c
10
thread.c
|
@ -748,10 +748,7 @@ thread_do_start(rb_thread_t *th)
|
||||||
rb_bug("unreachable");
|
rb_bug("unreachable");
|
||||||
}
|
}
|
||||||
|
|
||||||
VALUE scheduler = th->scheduler;
|
rb_thread_scheduler_set(th->self, Qnil);
|
||||||
if (scheduler != Qnil) {
|
|
||||||
rb_funcall(scheduler, rb_intern("run"), 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
|
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
|
||||||
|
@ -3732,6 +3729,11 @@ rb_thread_scheduler_set(VALUE thread, VALUE scheduler)
|
||||||
|
|
||||||
VM_ASSERT(th);
|
VM_ASSERT(th);
|
||||||
|
|
||||||
|
// We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
|
||||||
|
if (th->scheduler != Qnil) {
|
||||||
|
rb_scheduler_close(th->scheduler);
|
||||||
|
}
|
||||||
|
|
||||||
th->scheduler = scheduler;
|
th->scheduler = scheduler;
|
||||||
|
|
||||||
return th->scheduler;
|
return th->scheduler;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue