mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Fix potential hang when joining threads.
If the thread termination invokes user code after `th->status` becomes
`THREAD_KILLED`, and the user unblock function causes that `th->status` to
become something else (e.g. `THREAD_RUNNING`), threads waiting in
`thread_join_sleep` will hang forever. We move the unblock function call
to before the thread status is updated, and allow threads to join as soon
as `th->value` becomes defined.
This reverts commit 6505c77501
.
This commit is contained in:
parent
785c70e764
commit
2d4f29e77e
Notes:
git
2021-08-03 19:24:09 +09:00
4 changed files with 111 additions and 36 deletions
|
@ -112,8 +112,10 @@ class Scheduler
|
||||||
|
|
||||||
self.run
|
self.run
|
||||||
ensure
|
ensure
|
||||||
|
if @urgent
|
||||||
@urgent.each(&:close)
|
@urgent.each(&:close)
|
||||||
@urgent = nil
|
@urgent = nil
|
||||||
|
end
|
||||||
|
|
||||||
@closed = true
|
@closed = true
|
||||||
|
|
||||||
|
@ -240,3 +242,13 @@ class BrokenUnblockScheduler < Scheduler
|
||||||
raise "Broken unblock!"
|
raise "Broken unblock!"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class SleepingUnblockScheduler < Scheduler
|
||||||
|
# This method is invoked when the thread is exiting.
|
||||||
|
def unblock(blocker, fiber)
|
||||||
|
super
|
||||||
|
|
||||||
|
# This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang.
|
||||||
|
sleep(0.1)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
|
@ -20,6 +20,31 @@ class TestFiberThread < Test::Unit::TestCase
|
||||||
assert_equal :done, thread.value
|
assert_equal :done, thread.value
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_thread_join_implicit
|
||||||
|
sleeping = false
|
||||||
|
finished = false
|
||||||
|
|
||||||
|
thread = Thread.new do
|
||||||
|
scheduler = Scheduler.new
|
||||||
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
|
Fiber.schedule do
|
||||||
|
sleeping = true
|
||||||
|
sleep(0.1)
|
||||||
|
finished = true
|
||||||
|
end
|
||||||
|
|
||||||
|
:done
|
||||||
|
end
|
||||||
|
|
||||||
|
Thread.pass until sleeping
|
||||||
|
|
||||||
|
thread.join
|
||||||
|
|
||||||
|
assert_equal :done, thread.value
|
||||||
|
assert finished, "Scheduler thread's task should be finished!"
|
||||||
|
end
|
||||||
|
|
||||||
def test_thread_join_blocking
|
def test_thread_join_blocking
|
||||||
thread = Thread.new do
|
thread = Thread.new do
|
||||||
scheduler = Scheduler.new
|
scheduler = Scheduler.new
|
||||||
|
@ -66,4 +91,18 @@ class TestFiberThread < Test::Unit::TestCase
|
||||||
thread.join
|
thread.join
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_thread_join_hang
|
||||||
|
thread = Thread.new do
|
||||||
|
scheduler = SleepingUnblockScheduler.new
|
||||||
|
|
||||||
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
|
Fiber.schedule do
|
||||||
|
Thread.new{sleep(0.01)}.value
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
63
thread.c
63
thread.c
|
@ -632,6 +632,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
|
||||||
{
|
{
|
||||||
rb_thread_t *th = th_ptr;
|
rb_thread_t *th = th_ptr;
|
||||||
th->status = THREAD_KILLED;
|
th->status = THREAD_KILLED;
|
||||||
|
|
||||||
// The thread stack doesn't exist in the forked process:
|
// The thread stack doesn't exist in the forked process:
|
||||||
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
|
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
|
||||||
|
|
||||||
|
@ -688,7 +689,7 @@ rb_vm_proc_local_ep(VALUE proc)
|
||||||
VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
|
VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
|
||||||
int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);
|
int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);
|
||||||
|
|
||||||
static void
|
static VALUE
|
||||||
thread_do_start_proc(rb_thread_t *th)
|
thread_do_start_proc(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
VALUE args = th->invoke_arg.proc.args;
|
VALUE args = th->invoke_arg.proc.args;
|
||||||
|
@ -702,7 +703,6 @@ thread_do_start_proc(rb_thread_t *th)
|
||||||
th->ec->root_lep = rb_vm_proc_local_ep(procval);
|
th->ec->root_lep = rb_vm_proc_local_ep(procval);
|
||||||
th->ec->root_svar = Qfalse;
|
th->ec->root_svar = Qfalse;
|
||||||
|
|
||||||
EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
|
|
||||||
vm_check_ints_blocking(th->ec);
|
vm_check_ints_blocking(th->ec);
|
||||||
|
|
||||||
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
||||||
|
@ -713,11 +713,12 @@ thread_do_start_proc(rb_thread_t *th)
|
||||||
rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
|
rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
|
||||||
vm_check_ints_blocking(th->ec);
|
vm_check_ints_blocking(th->ec);
|
||||||
|
|
||||||
// kick thread
|
return rb_vm_invoke_proc_with_self(
|
||||||
th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self,
|
th->ec, proc, self,
|
||||||
args_len, args_ptr,
|
args_len, args_ptr,
|
||||||
th->invoke_arg.proc.kw_splat,
|
th->invoke_arg.proc.kw_splat,
|
||||||
VM_BLOCK_HANDLER_NONE);
|
VM_BLOCK_HANDLER_NONE
|
||||||
|
);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
args_len = RARRAY_LENINT(args);
|
args_len = RARRAY_LENINT(args);
|
||||||
|
@ -733,17 +734,12 @@ thread_do_start_proc(rb_thread_t *th)
|
||||||
|
|
||||||
vm_check_ints_blocking(th->ec);
|
vm_check_ints_blocking(th->ec);
|
||||||
|
|
||||||
// kick thread
|
return rb_vm_invoke_proc(
|
||||||
th->value = rb_vm_invoke_proc(th->ec, proc,
|
th->ec, proc,
|
||||||
args_len, args_ptr,
|
args_len, args_ptr,
|
||||||
th->invoke_arg.proc.kw_splat,
|
th->invoke_arg.proc.kw_splat,
|
||||||
VM_BLOCK_HANDLER_NONE);
|
VM_BLOCK_HANDLER_NONE
|
||||||
}
|
);
|
||||||
|
|
||||||
EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
|
|
||||||
|
|
||||||
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
|
||||||
rb_ractor_atexit(th->ec, th->value);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -751,20 +747,33 @@ static void
|
||||||
thread_do_start(rb_thread_t *th)
|
thread_do_start(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
native_set_thread_name(th);
|
native_set_thread_name(th);
|
||||||
|
VALUE result = Qundef;
|
||||||
|
|
||||||
|
EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
|
||||||
|
|
||||||
switch (th->invoke_type) {
|
switch (th->invoke_type) {
|
||||||
case thread_invoke_type_proc:
|
case thread_invoke_type_proc:
|
||||||
|
result = thread_do_start_proc(th);
|
||||||
|
break;
|
||||||
|
|
||||||
case thread_invoke_type_ractor_proc:
|
case thread_invoke_type_ractor_proc:
|
||||||
thread_do_start_proc(th);
|
result = thread_do_start_proc(th);
|
||||||
|
rb_ractor_atexit(th->ec, result);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case thread_invoke_type_func:
|
case thread_invoke_type_func:
|
||||||
th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
|
result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case thread_invoke_type_none:
|
case thread_invoke_type_none:
|
||||||
rb_bug("unreachable");
|
rb_bug("unreachable");
|
||||||
}
|
}
|
||||||
|
|
||||||
rb_fiber_scheduler_set(Qnil);
|
rb_fiber_scheduler_set(Qnil);
|
||||||
|
|
||||||
|
th->value = result;
|
||||||
|
|
||||||
|
EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
|
||||||
}
|
}
|
||||||
|
|
||||||
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
|
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
|
||||||
|
@ -817,6 +826,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
|
||||||
|
|
||||||
thread_debug("thread start (get lock): %p\n", (void *)th);
|
thread_debug("thread start (get lock): %p\n", (void *)th);
|
||||||
|
|
||||||
|
// Ensure that we are not joinable.
|
||||||
|
VM_ASSERT(th->value == Qundef);
|
||||||
|
|
||||||
EC_PUSH_TAG(th->ec);
|
EC_PUSH_TAG(th->ec);
|
||||||
|
|
||||||
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
||||||
|
@ -857,6 +869,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
|
||||||
th->value = Qnil;
|
th->value = Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The thread is effectively finished and can be joined.
|
||||||
|
VM_ASSERT(th->value != Qundef);
|
||||||
|
|
||||||
|
rb_threadptr_join_list_wakeup(th);
|
||||||
|
rb_threadptr_unlock_all_locking_mutexes(th);
|
||||||
|
|
||||||
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
if (th->invoke_type == thread_invoke_type_ractor_proc) {
|
||||||
rb_thread_terminate_all(th);
|
rb_thread_terminate_all(th);
|
||||||
rb_ractor_teardown(th->ec);
|
rb_ractor_teardown(th->ec);
|
||||||
|
@ -874,9 +892,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
|
||||||
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
|
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
rb_threadptr_join_list_wakeup(th);
|
|
||||||
rb_threadptr_unlock_all_locking_mutexes(th);
|
|
||||||
|
|
||||||
EC_POP_TAG();
|
EC_POP_TAG();
|
||||||
|
|
||||||
rb_ec_clear_current_thread_trace_func(th->ec);
|
rb_ec_clear_current_thread_trace_func(th->ec);
|
||||||
|
@ -1153,6 +1168,12 @@ remove_from_join_list(VALUE arg)
|
||||||
|
|
||||||
static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
|
static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
|
||||||
|
|
||||||
|
static int
|
||||||
|
thread_finished(rb_thread_t *th)
|
||||||
|
{
|
||||||
|
return th->status == THREAD_KILLED || th->value != Qundef;
|
||||||
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
thread_join_sleep(VALUE arg)
|
thread_join_sleep(VALUE arg)
|
||||||
{
|
{
|
||||||
|
@ -1179,7 +1200,7 @@ thread_join_sleep(VALUE arg)
|
||||||
end = rb_hrtime_add(*limit, rb_hrtime_now());
|
end = rb_hrtime_add(*limit, rb_hrtime_now());
|
||||||
}
|
}
|
||||||
|
|
||||||
while (target_th->status != THREAD_KILLED) {
|
while (!thread_finished(target_th)) {
|
||||||
VALUE scheduler = rb_fiber_scheduler_current();
|
VALUE scheduler = rb_fiber_scheduler_current();
|
||||||
|
|
||||||
if (scheduler != Qnil) {
|
if (scheduler != Qnil) {
|
||||||
|
@ -3319,7 +3340,7 @@ rb_thread_status(VALUE thread)
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_thread_alive_p(VALUE thread)
|
rb_thread_alive_p(VALUE thread)
|
||||||
{
|
{
|
||||||
if (rb_threadptr_dead(rb_thread_ptr(thread))) {
|
if (thread_finished(rb_thread_ptr(thread))) {
|
||||||
return Qfalse;
|
return Qfalse;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
17
vm.c
17
vm.c
|
@ -3075,6 +3075,8 @@ th_init(rb_thread_t *th, VALUE self)
|
||||||
th->thread_id_string[0] = '\0';
|
th->thread_id_string[0] = '\0';
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
th->value = Qundef;
|
||||||
|
|
||||||
#if OPT_CALL_THREADED_CODE
|
#if OPT_CALL_THREADED_CODE
|
||||||
th->retval = Qundef;
|
th->retval = Qundef;
|
||||||
#endif
|
#endif
|
||||||
|
@ -3087,16 +3089,17 @@ static VALUE
|
||||||
ruby_thread_init(VALUE self)
|
ruby_thread_init(VALUE self)
|
||||||
{
|
{
|
||||||
rb_thread_t *th = GET_THREAD();
|
rb_thread_t *th = GET_THREAD();
|
||||||
rb_thread_t *targe_th = rb_thread_ptr(self);
|
rb_thread_t *target_th = rb_thread_ptr(self);
|
||||||
rb_vm_t *vm = th->vm;
|
rb_vm_t *vm = th->vm;
|
||||||
|
|
||||||
targe_th->vm = vm;
|
target_th->vm = vm;
|
||||||
th_init(targe_th, self);
|
th_init(target_th, self);
|
||||||
|
|
||||||
|
target_th->top_wrapper = 0;
|
||||||
|
target_th->top_self = rb_vm_top_self();
|
||||||
|
target_th->ec->root_svar = Qfalse;
|
||||||
|
target_th->ractor = th->ractor;
|
||||||
|
|
||||||
targe_th->top_wrapper = 0;
|
|
||||||
targe_th->top_self = rb_vm_top_self();
|
|
||||||
targe_th->ec->root_svar = Qfalse;
|
|
||||||
targe_th->ractor = th->ractor;
|
|
||||||
return self;
|
return self;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue