1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

thread_sync.c: rewrite the rest using using ccan/list

The performance improvement increases as the number of waiters
increases, due to avoiding the O(n) behavior of rb_ary_delete on
the waiting thread.  Uncontended queues and condition variables
performance is not altered significantly.

Function entry cost is slightly increased for ConditionVariable,
since the data pointer is separately allocated and not embedded
into the RVALUE slot.

[ruby-core:81235] [Feature #13552]

name                  |trunk  |built
----------------------|------:|------:
vm_thread_condvar1    |  0.858|  0.858
vm_thread_condvar2    |  1.003|  0.804
vm_thread_queue       |  0.131|  0.129
vm_thread_sized_queue |  0.265|  0.251
vm_thread_sized_queue2|  0.892|  0.859
vm_thread_sized_queue3|  0.879|  0.845
vm_thread_sized_queue4|  0.599|  0.486

Speedup ratio: compare with the result of `trunk' (greater is better)

name                  |built
----------------------|------:
vm_thread_condvar1    |  0.999
vm_thread_condvar2    |  1.246
vm_thread_queue       |  1.020
vm_thread_sized_queue |  1.057
vm_thread_sized_queue2|  1.039
vm_thread_sized_queue3|  1.041
vm_thread_sized_queue4|  1.233

git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58805 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
normal 2017-05-19 18:53:11 +00:00
parent 44e48eca5f
commit ea1ce47fd7

View file

@ -4,8 +4,6 @@
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError; static VALUE rb_eClosedQueueError;
/* Mutex */
/* sync_waiter is always on-stack */ /* sync_waiter is always on-stack */
struct sync_waiter { struct sync_waiter {
rb_thread_t *th; rb_thread_t *th;
@ -14,6 +12,38 @@ struct sync_waiter {
#define MUTEX_ALLOW_TRAP FL_USER1 #define MUTEX_ALLOW_TRAP FL_USER1
static int
wakeup_one(struct list_head *head)
{
struct sync_waiter *cur = 0, *next = 0;
list_for_each_safe(head, cur, next, node) {
list_del_init(&cur->node);
if (cur->th->status != THREAD_KILLED) {
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
return TRUE;
}
}
return FALSE;
}
static void
wakeup_all(struct list_head *head)
{
struct sync_waiter *cur = 0, *next = 0;
list_for_each_safe(head, cur, next, node) {
list_del_init(&cur->node);
if (cur->th->status != THREAD_KILLED) {
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
}
}
}
/* Mutex */
typedef struct rb_mutex_struct { typedef struct rb_mutex_struct {
struct rb_thread_struct volatile *th; struct rb_thread_struct volatile *th;
struct rb_mutex_struct *next_mutex; struct rb_mutex_struct *next_mutex;
@ -491,21 +521,101 @@ void rb_mutex_allow_trap(VALUE self, int val)
/* Queue */ /* Queue */
enum { PACKED_STRUCT_UNALIGNED(struct rb_queue {
QUEUE_QUE, struct list_head waitq;
QUEUE_WAITERS, const VALUE que;
SZQUEUE_WAITERS, int num_waiting;
SZQUEUE_MAX, });
END_QUEUE
PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
struct rb_queue q;
int num_waiting_push;
struct list_head pushq;
long max;
});
static void
queue_mark(void *ptr)
{
struct rb_queue *q = ptr;
/* no need to mark threads in waitq, they are on stack */
rb_gc_mark(q->que);
}
static size_t
queue_memsize(const void *ptr)
{
return sizeof(struct rb_queue);
}
static const rb_data_type_t queue_data_type = {
"queue",
{queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
}; };
static VALUE
queue_alloc(VALUE klass)
{
VALUE obj;
struct rb_queue *q;
obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
list_head_init(&q->waitq);
return obj;
}
static struct rb_queue *
queue_ptr(VALUE obj)
{
struct rb_queue *q;
TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
return q;
}
#define QUEUE_CLOSED FL_USER5 #define QUEUE_CLOSED FL_USER5
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) static void
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) szqueue_mark(void *ptr)
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS) {
#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX) struct rb_szqueue *sq = ptr;
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
queue_mark(&sq->q);
}
static size_t
szqueue_memsize(const void *ptr)
{
return sizeof(struct rb_szqueue);
}
static const rb_data_type_t szqueue_data_type = {
"sized_queue",
{szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
};
static VALUE
szqueue_alloc(VALUE klass)
{
struct rb_szqueue *sq;
VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
&szqueue_data_type, sq);
list_head_init(&sq->q.waitq);
list_head_init(&sq->pushq);
return obj;
}
static struct rb_szqueue *
szqueue_ptr(VALUE obj)
{
struct rb_szqueue *sq;
TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
return sq;
}
static VALUE static VALUE
ary_buf_new(void) ary_buf_new(void)
@ -514,57 +624,18 @@ ary_buf_new(void)
} }
static VALUE static VALUE
get_array(VALUE obj, int idx) check_array(VALUE obj, VALUE ary)
{ {
VALUE ary = RSTRUCT_GET(obj, idx);
if (!RB_TYPE_P(ary, T_ARRAY)) { if (!RB_TYPE_P(ary, T_ARRAY)) {
rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
} }
return ary; return ary;
} }
static void static long
wakeup_first_thread(VALUE list) queue_length(VALUE self, struct rb_queue *q)
{ {
VALUE thread; return RARRAY_LEN(check_array(self, q->que));
while (!NIL_P(thread = rb_ary_shift(list))) {
if (RTEST(rb_thread_wakeup_alive(thread))) break;
}
}
static void
wakeup_all_threads(VALUE list)
{
VALUE thread;
long i;
for (i=0; i<RARRAY_LEN(list); i++) {
thread = RARRAY_AREF(list, i);
rb_thread_wakeup_alive(thread);
}
rb_ary_clear(list);
}
static unsigned long
queue_length(VALUE self)
{
VALUE que = GET_QUEUE_QUE(self);
return RARRAY_LEN(que);
}
static unsigned long
queue_num_waiting(VALUE self)
{
VALUE waiters = GET_QUEUE_WAITERS(self);
return RARRAY_LEN(waiters);
}
static unsigned long
szqueue_num_waiting_producer(VALUE self)
{
VALUE waiters = GET_SZQUEUE_WAITERS(self);
return RARRAY_LEN(waiters);
} }
static int static int
@ -580,32 +651,12 @@ raise_closed_queue_error(VALUE self)
} }
static VALUE static VALUE
queue_closed_result(VALUE self) queue_closed_result(VALUE self, struct rb_queue *q)
{ {
assert(queue_length(self) == 0); assert(queue_length(self, q) == 0);
return Qnil; return Qnil;
} }
static VALUE
queue_do_close(VALUE self, int is_szq)
{
if (!queue_closed_p(self)) {
FL_SET(self, QUEUE_CLOSED);
if (queue_num_waiting(self) > 0) {
VALUE waiters = GET_QUEUE_WAITERS(self);
wakeup_all_threads(waiters);
}
if (is_szq && szqueue_num_waiting_producer(self) > 0) {
VALUE waiters = GET_SZQUEUE_WAITERS(self);
wakeup_all_threads(waiters);
}
}
return self;
}
/* /*
* Document-class: Queue * Document-class: Queue
* *
@ -649,19 +700,20 @@ queue_do_close(VALUE self, int is_szq)
static VALUE static VALUE
rb_queue_initialize(VALUE self) rb_queue_initialize(VALUE self)
{ {
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); struct rb_queue *q = queue_ptr(self);
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); RB_OBJ_WRITE(self, &q->que, ary_buf_new());
list_head_init(&q->waitq);
return self; return self;
} }
static VALUE static VALUE
queue_do_push(VALUE self, VALUE obj) queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
{ {
if (queue_closed_p(self)) { if (queue_closed_p(self)) {
raise_closed_queue_error(self); raise_closed_queue_error(self);
} }
rb_ary_push(GET_QUEUE_QUE(self), obj); rb_ary_push(check_array(self, q->que), obj);
wakeup_first_thread(GET_QUEUE_WAITERS(self)); wakeup_one(&q->waitq);
return self; return self;
} }
@ -699,7 +751,15 @@ queue_do_push(VALUE self, VALUE obj)
static VALUE static VALUE
rb_queue_close(VALUE self) rb_queue_close(VALUE self)
{ {
return queue_do_close(self, FALSE); struct rb_queue *q = queue_ptr(self);
if (!queue_closed_p(self)) {
FL_SET(self, QUEUE_CLOSED);
wakeup_all(&q->waitq);
}
return self;
} }
/* /*
@ -728,19 +788,7 @@ rb_queue_closed_p(VALUE self)
static VALUE static VALUE
rb_queue_push(VALUE self, VALUE obj) rb_queue_push(VALUE self, VALUE obj)
{ {
return queue_do_push(self, obj); return queue_do_push(self, queue_ptr(self), obj);
}
struct waiting_delete {
VALUE waiting;
VALUE th;
};
static VALUE
queue_delete_from_waiting(struct waiting_delete *p)
{
rb_ary_delete(p->waiting, p->th);
return Qnil;
} }
static VALUE static VALUE
@ -750,30 +798,64 @@ queue_sleep(VALUE arg)
return Qnil; return Qnil;
} }
static VALUE struct queue_waiter {
queue_do_pop(VALUE self, int should_block) struct sync_waiter w;
{ union {
struct waiting_delete args; struct rb_queue *q;
args.waiting = GET_QUEUE_WAITERS(self); struct rb_szqueue *sq;
args.th = rb_thread_current(); } as;
};
while (queue_length(self) == 0) { static VALUE
queue_sleep_done(VALUE p)
{
struct queue_waiter *qw = (struct queue_waiter *)p;
list_del(&qw->w.node);
qw->as.q->num_waiting--;
return Qfalse;
}
static VALUE
szqueue_sleep_done(VALUE p)
{
struct queue_waiter *qw = (struct queue_waiter *)p;
list_del(&qw->w.node);
qw->as.sq->num_waiting_push--;
return Qfalse;
}
static VALUE
queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
{
check_array(self, q->que);
while (RARRAY_LEN(q->que) == 0) {
if (!should_block) { if (!should_block) {
rb_raise(rb_eThreadError, "queue empty"); rb_raise(rb_eThreadError, "queue empty");
} }
else if (queue_closed_p(self)) { else if (queue_closed_p(self)) {
return queue_closed_result(self); return queue_closed_result(self, q);
} }
else { else {
assert(queue_length(self) == 0); struct queue_waiter qw;
assert(RARRAY_LEN(q->que) == 0);
assert(queue_closed_p(self) == 0); assert(queue_closed_p(self) == 0);
rb_ary_push(args.waiting, args.th); qw.w.th = GET_THREAD();
rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args); qw.as.q = q;
list_add_tail(&qw.as.q->waitq, &qw.w.node);
qw.as.q->num_waiting++;
rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw);
} }
} }
return rb_ary_shift(GET_QUEUE_QUE(self)); return rb_ary_shift(q->que);
} }
static int static int
@ -805,7 +887,7 @@ static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self) rb_queue_pop(int argc, VALUE *argv, VALUE self)
{ {
int should_block = queue_pop_should_block(argc, argv); int should_block = queue_pop_should_block(argc, argv);
return queue_do_pop(self, should_block); return queue_do_pop(self, queue_ptr(self), should_block);
} }
/* /*
@ -818,7 +900,7 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self)
static VALUE static VALUE
rb_queue_empty_p(VALUE self) rb_queue_empty_p(VALUE self)
{ {
return queue_length(self) == 0 ? Qtrue : Qfalse; return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
} }
/* /*
@ -830,7 +912,9 @@ rb_queue_empty_p(VALUE self)
static VALUE static VALUE
rb_queue_clear(VALUE self) rb_queue_clear(VALUE self)
{ {
rb_ary_clear(GET_QUEUE_QUE(self)); struct rb_queue *q = queue_ptr(self);
rb_ary_clear(check_array(self, q->que));
return self; return self;
} }
@ -846,8 +930,7 @@ rb_queue_clear(VALUE self)
static VALUE static VALUE
rb_queue_length(VALUE self) rb_queue_length(VALUE self)
{ {
unsigned long len = queue_length(self); return LONG2NUM(queue_length(self, queue_ptr(self)));
return ULONG2NUM(len);
} }
/* /*
@ -859,8 +942,9 @@ rb_queue_length(VALUE self)
static VALUE static VALUE
rb_queue_num_waiting(VALUE self) rb_queue_num_waiting(VALUE self)
{ {
unsigned long len = queue_num_waiting(self); struct rb_queue *q = queue_ptr(self);
return ULONG2NUM(len);
return INT2NUM(q->num_waiting);
} }
/* /*
@ -883,16 +967,17 @@ static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax) rb_szqueue_initialize(VALUE self, VALUE vmax)
{ {
long max; long max;
struct rb_szqueue *sq = szqueue_ptr(self);
max = NUM2LONG(vmax); max = NUM2LONG(vmax);
if (max <= 0) { if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive"); rb_raise(rb_eArgError, "queue size must be positive");
} }
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); list_head_init(&sq->q.waitq);
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new()); list_head_init(&sq->pushq);
RSTRUCT_SET(self, SZQUEUE_MAX, vmax); sq->max = max;
return self; return self;
} }
@ -912,7 +997,14 @@ rb_szqueue_initialize(VALUE self, VALUE vmax)
static VALUE static VALUE
rb_szqueue_close(VALUE self) rb_szqueue_close(VALUE self)
{ {
return queue_do_close(self, TRUE); if (!queue_closed_p(self)) {
struct rb_szqueue *sq = szqueue_ptr(self);
FL_SET(self, QUEUE_CLOSED);
wakeup_all(&sq->q.waitq);
wakeup_all(&sq->pushq);
}
return self;
} }
/* /*
@ -924,7 +1016,7 @@ rb_szqueue_close(VALUE self)
static VALUE static VALUE
rb_szqueue_max_get(VALUE self) rb_szqueue_max_get(VALUE self)
{ {
return GET_SZQUEUE_MAX(self); return LONG2NUM(szqueue_ptr(self)->max);
} }
/* /*
@ -937,18 +1029,19 @@ rb_szqueue_max_get(VALUE self)
static VALUE static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax) rb_szqueue_max_set(VALUE self, VALUE vmax)
{ {
long max = NUM2LONG(vmax), diff = 0; long max = NUM2LONG(vmax);
VALUE t; long diff = 0;
struct rb_szqueue *sq = szqueue_ptr(self);
if (max <= 0) { if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive"); rb_raise(rb_eArgError, "queue size must be positive");
} }
if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) { if (max > sq->max) {
diff = max - GET_SZQUEUE_ULONGMAX(self); diff = max - sq->max;
} }
RSTRUCT_SET(self, SZQUEUE_MAX, vmax); sq->max = max;
while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) { while (diff-- > 0 && wakeup_one(&sq->pushq)) {
rb_thread_wakeup_alive(t); /* keep waking more up */
} }
return vmax; return vmax;
} }
@ -981,12 +1074,10 @@ szqueue_push_should_block(int argc, const VALUE *argv)
static VALUE static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self) rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{ {
struct waiting_delete args; struct rb_szqueue *sq = szqueue_ptr(self);
int should_block = szqueue_push_should_block(argc, argv); int should_block = szqueue_push_should_block(argc, argv);
args.waiting = GET_SZQUEUE_WAITERS(self);
args.th = rb_thread_current();
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) { while (queue_length(self, &sq->q) >= sq->max) {
if (!should_block) { if (!should_block) {
rb_raise(rb_eThreadError, "queue full"); rb_raise(rb_eThreadError, "queue full");
} }
@ -994,8 +1085,14 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
goto closed; goto closed;
} }
else { else {
rb_ary_push(args.waiting, args.th); struct queue_waiter qw;
rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
qw.w.th = GET_THREAD();
qw.as.sq = sq;
list_add_tail(&sq->pushq, &qw.w.node);
sq->num_waiting_push++;
rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
} }
} }
@ -1004,16 +1101,17 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
raise_closed_queue_error(self); raise_closed_queue_error(self);
} }
return queue_do_push(self, argv[0]); return queue_do_push(self, &sq->q, argv[0]);
} }
static VALUE static VALUE
szqueue_do_pop(VALUE self, int should_block) szqueue_do_pop(VALUE self, int should_block)
{ {
VALUE retval = queue_do_pop(self, should_block); struct rb_szqueue *sq = szqueue_ptr(self);
VALUE retval = queue_do_pop(self, &sq->q, should_block);
if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) { if (queue_length(self, &sq->q) < sq->max) {
wakeup_first_thread(GET_SZQUEUE_WAITERS(self)); wakeup_one(&sq->pushq);
} }
return retval; return retval;
@ -1049,11 +1147,21 @@ rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
static VALUE static VALUE
rb_szqueue_clear(VALUE self) rb_szqueue_clear(VALUE self)
{ {
rb_ary_clear(GET_QUEUE_QUE(self)); struct rb_szqueue *sq = szqueue_ptr(self);
wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
rb_ary_clear(check_array(self, sq->q.que));
wakeup_all(&sq->pushq);
return self; return self;
} }
static VALUE
rb_szqueue_length(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return LONG2NUM(queue_length(self, &sq->q));
}
/* /*
* Document-method: SizedQueue#num_waiting * Document-method: SizedQueue#num_waiting
* *
@ -1063,19 +1171,33 @@ rb_szqueue_clear(VALUE self)
static VALUE static VALUE
rb_szqueue_num_waiting(VALUE self) rb_szqueue_num_waiting(VALUE self)
{ {
long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self); struct rb_szqueue *sq = szqueue_ptr(self);
return ULONG2NUM(len);
return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
} }
/*
* Document-method: SizedQueue#empty?
* call-seq: empty?
*
* Returns +true+ if the queue is empty.
*/
static VALUE
rb_szqueue_empty_p(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
}
/* ConditionalVariable */ /* ConditionalVariable */
/* TODO: maybe this can be IMEMO */
enum { struct rb_condvar {
CONDVAR_WAITERS, struct list_head waitq;
END_CONDVAR
}; };
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
/* /*
* Document-class: ConditionVariable * Document-class: ConditionVariable
* *
@ -1106,6 +1228,40 @@ enum {
* } * }
*/ */
static size_t
condvar_memsize(const void *ptr)
{
return sizeof(struct rb_condvar);
}
static const rb_data_type_t cv_data_type = {
"condvar",
{0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
};
static struct rb_condvar *
condvar_ptr(VALUE self)
{
struct rb_condvar *cv;
TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
return cv;
}
static VALUE
condvar_alloc(VALUE klass)
{
struct rb_condvar *cv;
VALUE obj;
obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
list_head_init(&cv->waitq);
return obj;
}
/* /*
* Document-method: ConditionVariable::new * Document-method: ConditionVariable::new
* *
@ -1115,7 +1271,8 @@ enum {
static VALUE static VALUE
rb_condvar_initialize(VALUE self) rb_condvar_initialize(VALUE self)
{ {
RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new()); struct rb_condvar *cv = condvar_ptr(self);;
list_head_init(&cv->waitq);
return self; return self;
} }
@ -1134,9 +1291,11 @@ do_sleep(VALUE args)
} }
static VALUE static VALUE
delete_current_thread(VALUE ary) delete_from_waitq(struct sync_waiter *w)
{ {
return rb_ary_delete(ary, rb_thread_current()); list_del(&w->node);
return Qnil;
} }
/* /*
@ -1152,16 +1311,18 @@ delete_current_thread(VALUE ary)
static VALUE static VALUE
rb_condvar_wait(int argc, VALUE *argv, VALUE self) rb_condvar_wait(int argc, VALUE *argv, VALUE self)
{ {
VALUE waiters = GET_CONDVAR_WAITERS(self); struct rb_condvar *cv = condvar_ptr(self);
VALUE mutex, timeout; VALUE mutex, timeout;
struct sleep_call args; struct sleep_call args;
struct sync_waiter w;
rb_scan_args(argc, argv, "11", &mutex, &timeout); rb_scan_args(argc, argv, "11", &mutex, &timeout);
args.mutex = mutex; args.mutex = mutex;
args.timeout = timeout; args.timeout = timeout;
rb_ary_push(waiters, rb_thread_current()); w.th = GET_THREAD();
rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters); list_add_tail(&cv->waitq, &w.node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
return self; return self;
} }
@ -1175,7 +1336,8 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self)
static VALUE static VALUE
rb_condvar_signal(VALUE self) rb_condvar_signal(VALUE self)
{ {
wakeup_first_thread(GET_CONDVAR_WAITERS(self)); struct rb_condvar *cv = condvar_ptr(self);
wakeup_one(&cv->waitq);
return self; return self;
} }
@ -1188,7 +1350,8 @@ rb_condvar_signal(VALUE self)
static VALUE static VALUE
rb_condvar_broadcast(VALUE self) rb_condvar_broadcast(VALUE self)
{ {
wakeup_all_threads(GET_CONDVAR_WAITERS(self)); struct rb_condvar *cv = condvar_ptr(self);
wakeup_all(&cv->waitq);
return self; return self;
} }
@ -1228,10 +1391,8 @@ Init_thread_sync(void)
rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0); rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
/* Queue */ /* Queue */
rb_cQueue = rb_struct_define_without_accessor_under( rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
rb_cThread, rb_define_alloc_func(rb_cQueue, queue_alloc);
"Queue", rb_cObject, rb_struct_alloc_noinit,
"que", "waiters", NULL);
rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration); rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
@ -1253,10 +1414,8 @@ Init_thread_sync(void)
rb_define_alias(rb_cQueue, "shift", "pop"); rb_define_alias(rb_cQueue, "shift", "pop");
rb_define_alias(rb_cQueue, "size", "length"); rb_define_alias(rb_cQueue, "size", "length");
rb_cSizedQueue = rb_struct_define_without_accessor_under( rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cQueue);
rb_cThread, rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
"que", "waiters", "queue_waiters", "size", NULL);
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1); rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0); rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
@ -1264,19 +1423,21 @@ Init_thread_sync(void)
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1); rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1); rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0); rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
rb_define_alias(rb_cSizedQueue, "enq", "push"); rb_define_alias(rb_cSizedQueue, "enq", "push");
rb_define_alias(rb_cSizedQueue, "<<", "push"); rb_define_alias(rb_cSizedQueue, "<<", "push");
rb_define_alias(rb_cSizedQueue, "deq", "pop"); rb_define_alias(rb_cSizedQueue, "deq", "pop");
rb_define_alias(rb_cSizedQueue, "shift", "pop"); rb_define_alias(rb_cSizedQueue, "shift", "pop");
rb_define_alias(rb_cSizedQueue, "size", "length");
/* CVar */ /* CVar */
rb_cConditionVariable = rb_struct_define_without_accessor_under( rb_cConditionVariable = rb_define_class_under(rb_cThread,
rb_cThread, "ConditionVariable", rb_cObject);
"ConditionVariable", rb_cObject, rb_struct_alloc_noinit, rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
"waiters", NULL);
id_sleep = rb_intern("sleep"); id_sleep = rb_intern("sleep");