1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Make Mutex per-Fiber instead of per-Thread

* Enables Mutex to be used as synchronization between multiple Fibers
  of the same Thread.
* With a Fiber scheduler we can yield to another Fiber on contended
  Mutex#lock instead of blocking the entire thread.
* This also makes the behavior of Mutex consistent across CRuby, JRuby and TruffleRuby.
* [Feature #16792]
This commit is contained in:
Benoit Daloze 2020-09-05 16:26:24 +12:00 committed by Samuel Williams
parent 9e0a48c7a3
commit 178c1b0922
Notes: git 2020-09-14 13:44:37 +09:00
9 changed files with 199 additions and 54 deletions

11
cont.c
View file

@ -851,6 +851,12 @@ NOINLINE(static VALUE cont_capture(volatile int *volatile stat));
if (!(th)->ec->tag) rb_raise(rb_eThreadError, "not running thread"); \
} while (0)
rb_thread_t*
rb_fiber_threadptr(const rb_fiber_t *fiber)
{
return fiber->cont.saved_ec.thread_ptr;
}
static VALUE
cont_thread_value(const rb_context_t *cont)
{
@ -1146,6 +1152,11 @@ cont_new(VALUE klass)
return cont;
}
VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber)
{
return fiber->cont.self;
}
void
rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber)
{

View file

@ -20,4 +20,6 @@ void rb_fiber_reset_root_local_storage(struct rb_thread_struct *);
void ruby_register_rollback_func_for_ensure(VALUE (*ensure_func)(VALUE), VALUE (*rollback_func)(VALUE));
void rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber);
VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber);
#endif /* INTERNAL_CONT_H */

View file

@ -17,6 +17,9 @@ VALUE rb_scheduler_timeout(struct timeval *timeout);
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex);
VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber);
VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout);
VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io);
VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io);

View file

@ -12,6 +12,8 @@
#include "ruby/io.h"
static ID id_kernel_sleep;
static ID id_mutex_lock;
static ID id_mutex_unlock;
static ID id_io_read;
static ID id_io_write;
static ID id_io_wait;
@ -20,6 +22,8 @@ void
Init_Scheduler(void)
{
id_kernel_sleep = rb_intern_const("kernel_sleep");
id_mutex_lock = rb_intern_const("mutex_lock");
id_mutex_unlock = rb_intern_const("mutex_unlock");
id_io_read = rb_intern_const("io_read");
id_io_write = rb_intern_const("io_write");
id_io_wait = rb_intern_const("io_wait");
@ -44,6 +48,16 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
}
VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex)
{
return rb_funcall(scheduler, id_mutex_lock, 1, mutex);
}
VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber)
{
return rb_funcall(scheduler, id_mutex_unlock, 2, mutex, fiber);
}
VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);

View file

@ -40,4 +40,16 @@ describe "Mutex#owned?" do
m.owned?.should be_false
end
end
ruby_version_is "2.8" do
it "is held per Fiber" do
m = Mutex.new
m.lock
Fiber.new do
m.locked?.should == true
m.owned?.should == false
end.resume
end
end
end

View file

@ -14,6 +14,12 @@ class Scheduler
@readable = {}
@writable = {}
@waiting = {}
@urgent = nil
@lock = Mutex.new
@locking = 0
@ready = []
end
attr :readable
@ -35,9 +41,11 @@ class Scheduler
end
def run
while @readable.any? or @writable.any? or @waiting.any?
@urgent = IO.pipe
while @readable.any? or @writable.any? or @waiting.any? or @locking.positive?
# Can only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
@ -63,7 +71,24 @@ class Scheduler
end
end
end
if @ready.any?
# Clear out the urgent notification pipe.
@urgent.first.read_nonblock(1024)
ready = nil
@lock.synchronize do
ready, @ready = @ready, Array.new
end
ready.each do |fiber|
fiber.resume
end
end
end
ensure
@urgent.each(&:close)
end
def current_time
@ -95,6 +120,23 @@ class Scheduler
return true
end
def mutex_lock(mutex)
@locking += 1
Fiber.yield
ensure
@locking -= 1
end
def mutex_unlock(mutex, fiber)
@lock.synchronize do
@ready << fiber
if @urgent
@urgent.last.write('.')
end
end
end
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)

View file

