1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Make Thread#join non-blocking.

This commit is contained in:
Samuel Williams 2020-09-21 09:54:08 +12:00
parent 596173155a
commit 70f08f1eed
Notes: git 2020-09-21 08:49:08 +09:00
7 changed files with 168 additions and 127 deletions

View file

@ -19,7 +19,7 @@ VALUE rb_scheduler_close(VALUE scheduler);
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration); VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv); VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker); VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout);
VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber); VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber);
VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout); VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout);

View file

@ -61,9 +61,9 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv); return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
} }
VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker) VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{ {
return rb_funcall(scheduler, id_block, 1, blocker); return rb_funcall(scheduler, id_block, 2, blocker, timeout);
} }
VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)

View file

@ -19,8 +19,13 @@ describe "Thread#join" do
t.join(0).should equal(t) t.join(0).should equal(t)
t.join(0.0).should equal(t) t.join(0.0).should equal(t)
t.join(nil).should equal(t) t.join(nil).should equal(t)
end
it "raises TypeError if the argument is not a valid timeout" do
t = Thread.new {sleep}
-> { t.join(:foo) }.should raise_error TypeError -> { t.join(:foo) }.should raise_error TypeError
-> { t.join("bar") }.should raise_error TypeError -> { t.join("bar") }.should raise_error TypeError
t.kill
end end
it "returns nil if it is not finished when given a timeout" do it "returns nil if it is not finished when given a timeout" do

View file

@ -22,7 +22,7 @@ class Scheduler
@closed = false @closed = false
@lock = Mutex.new @lock = Mutex.new
@locking = 0 @blocking = 0
@ready = [] @ready = []
end end
@ -47,7 +47,7 @@ class Scheduler
def run def run
@urgent = IO.pipe @urgent = IO.pipe
while @readable.any? or @writable.any? or @waiting.any? or @locking.positive? while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive?
# Can only handle file descriptors up to 1024... # Can only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
@ -142,12 +142,22 @@ class Scheduler
end end
# Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, ...) # Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, ...)
def block(blocker) def block(blocker, timeout = nil)
# p [__method__, blocker] # p [__method__, blocker, timeout]
@locking += 1 @blocking += 1
if timeout
@waiting[Fiber.current] = current_time + timeout
end
Fiber.yield Fiber.yield
ensure ensure
@locking -= 1 @blocking -= 1
# Remove from @waiting in the case #unblock was called before the timeout expired:
if timeout
@waiting.delete(Fiber.current)
end
end end
# Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, ...). # Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, ...).

245
thread.c
View file

