mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Avoid missed wakeup with fiber scheduler and Fiber.blocking. (#6588)
* Ensure that blocked fibers don't prevent valid wakeups.
This commit is contained in:
parent
d9d9005a3a
commit
7f175e5648
Notes:
git
2022-10-20 00:39:13 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
4 changed files with 72 additions and 10 deletions
|
@ -350,3 +350,14 @@ class SleepingUnblockScheduler < Scheduler
|
|||
sleep(0.1)
|
||||
end
|
||||
end
|
||||
|
||||
class SleepingBlockingScheduler < Scheduler
|
||||
def kernel_sleep(duration = nil)
|
||||
# Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
|
||||
Fiber.blocking{sleep 0.0001}
|
||||
|
||||
self.block(:sleep, duration)
|
||||
|
||||
return true
|
||||
end
|
||||
end
|
||||
|
|
|
@ -138,4 +138,46 @@ class TestFiberScheduler < Test::Unit::TestCase
|
|||
Object.send(:remove_const, :TestFiberSchedulerAutoload)
|
||||
end
|
||||
end
|
||||
|
||||
def test_deadlock
|
||||
mutex = Thread::Mutex.new
|
||||
condition = Thread::ConditionVariable.new
|
||||
q = 0.0001
|
||||
|
||||
signaller = Thread.new do
|
||||
loop do
|
||||
mutex.synchronize do
|
||||
condition.signal
|
||||
end
|
||||
sleep q
|
||||
end
|
||||
end
|
||||
|
||||
i = 0
|
||||
|
||||
thread = Thread.new do
|
||||
scheduler = SleepingBlockingScheduler.new
|
||||
Fiber.set_scheduler scheduler
|
||||
|
||||
Fiber.schedule do
|
||||
10.times do
|
||||
mutex.synchronize do
|
||||
condition.wait(mutex)
|
||||
sleep q
|
||||
i += 1
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Wait for 10 seconds at most... if it doesn't finish, it's deadlocked.
|
||||
thread.join(10)
|
||||
|
||||
# If it's deadlocked, it will never finish, so this will be 0.
|
||||
assert_equal 10, i
|
||||
ensure
|
||||
# Make sure the threads are dead...
|
||||
thread.kill
|
||||
signaller.kill
|
||||
end
|
||||
end
|
||||
|
|
4
thread.c
4
thread.c
|
@ -404,7 +404,7 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread)
|
|||
|
||||
rb_thread_t *target_thread = join_list->thread;
|
||||
|
||||
if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
|
||||
if (target_thread->scheduler != Qnil && join_list->fiber) {
|
||||
rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
|
||||
}
|
||||
else {
|
||||
|
@ -1091,7 +1091,7 @@ thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
|
|||
struct rb_waiting_list waiter;
|
||||
waiter.next = target_th->join_list;
|
||||
waiter.thread = th;
|
||||
waiter.fiber = fiber;
|
||||
waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber;
|
||||
target_th->join_list = &waiter;
|
||||
|
||||
struct join_arg arg;
|
||||
|
|
|
@ -20,6 +20,16 @@ struct sync_waiter {
|
|||
struct ccan_list_node node;
|
||||
};
|
||||
|
||||
static inline rb_fiber_t*
|
||||
nonblocking_fiber(rb_fiber_t *fiber)
|
||||
{
|
||||
if (rb_fiberptr_blocking(fiber)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return fiber;
|
||||
}
|
||||
|
||||
struct queue_sleep_arg {
|
||||
VALUE self;
|
||||
VALUE timeout;
|
||||
|
@ -37,8 +47,7 @@ sync_wakeup(struct ccan_list_head *head, long max)
|
|||
ccan_list_del_init(&cur->node);
|
||||
|
||||
if (cur->th->status != THREAD_KILLED) {
|
||||
|
||||
if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
|
||||
if (cur->th->scheduler != Qnil && cur->fiber) {
|
||||
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
|
||||
}
|
||||
else {
|
||||
|
@ -306,7 +315,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
|
|||
struct sync_waiter sync_waiter = {
|
||||
.self = self,
|
||||
.th = th,
|
||||
.fiber = fiber
|
||||
.fiber = nonblocking_fiber(fiber)
|
||||
};
|
||||
|
||||
ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
|
||||
|
@ -339,7 +348,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
|
|||
struct sync_waiter sync_waiter = {
|
||||
.self = self,
|
||||
.th = th,
|
||||
.fiber = fiber
|
||||
.fiber = nonblocking_fiber(fiber)
|
||||
};
|
||||
|
||||
ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
|
||||
|
@ -437,7 +446,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
|
|||
ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
|
||||
ccan_list_del_init(&cur->node);
|
||||
|
||||
if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
|
||||
if (cur->th->scheduler != Qnil && cur->fiber) {
|
||||
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
|
||||
goto found;
|
||||
}
|
||||
|
@ -1051,7 +1060,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
|
|||
assert(queue_closed_p(self) == 0);
|
||||
|
||||
struct queue_waiter queue_waiter = {
|
||||
.w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
|
||||
.w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
|
||||
.as = {.q = q}
|
||||
};
|
||||
|
||||
|
@ -1258,7 +1267,7 @@ rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_
|
|||
else {
|
||||
rb_execution_context_t *ec = GET_EC();
|
||||
struct queue_waiter queue_waiter = {
|
||||
.w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
|
||||
.w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
|
||||
.as = {.sq = sq}
|
||||
};
|
||||
|
||||
|
@ -1491,7 +1500,7 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self)
|
|||
struct sync_waiter sync_waiter = {
|
||||
.self = args.mutex,
|
||||
.th = ec->thread_ptr,
|
||||
.fiber = ec->fiber_ptr
|
||||
.fiber = nonblocking_fiber(ec->fiber_ptr)
|
||||
};
|
||||
|
||||
ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
|
||||
|
|
Loading…
Reference in a new issue