@ -14,7 +14,7 @@ class TestFiberMutex < Test::Unit::TestCase
assert_equal Thread.scheduler, scheduler
mutex.synchronize do
assert_nil Thread.scheduler
assert Thread.scheduler
end
end
end
@ -22,7 +22,7 @@ class TestFiberMutex < Test::Unit::TestCase
thread.join
end
def test_mutex_deadlock
def test_mutex_interleaved_locking
mutex = Mutex.new
thread = Thread.new do
@ -30,18 +30,46 @@ class TestFiberMutex < Test::Unit::TestCase
Thread.current.scheduler = scheduler
Fiber.schedule do
assert_equal Thread.scheduler, scheduler
mutex.synchronize do
Fiber.yield
end
end
assert_raise ThreadError do
mutex.lock
sleep 0.1
mutex.unlock
end
Fiber.schedule do
mutex.lock
sleep 0.1
mutex.unlock
end
scheduler.run
end
thread.join
end
def test_mutex_deadlock
err = /No live threads left. Deadlock\?/
assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
require 'scheduler'
mutex = Mutex.new
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.schedule do
raise unless Thread.scheduler == scheduler
mutex.synchronize do
puts 'in synchronize'
Fiber.yield
end
end
mutex.lock
end
thread.join
RUBY
end
end

View file

@ -75,11 +75,13 @@
#include "hrtime.h"
#include "internal.h"
#include "internal/class.h"
#include "internal/cont.h"
#include "internal/error.h"
#include "internal/hash.h"
#include "internal/io.h"
#include "internal/object.h"
#include "internal/proc.h"
#include "internal/scheduler.h"
#include "internal/signal.h"
#include "internal/thread.h"
#include "internal/time.h"
@ -548,7 +550,7 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
/* rb_warn("mutex #<%p> remains to be locked by terminated thread",
(void *)mutexes); */
mutexes = mutex->next_mutex;
err = rb_mutex_unlock_th(mutex, th);
err = rb_mutex_unlock_th(mutex, th, mutex->fiber);
if (err) rb_bug("invalid keeping_mutexes: %s", err);
}
}
@ -5040,7 +5042,7 @@ rb_thread_shield_wait(VALUE self)
if (!mutex) return Qfalse;
m = mutex_ptr(mutex);
if (m->th == GET_THREAD()) return Qnil;
if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
rb_thread_shield_waiting_inc(self);
rb_mutex_lock(mutex);
rb_thread_shield_waiting_dec(self);
@ -5540,7 +5542,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
(void *)mutex->th, rb_mutex_num_waiting(mutex));
(void *)mutex->fiber, rb_mutex_num_waiting(mutex));
}
{
@ -5574,8 +5576,7 @@ rb_check_deadlock(rb_ractor_t *r)
}
else if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) {
if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !list_empty(&mutex->waitq))) {
found = 1;
}
}

View file