@ -544,6 +544,32 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
} }
} }
static void
rb_threadptr_join_list_wakeup(rb_thread_t *thread)
{
struct rb_waiting_list *join_list = thread->join_list;
while (join_list) {
rb_thread_t *target_thread = join_list->thread;
if (target_thread->scheduler != Qnil) {
rb_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
} else {
rb_threadptr_interrupt(target_thread);
switch (target_thread->status) {
case THREAD_STOPPED:
case THREAD_STOPPED_FOREVER:
target_thread->status = THREAD_RUNNABLE;
default:
break;
}
}
join_list = join_list->next;
}
}
void void
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
{ {
@ -758,7 +784,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
{ {
STACK_GROW_DIR_DETECTION; STACK_GROW_DIR_DETECTION;
enum ruby_tag_type state; enum ruby_tag_type state;
rb_thread_list_t *join_list;
VALUE errinfo = Qnil; VALUE errinfo = Qnil;
size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
rb_thread_t *ractor_main_th = th->ractor->threads.main; rb_thread_t *ractor_main_th = th->ractor->threads.main;
@ -860,20 +885,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
rb_threadptr_interrupt(ractor_main_th); rb_threadptr_interrupt(ractor_main_th);
} }
/* wake up joining threads */ rb_threadptr_join_list_wakeup(th);
join_list = th->join_list; rb_threadptr_unlock_all_locking_mutexes(th);
while (join_list) { rb_check_deadlock(th->ractor);
rb_threadptr_interrupt(join_list->th);
switch (join_list->th->status) {
case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
join_list->th->status = THREAD_RUNNABLE;
default: break;
}
join_list = join_list->next;
}
rb_threadptr_unlock_all_locking_mutexes(th);
rb_check_deadlock(th->ractor);
rb_fiber_close(th->ec->fiber_ptr); rb_fiber_close(th->ec->fiber_ptr);
} }
@ -1105,129 +1119,152 @@ rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
struct join_arg { struct join_arg {
rb_thread_t *target, *waiting; struct rb_waiting_list *waiting_list;
rb_hrtime_t *limit; rb_thread_t *target;
VALUE timeout;
}; };
static VALUE static VALUE
remove_from_join_list(VALUE arg) remove_from_join_list(VALUE arg)
{ {
struct join_arg *p = (struct join_arg *)arg; struct join_arg *p = (struct join_arg *)arg;
rb_thread_t *target_th = p->target, *th = p->waiting; rb_thread_t *target_thread = p->target;
if (target_th->status != THREAD_KILLED) { if (target_thread->status != THREAD_KILLED) {
rb_thread_list_t **p = &target_th->join_list; struct rb_waiting_list **join_list = &target_thread->join_list;
while (*p) { while (*join_list) {
if ((*p)->th == th) { if (*join_list == p->waiting_list) {
*p = (*p)->next; *join_list = (*join_list)->next;
break; break;
} }
p = &(*p)->next;
} join_list = &(*join_list)->next;
}
} }
return Qnil; return Qnil;
} }
static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
static VALUE static VALUE
thread_join_sleep(VALUE arg) thread_join_sleep(VALUE arg)
{ {
struct join_arg *p = (struct join_arg *)arg; struct join_arg *p = (struct join_arg *)arg;
rb_thread_t *target_th = p->target, *th = p->waiting; rb_thread_t *target_th = p->target, *th = p->waiting_list->thread;
rb_hrtime_t end = 0; rb_hrtime_t end = 0, rel = 0, *limit = 0;
if (p->limit) { /*
end = rb_hrtime_add(*p->limit, rb_hrtime_now()); * This supports INFINITY and negative values, so we can't use
* rb_time_interval right now...
*/
if (p->timeout == Qnil) {
/* unlimited */
}
else if (FIXNUM_P(p->timeout)) {
rel = rb_sec2hrtime(NUM2TIMET(p->timeout));
limit = &rel;
}
else {
limit = double2hrtime(&rel, rb_num2dbl(p->timeout));
}
if (limit) {
end = rb_hrtime_add(*limit, rb_hrtime_now());
} }
while (target_th->status != THREAD_KILLED) { while (target_th->status != THREAD_KILLED) {
if (!p->limit) { if (th->scheduler != Qnil) {
th->status = THREAD_STOPPED_FOREVER; rb_scheduler_block(th->scheduler, target_th->self, p->timeout);
} else if (!limit) {
th->status = THREAD_STOPPED_FOREVER;
rb_ractor_sleeper_threads_inc(th->ractor); rb_ractor_sleeper_threads_inc(th->ractor);
rb_check_deadlock(th->ractor); rb_check_deadlock(th->ractor);
native_sleep(th, 0); native_sleep(th, 0);
rb_ractor_sleeper_threads_dec(th->ractor); rb_ractor_sleeper_threads_dec(th->ractor);
} }
else { else {
if (hrtime_update_expire(p->limit, end)) { if (hrtime_update_expire(limit, end)) {
thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
thread_id_str(target_th)); thread_id_str(target_th));
return Qfalse; return Qfalse;
} }
th->status = THREAD_STOPPED; th->status = THREAD_STOPPED;
native_sleep(th, p->limit); native_sleep(th, limit);
} }
RUBY_VM_CHECK_INTS_BLOCKING(th->ec); RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
th->status = THREAD_RUNNABLE; th->status = THREAD_RUNNABLE;
thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
thread_id_str(target_th), thread_status_name(target_th, TRUE)); thread_id_str(target_th), thread_status_name(target_th, TRUE));
} }
return Qtrue; return Qtrue;
} }
static VALUE static VALUE
thread_join(rb_thread_t *target_th, rb_hrtime_t *rel) thread_join(rb_thread_t *target_th, VALUE timeout)
{ {
rb_thread_t *th = GET_THREAD(); rb_execution_context_t *ec = GET_EC();
struct join_arg arg; rb_thread_t *th = ec->thread_ptr;
rb_fiber_t *fiber = ec->fiber_ptr;
if (th == target_th) { if (th == target_th) {
rb_raise(rb_eThreadError, "Target thread must not be current thread"); rb_raise(rb_eThreadError, "Target thread must not be current thread");
}
if (th->ractor->threads.main == target_th) {
rb_raise(rb_eThreadError, "Target thread must not be main thread");
} }
arg.target = target_th; if (th->ractor->threads.main == target_th) {
arg.waiting = th; rb_raise(rb_eThreadError, "Target thread must not be main thread");
arg.limit = rel; }
thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n", thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n",
thread_id_str(target_th), thread_status_name(target_th, TRUE)); thread_id_str(target_th), thread_status_name(target_th, TRUE));
if (target_th->status != THREAD_KILLED) { if (target_th->status != THREAD_KILLED) {
rb_thread_list_t list; struct rb_waiting_list waiting_list;
list.next = target_th->join_list; waiting_list.next = target_th->join_list;
list.th = th; waiting_list.thread = th;
target_th->join_list = &list; waiting_list.fiber = fiber;
if (!rb_ensure(thread_join_sleep, (VALUE)&arg, target_th->join_list = &waiting_list;
remove_from_join_list, (VALUE)&arg)) {
return Qnil; struct join_arg arg;
} arg.waiting_list = &waiting_list;
arg.target = target_th;
arg.timeout = timeout;
if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) {
return Qnil;
}
} }
thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n", thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n",
thread_id_str(target_th), thread_status_name(target_th, TRUE)); thread_id_str(target_th), thread_status_name(target_th, TRUE));
if (target_th->ec->errinfo != Qnil) { if (target_th->ec->errinfo != Qnil) {
VALUE err = target_th->ec->errinfo; VALUE err = target_th->ec->errinfo;
if (FIXNUM_P(err)) { if (FIXNUM_P(err)) {
switch (err) { switch (err) {
case INT2FIX(TAG_FATAL): case INT2FIX(TAG_FATAL):
thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n", thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n",
thread_id_str(target_th), thread_status_name(target_th, TRUE)); thread_id_str(target_th), thread_status_name(target_th, TRUE));
/* OK. killed. */ /* OK. killed. */
break; break;
default: default:
rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
} }
} }
else if (THROW_DATA_P(target_th->ec->errinfo)) { else if (THROW_DATA_P(target_th->ec->errinfo)) {
rb_bug("thread_join: THROW_DATA should not reach here."); rb_bug("thread_join: THROW_DATA should not reach here.");
} }
else { else {
/* normal exception */ /* normal exception */
rb_exc_raise(err); rb_exc_raise(err);
} }
} }
return target_th->self; return target_th->self;
} }
static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
/* /*
* call-seq: * call-seq:
* thr.join -> thr * thr.join -> thr
@ -1270,25 +1307,13 @@ static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
static VALUE static VALUE
thread_join_m(int argc, VALUE *argv, VALUE self) thread_join_m(int argc, VALUE *argv, VALUE self)
{ {
VALUE limit; VALUE timeout = Qnil;
rb_hrtime_t rel, *to = 0;
/* if (rb_check_arity(argc, 0, 1)) {
* This supports INFINITY and negative values, so we can't use timeout = argv[0];
* rb_time_interval right now...
*/
if (!rb_check_arity(argc, 0, 1) || NIL_P(argv[0])) {
/* unlimited */
}
else if (FIXNUM_P(limit = argv[0])) {
rel = rb_sec2hrtime(NUM2TIMET(limit));
to = &rel;
}
else {
to = double2hrtime(&rel, rb_num2dbl(limit));
} }
return thread_join(rb_thread_ptr(self), to); return thread_join(rb_thread_ptr(self), timeout);
} }
/* /*
@ -1309,7 +1334,7 @@ static VALUE
thread_value(VALUE self) thread_value(VALUE self)
{ {
rb_thread_t *th = rb_thread_ptr(self); rb_thread_t *th = rb_thread_ptr(self);
thread_join(th, 0); thread_join(th, Qnil);
return th->value; return th->value;
} }
@ -1486,7 +1511,7 @@ rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker)
{ {
VALUE scheduler = rb_thread_current_scheduler(); VALUE scheduler = rb_thread_current_scheduler();
if (scheduler != Qnil) { if (scheduler != Qnil) {
rb_scheduler_block(scheduler, blocker); rb_scheduler_block(scheduler, blocker, Qnil);
} else { } else {
thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
@ -5559,9 +5584,9 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
} }
{ {
rb_thread_list_t *list = th->join_list; struct rb_waiting_list *list = th->join_list;
while (list) { while (list) {
rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th); rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread);
list = list->next; list = list->next;
} }
} }

View file

@ -246,7 +246,7 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
} }
static VALUE call_rb_scheduler_block(VALUE mutex) { static VALUE call_rb_scheduler_block(VALUE mutex) {
return rb_scheduler_block(rb_thread_current_scheduler(), mutex); return rb_scheduler_block(rb_thread_current_scheduler(), mutex, Qnil);
} }
static VALUE remove_from_mutex_lock_waiters(VALUE arg) { static VALUE remove_from_mutex_lock_waiters(VALUE arg) {

View file

@ -812,11 +812,6 @@ struct rb_unblock_callback {
struct rb_mutex_struct; struct rb_mutex_struct;
typedef struct rb_thread_list_struct{
struct rb_thread_list_struct *next;
struct rb_thread_struct *th;
} rb_thread_list_t;
typedef struct rb_ensure_entry { typedef struct rb_ensure_entry {
VALUE marker; VALUE marker;
VALUE (*e_proc)(VALUE); VALUE (*e_proc)(VALUE);
@ -832,6 +827,12 @@ typedef char rb_thread_id_string_t[sizeof(rb_nativethread_id_t) * 2 + 3];
typedef struct rb_fiber_struct rb_fiber_t; typedef struct rb_fiber_struct rb_fiber_t;
struct rb_waiting_list {
struct rb_waiting_list *next;
struct rb_thread_struct *thread;
struct rb_fiber_struct *fiber;
};
struct rb_execution_context_struct { struct rb_execution_context_struct {
/* execution information */ /* execution information */
VALUE *vm_stack; /* must free, must mark */ VALUE *vm_stack; /* must free, must mark */
@ -958,7 +959,7 @@ typedef struct rb_thread_struct {
VALUE locking_mutex; VALUE locking_mutex;
struct rb_mutex_struct *keeping_mutexes; struct rb_mutex_struct *keeping_mutexes;
rb_thread_list_t *join_list; struct rb_waiting_list *join_list;
union { union {
struct { struct {