2015-09-03 11:17:25 -04:00
|
|
|
/* included by thread.c */
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
#include "ccan/list/list.h"
|
2015-08-21 19:36:23 -04:00
|
|
|
|
2015-12-28 16:52:15 -05:00
|
|
|
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
|
|
|
static VALUE rb_eClosedQueueError;
|
2015-08-21 19:36:23 -04:00
|
|
|
|
|
|
|
/* Mutex */
|
|
|
|
|
2017-05-19 14:34:38 -04:00
|
|
|
/* sync_waiter is always on-stack */
|
|
|
|
struct sync_waiter {
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
rb_thread_t *th;
|
|
|
|
struct list_node node;
|
|
|
|
};
|
|
|
|
|
2017-05-07 21:59:17 -04:00
|
|
|
#define MUTEX_ALLOW_TRAP FL_USER1
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
typedef struct rb_mutex_struct {
|
|
|
|
struct rb_thread_struct volatile *th;
|
|
|
|
struct rb_mutex_struct *next_mutex;
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
struct list_head waitq; /* protected by GVL */
|
2015-08-21 19:36:23 -04:00
|
|
|
} rb_mutex_t;
|
|
|
|
|
2016-05-08 21:46:37 -04:00
|
|
|
#if defined(HAVE_WORKING_FORK)
|
2015-08-21 19:36:23 -04:00
|
|
|
static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
|
|
|
|
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
|
|
|
|
static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
|
2016-05-08 21:46:37 -04:00
|
|
|
#endif
|
2015-08-21 19:36:23 -04:00
|
|
|
static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-class: Mutex
|
|
|
|
*
|
|
|
|
* Mutex implements a simple semaphore that can be used to coordinate access to
|
|
|
|
* shared data from multiple concurrent threads.
|
|
|
|
*
|
|
|
|
* Example:
|
|
|
|
*
|
|
|
|
* require 'thread'
|
|
|
|
* semaphore = Mutex.new
|
|
|
|
*
|
|
|
|
* a = Thread.new {
|
|
|
|
* semaphore.synchronize {
|
|
|
|
* # access shared resource
|
|
|
|
* }
|
|
|
|
* }
|
|
|
|
*
|
|
|
|
* b = Thread.new {
|
|
|
|
* semaphore.synchronize {
|
|
|
|
* # access shared resource
|
|
|
|
* }
|
|
|
|
* }
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
#define GetMutexPtr(obj, tobj) \
|
|
|
|
TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj))
|
|
|
|
|
|
|
|
#define mutex_mark NULL
|
|
|
|
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
static size_t
|
|
|
|
rb_mutex_num_waiting(rb_mutex_t *mutex)
|
|
|
|
{
|
2017-05-19 14:34:38 -04:00
|
|
|
struct sync_waiter *w = 0;
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
size_t n = 0;
|
|
|
|
|
|
|
|
list_for_each(&mutex->waitq, w, node) {
|
|
|
|
n++;
|
|
|
|
}
|
|
|
|
|
|
|
|
return n;
|
|
|
|
}
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
static void
|
|
|
|
mutex_free(void *ptr)
|
|
|
|
{
|
2017-03-17 15:59:56 -04:00
|
|
|
rb_mutex_t *mutex = ptr;
|
|
|
|
if (mutex->th) {
|
|
|
|
/* rb_warn("free locked mutex"); */
|
|
|
|
const char *err = rb_mutex_unlock_th(mutex, mutex->th);
|
|
|
|
if (err) rb_bug("%s", err);
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
|
|
|
ruby_xfree(ptr);
|
|
|
|
}
|
|
|
|
|
|
|
|
static size_t
|
|
|
|
mutex_memsize(const void *ptr)
|
|
|
|
{
|
2015-12-08 19:38:32 -05:00
|
|
|
return sizeof(rb_mutex_t);
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
static const rb_data_type_t mutex_data_type = {
|
|
|
|
"mutex",
|
|
|
|
{mutex_mark, mutex_free, mutex_memsize,},
|
|
|
|
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|
|
|
|
};
|
|
|
|
|
|
|
|
VALUE
|
|
|
|
rb_obj_is_mutex(VALUE obj)
|
|
|
|
{
|
|
|
|
if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) {
|
|
|
|
return Qtrue;
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
return Qfalse;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
mutex_alloc(VALUE klass)
|
|
|
|
{
|
|
|
|
VALUE obj;
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
|
|
|
|
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
list_head_init(&mutex->waitq);
|
2015-08-21 19:36:23 -04:00
|
|
|
return obj;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* Mutex.new -> mutex
|
|
|
|
*
|
|
|
|
* Creates a new Mutex
|
|
|
|
*/
|
|
|
|
static VALUE
|
|
|
|
mutex_initialize(VALUE self)
|
|
|
|
{
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
VALUE
|
|
|
|
rb_mutex_new(void)
|
|
|
|
{
|
|
|
|
return mutex_alloc(rb_cMutex);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.locked? -> true or false
|
|
|
|
*
|
|
|
|
* Returns +true+ if this lock is currently held by some thread.
|
|
|
|
*/
|
|
|
|
VALUE
|
|
|
|
rb_mutex_locked_p(VALUE self)
|
|
|
|
{
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
GetMutexPtr(self, mutex);
|
|
|
|
return mutex->th ? Qtrue : Qfalse;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
mutex_locked(rb_thread_t *th, VALUE self)
|
|
|
|
{
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
GetMutexPtr(self, mutex);
|
|
|
|
|
|
|
|
if (th->keeping_mutexes) {
|
|
|
|
mutex->next_mutex = th->keeping_mutexes;
|
|
|
|
}
|
|
|
|
th->keeping_mutexes = mutex;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.try_lock -> true or false
|
|
|
|
*
|
|
|
|
* Attempts to obtain the lock and returns immediately. Returns +true+ if the
|
|
|
|
* lock was granted.
|
|
|
|
*/
|
|
|
|
VALUE
|
|
|
|
rb_mutex_trylock(VALUE self)
|
|
|
|
{
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
VALUE locked = Qfalse;
|
|
|
|
GetMutexPtr(self, mutex);
|
|
|
|
|
|
|
|
if (mutex->th == 0) {
|
|
|
|
rb_thread_t *th = GET_THREAD();
|
|
|
|
mutex->th = th;
|
|
|
|
locked = Qtrue;
|
|
|
|
|
|
|
|
mutex_locked(th, self);
|
|
|
|
}
|
|
|
|
|
|
|
|
return locked;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* At maximum, only one thread can use cond_timedwait and watch deadlock
|
|
|
|
* periodically. Multiple polling thread (i.e. concurrent deadlock check)
|
|
|
|
* introduces new race conditions. [Bug #6278] [ruby-core:44275]
|
|
|
|
*/
|
|
|
|
static const rb_thread_t *patrol_thread = NULL;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.lock -> self
|
|
|
|
*
|
|
|
|
* Attempts to grab the lock and waits if it isn't available.
|
|
|
|
* Raises +ThreadError+ if +mutex+ was locked by the current thread.
|
|
|
|
*/
|
|
|
|
VALUE
|
|
|
|
rb_mutex_lock(VALUE self)
|
|
|
|
{
|
|
|
|
rb_thread_t *th = GET_THREAD();
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
GetMutexPtr(self, mutex);
|
|
|
|
|
|
|
|
/* When running trap handler */
|
2017-05-07 21:59:17 -04:00
|
|
|
if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
|
|
|
|
th->interrupt_mask & TRAP_INTERRUPT_MASK) {
|
2015-08-21 19:36:23 -04:00
|
|
|
rb_raise(rb_eThreadError, "can't be called from trap context");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (rb_mutex_trylock(self) == Qfalse) {
|
2017-05-19 14:34:38 -04:00
|
|
|
struct sync_waiter w;
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
if (mutex->th == th) {
|
|
|
|
rb_raise(rb_eThreadError, "deadlock; recursive locking");
|
|
|
|
}
|
|
|
|
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
w.th = th;
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
while (mutex->th != th) {
|
|
|
|
enum rb_thread_status prev_status = th->status;
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
struct timeval *timeout = 0;
|
|
|
|
struct timeval tv = { 0, 100000 }; /* 100ms */
|
2015-08-21 19:36:23 -04:00
|
|
|
|
|
|
|
th->status = THREAD_STOPPED_FOREVER;
|
|
|
|
th->locking_mutex = self;
|
|
|
|
th->vm->sleeper++;
|
|
|
|
/*
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
* Carefully! while some contended threads are in native_sleep(),
|
2017-04-25 16:20:08 -04:00
|
|
|
* vm->sleeper is unstable value. we have to avoid both deadlock
|
2015-08-21 19:36:23 -04:00
|
|
|
* and busy loop.
|
|
|
|
*/
|
|
|
|
if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
|
|
|
|
!patrol_thread) {
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
timeout = &tv;
|
2015-08-21 19:36:23 -04:00
|
|
|
patrol_thread = th;
|
|
|
|
}
|
|
|
|
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
list_add_tail(&mutex->waitq, &w.node);
|
|
|
|
native_sleep(th, timeout); /* release GVL */
|
|
|
|
list_del(&w.node);
|
|
|
|
if (!mutex->th) {
|
|
|
|
mutex->th = th;
|
|
|
|
}
|
2015-08-21 19:36:23 -04:00
|
|
|
|
|
|
|
if (patrol_thread == th)
|
|
|
|
patrol_thread = NULL;
|
|
|
|
|
|
|
|
th->locking_mutex = Qfalse;
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th)) {
|
2015-08-21 19:36:23 -04:00
|
|
|
rb_check_deadlock(th->vm);
|
|
|
|
}
|
|
|
|
if (th->status == THREAD_STOPPED_FOREVER) {
|
|
|
|
th->status = prev_status;
|
|
|
|
}
|
|
|
|
th->vm->sleeper--;
|
|
|
|
|
|
|
|
if (mutex->th == th) mutex_locked(th, self);
|
|
|
|
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.owned? -> true or false
|
|
|
|
*
|
|
|
|
* Returns +true+ if this lock is currently held by current thread.
|
|
|
|
*/
|
|
|
|
VALUE
|
|
|
|
rb_mutex_owned_p(VALUE self)
|
|
|
|
{
|
|
|
|
VALUE owned = Qfalse;
|
|
|
|
rb_thread_t *th = GET_THREAD();
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
|
|
|
|
GetMutexPtr(self, mutex);
|
|
|
|
|
|
|
|
if (mutex->th == th)
|
|
|
|
owned = Qtrue;
|
|
|
|
|
|
|
|
return owned;
|
|
|
|
}
|
|
|
|
|
|
|
|
static const char *
|
|
|
|
rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th)
|
|
|
|
{
|
|
|
|
const char *err = NULL;
|
|
|
|
|
|
|
|
if (mutex->th == 0) {
|
|
|
|
err = "Attempt to unlock a mutex which is not locked";
|
|
|
|
}
|
|
|
|
else if (mutex->th != th) {
|
|
|
|
err = "Attempt to unlock a mutex which is locked by another thread";
|
2017-05-09 20:39:26 -04:00
|
|
|
}
|
|
|
|
else {
|
2017-05-19 14:34:38 -04:00
|
|
|
struct sync_waiter *cur = 0, *next = 0;
|
2015-08-21 19:36:23 -04:00
|
|
|
rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes;
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
|
|
|
|
mutex->th = 0;
|
|
|
|
list_for_each_safe(&mutex->waitq, cur, next, node) {
|
|
|
|
list_del_init(&cur->node);
|
|
|
|
switch (cur->th->status) {
|
2017-05-09 20:39:26 -04:00
|
|
|
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
|
|
|
|
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
rb_threadptr_interrupt(cur->th);
|
|
|
|
goto found;
|
2017-05-09 20:39:26 -04:00
|
|
|
case THREAD_STOPPED: /* probably impossible */
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
rb_bug("unexpected THREAD_STOPPED");
|
2017-05-09 20:39:26 -04:00
|
|
|
case THREAD_KILLED:
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
/* not sure about this, possible in exit GC? */
|
|
|
|
rb_bug("unexpected THREAD_KILLED");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
2017-05-09 20:39:26 -04:00
|
|
|
found:
|
2015-08-21 19:36:23 -04:00
|
|
|
while (*th_mutex != mutex) {
|
|
|
|
th_mutex = &(*th_mutex)->next_mutex;
|
|
|
|
}
|
|
|
|
*th_mutex = mutex->next_mutex;
|
|
|
|
mutex->next_mutex = NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
return err;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.unlock -> self
|
|
|
|
*
|
|
|
|
* Releases the lock.
|
|
|
|
* Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
|
|
|
|
*/
|
|
|
|
VALUE
|
|
|
|
rb_mutex_unlock(VALUE self)
|
|
|
|
{
|
|
|
|
const char *err;
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
GetMutexPtr(self, mutex);
|
|
|
|
|
|
|
|
err = rb_mutex_unlock_th(mutex, GET_THREAD());
|
|
|
|
if (err) rb_raise(rb_eThreadError, "%s", err);
|
|
|
|
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
2016-05-08 21:46:37 -04:00
|
|
|
#if defined(HAVE_WORKING_FORK)
|
2015-08-21 19:36:23 -04:00
|
|
|
static void
|
|
|
|
rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
|
|
|
|
{
|
|
|
|
if (th->keeping_mutexes) {
|
|
|
|
rb_mutex_abandon_all(th->keeping_mutexes);
|
|
|
|
}
|
|
|
|
th->keeping_mutexes = NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
rb_mutex_abandon_locking_mutex(rb_thread_t *th)
|
|
|
|
{
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
|
|
|
|
if (!th->locking_mutex) return;
|
|
|
|
|
|
|
|
GetMutexPtr(th->locking_mutex, mutex);
|
|
|
|
if (mutex->th == th)
|
|
|
|
rb_mutex_abandon_all(mutex);
|
|
|
|
th->locking_mutex = Qfalse;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
rb_mutex_abandon_all(rb_mutex_t *mutexes)
|
|
|
|
{
|
|
|
|
rb_mutex_t *mutex;
|
|
|
|
|
|
|
|
while (mutexes) {
|
|
|
|
mutex = mutexes;
|
|
|
|
mutexes = mutex->next_mutex;
|
|
|
|
mutex->th = 0;
|
|
|
|
mutex->next_mutex = 0;
|
reduce rb_mutex_t size from 160 to 80 bytes on 64-bit
Instead of relying on a native condition variable and mutex for
every Ruby Mutex object, use a doubly linked-list to implement a
waiter queue in the Mutex. The immediate benefit of this is
reducing the size of every Mutex object, as some projects have
many objects requiring synchronization.
In the future, this technique using a linked-list and on-stack
list node (struct mutex_waiter) should allow us to easily
transition to M:N threading model, as we can avoid the native
thread dependency to implement Mutex.
We already do something similar for autoload in variable.c,
and this was inspired by the Linux kernel wait queue (as
ccan/list is inspired by the Linux kernel linked-list).
Finaly, there are big performance improvements for Mutex
benchmarks, especially in contended cases:
measure target: real
name |trunk |built
----------------|------:|------:
loop_whileloop2 | 0.149| 0.148
vm2_mutex* | 0.893| 0.651
vm_thread_mutex1| 0.809| 0.624
vm_thread_mutex2| 2.608| 0.628
vm_thread_mutex3| 28.227| 0.881
Speedup ratio: compare with the result of `trunk' (greater is better)
name |built
----------------|------:
loop_whileloop2 | 1.002
vm2_mutex* | 1.372
vm_thread_mutex1| 1.297
vm_thread_mutex2| 4.149
vm_thread_mutex3| 32.044
Tested on AMD FX-8320 8-core at 3.5GHz
* thread_sync.c (struct mutex_waiter): new on-stack struct
(struct rb_mutex_struct): remove native lock/cond, use ccan/list
(rb_mutex_num_waiting): new function for debug_deadlock_check
(mutex_free): remove native_*_destroy
(mutex_alloc): initialize waitq, remove native_*_initialize
(rb_mutex_trylock): remove native_mutex_{lock,unlock}
(lock_func): remove
(lock_interrupt): remove
(rb_mutex_lock): rewrite waiting path to use native_sleep + ccan/list
(rb_mutex_unlock_th): rewrite to wake up from native_sleep
using rb_threadptr_interrupt
(rb_mutex_abandon_all): empty waitq
* thread.c (debug_deadlock_check): update for new struct
(rb_check_deadlock): ditto
[ruby-core:80913] [Feature #13517]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58604 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
2017-05-07 20:18:53 -04:00
|
|
|
list_head_init(&mutex->waitq);
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
|
|
|
}
|
2016-05-08 21:46:37 -04:00
|
|
|
#endif
|
2015-08-21 19:36:23 -04:00
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_mutex_sleep_forever(VALUE time)
|
|
|
|
{
|
2017-01-31 01:39:01 -05:00
|
|
|
rb_thread_sleep_deadly_allow_spurious_wakeup();
|
2015-08-21 19:36:23 -04:00
|
|
|
return Qnil;
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_mutex_wait_for(VALUE time)
|
|
|
|
{
|
|
|
|
struct timeval *t = (struct timeval *)time;
|
|
|
|
sleep_timeval(GET_THREAD(), *t, 0); /* permit spurious check */
|
|
|
|
return Qnil;
|
|
|
|
}
|
|
|
|
|
|
|
|
VALUE
|
|
|
|
rb_mutex_sleep(VALUE self, VALUE timeout)
|
|
|
|
{
|
|
|
|
time_t beg, end;
|
|
|
|
struct timeval t;
|
|
|
|
|
|
|
|
if (!NIL_P(timeout)) {
|
|
|
|
t = rb_time_interval(timeout);
|
|
|
|
}
|
|
|
|
rb_mutex_unlock(self);
|
|
|
|
beg = time(0);
|
|
|
|
if (NIL_P(timeout)) {
|
|
|
|
rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self);
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self);
|
|
|
|
}
|
|
|
|
end = time(0) - beg;
|
|
|
|
return INT2FIX(end);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.sleep(timeout = nil) -> number
|
|
|
|
*
|
|
|
|
* Releases the lock and sleeps +timeout+ seconds if it is given and
|
|
|
|
* non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
|
|
|
|
* the current thread.
|
|
|
|
*
|
|
|
|
* When the thread is next woken up, it will attempt to reacquire
|
|
|
|
* the lock.
|
|
|
|
*
|
|
|
|
* Note that this method can wakeup without explicit Thread#wakeup call.
|
|
|
|
* For example, receiving signal and so on.
|
|
|
|
*/
|
|
|
|
static VALUE
|
|
|
|
mutex_sleep(int argc, VALUE *argv, VALUE self)
|
|
|
|
{
|
|
|
|
VALUE timeout;
|
|
|
|
|
|
|
|
rb_scan_args(argc, argv, "01", &timeout);
|
|
|
|
return rb_mutex_sleep(self, timeout);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.synchronize { ... } -> result of the block
|
|
|
|
*
|
|
|
|
* Obtains a lock, runs the block, and releases the lock when the block
|
|
|
|
* completes. See the example under +Mutex+.
|
|
|
|
*/
|
|
|
|
|
|
|
|
VALUE
|
|
|
|
rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
|
|
|
|
{
|
|
|
|
rb_mutex_lock(mutex);
|
|
|
|
return rb_ensure(func, arg, rb_mutex_unlock, mutex);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* call-seq:
|
|
|
|
* mutex.synchronize { ... } -> result of the block
|
|
|
|
*
|
|
|
|
* Obtains a lock, runs the block, and releases the lock when the block
|
|
|
|
* completes. See the example under +Mutex+.
|
|
|
|
*/
|
|
|
|
static VALUE
|
|
|
|
rb_mutex_synchronize_m(VALUE self, VALUE args)
|
|
|
|
{
|
|
|
|
if (!rb_block_given_p()) {
|
|
|
|
rb_raise(rb_eThreadError, "must be called with a block");
|
|
|
|
}
|
|
|
|
|
|
|
|
return rb_mutex_synchronize(self, rb_yield, Qundef);
|
|
|
|
}
|
|
|
|
|
|
|
|
void rb_mutex_allow_trap(VALUE self, int val)
|
|
|
|
{
|
2017-05-07 21:59:17 -04:00
|
|
|
Check_TypedStruct(self, &mutex_data_type);
|
2015-08-21 19:36:23 -04:00
|
|
|
|
2017-05-07 21:59:17 -04:00
|
|
|
if (val)
|
|
|
|
FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
|
|
|
|
else
|
|
|
|
FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
/* Queue */
|
|
|
|
|
|
|
|
enum {
|
|
|
|
QUEUE_QUE,
|
|
|
|
QUEUE_WAITERS,
|
|
|
|
SZQUEUE_WAITERS,
|
|
|
|
SZQUEUE_MAX,
|
|
|
|
END_QUEUE
|
|
|
|
};
|
|
|
|
|
2015-08-26 18:59:32 -04:00
|
|
|
#define QUEUE_CLOSED FL_USER5
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
|
|
|
|
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
|
|
|
|
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
|
|
|
|
#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
|
|
|
|
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
ary_buf_new(void)
|
|
|
|
{
|
|
|
|
return rb_ary_tmp_new(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
get_array(VALUE obj, int idx)
|
|
|
|
{
|
|
|
|
VALUE ary = RSTRUCT_GET(obj, idx);
|
|
|
|
if (!RB_TYPE_P(ary, T_ARRAY)) {
|
|
|
|
rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
|
|
|
|
}
|
|
|
|
return ary;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
wakeup_first_thread(VALUE list)
|
|
|
|
{
|
|
|
|
VALUE thread;
|
|
|
|
|
|
|
|
while (!NIL_P(thread = rb_ary_shift(list))) {
|
|
|
|
if (RTEST(rb_thread_wakeup_alive(thread))) break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
wakeup_all_threads(VALUE list)
|
|
|
|
{
|
|
|
|
VALUE thread;
|
|
|
|
long i;
|
|
|
|
|
|
|
|
for (i=0; i<RARRAY_LEN(list); i++) {
|
|
|
|
thread = RARRAY_AREF(list, i);
|
|
|
|
rb_thread_wakeup_alive(thread);
|
|
|
|
}
|
|
|
|
rb_ary_clear(list);
|
|
|
|
}
|
|
|
|
|
2015-08-26 18:59:32 -04:00
|
|
|
static unsigned long
|
|
|
|
queue_length(VALUE self)
|
|
|
|
{
|
|
|
|
VALUE que = GET_QUEUE_QUE(self);
|
|
|
|
return RARRAY_LEN(que);
|
|
|
|
}
|
|
|
|
|
|
|
|
static unsigned long
|
|
|
|
queue_num_waiting(VALUE self)
|
|
|
|
{
|
|
|
|
VALUE waiters = GET_QUEUE_WAITERS(self);
|
|
|
|
return RARRAY_LEN(waiters);
|
|
|
|
}
|
|
|
|
|
|
|
|
static unsigned long
|
|
|
|
szqueue_num_waiting_producer(VALUE self)
|
|
|
|
{
|
|
|
|
VALUE waiters = GET_SZQUEUE_WAITERS(self);
|
|
|
|
return RARRAY_LEN(waiters);
|
|
|
|
}
|
|
|
|
|
|
|
|
static int
|
|
|
|
queue_closed_p(VALUE self)
|
|
|
|
{
|
|
|
|
return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
raise_closed_queue_error(VALUE self)
|
|
|
|
{
|
|
|
|
rb_raise(rb_eClosedQueueError, "queue closed");
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
queue_closed_result(VALUE self)
|
|
|
|
{
|
|
|
|
assert(queue_length(self) == 0);
|
|
|
|
return Qnil;
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
2015-11-20 19:32:09 -05:00
|
|
|
queue_do_close(VALUE self, int is_szq)
|
2015-08-26 18:59:32 -04:00
|
|
|
{
|
2015-09-01 05:17:28 -04:00
|
|
|
if (!queue_closed_p(self)) {
|
|
|
|
FL_SET(self, QUEUE_CLOSED);
|
2015-08-26 18:59:32 -04:00
|
|
|
|
2015-09-01 05:17:28 -04:00
|
|
|
if (queue_num_waiting(self) > 0) {
|
|
|
|
VALUE waiters = GET_QUEUE_WAITERS(self);
|
|
|
|
wakeup_all_threads(waiters);
|
|
|
|
}
|
2015-08-26 18:59:32 -04:00
|
|
|
|
2015-09-01 05:17:28 -04:00
|
|
|
if (is_szq && szqueue_num_waiting_producer(self) > 0) {
|
|
|
|
VALUE waiters = GET_SZQUEUE_WAITERS(self);
|
|
|
|
wakeup_all_threads(waiters);
|
|
|
|
}
|
2015-08-26 18:59:32 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
/*
|
|
|
|
* Document-class: Queue
|
|
|
|
*
|
2016-03-17 04:20:29 -04:00
|
|
|
* The Queue class implements multi-producer, multi-consumer queues.
|
|
|
|
* It is especially useful in threaded programming when information
|
|
|
|
* must be exchanged safely between multiple threads. The Queue class
|
|
|
|
* implements all the required locking semantics.
|
|
|
|
*
|
|
|
|
* The class implements FIFO type of queue. In a FIFO queue, the first
|
|
|
|
* tasks added are the first retrieved.
|
2015-08-21 19:36:23 -04:00
|
|
|
*
|
|
|
|
* Example:
|
|
|
|
*
|
|
|
|
* require 'thread'
|
|
|
|
* queue = Queue.new
|
|
|
|
*
|
|
|
|
* producer = Thread.new do
|
|
|
|
* 5.times do |i|
|
|
|
|
* sleep rand(i) # simulate expense
|
|
|
|
* queue << i
|
|
|
|
* puts "#{i} produced"
|
|
|
|
* end
|
|
|
|
* end
|
|
|
|
*
|
|
|
|
* consumer = Thread.new do
|
|
|
|
* 5.times do |i|
|
|
|
|
* value = queue.pop
|
|
|
|
* sleep rand(i/2) # simulate expense
|
|
|
|
* puts "consumed #{value}"
|
|
|
|
* end
|
|
|
|
* end
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: Queue::new
|
|
|
|
*
|
|
|
|
* Creates a new queue instance.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_initialize(VALUE self)
|
|
|
|
{
|
|
|
|
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
|
|
|
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
queue_do_push(VALUE self, VALUE obj)
|
|
|
|
{
|
2015-08-26 18:59:32 -04:00
|
|
|
if (queue_closed_p(self)) {
|
|
|
|
raise_closed_queue_error(self);
|
|
|
|
}
|
2015-08-21 19:36:23 -04:00
|
|
|
rb_ary_push(GET_QUEUE_QUE(self), obj);
|
|
|
|
wakeup_first_thread(GET_QUEUE_WAITERS(self));
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
2015-08-26 18:59:32 -04:00
|
|
|
/*
|
|
|
|
* Document-method: Queue#close
|
|
|
|
* call-seq:
|
2015-11-20 19:32:09 -05:00
|
|
|
* close
|
2015-08-26 18:59:32 -04:00
|
|
|
*
|
|
|
|
* Closes the queue. A closed queue cannot be re-opened.
|
|
|
|
*
|
|
|
|
* After the call to close completes, the following are true:
|
|
|
|
*
|
|
|
|
* - +closed?+ will return true
|
|
|
|
*
|
2015-09-01 05:17:28 -04:00
|
|
|
* - +close+ will be ignored.
|
|
|
|
*
|
2015-11-20 19:32:09 -05:00
|
|
|
* - calling enq/push/<< will return nil.
|
2015-08-26 18:59:32 -04:00
|
|
|
*
|
|
|
|
* - when +empty?+ is false, calling deq/pop/shift will return an object
|
|
|
|
* from the queue as usual.
|
|
|
|
*
|
|
|
|
* ClosedQueueError is inherited from StopIteration, so that you can break loop block.
|
|
|
|
*
|
|
|
|
* Example:
|
2015-08-26 19:00:15 -04:00
|
|
|
*
|
2015-08-26 18:59:32 -04:00
|
|
|
* q = Queue.new
|
|
|
|
* Thread.new{
|
|
|
|
* while e = q.deq # wait for nil to break loop
|
|
|
|
* # ...
|
|
|
|
* end
|
|
|
|
* }
|
2015-11-20 19:32:09 -05:00
|
|
|
* q.close
|
2015-08-26 18:59:32 -04:00
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
2015-11-20 19:32:09 -05:00
|
|
|
rb_queue_close(VALUE self)
|
2015-08-26 18:59:32 -04:00
|
|
|
{
|
2015-11-20 19:32:09 -05:00
|
|
|
return queue_do_close(self, FALSE);
|
2015-08-26 18:59:32 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: Queue#closed?
|
|
|
|
* call-seq: closed?
|
|
|
|
*
|
|
|
|
* Returns +true+ if the queue is closed.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_closed_p(VALUE self)
|
|
|
|
{
|
|
|
|
return queue_closed_p(self) ? Qtrue : Qfalse;
|
|
|
|
}
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
/*
|
|
|
|
* Document-method: Queue#push
|
|
|
|
* call-seq:
|
|
|
|
* push(object)
|
|
|
|
* enq(object)
|
|
|
|
* <<(object)
|
|
|
|
*
|
|
|
|
* Pushes the given +object+ to the queue.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_push(VALUE self, VALUE obj)
|
|
|
|
{
|
|
|
|
return queue_do_push(self, obj);
|
|
|
|
}
|
|
|
|
|
|
|
|
struct waiting_delete {
|
|
|
|
VALUE waiting;
|
|
|
|
VALUE th;
|
|
|
|
};
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
queue_delete_from_waiting(struct waiting_delete *p)
|
|
|
|
{
|
|
|
|
rb_ary_delete(p->waiting, p->th);
|
|
|
|
return Qnil;
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
queue_sleep(VALUE arg)
|
|
|
|
{
|
2017-01-31 02:00:38 -05:00
|
|
|
rb_thread_sleep_deadly_allow_spurious_wakeup();
|
2015-08-21 19:36:23 -04:00
|
|
|
return Qnil;
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
queue_do_pop(VALUE self, int should_block)
|
|
|
|
{
|
|
|
|
struct waiting_delete args;
|
|
|
|
args.waiting = GET_QUEUE_WAITERS(self);
|
|
|
|
args.th = rb_thread_current();
|
|
|
|
|
|
|
|
while (queue_length(self) == 0) {
|
|
|
|
if (!should_block) {
|
|
|
|
rb_raise(rb_eThreadError, "queue empty");
|
|
|
|
}
|
2015-08-26 18:59:32 -04:00
|
|
|
else if (queue_closed_p(self)) {
|
|
|
|
return queue_closed_result(self);
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
assert(queue_length(self) == 0);
|
|
|
|
assert(queue_closed_p(self) == 0);
|
|
|
|
|
|
|
|
rb_ary_push(args.waiting, args.th);
|
2016-01-02 07:08:34 -05:00
|
|
|
rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
|
2015-08-26 18:59:32 -04:00
|
|
|
}
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
return rb_ary_shift(GET_QUEUE_QUE(self));
|
|
|
|
}
|
|
|
|
|
|
|
|
static int
|
|
|
|
queue_pop_should_block(int argc, const VALUE *argv)
|
|
|
|
{
|
|
|
|
int should_block = 1;
|
|
|
|
rb_check_arity(argc, 0, 1);
|
|
|
|
if (argc > 0) {
|
|
|
|
should_block = !RTEST(argv[0]);
|
|
|
|
}
|
|
|
|
return should_block;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: Queue#pop
|
|
|
|
* call-seq:
|
|
|
|
* pop(non_block=false)
|
|
|
|
* deq(non_block=false)
|
|
|
|
* shift(non_block=false)
|
|
|
|
*
|
|
|
|
* Retrieves data from the queue.
|
|
|
|
*
|
|
|
|
* If the queue is empty, the calling thread is suspended until data is pushed
|
2016-09-29 06:34:25 -04:00
|
|
|
* onto the queue. If +non_block+ is true, the thread isn't suspended, and
|
|
|
|
* +ThreadError+ is raised.
|
2015-08-21 19:36:23 -04:00
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_pop(int argc, VALUE *argv, VALUE self)
|
|
|
|
{
|
|
|
|
int should_block = queue_pop_should_block(argc, argv);
|
|
|
|
return queue_do_pop(self, should_block);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: Queue#empty?
|
|
|
|
* call-seq: empty?
|
|
|
|
*
|
|
|
|
* Returns +true+ if the queue is empty.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_empty_p(VALUE self)
|
|
|
|
{
|
|
|
|
return queue_length(self) == 0 ? Qtrue : Qfalse;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: Queue#clear
|
|
|
|
*
|
|
|
|
* Removes all objects from the queue.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_clear(VALUE self)
|
|
|
|
{
|
|
|
|
rb_ary_clear(GET_QUEUE_QUE(self));
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: Queue#length
|
|
|
|
* call-seq:
|
|
|
|
* length
|
|
|
|
* size
|
|
|
|
*
|
|
|
|
* Returns the length of the queue.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_length(VALUE self)
|
|
|
|
{
|
|
|
|
unsigned long len = queue_length(self);
|
|
|
|
return ULONG2NUM(len);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: Queue#num_waiting
|
|
|
|
*
|
|
|
|
* Returns the number of threads waiting on the queue.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_queue_num_waiting(VALUE self)
|
|
|
|
{
|
|
|
|
unsigned long len = queue_num_waiting(self);
|
|
|
|
return ULONG2NUM(len);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-class: SizedQueue
|
|
|
|
*
|
|
|
|
* This class represents queues of specified size capacity. The push operation
|
|
|
|
* may be blocked if the capacity is full.
|
|
|
|
*
|
|
|
|
* See Queue for an example of how a SizedQueue works.
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: SizedQueue::new
|
|
|
|
* call-seq: new(max)
|
|
|
|
*
|
|
|
|
* Creates a fixed-length queue with a maximum size of +max+.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_szqueue_initialize(VALUE self, VALUE vmax)
|
|
|
|
{
|
|
|
|
long max;
|
|
|
|
|
|
|
|
max = NUM2LONG(vmax);
|
|
|
|
if (max <= 0) {
|
|
|
|
rb_raise(rb_eArgError, "queue size must be positive");
|
|
|
|
}
|
|
|
|
|
|
|
|
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
|
|
|
|
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
|
|
|
|
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
|
|
|
|
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
|
|
|
|
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
2015-08-26 18:59:32 -04:00
|
|
|
/*
|
|
|
|
* Document-method: SizedQueue#close
|
|
|
|
* call-seq:
|
2016-01-04 01:38:26 -05:00
|
|
|
* close
|
2015-08-26 18:59:32 -04:00
|
|
|
*
|
|
|
|
* Similar to Queue#close.
|
|
|
|
*
|
|
|
|
* The difference is behavior with waiting enqueuing threads.
|
|
|
|
*
|
|
|
|
* If there are waiting enqueuing threads, they are interrupted by
|
|
|
|
* raising ClosedQueueError('queue closed').
|
|
|
|
*/
|
|
|
|
static VALUE
|
2015-11-20 19:32:09 -05:00
|
|
|
rb_szqueue_close(VALUE self)
|
2015-08-26 18:59:32 -04:00
|
|
|
{
|
2015-11-20 19:32:09 -05:00
|
|
|
return queue_do_close(self, TRUE);
|
2015-08-26 18:59:32 -04:00
|
|
|
}
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
/*
|
|
|
|
* Document-method: SizedQueue#max
|
|
|
|
*
|
|
|
|
* Returns the maximum size of the queue.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_szqueue_max_get(VALUE self)
|
|
|
|
{
|
|
|
|
return GET_SZQUEUE_MAX(self);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: SizedQueue#max=
|
|
|
|
* call-seq: max=(number)
|
|
|
|
*
|
|
|
|
* Sets the maximum size of the queue to the given +number+.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_szqueue_max_set(VALUE self, VALUE vmax)
|
|
|
|
{
|
|
|
|
long max = NUM2LONG(vmax), diff = 0;
|
|
|
|
VALUE t;
|
|
|
|
|
|
|
|
if (max <= 0) {
|
|
|
|
rb_raise(rb_eArgError, "queue size must be positive");
|
|
|
|
}
|
|
|
|
if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
|
|
|
|
diff = max - GET_SZQUEUE_ULONGMAX(self);
|
|
|
|
}
|
|
|
|
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
|
|
|
|
while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) {
|
|
|
|
rb_thread_wakeup_alive(t);
|
|
|
|
}
|
|
|
|
return vmax;
|
|
|
|
}
|
|
|
|
|
|
|
|
static int
|
|
|
|
szqueue_push_should_block(int argc, const VALUE *argv)
|
|
|
|
{
|
|
|
|
int should_block = 1;
|
|
|
|
rb_check_arity(argc, 1, 2);
|
|
|
|
if (argc > 1) {
|
|
|
|
should_block = !RTEST(argv[1]);
|
|
|
|
}
|
|
|
|
return should_block;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: SizedQueue#push
|
|
|
|
* call-seq:
|
|
|
|
* push(object, non_block=false)
|
|
|
|
* enq(object, non_block=false)
|
|
|
|
* <<(object)
|
|
|
|
*
|
|
|
|
* Pushes +object+ to the queue.
|
|
|
|
*
|
|
|
|
* If there is no space left in the queue, waits until space becomes
|
|
|
|
* available, unless +non_block+ is true. If +non_block+ is true, the
|
2016-09-29 06:34:25 -04:00
|
|
|
* thread isn't suspended, and +ThreadError+ is raised.
|
2015-08-21 19:36:23 -04:00
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
|
|
|
|
{
|
|
|
|
struct waiting_delete args;
|
|
|
|
int should_block = szqueue_push_should_block(argc, argv);
|
|
|
|
args.waiting = GET_SZQUEUE_WAITERS(self);
|
|
|
|
args.th = rb_thread_current();
|
|
|
|
|
|
|
|
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
|
|
|
|
if (!should_block) {
|
|
|
|
rb_raise(rb_eThreadError, "queue full");
|
|
|
|
}
|
2015-08-26 18:59:32 -04:00
|
|
|
else if (queue_closed_p(self)) {
|
|
|
|
goto closed;
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
rb_ary_push(args.waiting, args.th);
|
2016-01-02 07:08:34 -05:00
|
|
|
rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
|
2015-08-26 18:59:32 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (queue_closed_p(self)) {
|
|
|
|
closed:
|
|
|
|
raise_closed_queue_error(self);
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
2015-08-26 18:59:32 -04:00
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
return queue_do_push(self, argv[0]);
|
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
szqueue_do_pop(VALUE self, int should_block)
|
|
|
|
{
|
|
|
|
VALUE retval = queue_do_pop(self, should_block);
|
|
|
|
|
|
|
|
if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
|
|
|
|
wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
|
|
|
|
}
|
|
|
|
|
|
|
|
return retval;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: SizedQueue#pop
|
|
|
|
* call-seq:
|
|
|
|
* pop(non_block=false)
|
|
|
|
* deq(non_block=false)
|
|
|
|
* shift(non_block=false)
|
|
|
|
*
|
|
|
|
* Retrieves data from the queue.
|
|
|
|
*
|
|
|
|
* If the queue is empty, the calling thread is suspended until data is pushed
|
2016-09-29 06:34:25 -04:00
|
|
|
* onto the queue. If +non_block+ is true, the thread isn't suspended, and
|
|
|
|
* +ThreadError+ is raised.
|
2015-08-21 19:36:23 -04:00
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
|
|
|
|
{
|
|
|
|
int should_block = queue_pop_should_block(argc, argv);
|
|
|
|
return szqueue_do_pop(self, should_block);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2017-04-30 05:06:39 -04:00
|
|
|
* Document-method: SizedQueue#clear
|
2015-08-21 19:36:23 -04:00
|
|
|
*
|
|
|
|
* Removes all objects from the queue.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_szqueue_clear(VALUE self)
|
|
|
|
{
|
|
|
|
rb_ary_clear(GET_QUEUE_QUE(self));
|
|
|
|
wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: SizedQueue#num_waiting
|
|
|
|
*
|
|
|
|
* Returns the number of threads waiting on the queue.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_szqueue_num_waiting(VALUE self)
|
|
|
|
{
|
2015-08-26 18:59:32 -04:00
|
|
|
long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self);
|
2015-08-21 19:36:23 -04:00
|
|
|
return ULONG2NUM(len);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ConditionalVariable */
|
|
|
|
|
|
|
|
enum {
|
|
|
|
CONDVAR_WAITERS,
|
|
|
|
END_CONDVAR
|
|
|
|
};
|
|
|
|
|
|
|
|
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-class: ConditionVariable
|
|
|
|
*
|
|
|
|
* ConditionVariable objects augment class Mutex. Using condition variables,
|
|
|
|
* it is possible to suspend while in the middle of a critical section until a
|
|
|
|
* resource becomes available.
|
|
|
|
*
|
|
|
|
* Example:
|
|
|
|
*
|
|
|
|
* require 'thread'
|
|
|
|
*
|
|
|
|
* mutex = Mutex.new
|
|
|
|
* resource = ConditionVariable.new
|
|
|
|
*
|
|
|
|
* a = Thread.new {
|
|
|
|
* mutex.synchronize {
|
|
|
|
* # Thread 'a' now needs the resource
|
|
|
|
* resource.wait(mutex)
|
|
|
|
* # 'a' can now have the resource
|
|
|
|
* }
|
|
|
|
* }
|
|
|
|
*
|
|
|
|
* b = Thread.new {
|
|
|
|
* mutex.synchronize {
|
|
|
|
* # Thread 'b' has finished using the resource
|
|
|
|
* resource.signal
|
|
|
|
* }
|
|
|
|
* }
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: ConditionVariable::new
|
|
|
|
*
|
|
|
|
* Creates a new condition variable instance.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_condvar_initialize(VALUE self)
|
|
|
|
{
|
|
|
|
RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new());
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
struct sleep_call {
|
|
|
|
VALUE mutex;
|
|
|
|
VALUE timeout;
|
|
|
|
};
|
|
|
|
|
|
|
|
static ID id_sleep;
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
do_sleep(VALUE args)
|
|
|
|
{
|
|
|
|
struct sleep_call *p = (struct sleep_call *)args;
|
2016-07-29 07:57:14 -04:00
|
|
|
return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
|
2015-08-21 19:36:23 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
delete_current_thread(VALUE ary)
|
|
|
|
{
|
|
|
|
return rb_ary_delete(ary, rb_thread_current());
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: ConditionVariable#wait
|
|
|
|
* call-seq: wait(mutex, timeout=nil)
|
|
|
|
*
|
|
|
|
* Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
|
|
|
|
*
|
|
|
|
* If +timeout+ is given, this method returns after +timeout+ seconds passed,
|
|
|
|
* even if no other thread doesn't signal.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
|
|
|
|
{
|
|
|
|
VALUE waiters = GET_CONDVAR_WAITERS(self);
|
|
|
|
VALUE mutex, timeout;
|
|
|
|
struct sleep_call args;
|
|
|
|
|
|
|
|
rb_scan_args(argc, argv, "11", &mutex, &timeout);
|
|
|
|
|
|
|
|
args.mutex = mutex;
|
|
|
|
args.timeout = timeout;
|
|
|
|
rb_ary_push(waiters, rb_thread_current());
|
|
|
|
rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
|
|
|
|
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: ConditionVariable#signal
|
|
|
|
*
|
|
|
|
* Wakes up the first thread in line waiting for this lock.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_condvar_signal(VALUE self)
|
|
|
|
{
|
|
|
|
wakeup_first_thread(GET_CONDVAR_WAITERS(self));
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Document-method: ConditionVariable#broadcast
|
|
|
|
*
|
|
|
|
* Wakes up all threads waiting for this lock.
|
|
|
|
*/
|
|
|
|
|
|
|
|
static VALUE
|
|
|
|
rb_condvar_broadcast(VALUE self)
|
|
|
|
{
|
|
|
|
wakeup_all_threads(GET_CONDVAR_WAITERS(self));
|
|
|
|
return self;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* :nodoc: */
|
|
|
|
static VALUE
|
|
|
|
undumpable(VALUE obj)
|
|
|
|
{
|
|
|
|
rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
|
|
|
|
UNREACHABLE;
|
|
|
|
}
|
|
|
|
|
2016-08-28 04:53:22 -04:00
|
|
|
static void
|
|
|
|
alias_global_const(const char *name, VALUE klass)
|
|
|
|
{
|
|
|
|
rb_define_const(rb_cObject, name, klass);
|
|
|
|
}
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
static void
|
2015-09-01 05:08:42 -04:00
|
|
|
Init_thread_sync(void)
|
2015-08-21 19:36:23 -04:00
|
|
|
{
|
|
|
|
#if 0
|
|
|
|
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
|
|
|
|
rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */
|
|
|
|
rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject); /* teach rdoc SizedQueue */
|
|
|
|
#endif
|
|
|
|
|
|
|
|
/* Mutex */
|
|
|
|
rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
|
|
|
|
rb_define_alloc_func(rb_cMutex, mutex_alloc);
|
|
|
|
rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
|
|
|
|
rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
|
|
|
|
rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
|
|
|
|
rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
|
|
|
|
rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
|
|
|
|
rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
|
|
|
|
rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
|
|
|
|
rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
|
|
|
|
|
|
|
|
/* Queue */
|
|
|
|
rb_cQueue = rb_struct_define_without_accessor_under(
|
|
|
|
rb_cThread,
|
|
|
|
"Queue", rb_cObject, rb_struct_alloc_noinit,
|
|
|
|
"que", "waiters", NULL);
|
2015-08-26 18:59:32 -04:00
|
|
|
|
|
|
|
rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
|
2015-08-21 19:36:23 -04:00
|
|
|
|
|
|
|
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
|
|
|
|
rb_undef_method(rb_cQueue, "initialize_copy");
|
|
|
|
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
|
2015-11-20 19:32:09 -05:00
|
|
|
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
|
2015-08-26 18:59:32 -04:00
|
|
|
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
2015-08-21 19:36:23 -04:00
|
|
|
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
|
|
|
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
|
|
|
|
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
|
|
|
|
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
|
|
|
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
|
|
|
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
|
|
|
|
|
2016-09-29 06:21:04 -04:00
|
|
|
rb_define_alias(rb_cQueue, "enq", "push");
|
|
|
|
rb_define_alias(rb_cQueue, "<<", "push");
|
|
|
|
rb_define_alias(rb_cQueue, "deq", "pop");
|
|
|
|
rb_define_alias(rb_cQueue, "shift", "pop");
|
|
|
|
rb_define_alias(rb_cQueue, "size", "length");
|
2015-08-21 19:36:23 -04:00
|
|
|
|
2015-08-26 18:59:32 -04:00
|
|
|
rb_cSizedQueue = rb_struct_define_without_accessor_under(
|
|
|
|
rb_cThread,
|
|
|
|
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
|
|
|
|
"que", "waiters", "queue_waiters", "size", NULL);
|
|
|
|
|
2015-08-21 19:36:23 -04:00
|
|
|
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
2015-11-20 19:32:09 -05:00
|
|
|
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
|
2015-08-21 19:36:23 -04:00
|
|
|
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
|
|
|
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
|
|
|
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
|
|
|
|
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
|
|
|
|
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
|
|
|
|
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
|
|
|
|
|
2016-09-29 06:21:04 -04:00
|
|
|
rb_define_alias(rb_cSizedQueue, "enq", "push");
|
|
|
|
rb_define_alias(rb_cSizedQueue, "<<", "push");
|
|
|
|
rb_define_alias(rb_cSizedQueue, "deq", "pop");
|
|
|
|
rb_define_alias(rb_cSizedQueue, "shift", "pop");
|
2015-08-21 19:36:23 -04:00
|
|
|
|
|
|
|
/* CVar */
|
|
|
|
rb_cConditionVariable = rb_struct_define_without_accessor_under(
|
|
|
|
rb_cThread,
|
|
|
|
"ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
|
|
|
|
"waiters", NULL);
|
|
|
|
|
|
|
|
id_sleep = rb_intern("sleep");
|
|
|
|
|
|
|
|
rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
|
|
|
|
rb_undef_method(rb_cConditionVariable, "initialize_copy");
|
|
|
|
rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
|
|
|
|
rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
|
|
|
|
rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
|
|
|
|
rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
|
|
|
|
|
2016-04-29 22:55:18 -04:00
|
|
|
#define ALIAS_GLOBAL_CONST(name) \
|
2016-08-28 04:53:22 -04:00
|
|
|
alias_global_const(#name, rb_c##name)
|
2015-08-21 19:36:23 -04:00
|
|
|
|
|
|
|
ALIAS_GLOBAL_CONST(Mutex);
|
|
|
|
ALIAS_GLOBAL_CONST(Queue);
|
|
|
|
ALIAS_GLOBAL_CONST(SizedQueue);
|
|
|
|
ALIAS_GLOBAL_CONST(ConditionVariable);
|
|
|
|
rb_provide("thread.rb");
|
|
|
|
}
|