@ -7,6 +7,7 @@ static VALUE rb_eClosedQueueError;
/* sync_waiter is always on-stack */
struct sync_waiter {
rb_thread_t *th;
rb_fiber_t *fiber;
struct list_node node;
};
@ -42,7 +43,9 @@ wakeup_all(struct list_head *head)
/* Mutex */
typedef struct rb_mutex_struct {
rb_thread_t *th;
VALUE self;
rb_fiber_t *fiber;
struct rb_mutex_struct *next_mutex;
struct list_head waitq; /* protected by GVL */
} rb_mutex_t;
@ -52,7 +55,7 @@ static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
#endif
static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th);
static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
/*
* Document-class: Mutex
@ -93,13 +96,15 @@ rb_mutex_num_waiting(rb_mutex_t *mutex)
return n;
}
rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
static void
mutex_free(void *ptr)
{
rb_mutex_t *mutex = ptr;
if (mutex->th) {
if (mutex->fiber) {
/* rb_warn("free locked mutex"); */
const char *err = rb_mutex_unlock_th(mutex, mutex->th);
const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
if (err) rb_bug("%s", err);
}
ruby_xfree(ptr);
@ -145,6 +150,8 @@ mutex_alloc(VALUE klass)
rb_mutex_t *mutex;
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
mutex->self = obj;
list_head_init(&mutex->waitq);
return obj;
}
@ -178,7 +185,7 @@ rb_mutex_locked_p(VALUE self)
{
rb_mutex_t *mutex = mutex_ptr(self);
return mutex->th ? Qtrue : Qfalse;
return mutex->fiber ? Qtrue : Qfalse;
}
static void
@ -191,7 +198,7 @@ mutex_locked(rb_thread_t *th, VALUE self)
}
th->keeping_mutexes = mutex;
th->blocking += 1;
// th->blocking += 1;
}
/*
@ -207,9 +214,10 @@ rb_mutex_trylock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
VALUE locked = Qfalse;
if (mutex->th == 0) {
if (mutex->fiber == 0) {
rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_thread_t *th = GET_THREAD();
mutex->th = th;
mutex->fiber = fiber;
locked = Qtrue;
mutex_locked(th, self);
@ -226,9 +234,9 @@ rb_mutex_trylock(VALUE self)
static const rb_thread_t *patrol_thread = NULL;
static VALUE
mutex_owned_p(rb_thread_t *th, rb_mutex_t *mutex)
mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
{
if (mutex->th == th) {
if (mutex->fiber == fiber) {
return Qtrue;
}
else {
@ -240,6 +248,8 @@ static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
rb_thread_t *th = GET_THREAD();
rb_execution_context_t *ec = GET_EC();
rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
/* When running trap handler */
@ -249,15 +259,33 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
if (rb_mutex_trylock(self) == Qfalse) {
struct sync_waiter w;
struct sync_waiter w = {
.th = th,
.fiber = fiber
};
if (mutex->th == th) {
if (mutex->fiber == fiber) {
rb_raise(rb_eThreadError, "deadlock; recursive locking");
}
w.th = th;
VALUE scheduler = rb_thread_current_scheduler();
while (mutex->fiber != fiber) {
if (scheduler != Qnil) {
list_add_tail(&mutex->waitq, &w.node);
while (mutex->th != th) {
rb_scheduler_mutex_lock(scheduler, self);
list_del(&w.node);
if (!mutex->fiber) {
mutex->fiber = fiber;
break;
} else {
// Try again...
continue;
}
}
enum rb_thread_status prev_status = th->status;
rb_hrtime_t *timeout = 0;
rb_hrtime_t rel = rb_msec2hrtime(100);
@ -277,18 +305,20 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
list_add_tail(&mutex->waitq, &w.node);
native_sleep(th, timeout); /* release GVL */
native_sleep(th, timeout); /* release GVL */
list_del(&w.node);
if (!mutex->th) {
mutex->th = th;
if (!mutex->fiber) {
mutex->fiber = fiber;
}
if (patrol_thread == th)
patrol_thread = NULL;
th->locking_mutex = Qfalse;
if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
rb_check_deadlock(th->ractor);
}
if (th->status == THREAD_STOPPED_FOREVER) {
@ -299,22 +329,19 @@ do_mutex_lock(VALUE self, int interruptible_p)
if (interruptible_p) {
/* release mutex before checking for interrupts...as interrupt checking
* code might call rb_raise() */
if (mutex->th == th) mutex->th = 0;
if (mutex->fiber == fiber) mutex->fiber = 0;
RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
if (!mutex->th) {
mutex->th = th;
mutex_locked(th, self);
if (!mutex->fiber) {
mutex->fiber = fiber;
}
}
else {
if (mutex->th == th) mutex_locked(th, self);
}
}
if (mutex->fiber == fiber) mutex_locked(th, self);
}
// assertion
if (mutex_owned_p(th, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
return self;
}
@ -347,32 +374,37 @@ rb_mutex_lock(VALUE self)
VALUE
rb_mutex_owned_p(VALUE self)
{
rb_thread_t *th = GET_THREAD();
rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
return mutex_owned_p(th, mutex);
return mutex_owned_p(fiber, mutex);
}
static const char *
rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th)
rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
{
const char *err = NULL;
if (mutex->th == 0) {
if (mutex->fiber == 0) {
err = "Attempt to unlock a mutex which is not locked";
}
else if (mutex->th != th) {
err = "Attempt to unlock a mutex which is locked by another thread";
else if (mutex->fiber != fiber) {
err = "Attempt to unlock a mutex which is locked by another thread/fiber";
}
else {
struct sync_waiter *cur = 0, *next;
rb_mutex_t **th_mutex = &th->keeping_mutexes;
th->blocking -= 1;
// th->blocking -= 1;
mutex->th = 0;
mutex->fiber = 0;
list_for_each_safe(&mutex->waitq, cur, next, node) {
list_del_init(&cur->node);
if (cur->th->scheduler != Qnil) {
rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber));
}
switch (cur->th->status) {
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
@ -411,7 +443,7 @@ rb_mutex_unlock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
rb_thread_t *th = GET_THREAD();
err = rb_mutex_unlock_th(mutex, th);
err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
if (err) rb_raise(rb_eThreadError, "%s", err);
return self;
@ -444,7 +476,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
while (mutexes) {
mutex = mutexes;
mutexes = mutex->next_mutex;
mutex->th = 0;
mutex->fiber = 0;
mutex->next_mutex = 0;
list_head_init(&mutex->waitq);
}