From 178c1b0922dc727897d81d7cfe9c97d5ffa97fd9 Mon Sep 17 00:00:00 2001 From: Benoit Daloze Date: Sat, 5 Sep 2020 16:26:24 +1200 Subject: [PATCH] Make Mutex per-Fiber instead of per-Thread * Enables Mutex to be used as synchronization between multiple Fibers of the same Thread. * With a Fiber scheduler we can yield to another Fiber on contended Mutex#lock instead of blocking the entire thread. * This also makes the behavior of Mutex consistent across CRuby, JRuby and TruffleRuby. * [Feature #16792] --- cont.c | 11 +++ internal/cont.h | 2 + internal/scheduler.h | 3 + scheduler.c | 14 ++++ spec/ruby/core/mutex/owned_spec.rb | 12 ++++ test/fiber/scheduler.rb | 46 ++++++++++++- test/fiber/test_mutex.rb | 48 ++++++++++--- thread.c | 11 +-- thread_sync.c | 106 +++++++++++++++++++---------- 9 files changed, 199 insertions(+), 54 deletions(-) diff --git a/cont.c b/cont.c index d228107b9a..0304f4c60e 100644 --- a/cont.c +++ b/cont.c @@ -851,6 +851,12 @@ NOINLINE(static VALUE cont_capture(volatile int *volatile stat)); if (!(th)->ec->tag) rb_raise(rb_eThreadError, "not running thread"); \ } while (0) +rb_thread_t* +rb_fiber_threadptr(const rb_fiber_t *fiber) +{ + return fiber->cont.saved_ec.thread_ptr; +} + static VALUE cont_thread_value(const rb_context_t *cont) { @@ -1146,6 +1152,11 @@ cont_new(VALUE klass) return cont; } +VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber) +{ + return fiber->cont.self; +} + void rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber) { diff --git a/internal/cont.h b/internal/cont.h index 81874aa5c7..a365cbe978 100644 --- a/internal/cont.h +++ b/internal/cont.h @@ -20,4 +20,6 @@ void rb_fiber_reset_root_local_storage(struct rb_thread_struct *); void ruby_register_rollback_func_for_ensure(VALUE (*ensure_func)(VALUE), VALUE (*rollback_func)(VALUE)); void rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber); +VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber); + #endif /* INTERNAL_CONT_H */ diff --git a/internal/scheduler.h b/internal/scheduler.h index f5a41af064..44872e3b10 100644 --- a/internal/scheduler.h +++ b/internal/scheduler.h @@ -17,6 +17,9 @@ VALUE rb_scheduler_timeout(struct timeval *timeout); VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration); VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv); +VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex); +VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber); + VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout); VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io); VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io); diff --git a/scheduler.c b/scheduler.c index 9821d07636..9ecc40cf6c 100644 --- a/scheduler.c +++ b/scheduler.c @@ -12,6 +12,8 @@ #include "ruby/io.h" static ID id_kernel_sleep; +static ID id_mutex_lock; +static ID id_mutex_unlock; static ID id_io_read; static ID id_io_write; static ID id_io_wait; @@ -20,6 +22,8 @@ void Init_Scheduler(void) { id_kernel_sleep = rb_intern_const("kernel_sleep"); + id_mutex_lock = rb_intern_const("mutex_lock"); + id_mutex_unlock = rb_intern_const("mutex_unlock"); id_io_read = rb_intern_const("io_read"); id_io_write = rb_intern_const("io_write"); id_io_wait = rb_intern_const("io_wait"); @@ -44,6 +48,16 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) return rb_funcallv(scheduler, id_kernel_sleep, argc, argv); } +VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex) +{ + return rb_funcall(scheduler, id_mutex_lock, 1, mutex); +} + +VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber) +{ + return rb_funcall(scheduler, id_mutex_unlock, 2, mutex, fiber); +} + VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) { return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout); diff --git a/spec/ruby/core/mutex/owned_spec.rb b/spec/ruby/core/mutex/owned_spec.rb index e66062534e..f881622965 100644 --- a/spec/ruby/core/mutex/owned_spec.rb +++ b/spec/ruby/core/mutex/owned_spec.rb @@ -40,4 +40,16 @@ describe "Mutex#owned?" do m.owned?.should be_false end end + + ruby_version_is "2.8" do + it "is held per Fiber" do + m = Mutex.new + m.lock + + Fiber.new do + m.locked?.should == true + m.owned?.should == false + end.resume + end + end end diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 1f690b4c08..fa05daf886 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -14,6 +14,12 @@ class Scheduler @readable = {} @writable = {} @waiting = {} + + @urgent = nil + + @lock = Mutex.new + @locking = 0 + @ready = [] end attr :readable @@ -35,9 +41,11 @@ class Scheduler end def run - while @readable.any? or @writable.any? or @waiting.any? + @urgent = IO.pipe + + while @readable.any? or @writable.any? or @waiting.any? or @locking.positive? # Can only handle file descriptors up to 1024... - readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout) + readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) # puts "readable: #{readable}" if readable&.any? # puts "writable: #{writable}" if writable&.any? @@ -63,7 +71,24 @@ class Scheduler end end end + + if @ready.any? + # Clear out the urgent notification pipe. + @urgent.first.read_nonblock(1024) + + ready = nil + + @lock.synchronize do + ready, @ready = @ready, Array.new + end + + ready.each do |fiber| + fiber.resume + end + end end + ensure + @urgent.each(&:close) end def current_time @@ -95,6 +120,23 @@ class Scheduler return true end + def mutex_lock(mutex) + @locking += 1 + Fiber.yield + ensure + @locking -= 1 + end + + def mutex_unlock(mutex, fiber) + @lock.synchronize do + @ready << fiber + + if @urgent + @urgent.last.write('.') + end + end + end + def fiber(&block) fiber = Fiber.new(blocking: false, &block) diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb index 5179959a6a..393a44fc2f 100644 --- a/test/fiber/test_mutex.rb +++ b/test/fiber/test_mutex.rb @@ -14,7 +14,7 @@ class TestFiberMutex < Test::Unit::TestCase assert_equal Thread.scheduler, scheduler mutex.synchronize do - assert_nil Thread.scheduler + assert Thread.scheduler end end end @@ -22,7 +22,7 @@ class TestFiberMutex < Test::Unit::TestCase thread.join end - def test_mutex_deadlock + def test_mutex_interleaved_locking mutex = Mutex.new thread = Thread.new do @@ -30,18 +30,46 @@ class TestFiberMutex < Test::Unit::TestCase Thread.current.scheduler = scheduler Fiber.schedule do - assert_equal Thread.scheduler, scheduler - - mutex.synchronize do - Fiber.yield - end - end - - assert_raise ThreadError do mutex.lock + sleep 0.1 + mutex.unlock end + + Fiber.schedule do + mutex.lock + sleep 0.1 + mutex.unlock + end + + scheduler.run end thread.join end + + def test_mutex_deadlock + err = /No live threads left. Deadlock\?/ + assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false + require 'scheduler' + mutex = Mutex.new + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber.schedule do + raise unless Thread.scheduler == scheduler + + mutex.synchronize do + puts 'in synchronize' + Fiber.yield + end + end + + mutex.lock + end + + thread.join + RUBY + end end diff --git a/thread.c b/thread.c index d0ebfff882..c4ff5aafde 100644 --- a/thread.c +++ b/thread.c @@ -75,11 +75,13 @@ #include "hrtime.h" #include "internal.h" #include "internal/class.h" +#include "internal/cont.h" #include "internal/error.h" #include "internal/hash.h" #include "internal/io.h" #include "internal/object.h" #include "internal/proc.h" +#include "internal/scheduler.h" #include "internal/signal.h" #include "internal/thread.h" #include "internal/time.h" @@ -548,7 +550,7 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */ mutexes = mutex->next_mutex; - err = rb_mutex_unlock_th(mutex, th); + err = rb_mutex_unlock_th(mutex, th, mutex->fiber); if (err) rb_bug("invalid keeping_mutexes: %s", err); } } @@ -5040,7 +5042,7 @@ rb_thread_shield_wait(VALUE self) if (!mutex) return Qfalse; m = mutex_ptr(mutex); - if (m->th == GET_THREAD()) return Qnil; + if (m->fiber == GET_EC()->fiber_ptr) return Qnil; rb_thread_shield_waiting_inc(self); rb_mutex_lock(mutex); rb_thread_shield_waiting_dec(self); @@ -5540,7 +5542,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, - (void *)mutex->th, rb_mutex_num_waiting(mutex)); + (void *)mutex->fiber, rb_mutex_num_waiting(mutex)); } { @@ -5574,8 +5576,7 @@ rb_check_deadlock(rb_ractor_t *r) } else if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - - if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) { + if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !list_empty(&mutex->waitq))) { found = 1; } } diff --git a/thread_sync.c b/thread_sync.c index deb3858c31..cfdd62635a 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -7,6 +7,7 @@ static VALUE rb_eClosedQueueError; /* sync_waiter is always on-stack */ struct sync_waiter { rb_thread_t *th; + rb_fiber_t *fiber; struct list_node node; }; @@ -42,7 +43,9 @@ wakeup_all(struct list_head *head) /* Mutex */ typedef struct rb_mutex_struct { - rb_thread_t *th; + VALUE self; + + rb_fiber_t *fiber; struct rb_mutex_struct *next_mutex; struct list_head waitq; /* protected by GVL */ } rb_mutex_t; @@ -52,7 +55,7 @@ static void rb_mutex_abandon_all(rb_mutex_t *mutexes); static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); #endif -static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th); +static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber); /* * Document-class: Mutex @@ -93,13 +96,15 @@ rb_mutex_num_waiting(rb_mutex_t *mutex) return n; } +rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber); + static void mutex_free(void *ptr) { rb_mutex_t *mutex = ptr; - if (mutex->th) { + if (mutex->fiber) { /* rb_warn("free locked mutex"); */ - const char *err = rb_mutex_unlock_th(mutex, mutex->th); + const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber); if (err) rb_bug("%s", err); } ruby_xfree(ptr); @@ -145,6 +150,8 @@ mutex_alloc(VALUE klass) rb_mutex_t *mutex; obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); + + mutex->self = obj; list_head_init(&mutex->waitq); return obj; } @@ -178,7 +185,7 @@ rb_mutex_locked_p(VALUE self) { rb_mutex_t *mutex = mutex_ptr(self); - return mutex->th ? Qtrue : Qfalse; + return mutex->fiber ? Qtrue : Qfalse; } static void @@ -191,7 +198,7 @@ mutex_locked(rb_thread_t *th, VALUE self) } th->keeping_mutexes = mutex; - th->blocking += 1; + // th->blocking += 1; } /* @@ -207,9 +214,10 @@ rb_mutex_trylock(VALUE self) rb_mutex_t *mutex = mutex_ptr(self); VALUE locked = Qfalse; - if (mutex->th == 0) { + if (mutex->fiber == 0) { + rb_fiber_t *fiber = GET_EC()->fiber_ptr; rb_thread_t *th = GET_THREAD(); - mutex->th = th; + mutex->fiber = fiber; locked = Qtrue; mutex_locked(th, self); @@ -226,9 +234,9 @@ rb_mutex_trylock(VALUE self) static const rb_thread_t *patrol_thread = NULL; static VALUE -mutex_owned_p(rb_thread_t *th, rb_mutex_t *mutex) +mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) { - if (mutex->th == th) { + if (mutex->fiber == fiber) { return Qtrue; } else { @@ -240,6 +248,8 @@ static VALUE do_mutex_lock(VALUE self, int interruptible_p) { rb_thread_t *th = GET_THREAD(); + rb_execution_context_t *ec = GET_EC(); + rb_fiber_t *fiber = ec->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); /* When running trap handler */ @@ -249,15 +259,33 @@ do_mutex_lock(VALUE self, int interruptible_p) } if (rb_mutex_trylock(self) == Qfalse) { - struct sync_waiter w; + struct sync_waiter w = { + .th = th, + .fiber = fiber + }; - if (mutex->th == th) { + if (mutex->fiber == fiber) { rb_raise(rb_eThreadError, "deadlock; recursive locking"); } - w.th = th; + VALUE scheduler = rb_thread_current_scheduler(); + while (mutex->fiber != fiber) { + if (scheduler != Qnil) { + list_add_tail(&mutex->waitq, &w.node); - while (mutex->th != th) { + rb_scheduler_mutex_lock(scheduler, self); + + list_del(&w.node); + + if (!mutex->fiber) { + mutex->fiber = fiber; + break; + } else { + // Try again... + continue; + } + } + enum rb_thread_status prev_status = th->status; rb_hrtime_t *timeout = 0; rb_hrtime_t rel = rb_msec2hrtime(100); @@ -277,18 +305,20 @@ do_mutex_lock(VALUE self, int interruptible_p) } list_add_tail(&mutex->waitq, &w.node); - native_sleep(th, timeout); /* release GVL */ + + native_sleep(th, timeout); /* release GVL */ + list_del(&w.node); - if (!mutex->th) { - mutex->th = th; + if (!mutex->fiber) { + mutex->fiber = fiber; } if (patrol_thread == th) patrol_thread = NULL; th->locking_mutex = Qfalse; - if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { + if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { rb_check_deadlock(th->ractor); } if (th->status == THREAD_STOPPED_FOREVER) { @@ -299,22 +329,19 @@ do_mutex_lock(VALUE self, int interruptible_p) if (interruptible_p) { /* release mutex before checking for interrupts...as interrupt checking * code might call rb_raise() */ - if (mutex->th == th) mutex->th = 0; - + if (mutex->fiber == fiber) mutex->fiber = 0; RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */ - if (!mutex->th) { - mutex->th = th; - mutex_locked(th, self); + if (!mutex->fiber) { + mutex->fiber = fiber; } } - else { - if (mutex->th == th) mutex_locked(th, self); - } } + + if (mutex->fiber == fiber) mutex_locked(th, self); } // assertion - if (mutex_owned_p(th, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned."); + if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned."); return self; } @@ -347,32 +374,37 @@ rb_mutex_lock(VALUE self) VALUE rb_mutex_owned_p(VALUE self) { - rb_thread_t *th = GET_THREAD(); + rb_fiber_t *fiber = GET_EC()->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); - return mutex_owned_p(th, mutex); + return mutex_owned_p(fiber, mutex); } static const char * -rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th) +rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) { const char *err = NULL; - if (mutex->th == 0) { + if (mutex->fiber == 0) { err = "Attempt to unlock a mutex which is not locked"; } - else if (mutex->th != th) { - err = "Attempt to unlock a mutex which is locked by another thread"; + else if (mutex->fiber != fiber) { + err = "Attempt to unlock a mutex which is locked by another thread/fiber"; } else { struct sync_waiter *cur = 0, *next; rb_mutex_t **th_mutex = &th->keeping_mutexes; - th->blocking -= 1; + // th->blocking -= 1; - mutex->th = 0; + mutex->fiber = 0; list_for_each_safe(&mutex->waitq, cur, next, node) { list_del_init(&cur->node); + + if (cur->th->scheduler != Qnil) { + rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber)); + } + switch (cur->th->status) { case THREAD_RUNNABLE: /* from someone else calling Thread#run */ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ @@ -411,7 +443,7 @@ rb_mutex_unlock(VALUE self) rb_mutex_t *mutex = mutex_ptr(self); rb_thread_t *th = GET_THREAD(); - err = rb_mutex_unlock_th(mutex, th); + err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr); if (err) rb_raise(rb_eThreadError, "%s", err); return self; @@ -444,7 +476,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes) while (mutexes) { mutex = mutexes; mutexes = mutex->next_mutex; - mutex->th = 0; + mutex->fiber = 0; mutex->next_mutex = 0; list_head_init(&mutex->waitq); }