1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Wake up join list within thread EC context. (#4471)

* Wake up join list within thread EC context.

* Consume items from join list so that they are not re-executed.

If `rb_fiber_scheduler_unblock` raises an exception, it can result in a
segfault if `rb_threadptr_join_list_wakeup` is not within a valid EC. This
change moves `rb_threadptr_join_list_wakeup` into the thread's top level EC
which initially caused an infinite loop because on exception will retry. We
explicitly remove items from the thread's join list to avoid this situation.

* Verify the required scheduler interface.

* Test several scheduler hooks methods with broken `unblock` implementation.
This commit is contained in:
Samuel Williams 2021-06-14 17:56:53 +12:00 committed by nagachika
parent 98ac62de5c
commit 5e9ec35104
6 changed files with 175 additions and 89 deletions

View file

@ -49,12 +49,36 @@ rb_scheduler_get(void)
return thread->scheduler;
}
static void
verify_interface(VALUE scheduler)
{
if (!rb_respond_to(scheduler, id_block)) {
rb_raise(rb_eArgError, "Scheduler must implement #block!");
}
if (!rb_respond_to(scheduler, id_unblock)) {
rb_raise(rb_eArgError, "Scheduler must implement #unblock!");
}
if (!rb_respond_to(scheduler, id_kernel_sleep)) {
rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep!");
}
if (!rb_respond_to(scheduler, id_io_wait)) {
rb_raise(rb_eArgError, "Scheduler must implement #io_wait!");
}
}
VALUE
rb_scheduler_set(VALUE scheduler)
{
rb_thread_t *thread = GET_THREAD();
VM_ASSERT(thread);
if (scheduler != Qnil) {
verify_interface(scheduler);
}
// We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
if (thread->scheduler != Qnil) {
rb_scheduler_close(thread->scheduler);

View file

@ -188,3 +188,11 @@ class Scheduler
return fiber
end
end
class BrokenUnblockScheduler < Scheduler
def unblock(blocker, fiber)
super
raise "Broken unblock!"
end
end

View file

@ -66,9 +66,23 @@ class TestFiberScheduler < Test::Unit::TestCase
RUBY
end
def test_optional_close
def test_minimal_interface
scheduler = Object.new
def scheduler.block
end
def scheduler.unblock
end
def scheduler.io_wait
end
def scheduler.kernel_sleep
end
thread = Thread.new do
Fiber.set_scheduler Object.new
Fiber.set_scheduler scheduler
end
thread.join

View file

@ -43,4 +43,26 @@ class TestFiberSleep < Test::Unit::TestCase
assert_operator seconds, :>=, 2, "actual: %p" % seconds
end
def test_broken_sleep
thread = Thread.new do
Thread.current.report_on_exception = false
scheduler = Scheduler.new
def scheduler.kernel_sleep(duration = nil)
raise "Broken sleep!"
end
Fiber.set_scheduler scheduler
Fiber.schedule do
sleep 0
end
end
assert_raise(RuntimeError) do
thread.join
end
end
end

View file

@ -42,4 +42,24 @@ class TestFiberThread < Test::Unit::TestCase
assert_equal :done, thread.value
end
def test_broken_unblock
thread = Thread.new do
Thread.current.report_on_exception = false
scheduler = BrokenUnblockScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
Thread.new{}.join
end
scheduler.run
end
assert_raise(RuntimeError) do
thread.join
end
end
end

View file

@ -539,9 +539,12 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
static void
rb_threadptr_join_list_wakeup(rb_thread_t *thread)
{
while (thread->join_list) {
struct rb_waiting_list *join_list = thread->join_list;
while (join_list) {
// Consume the entry from the join list:
thread->join_list = join_list->next;
rb_thread_t *target_thread = join_list->thread;
if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
@ -557,25 +560,20 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread)
break;
}
}
join_list = join_list->next;
}
}
void
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
{
const char *err;
rb_mutex_t *mutex;
rb_mutex_t *mutexes = th->keeping_mutexes;
while (th->keeping_mutexes) {
rb_mutex_t *mutex = th->keeping_mutexes;
th->keeping_mutexes = mutex->next_mutex;
while (mutexes) {
mutex = mutexes;
/* rb_warn("mutex #<%p> remains to be locked by terminated thread",
(void *)mutexes); */
mutexes = mutex->next_mutex;
err = rb_mutex_unlock_th(mutex, th, mutex->fiber);
if (err) rb_bug("invalid keeping_mutexes: %s", err);
/* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */
const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
}
}
@ -816,14 +814,13 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
{
thread_debug("thread start (get lock): %p\n", (void *)th);
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
SAVE_ROOT_JMPBUF(th, thread_do_start(th));
}
else {
} else {
errinfo = th->ec->errinfo;
if (state == TAG_FATAL) {
@ -874,6 +871,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
/* treat with normal error object */
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
}
rb_threadptr_join_list_wakeup(th);
rb_threadptr_unlock_all_locking_mutexes(th);
EC_POP_TAG();
rb_ec_clear_current_thread_trace_func(th->ec);
@ -890,12 +891,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
rb_threadptr_interrupt(ractor_main_th);
}
rb_threadptr_join_list_wakeup(th);
rb_threadptr_unlock_all_locking_mutexes(th);
rb_check_deadlock(th->ractor);
rb_fiber_close(th->ec->fiber_ptr);
}
thread_cleanup_func(th, FALSE);
VM_ASSERT(th->ec->vm_stack == NULL);