mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Wake up join list within thread EC context. (#4471)
* Wake up join list within thread EC context. * Consume items from join list so that they are not re-executed. If `rb_fiber_scheduler_unblock` raises an exception, it can result in a segfault if `rb_threadptr_join_list_wakeup` is not within a valid EC. This change moves `rb_threadptr_join_list_wakeup` into the thread's top level EC which initially caused an infinite loop because on exception will retry. We explicitly remove items from the thread's join list to avoid this situation. * Verify the required scheduler interface. * Test several scheduler hooks methods with broken `unblock` implementation.
This commit is contained in:
parent
626427c2e0
commit
050a895439
Notes:
git
2021-06-14 14:57:14 +09:00
Merged-By: ioquatix <samuel@codeotaku.com>
6 changed files with 175 additions and 89 deletions
24
scheduler.c
24
scheduler.c
|
@ -55,12 +55,36 @@ rb_fiber_scheduler_get(void)
|
||||||
return thread->scheduler;
|
return thread->scheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
verify_interface(VALUE scheduler)
|
||||||
|
{
|
||||||
|
if (!rb_respond_to(scheduler, id_block)) {
|
||||||
|
rb_raise(rb_eArgError, "Scheduler must implement #block!");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rb_respond_to(scheduler, id_unblock)) {
|
||||||
|
rb_raise(rb_eArgError, "Scheduler must implement #unblock!");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rb_respond_to(scheduler, id_kernel_sleep)) {
|
||||||
|
rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep!");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rb_respond_to(scheduler, id_io_wait)) {
|
||||||
|
rb_raise(rb_eArgError, "Scheduler must implement #io_wait!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
rb_fiber_scheduler_set(VALUE scheduler)
|
rb_fiber_scheduler_set(VALUE scheduler)
|
||||||
{
|
{
|
||||||
rb_thread_t *thread = GET_THREAD();
|
rb_thread_t *thread = GET_THREAD();
|
||||||
VM_ASSERT(thread);
|
VM_ASSERT(thread);
|
||||||
|
|
||||||
|
if (scheduler != Qnil) {
|
||||||
|
verify_interface(scheduler);
|
||||||
|
}
|
||||||
|
|
||||||
// We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
|
// We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
|
||||||
if (thread->scheduler != Qnil) {
|
if (thread->scheduler != Qnil) {
|
||||||
rb_fiber_scheduler_close(thread->scheduler);
|
rb_fiber_scheduler_close(thread->scheduler);
|
||||||
|
|
|
@ -230,3 +230,11 @@ class Scheduler
|
||||||
end.value
|
end.value
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class BrokenUnblockScheduler < Scheduler
|
||||||
|
def unblock(blocker, fiber)
|
||||||
|
super
|
||||||
|
|
||||||
|
raise "Broken unblock!"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
|
@ -66,9 +66,23 @@ class TestFiberScheduler < Test::Unit::TestCase
|
||||||
RUBY
|
RUBY
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_optional_close
|
def test_minimal_interface
|
||||||
|
scheduler = Object.new
|
||||||
|
|
||||||
|
def scheduler.block
|
||||||
|
end
|
||||||
|
|
||||||
|
def scheduler.unblock
|
||||||
|
end
|
||||||
|
|
||||||
|
def scheduler.io_wait
|
||||||
|
end
|
||||||
|
|
||||||
|
def scheduler.kernel_sleep
|
||||||
|
end
|
||||||
|
|
||||||
thread = Thread.new do
|
thread = Thread.new do
|
||||||
Fiber.set_scheduler Object.new
|
Fiber.set_scheduler scheduler
|
||||||
end
|
end
|
||||||
|
|
||||||
thread.join
|
thread.join
|
||||||
|
|
|
@ -43,4 +43,26 @@ class TestFiberSleep < Test::Unit::TestCase
|
||||||
|
|
||||||
assert_operator seconds, :>=, 2, "actual: %p" % seconds
|
assert_operator seconds, :>=, 2, "actual: %p" % seconds
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_broken_sleep
|
||||||
|
thread = Thread.new do
|
||||||
|
Thread.current.report_on_exception = false
|
||||||
|
|
||||||
|
scheduler = Scheduler.new
|
||||||
|
|
||||||
|
def scheduler.kernel_sleep(duration = nil)
|
||||||
|
raise "Broken sleep!"
|
||||||
|
end
|
||||||
|
|
||||||
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
|
Fiber.schedule do
|
||||||
|
sleep 0
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_raise(RuntimeError) do
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -42,4 +42,24 @@ class TestFiberThread < Test::Unit::TestCase
|
||||||
|
|
||||||
assert_equal :done, thread.value
|
assert_equal :done, thread.value
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_broken_unblock
|
||||||
|
thread = Thread.new do
|
||||||
|
Thread.current.report_on_exception = false
|
||||||
|
|
||||||
|
scheduler = BrokenUnblockScheduler.new
|
||||||
|
|
||||||
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
|
Fiber.schedule do
|
||||||
|
Thread.new{}.join
|
||||||
|
end
|
||||||
|
|
||||||
|
scheduler.run
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_raise(RuntimeError) do
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
172
thread.c
172
thread.c
|
@ -539,9 +539,12 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
|
||||||
static void
|
static void
|
||||||
rb_threadptr_join_list_wakeup(rb_thread_t *thread)
|
rb_threadptr_join_list_wakeup(rb_thread_t *thread)
|
||||||
{
|
{
|
||||||
struct rb_waiting_list *join_list = thread->join_list;
|
while (thread->join_list) {
|
||||||
|
struct rb_waiting_list *join_list = thread->join_list;
|
||||||
|
|
||||||
|
// Consume the entry from the join list:
|
||||||
|
thread->join_list = join_list->next;
|
||||||
|
|
||||||
while (join_list) {
|
|
||||||
rb_thread_t *target_thread = join_list->thread;
|
rb_thread_t *target_thread = join_list->thread;
|
||||||
|
|
||||||
if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
|
if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
|
||||||
|
@ -557,25 +560,20 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
join_list = join_list->next;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
|
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
const char *err;
|
while (th->keeping_mutexes) {
|
||||||
rb_mutex_t *mutex;
|
rb_mutex_t *mutex = th->keeping_mutexes;
|
||||||
rb_mutex_t *mutexes = th->keeping_mutexes;
|
th->keeping_mutexes = mutex->next_mutex;
|
||||||
|
|
||||||
while (mutexes) {
|
/* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */
|
||||||
mutex = mutexes;
|
|
||||||
/* rb_warn("mutex #<%p> remains to be locked by terminated thread",
|
const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
|
||||||
(void *)mutexes); */
|
if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
|
||||||
mutexes = mutex->next_mutex;
|
|
||||||
err = rb_mutex_unlock_th(mutex, th, mutex->fiber);
|
|
||||||
if (err) rb_bug("invalid keeping_mutexes: %s", err);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -816,87 +814,87 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
|
||||||
th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
|
th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
|
||||||
th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
|
th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
|
||||||
|
|
||||||
{
|
thread_debug("thread start (get lock): %p\n", (void *)th);
|
||||||
thread_debug("thread start (get lock): %p\n", (void *)th);
|
|
||||||
|
|
||||||
EC_PUSH_TAG(th->ec);
|
EC_PUSH_TAG(th->ec);
|
||||||
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
|
||||||
SAVE_ROOT_JMPBUF(th, thread_do_start(th));
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
errinfo = th->ec->errinfo;
|
|
||||||
|
|
||||||
if (state == TAG_FATAL) {
|
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
||||||
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
SAVE_ROOT_JMPBUF(th, thread_do_start(th));
|
||||||
rb_ractor_atexit(th->ec, Qnil);
|
} else {
|
||||||
}
|
errinfo = th->ec->errinfo;
|
||||||
/* fatal error within this thread, need to stop whole script */
|
|
||||||
}
|
|
||||||
else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
|
|
||||||
/* exit on main_thread. */
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (th->report_on_exception) {
|
|
||||||
VALUE mesg = rb_thread_to_s(th->self);
|
|
||||||
rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
|
|
||||||
rb_write_error_str(mesg);
|
|
||||||
rb_ec_error_print(th->ec, errinfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
if (state == TAG_FATAL) {
|
||||||
rb_ractor_atexit_exception(th->ec);
|
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
||||||
}
|
rb_ractor_atexit(th->ec, Qnil);
|
||||||
|
}
|
||||||
if (th->vm->thread_abort_on_exception ||
|
/* fatal error within this thread, need to stop whole script */
|
||||||
th->abort_on_exception || RTEST(ruby_debug)) {
|
|
||||||
/* exit on main_thread */
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
errinfo = Qnil;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
th->value = Qnil;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
|
||||||
rb_thread_terminate_all(th);
|
|
||||||
rb_ractor_teardown(th->ec);
|
|
||||||
}
|
}
|
||||||
|
else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
|
||||||
|
/* exit on main_thread. */
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (th->report_on_exception) {
|
||||||
|
VALUE mesg = rb_thread_to_s(th->self);
|
||||||
|
rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
|
||||||
|
rb_write_error_str(mesg);
|
||||||
|
rb_ec_error_print(th->ec, errinfo);
|
||||||
|
}
|
||||||
|
|
||||||
th->status = THREAD_KILLED;
|
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
||||||
thread_debug("thread end: %p\n", (void *)th);
|
rb_ractor_atexit_exception(th->ec);
|
||||||
|
}
|
||||||
|
|
||||||
if (th->vm->ractor.main_thread == th) {
|
if (th->vm->thread_abort_on_exception ||
|
||||||
ruby_stop(0);
|
th->abort_on_exception || RTEST(ruby_debug)) {
|
||||||
}
|
/* exit on main_thread */
|
||||||
|
}
|
||||||
if (RB_TYPE_P(errinfo, T_OBJECT)) {
|
else {
|
||||||
/* treat with normal error object */
|
errinfo = Qnil;
|
||||||
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
|
}
|
||||||
}
|
}
|
||||||
EC_POP_TAG();
|
th->value = Qnil;
|
||||||
|
|
||||||
rb_ec_clear_current_thread_trace_func(th->ec);
|
|
||||||
|
|
||||||
/* locking_mutex must be Qfalse */
|
|
||||||
if (th->locking_mutex != Qfalse) {
|
|
||||||
rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
|
|
||||||
(void *)th, th->locking_mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ractor_main_th->status == THREAD_KILLED &&
|
|
||||||
th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
|
|
||||||
/* I'm last thread. wake up main thread from rb_thread_terminate_all */
|
|
||||||
rb_threadptr_interrupt(ractor_main_th);
|
|
||||||
}
|
|
||||||
|
|
||||||
rb_threadptr_join_list_wakeup(th);
|
|
||||||
rb_threadptr_unlock_all_locking_mutexes(th);
|
|
||||||
rb_check_deadlock(th->ractor);
|
|
||||||
|
|
||||||
rb_fiber_close(th->ec->fiber_ptr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
||||||
|
rb_thread_terminate_all(th);
|
||||||
|
rb_ractor_teardown(th->ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
th->status = THREAD_KILLED;
|
||||||
|
thread_debug("thread end: %p\n", (void *)th);
|
||||||
|
|
||||||
|
if (th->vm->ractor.main_thread == th) {
|
||||||
|
ruby_stop(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (RB_TYPE_P(errinfo, T_OBJECT)) {
|
||||||
|
/* treat with normal error object */
|
||||||
|
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
rb_threadptr_join_list_wakeup(th);
|
||||||
|
rb_threadptr_unlock_all_locking_mutexes(th);
|
||||||
|
|
||||||
|
EC_POP_TAG();
|
||||||
|
|
||||||
|
rb_ec_clear_current_thread_trace_func(th->ec);
|
||||||
|
|
||||||
|
/* locking_mutex must be Qfalse */
|
||||||
|
if (th->locking_mutex != Qfalse) {
|
||||||
|
rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
|
||||||
|
(void *)th, th->locking_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ractor_main_th->status == THREAD_KILLED &&
|
||||||
|
th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
|
||||||
|
/* I'm last thread. wake up main thread from rb_thread_terminate_all */
|
||||||
|
rb_threadptr_interrupt(ractor_main_th);
|
||||||
|
}
|
||||||
|
|
||||||
|
rb_check_deadlock(th->ractor);
|
||||||
|
|
||||||
|
rb_fiber_close(th->ec->fiber_ptr);
|
||||||
|
|
||||||
thread_cleanup_func(th, FALSE);
|
thread_cleanup_func(th, FALSE);
|
||||||
VM_ASSERT(th->ec->vm_stack == NULL);
|
VM_ASSERT(th->ec->vm_stack == NULL);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue