1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Add support for ConditionVariable.

This commit is contained in:
Samuel Williams 2020-09-11 20:47:25 +12:00
parent 1a0cfe2839
commit 0f613cc5f1
Notes: git 2020-09-14 13:44:36 +09:00
3 changed files with 84 additions and 32 deletions

View file

@ -97,7 +97,9 @@ class Scheduler
end
def kernel_sleep(duration = nil)
@waiting[Fiber.current] = current_time + duration
if duration
@waiting[Fiber.current] = current_time + duration
end
Fiber.yield

View file

@ -47,6 +47,43 @@ class TestFiberMutex < Test::Unit::TestCase
thread.join
end
def test_condition_variable
mutex = Mutex.new
condition = ConditionVariable.new
signalled = 0
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.schedule do
mutex.synchronize do
3.times do
condition.wait(mutex)
signalled += 1
end
end
end
Fiber.schedule do
3.times do
mutex.synchronize do
condition.signal
end
sleep 0.1
end
end
scheduler.run
end
thread.join
assert signalled > 1
end
def test_mutex_deadlock
err = /No live threads left. Deadlock\?/
assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false

View file

@ -4,8 +4,16 @@
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
/* Mutex */
typedef struct rb_mutex_struct {
rb_fiber_t *fiber;
struct rb_mutex_struct *next_mutex;
struct list_head waitq; /* protected by GVL */
} rb_mutex_t;
/* sync_waiter is always on-stack */
struct sync_waiter {
VALUE self;
rb_thread_t *th;
rb_fiber_t *fiber;
struct list_node node;
@ -19,12 +27,17 @@ sync_wakeup(struct list_head *head, long max)
struct sync_waiter *cur = 0, *next;
list_for_each_safe(head, cur, next, node) {
list_del_init(&cur->node);
if (cur->th->status != THREAD_KILLED) {
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
if (--max == 0) return;
}
list_del_init(&cur->node);
if (cur->th->scheduler != Qnil) {
rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
if (cur->th->status != THREAD_KILLED) {
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
if (--max == 0) return;
}
}
}
@ -40,16 +53,6 @@ wakeup_all(struct list_head *head)
sync_wakeup(head, LONG_MAX);
}
/* Mutex */
typedef struct rb_mutex_struct {
VALUE self;
rb_fiber_t *fiber;
struct rb_mutex_struct *next_mutex;
struct list_head waitq; /* protected by GVL */
} rb_mutex_t;
#if defined(HAVE_WORKING_FORK)
static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
@ -151,7 +154,6 @@ mutex_alloc(VALUE klass)
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
mutex->self = obj;
list_head_init(&mutex->waitq);
return obj;
}
@ -247,8 +249,8 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
rb_thread_t *th = GET_THREAD();
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = ec->thread_ptr;
rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
@ -260,6 +262,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
if (rb_mutex_trylock(self) == Qfalse) {
struct sync_waiter w = {
.self = self,
.th = th,
.fiber = fiber
};
@ -398,7 +401,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
list_del_init(&cur->node);
if (cur->th->scheduler != Qnil) {
rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber));
rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
switch (cur->th->status) {
@ -498,7 +501,6 @@ rb_mutex_wait_for(VALUE time)
VALUE
rb_mutex_sleep(VALUE self, VALUE timeout)
{
time_t beg, end;
struct timeval t;
if (!NIL_P(timeout)) {
@ -506,18 +508,23 @@ rb_mutex_sleep(VALUE self, VALUE timeout)
}
rb_mutex_unlock(self);
beg = time(0);
if (NIL_P(timeout)) {
rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
}
else {
rb_hrtime_t rel = rb_timeval2hrtime(&t);
time_t beg = time(0);
rb_ensure(rb_mutex_wait_for, (VALUE)&rel,
mutex_lock_uninterruptible, self);
VALUE scheduler = rb_thread_current_scheduler();
if (scheduler != Qnil) {
rb_scheduler_kernel_sleep(scheduler, timeout);
mutex_lock_uninterruptible(self);
} else {
if (NIL_P(timeout)) {
rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
} else {
rb_hrtime_t rel = rb_timeval2hrtime(&t);
rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
}
}
RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
end = time(0) - beg;
time_t end = time(0) - beg;
return INT2FIX(end);
}
@ -1429,13 +1436,19 @@ delete_from_waitq(VALUE v)
static VALUE
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
{
rb_execution_context_t *ec = GET_EC();
struct rb_condvar *cv = condvar_ptr(self);
struct sleep_call args;
struct sync_waiter w;
rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
w.th = GET_THREAD();
struct sync_waiter w = {
.self = args.mutex,
.th = ec->thread_ptr,
.fiber = ec->fiber_ptr,
};
list_add_tail(&cv->waitq, &w.node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);