mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for every Ruby Mutex object, use a doubly linked-list to implement a waiter queue in the Mutex. The immediate benefit of this is reducing the size of every Mutex object, as some projects have many objects requiring synchronization. In the future, this technique using a linked-list and on-stack list node (struct mutex_waiter) should allow us to easily transition to M:N threading model, as we can avoid the native thread dependency to implement Mutex. We already do something similar for autoload in variable.c, and this was inspired by the Linux kernel wait queue (as ccan/list is inspired by the Linux kernel linked-list). Finaly, there are big performance improvements for Mutex benchmarks, especially in contended cases: measure target: real name |trunk |built ----------------|------:|------: loop_whileloop2 | 0.149| 0.148 vm2_mutex* | 0.893| 0.651 vm_thread_mutex1| 0.809| 0.624 vm_thread_mutex2| 2.608| 0.628 vm_thread_mutex3| 28.227| 0.881 Speedup ratio: compare with the result of `trunk' (greater is better) name |built ----------------|------: loop_whileloop2 | 1.002 vm2_mutex* | 1.372 vm_thread_mutex1| 1.297 vm_thread_mutex2| 4.149 vm_thread_mutex3| 32.044 Tested on AMD FX-8320 8-core at 3.5GHz * thread_sync.c (struct mutex_waiter): new on-stack struct (struct rb_mutex_struct): remove native lock/cond, use ccan/list (rb_mutex_num_waiting): new function for debug_deadlock_check (mutex_free): remove native_*_destroy (mutex_alloc): initialize waitq, remove native_*_initialize (rb_mutex_trylock): remove native_mutex_{lock,unlock} (lock_func): remove (lock_interrupt): remove (rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list (rb_mutex_unlock_th): rewrite to wake up from native_sleep using rb_threadptr_interrupt (rb_mutex_abandon_all): empty waitq * thread.c (debug_deadlock_check): update for new struct (rb_check_deadlock): ditto [ruby-core:80913] [Feature #13517] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
7dae53f371
commit
3586c9e087
2 changed files with 64 additions and 102 deletions
14
thread.c
14
thread.c
|
@ -4940,15 +4940,9 @@ debug_deadlock_check(rb_vm_t *vm, VALUE msg)
|
||||||
th->self, th, thread_id_str(th), th->interrupt_flag);
|
th->self, th, thread_id_str(th), th->interrupt_flag);
|
||||||
if (th->locking_mutex) {
|
if (th->locking_mutex) {
|
||||||
rb_mutex_t *mutex;
|
rb_mutex_t *mutex;
|
||||||
struct rb_thread_struct volatile *mth;
|
|
||||||
int waiting;
|
|
||||||
GetMutexPtr(th->locking_mutex, mutex);
|
GetMutexPtr(th->locking_mutex, mutex);
|
||||||
|
rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
|
||||||
native_mutex_lock(&mutex->lock);
|
mutex->th, rb_mutex_num_waiting(mutex));
|
||||||
mth = mutex->th;
|
|
||||||
waiting = mutex->cond_waiting;
|
|
||||||
native_mutex_unlock(&mutex->lock);
|
|
||||||
rb_str_catf(msg, " mutex:%p cond:%d", mth, waiting);
|
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
rb_thread_list_t *list = th->join_list;
|
rb_thread_list_t *list = th->join_list;
|
||||||
|
@ -4981,11 +4975,9 @@ rb_check_deadlock(rb_vm_t *vm)
|
||||||
rb_mutex_t *mutex;
|
rb_mutex_t *mutex;
|
||||||
GetMutexPtr(th->locking_mutex, mutex);
|
GetMutexPtr(th->locking_mutex, mutex);
|
||||||
|
|
||||||
native_mutex_lock(&mutex->lock);
|
if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) {
|
||||||
if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) {
|
|
||||||
found = 1;
|
found = 1;
|
||||||
}
|
}
|
||||||
native_mutex_unlock(&mutex->lock);
|
|
||||||
}
|
}
|
||||||
if (found)
|
if (found)
|
||||||
break;
|
break;
|
||||||
|
|
152
thread_sync.c
152
thread_sync.c
|
@ -1,16 +1,21 @@
|
||||||
/* included by thread.c */
|
/* included by thread.c */
|
||||||
|
#include "ccan/list/list.h"
|
||||||
|
|
||||||
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
||||||
static VALUE rb_eClosedQueueError;
|
static VALUE rb_eClosedQueueError;
|
||||||
|
|
||||||
/* Mutex */
|
/* Mutex */
|
||||||
|
|
||||||
|
/* mutex_waiter is always on-stack */
|
||||||
|
struct mutex_waiter {
|
||||||
|
rb_thread_t *th;
|
||||||
|
struct list_node node;
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct rb_mutex_struct {
|
typedef struct rb_mutex_struct {
|
||||||
rb_nativethread_lock_t lock;
|
|
||||||
rb_nativethread_cond_t cond;
|
|
||||||
struct rb_thread_struct volatile *th;
|
struct rb_thread_struct volatile *th;
|
||||||
struct rb_mutex_struct *next_mutex;
|
struct rb_mutex_struct *next_mutex;
|
||||||
int cond_waiting;
|
struct list_head waitq; /* protected by GVL */
|
||||||
int allow_trap;
|
int allow_trap;
|
||||||
} rb_mutex_t;
|
} rb_mutex_t;
|
||||||
|
|
||||||
|
@ -51,6 +56,19 @@ static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *t
|
||||||
|
|
||||||
#define mutex_mark NULL
|
#define mutex_mark NULL
|
||||||
|
|
||||||
|
static size_t
|
||||||
|
rb_mutex_num_waiting(rb_mutex_t *mutex)
|
||||||
|
{
|
||||||
|
struct mutex_waiter *w;
|
||||||
|
size_t n = 0;
|
||||||
|
|
||||||
|
list_for_each(&mutex->waitq, w, node) {
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
mutex_free(void *ptr)
|
mutex_free(void *ptr)
|
||||||
{
|
{
|
||||||
|
@ -60,8 +78,6 @@ mutex_free(void *ptr)
|
||||||
const char *err = rb_mutex_unlock_th(mutex, mutex->th);
|
const char *err = rb_mutex_unlock_th(mutex, mutex->th);
|
||||||
if (err) rb_bug("%s", err);
|
if (err) rb_bug("%s", err);
|
||||||
}
|
}
|
||||||
native_mutex_destroy(&mutex->lock);
|
|
||||||
native_cond_destroy(&mutex->cond);
|
|
||||||
ruby_xfree(ptr);
|
ruby_xfree(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,8 +111,7 @@ mutex_alloc(VALUE klass)
|
||||||
rb_mutex_t *mutex;
|
rb_mutex_t *mutex;
|
||||||
|
|
||||||
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
|
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
|
||||||
native_mutex_initialize(&mutex->lock);
|
list_head_init(&mutex->waitq);
|
||||||
native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC);
|
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,7 +173,6 @@ rb_mutex_trylock(VALUE self)
|
||||||
VALUE locked = Qfalse;
|
VALUE locked = Qfalse;
|
||||||
GetMutexPtr(self, mutex);
|
GetMutexPtr(self, mutex);
|
||||||
|
|
||||||
native_mutex_lock(&mutex->lock);
|
|
||||||
if (mutex->th == 0) {
|
if (mutex->th == 0) {
|
||||||
rb_thread_t *th = GET_THREAD();
|
rb_thread_t *th = GET_THREAD();
|
||||||
mutex->th = th;
|
mutex->th = th;
|
||||||
|
@ -166,61 +180,10 @@ rb_mutex_trylock(VALUE self)
|
||||||
|
|
||||||
mutex_locked(th, self);
|
mutex_locked(th, self);
|
||||||
}
|
}
|
||||||
native_mutex_unlock(&mutex->lock);
|
|
||||||
|
|
||||||
return locked;
|
return locked;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
|
||||||
lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms)
|
|
||||||
{
|
|
||||||
int interrupted = 0;
|
|
||||||
int err = 0;
|
|
||||||
|
|
||||||
mutex->cond_waiting++;
|
|
||||||
for (;;) {
|
|
||||||
if (!mutex->th) {
|
|
||||||
mutex->th = th;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (RUBY_VM_INTERRUPTED(th)) {
|
|
||||||
interrupted = 1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (err == ETIMEDOUT) {
|
|
||||||
interrupted = 2;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (timeout_ms) {
|
|
||||||
struct timespec timeout_rel;
|
|
||||||
struct timespec timeout;
|
|
||||||
|
|
||||||
timeout_rel.tv_sec = 0;
|
|
||||||
timeout_rel.tv_nsec = timeout_ms * 1000 * 1000;
|
|
||||||
timeout = native_cond_timeout(&mutex->cond, timeout_rel);
|
|
||||||
err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
native_cond_wait(&mutex->cond, &mutex->lock);
|
|
||||||
err = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mutex->cond_waiting--;
|
|
||||||
|
|
||||||
return interrupted;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
lock_interrupt(void *ptr)
|
|
||||||
{
|
|
||||||
rb_mutex_t *mutex = (rb_mutex_t *)ptr;
|
|
||||||
native_mutex_lock(&mutex->lock);
|
|
||||||
if (mutex->cond_waiting > 0)
|
|
||||||
native_cond_broadcast(&mutex->cond);
|
|
||||||
native_mutex_unlock(&mutex->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At maximum, only one thread can use cond_timedwait and watch deadlock
|
* At maximum, only one thread can use cond_timedwait and watch deadlock
|
||||||
* periodically. Multiple polling thread (i.e. concurrent deadlock check)
|
* periodically. Multiple polling thread (i.e. concurrent deadlock check)
|
||||||
|
@ -248,45 +211,45 @@ rb_mutex_lock(VALUE self)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rb_mutex_trylock(self) == Qfalse) {
|
if (rb_mutex_trylock(self) == Qfalse) {
|
||||||
|
struct mutex_waiter w;
|
||||||
|
|
||||||
if (mutex->th == th) {
|
if (mutex->th == th) {
|
||||||
rb_raise(rb_eThreadError, "deadlock; recursive locking");
|
rb_raise(rb_eThreadError, "deadlock; recursive locking");
|
||||||
}
|
}
|
||||||
|
|
||||||
while (mutex->th != th) {
|
w.th = th;
|
||||||
int interrupted;
|
|
||||||
enum rb_thread_status prev_status = th->status;
|
while (mutex->th != th) {
|
||||||
volatile int timeout_ms = 0;
|
enum rb_thread_status prev_status = th->status;
|
||||||
struct rb_unblock_callback oldubf;
|
struct timeval *timeout = 0;
|
||||||
|
struct timeval tv = { 0, 100000 }; /* 100ms */
|
||||||
|
|
||||||
set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE);
|
|
||||||
th->status = THREAD_STOPPED_FOREVER;
|
th->status = THREAD_STOPPED_FOREVER;
|
||||||
th->locking_mutex = self;
|
th->locking_mutex = self;
|
||||||
|
|
||||||
native_mutex_lock(&mutex->lock);
|
|
||||||
th->vm->sleeper++;
|
th->vm->sleeper++;
|
||||||
/*
|
/*
|
||||||
* Carefully! while some contended threads are in lock_func(),
|
* Carefully! while some contended threads are in native_sleep(),
|
||||||
* vm->sleeper is unstable value. we have to avoid both deadlock
|
* vm->sleeper is unstable value. we have to avoid both deadlock
|
||||||
* and busy loop.
|
* and busy loop.
|
||||||
*/
|
*/
|
||||||
if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
|
if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
|
||||||
!patrol_thread) {
|
!patrol_thread) {
|
||||||
timeout_ms = 100;
|
timeout = &tv;
|
||||||
patrol_thread = th;
|
patrol_thread = th;
|
||||||
}
|
}
|
||||||
|
|
||||||
GVL_UNLOCK_BEGIN();
|
list_add_tail(&mutex->waitq, &w.node);
|
||||||
interrupted = lock_func(th, mutex, (int)timeout_ms);
|
native_sleep(th, timeout); /* release GVL */
|
||||||
native_mutex_unlock(&mutex->lock);
|
list_del(&w.node);
|
||||||
GVL_UNLOCK_END();
|
if (!mutex->th) {
|
||||||
|
mutex->th = th;
|
||||||
|
}
|
||||||
|
|
||||||
if (patrol_thread == th)
|
if (patrol_thread == th)
|
||||||
patrol_thread = NULL;
|
patrol_thread = NULL;
|
||||||
|
|
||||||
reset_unblock_function(th, &oldubf);
|
|
||||||
|
|
||||||
th->locking_mutex = Qfalse;
|
th->locking_mutex = Qfalse;
|
||||||
if (mutex->th && interrupted == 2) {
|
if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th)) {
|
||||||
rb_check_deadlock(th->vm);
|
rb_check_deadlock(th->vm);
|
||||||
}
|
}
|
||||||
if (th->status == THREAD_STOPPED_FOREVER) {
|
if (th->status == THREAD_STOPPED_FOREVER) {
|
||||||
|
@ -296,9 +259,7 @@ rb_mutex_lock(VALUE self)
|
||||||
|
|
||||||
if (mutex->th == th) mutex_locked(th, self);
|
if (mutex->th == th) mutex_locked(th, self);
|
||||||
|
|
||||||
if (interrupted) {
|
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||||||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return self;
|
return self;
|
||||||
|
@ -330,24 +291,32 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th)
|
||||||
{
|
{
|
||||||
const char *err = NULL;
|
const char *err = NULL;
|
||||||
|
|
||||||
native_mutex_lock(&mutex->lock);
|
|
||||||
|
|
||||||
if (mutex->th == 0) {
|
if (mutex->th == 0) {
|
||||||
err = "Attempt to unlock a mutex which is not locked";
|
err = "Attempt to unlock a mutex which is not locked";
|
||||||
}
|
}
|
||||||
else if (mutex->th != th) {
|
else if (mutex->th != th) {
|
||||||
err = "Attempt to unlock a mutex which is locked by another thread";
|
err = "Attempt to unlock a mutex which is locked by another thread";
|
||||||
}
|
} else {
|
||||||
else {
|
struct mutex_waiter *cur = 0, *next = 0;
|
||||||
mutex->th = 0;
|
|
||||||
if (mutex->cond_waiting > 0)
|
|
||||||
native_cond_signal(&mutex->cond);
|
|
||||||
}
|
|
||||||
|
|
||||||
native_mutex_unlock(&mutex->lock);
|
|
||||||
|
|
||||||
if (!err) {
|
|
||||||
rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes;
|
rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes;
|
||||||
|
|
||||||
|
mutex->th = 0;
|
||||||
|
list_for_each_safe(&mutex->waitq, cur, next, node) {
|
||||||
|
list_del_init(&cur->node);
|
||||||
|
switch (cur->th->status) {
|
||||||
|
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
|
||||||
|
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
|
||||||
|
rb_threadptr_interrupt(cur->th);
|
||||||
|
goto found;
|
||||||
|
case THREAD_STOPPED: /* probably impossible */
|
||||||
|
rb_bug("unexpected THREAD_STOPPED");
|
||||||
|
case THREAD_KILLED:
|
||||||
|
/* not sure about this, possible in exit GC? */
|
||||||
|
rb_bug("unexpected THREAD_KILLED");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
found:
|
||||||
while (*th_mutex != mutex) {
|
while (*th_mutex != mutex) {
|
||||||
th_mutex = &(*th_mutex)->next_mutex;
|
th_mutex = &(*th_mutex)->next_mutex;
|
||||||
}
|
}
|
||||||
|
@ -411,6 +380,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
|
||||||
mutexes = mutex->next_mutex;
|
mutexes = mutex->next_mutex;
|
||||||
mutex->th = 0;
|
mutex->th = 0;
|
||||||
mutex->next_mutex = 0;
|
mutex->next_mutex = 0;
|
||||||
|
list_head_init(&mutex->waitq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in a new issue