mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00

Mikhail T. in [ruby-core:26217]. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/branches/ruby_1_8@25431 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
1251 lines
25 KiB
C
1251 lines
25 KiB
C
/*
|
|
* Optimized Ruby Mutex implementation, loosely based on thread.rb by
|
|
* Yukihiro Matsumoto <matz@ruby-lang.org>
|
|
*
|
|
* Copyright 2006-2007 MenTaLguY <mental@rydia.net>
|
|
*
|
|
* RDoc taken from original.
|
|
*
|
|
* This file is made available under the same terms as Ruby.
|
|
*/
|
|
|
|
#include <ruby.h>
|
|
#include <intern.h>
|
|
#include <rubysig.h>
|
|
|
|
static VALUE rb_cMutex;
|
|
static VALUE rb_cConditionVariable;
|
|
static VALUE rb_cQueue;
|
|
static VALUE rb_cSizedQueue;
|
|
|
|
static VALUE set_critical(VALUE value);
|
|
|
|
static VALUE
|
|
thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg)
|
|
{
|
|
VALUE critical = rb_thread_critical;
|
|
|
|
rb_thread_critical = 1;
|
|
return rb_ensure(func, arg, set_critical, (VALUE)critical);
|
|
}
|
|
|
|
/*
|
|
* call-seq:
|
|
* Thread.exclusive { block } => obj
|
|
*
|
|
* Wraps a block in Thread.critical, restoring the original value
|
|
* upon exit from the critical section, and returns the value of the
|
|
* block.
|
|
*/
|
|
|
|
static VALUE
|
|
rb_thread_exclusive(void)
|
|
{
|
|
return thread_exclusive(rb_yield, Qundef);
|
|
}
|
|
|
|
typedef struct _Entry {
|
|
VALUE value;
|
|
struct _Entry *next;
|
|
} Entry;
|
|
|
|
typedef struct _List {
|
|
Entry *entries;
|
|
Entry *last_entry;
|
|
Entry *entry_pool;
|
|
unsigned long size;
|
|
} List;
|
|
|
|
static void
|
|
init_list(List *list)
|
|
{
|
|
list->entries = NULL;
|
|
list->last_entry = NULL;
|
|
list->entry_pool = NULL;
|
|
list->size = 0;
|
|
}
|
|
|
|
static void
|
|
mark_list(List *list)
|
|
{
|
|
Entry *entry;
|
|
for (entry = list->entries; entry; entry = entry->next) {
|
|
rb_gc_mark(entry->value);
|
|
}
|
|
}
|
|
|
|
static void
|
|
free_entries(Entry *first)
|
|
{
|
|
Entry *next;
|
|
while (first) {
|
|
next = first->next;
|
|
xfree(first);
|
|
first = next;
|
|
}
|
|
}
|
|
|
|
static void
|
|
finalize_list(List *list)
|
|
{
|
|
free_entries(list->entries);
|
|
free_entries(list->entry_pool);
|
|
}
|
|
|
|
static void
|
|
push_list(List *list, VALUE value)
|
|
{
|
|
Entry *entry;
|
|
|
|
if (list->entry_pool) {
|
|
entry = list->entry_pool;
|
|
list->entry_pool = entry->next;
|
|
} else {
|
|
entry = ALLOC(Entry);
|
|
}
|
|
|
|
entry->value = value;
|
|
entry->next = NULL;
|
|
|
|
if (list->last_entry) {
|
|
list->last_entry->next = entry;
|
|
} else {
|
|
list->entries = entry;
|
|
}
|
|
list->last_entry = entry;
|
|
|
|
++list->size;
|
|
}
|
|
|
|
static void
|
|
push_multiple_list(List *list, VALUE *values, unsigned count)
|
|
{
|
|
unsigned i;
|
|
for (i = 0; i < count; i++) {
|
|
push_list(list, values[i]);
|
|
}
|
|
}
|
|
|
|
static void
|
|
recycle_entries(List *list, Entry *first_entry, Entry *last_entry)
|
|
{
|
|
#ifdef USE_MEM_POOLS
|
|
last_entry->next = list->entry_pool;
|
|
list->entry_pool = first_entry;
|
|
#else
|
|
last_entry->next = NULL;
|
|
free_entries(first_entry);
|
|
#endif
|
|
}
|
|
|
|
static VALUE
|
|
shift_list(List *list)
|
|
{
|
|
Entry *entry;
|
|
VALUE value;
|
|
|
|
entry = list->entries;
|
|
if (!entry) return Qnil;
|
|
|
|
list->entries = entry->next;
|
|
if (entry == list->last_entry) {
|
|
list->last_entry = NULL;
|
|
}
|
|
|
|
--list->size;
|
|
|
|
value = entry->value;
|
|
recycle_entries(list, entry, entry);
|
|
|
|
return value;
|
|
}
|
|
|
|
static void
|
|
remove_one(List *list, VALUE value)
|
|
{
|
|
Entry **ref;
|
|
Entry *prev;
|
|
Entry *entry;
|
|
|
|
for (ref = &list->entries, prev = NULL, entry = list->entries;
|
|
entry != NULL;
|
|
ref = &entry->next, prev = entry, entry = entry->next) {
|
|
if (entry->value == value) {
|
|
*ref = entry->next;
|
|
list->size--;
|
|
if (!entry->next) {
|
|
list->last_entry = prev;
|
|
}
|
|
recycle_entries(list, entry, entry);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
clear_list(List *list)
|
|
{
|
|
if (list->last_entry) {
|
|
recycle_entries(list, list->entries, list->last_entry);
|
|
list->entries = NULL;
|
|
list->last_entry = NULL;
|
|
list->size = 0;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
array_from_list(List const *list)
|
|
{
|
|
VALUE ary;
|
|
Entry *entry;
|
|
ary = rb_ary_new();
|
|
for (entry = list->entries; entry; entry = entry->next) {
|
|
rb_ary_push(ary, entry->value);
|
|
}
|
|
return ary;
|
|
}
|
|
|
|
static void
|
|
adjust_join(const List *list, VALUE new)
|
|
{
|
|
extern void rb_thread_set_join _((VALUE, VALUE));
|
|
Entry *entry;
|
|
for (entry = list->entries; entry; entry = entry->next) {
|
|
rb_thread_set_join(entry->value, new);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
wake_thread(VALUE thread)
|
|
{
|
|
return rb_thread_wakeup_alive(thread);
|
|
}
|
|
|
|
static VALUE
|
|
run_thread(VALUE thread)
|
|
{
|
|
thread = wake_thread(thread);
|
|
if (RTEST(thread) && !rb_thread_critical)
|
|
rb_thread_schedule();
|
|
return thread;
|
|
}
|
|
|
|
static VALUE
|
|
wake_one(List *list)
|
|
{
|
|
VALUE waking;
|
|
|
|
waking = Qnil;
|
|
while (list->entries && !RTEST(waking)) {
|
|
waking = wake_thread(shift_list(list));
|
|
}
|
|
|
|
return waking;
|
|
}
|
|
|
|
static VALUE
|
|
wake_all(List *list)
|
|
{
|
|
while (list->entries) {
|
|
wake_one(list);
|
|
}
|
|
return Qnil;
|
|
}
|
|
|
|
#define DELAY_INFTY 1E30
|
|
|
|
static VALUE
|
|
wait_list_inner(VALUE arg)
|
|
{
|
|
push_list((List *)arg, rb_thread_current());
|
|
rb_thread_stop();
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
wait_list_cleanup(VALUE arg)
|
|
{
|
|
/* cleanup in case of spurious wakeups */
|
|
remove_one((List *)arg, rb_thread_current());
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
wait_list(List *list)
|
|
{
|
|
return rb_ensure(wait_list_inner, (VALUE)list, wait_list_cleanup, (VALUE)list);
|
|
}
|
|
|
|
static void
|
|
kill_waiting_threads(List *waiting)
|
|
{
|
|
Entry *entry;
|
|
|
|
for (entry = waiting->entries; entry; entry = entry->next) {
|
|
rb_thread_kill(entry->value);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Document-class: Mutex
|
|
*
|
|
* Mutex implements a simple semaphore that can be used to coordinate access to
|
|
* shared data from multiple concurrent threads.
|
|
*
|
|
* Example:
|
|
*
|
|
* require 'thread'
|
|
* semaphore = Mutex.new
|
|
*
|
|
* a = Thread.new {
|
|
* semaphore.synchronize {
|
|
* # access shared resource
|
|
* }
|
|
* }
|
|
*
|
|
* b = Thread.new {
|
|
* semaphore.synchronize {
|
|
* # access shared resource
|
|
* }
|
|
* }
|
|
*
|
|
*/
|
|
|
|
typedef struct _Mutex {
|
|
VALUE owner;
|
|
List waiting;
|
|
} Mutex;
|
|
|
|
#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner))
|
|
|
|
static void
|
|
mark_mutex(Mutex *mutex)
|
|
{
|
|
rb_gc_mark(mutex->owner);
|
|
mark_list(&mutex->waiting);
|
|
}
|
|
|
|
static void
|
|
finalize_mutex(Mutex *mutex)
|
|
{
|
|
finalize_list(&mutex->waiting);
|
|
}
|
|
|
|
static void
|
|
free_mutex(Mutex *mutex)
|
|
{
|
|
kill_waiting_threads(&mutex->waiting);
|
|
finalize_mutex(mutex);
|
|
xfree(mutex);
|
|
}
|
|
|
|
static void
|
|
init_mutex(Mutex *mutex)
|
|
{
|
|
mutex->owner = Qnil;
|
|
init_list(&mutex->waiting);
|
|
}
|
|
|
|
/*
|
|
* Document-method: new
|
|
* call-seq: Mutex.new
|
|
*
|
|
* Creates a new Mutex
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_mutex_alloc(VALUE klass)
|
|
{
|
|
Mutex *mutex;
|
|
mutex = ALLOC(Mutex);
|
|
init_mutex(mutex);
|
|
return Data_Wrap_Struct(klass, mark_mutex, free_mutex, mutex);
|
|
}
|
|
|
|
/*
|
|
* Document-method: locked?
|
|
* call-seq: locked?
|
|
*
|
|
* Returns +true+ if this lock is currently held by some thread.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_mutex_locked_p(VALUE self)
|
|
{
|
|
Mutex *mutex;
|
|
Data_Get_Struct(self, Mutex, mutex);
|
|
return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse;
|
|
}
|
|
|
|
/*
|
|
* Document-method: try_lock
|
|
* call-seq: try_lock
|
|
*
|
|
* Attempts to obtain the lock and returns immediately. Returns +true+ if the
|
|
* lock was granted.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_mutex_try_lock(VALUE self)
|
|
{
|
|
Mutex *mutex;
|
|
|
|
Data_Get_Struct(self, Mutex, mutex);
|
|
|
|
if (MUTEX_LOCKED_P(mutex))
|
|
return Qfalse;
|
|
|
|
mutex->owner = rb_thread_current();
|
|
return Qtrue;
|
|
}
|
|
|
|
static VALUE
|
|
wait_mutex(VALUE arg)
|
|
{
|
|
Mutex *mutex = (Mutex *)arg;
|
|
VALUE current = rb_thread_current();
|
|
|
|
push_list(&mutex->waiting, current);
|
|
do {
|
|
rb_thread_critical = 0;
|
|
rb_thread_join(mutex->owner, DELAY_INFTY);
|
|
rb_thread_critical = 1;
|
|
if (!MUTEX_LOCKED_P(mutex)) {
|
|
mutex->owner = current;
|
|
break;
|
|
}
|
|
} while (mutex->owner != current);
|
|
return Qnil;
|
|
}
|
|
|
|
/*
|
|
* Document-method: lock
|
|
* call-seq: lock
|
|
*
|
|
* Attempts to grab the lock and waits if it isn't available.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
lock_mutex(Mutex *mutex)
|
|
{
|
|
VALUE current;
|
|
current = rb_thread_current();
|
|
|
|
rb_thread_critical = 1;
|
|
|
|
if (!MUTEX_LOCKED_P(mutex)) {
|
|
mutex->owner = current;
|
|
}
|
|
else {
|
|
rb_ensure(wait_mutex, (VALUE)mutex, wait_list_cleanup, (VALUE)&mutex->waiting);
|
|
}
|
|
|
|
rb_thread_critical = 0;
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
lock_mutex_call(VALUE mutex)
|
|
{
|
|
return lock_mutex((Mutex *)mutex);
|
|
}
|
|
|
|
static VALUE
|
|
rb_mutex_lock(VALUE self)
|
|
{
|
|
Mutex *mutex;
|
|
Data_Get_Struct(self, Mutex, mutex);
|
|
lock_mutex(mutex);
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-method: unlock
|
|
*
|
|
* Releases the lock. Returns +nil+ if ref wasn't locked.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
unlock_mutex_inner(Mutex *mutex)
|
|
{
|
|
VALUE waking;
|
|
|
|
if (mutex->owner != rb_thread_current()) {
|
|
rb_raise(rb_eThreadError, "not owner");
|
|
}
|
|
|
|
waking = wake_one(&mutex->waiting);
|
|
if (!NIL_P(waking)) {
|
|
adjust_join(&mutex->waiting, waking);
|
|
}
|
|
mutex->owner = waking;
|
|
|
|
return waking;
|
|
}
|
|
|
|
static VALUE
|
|
set_critical(VALUE value)
|
|
{
|
|
rb_thread_critical = (int)value;
|
|
return Qundef;
|
|
}
|
|
|
|
static VALUE
|
|
unlock_mutex(Mutex *mutex)
|
|
{
|
|
VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex);
|
|
|
|
if (!RTEST(waking)) {
|
|
return Qfalse;
|
|
}
|
|
|
|
run_thread(waking);
|
|
|
|
return Qtrue;
|
|
}
|
|
|
|
static VALUE
|
|
unlock_mutex_call(VALUE mutex)
|
|
{
|
|
return unlock_mutex((Mutex *)mutex);
|
|
}
|
|
|
|
static VALUE
|
|
rb_mutex_unlock(VALUE self)
|
|
{
|
|
Mutex *mutex;
|
|
Data_Get_Struct(self, Mutex, mutex);
|
|
|
|
if (RTEST(unlock_mutex(mutex))) {
|
|
return self;
|
|
} else {
|
|
return Qnil;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Document-method: exclusive_unlock
|
|
* call-seq: exclusive_unlock { ... }
|
|
*
|
|
* If the mutex is locked, unlocks the mutex, wakes one waiting thread, and
|
|
* yields in a critical section.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_mutex_exclusive_unlock_inner(Mutex *mutex)
|
|
{
|
|
VALUE waking;
|
|
waking = unlock_mutex_inner(mutex);
|
|
rb_yield(Qundef);
|
|
return waking;
|
|
}
|
|
|
|
static VALUE
|
|
rb_mutex_exclusive_unlock(VALUE self)
|
|
{
|
|
Mutex *mutex;
|
|
VALUE waking;
|
|
Data_Get_Struct(self, Mutex, mutex);
|
|
|
|
waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex);
|
|
|
|
if (!RTEST(waking)) {
|
|
return Qnil;
|
|
}
|
|
|
|
run_thread(waking);
|
|
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-method: synchronize
|
|
* call-seq: synchronize { ... }
|
|
*
|
|
* Obtains a lock, runs the block, and releases the lock when the block
|
|
* completes. See the example under Mutex.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_mutex_synchronize(VALUE self)
|
|
{
|
|
rb_mutex_lock(self);
|
|
return rb_ensure(rb_yield, Qundef, rb_mutex_unlock, self);
|
|
}
|
|
|
|
/*
|
|
* Document-class: ConditionVariable
|
|
*
|
|
* ConditionVariable objects augment class Mutex. Using condition variables,
|
|
* it is possible to suspend while in the middle of a critical section until a
|
|
* resource becomes available.
|
|
*
|
|
* Example:
|
|
*
|
|
* require 'thread'
|
|
*
|
|
* mutex = Mutex.new
|
|
* resource = ConditionVariable.new
|
|
*
|
|
* a = Thread.new {
|
|
* mutex.synchronize {
|
|
* # Thread 'a' now needs the resource
|
|
* resource.wait(mutex)
|
|
* # 'a' can now have the resource
|
|
* }
|
|
* }
|
|
*
|
|
* b = Thread.new {
|
|
* mutex.synchronize {
|
|
* # Thread 'b' has finished using the resource
|
|
* resource.signal
|
|
* }
|
|
* }
|
|
*
|
|
*/
|
|
|
|
typedef struct _ConditionVariable {
|
|
List waiting;
|
|
} ConditionVariable;
|
|
|
|
static void
|
|
mark_condvar(ConditionVariable *condvar)
|
|
{
|
|
mark_list(&condvar->waiting);
|
|
}
|
|
|
|
static void
|
|
finalize_condvar(ConditionVariable *condvar)
|
|
{
|
|
finalize_list(&condvar->waiting);
|
|
}
|
|
|
|
static void
|
|
free_condvar(ConditionVariable *condvar)
|
|
{
|
|
kill_waiting_threads(&condvar->waiting);
|
|
finalize_condvar(condvar);
|
|
xfree(condvar);
|
|
}
|
|
|
|
static void
|
|
init_condvar(ConditionVariable *condvar)
|
|
{
|
|
init_list(&condvar->waiting);
|
|
}
|
|
|
|
/*
|
|
* Document-method: new
|
|
* call-seq: ConditionVariable.new
|
|
*
|
|
* Creates a new ConditionVariable
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_condvar_alloc(VALUE klass)
|
|
{
|
|
ConditionVariable *condvar;
|
|
|
|
condvar = ALLOC(ConditionVariable);
|
|
init_condvar(condvar);
|
|
|
|
return Data_Wrap_Struct(klass, mark_condvar, free_condvar, condvar);
|
|
}
|
|
|
|
/*
|
|
* Document-method: wait
|
|
* call-seq: wait
|
|
*
|
|
* Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
|
|
*
|
|
*/
|
|
|
|
static void condvar_wakeup(Mutex *mutex);
|
|
|
|
static void
|
|
wait_condvar(ConditionVariable *condvar, Mutex *mutex)
|
|
{
|
|
condvar_wakeup(mutex);
|
|
rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex_call, (VALUE)mutex);
|
|
}
|
|
|
|
static void
|
|
condvar_wakeup(Mutex *mutex)
|
|
{
|
|
VALUE waking;
|
|
|
|
rb_thread_critical = 1;
|
|
if (rb_thread_current() != mutex->owner) {
|
|
rb_thread_critical = 0;
|
|
rb_raise(rb_eThreadError, "not owner of the synchronization mutex");
|
|
}
|
|
waking = unlock_mutex_inner(mutex);
|
|
if (RTEST(waking)) {
|
|
wake_thread(waking);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
legacy_exclusive_unlock(VALUE mutex)
|
|
{
|
|
return rb_funcall(mutex, rb_intern("exclusive_unlock"), 0);
|
|
}
|
|
|
|
typedef struct {
|
|
ConditionVariable *condvar;
|
|
VALUE mutex;
|
|
} legacy_wait_args;
|
|
|
|
static VALUE
|
|
legacy_wait(VALUE unused, legacy_wait_args *args)
|
|
{
|
|
wait_list(&args->condvar->waiting);
|
|
rb_funcall(args->mutex, rb_intern("lock"), 0);
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
rb_condvar_wait(VALUE self, VALUE mutex_v)
|
|
{
|
|
ConditionVariable *condvar;
|
|
Data_Get_Struct(self, ConditionVariable, condvar);
|
|
|
|
if (CLASS_OF(mutex_v) != rb_cMutex) {
|
|
/* interoperate with legacy mutex */
|
|
legacy_wait_args args;
|
|
args.condvar = condvar;
|
|
args.mutex = mutex_v;
|
|
rb_iterate(legacy_exclusive_unlock, mutex_v, legacy_wait, (VALUE)&args);
|
|
} else {
|
|
Mutex *mutex;
|
|
Data_Get_Struct(mutex_v, Mutex, mutex);
|
|
wait_condvar(condvar, mutex);
|
|
}
|
|
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-method: broadcast
|
|
* call-seq: broadcast
|
|
*
|
|
* Wakes up all threads waiting for this condition.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_condvar_broadcast(VALUE self)
|
|
{
|
|
ConditionVariable *condvar;
|
|
|
|
Data_Get_Struct(self, ConditionVariable, condvar);
|
|
|
|
thread_exclusive(wake_all, (VALUE)&condvar->waiting);
|
|
rb_thread_schedule();
|
|
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-method: signal
|
|
* call-seq: signal
|
|
*
|
|
* Wakes up the first thread in line waiting for this condition.
|
|
*
|
|
*/
|
|
|
|
static void
|
|
signal_condvar(ConditionVariable *condvar)
|
|
{
|
|
VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting);
|
|
|
|
if (RTEST(waking)) {
|
|
run_thread(waking);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
signal_condvar_call(VALUE condvar)
|
|
{
|
|
signal_condvar((ConditionVariable *)condvar);
|
|
return Qundef;
|
|
}
|
|
|
|
static VALUE
|
|
rb_condvar_signal(VALUE self)
|
|
{
|
|
ConditionVariable *condvar;
|
|
Data_Get_Struct(self, ConditionVariable, condvar);
|
|
signal_condvar(condvar);
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-class: Queue
|
|
*
|
|
* This class provides a way to synchronize communication between threads.
|
|
*
|
|
* Example:
|
|
*
|
|
* require 'thread'
|
|
*
|
|
* queue = Queue.new
|
|
*
|
|
* producer = Thread.new do
|
|
* 5.times do |i|
|
|
* sleep rand(i) # simulate expense
|
|
* queue << i
|
|
* puts "#{i} produced"
|
|
* end
|
|
* end
|
|
*
|
|
* consumer = Thread.new do
|
|
* 5.times do |i|
|
|
* value = queue.pop
|
|
* sleep rand(i/2) # simulate expense
|
|
* puts "consumed #{value}"
|
|
* end
|
|
* end
|
|
*
|
|
* consumer.join
|
|
*
|
|
*/
|
|
|
|
typedef struct _Queue {
|
|
Mutex mutex;
|
|
ConditionVariable value_available;
|
|
ConditionVariable space_available;
|
|
List values;
|
|
unsigned long capacity;
|
|
} Queue;
|
|
|
|
static void
|
|
mark_queue(Queue *queue)
|
|
{
|
|
mark_mutex(&queue->mutex);
|
|
mark_condvar(&queue->value_available);
|
|
mark_condvar(&queue->space_available);
|
|
mark_list(&queue->values);
|
|
}
|
|
|
|
static void
|
|
finalize_queue(Queue *queue)
|
|
{
|
|
finalize_mutex(&queue->mutex);
|
|
finalize_condvar(&queue->value_available);
|
|
finalize_condvar(&queue->space_available);
|
|
finalize_list(&queue->values);
|
|
}
|
|
|
|
static void
|
|
free_queue(Queue *queue)
|
|
{
|
|
kill_waiting_threads(&queue->mutex.waiting);
|
|
kill_waiting_threads(&queue->space_available.waiting);
|
|
kill_waiting_threads(&queue->value_available.waiting);
|
|
finalize_queue(queue);
|
|
xfree(queue);
|
|
}
|
|
|
|
static void
|
|
init_queue(Queue *queue)
|
|
{
|
|
init_mutex(&queue->mutex);
|
|
init_condvar(&queue->value_available);
|
|
init_condvar(&queue->space_available);
|
|
init_list(&queue->values);
|
|
queue->capacity = 0;
|
|
}
|
|
|
|
/*
|
|
* Document-method: new
|
|
* call-seq: new
|
|
*
|
|
* Creates a new queue.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_queue_alloc(VALUE klass)
|
|
{
|
|
Queue *queue;
|
|
queue = ALLOC(Queue);
|
|
init_queue(queue);
|
|
return Data_Wrap_Struct(klass, mark_queue, free_queue, queue);
|
|
}
|
|
|
|
static VALUE
|
|
rb_queue_marshal_load(VALUE self, VALUE data)
|
|
{
|
|
Queue *queue;
|
|
VALUE array;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
array = rb_marshal_load(data);
|
|
if (TYPE(array) != T_ARRAY) {
|
|
rb_raise(rb_eTypeError, "expected Array of queue data");
|
|
}
|
|
if (RARRAY(array)->len < 1) {
|
|
rb_raise(rb_eArgError, "missing capacity value");
|
|
}
|
|
queue->capacity = NUM2ULONG(rb_ary_shift(array));
|
|
push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);
|
|
|
|
return self;
|
|
}
|
|
|
|
static VALUE
|
|
rb_queue_marshal_dump(VALUE self)
|
|
{
|
|
Queue *queue;
|
|
VALUE array;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
array = array_from_list(&queue->values);
|
|
rb_ary_unshift(array, ULONG2NUM(queue->capacity));
|
|
return rb_marshal_dump(array, Qnil);
|
|
}
|
|
|
|
/*
|
|
* Document-method: clear
|
|
* call-seq: clear
|
|
*
|
|
* Removes all objects from the queue.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_queue_clear(VALUE self)
|
|
{
|
|
Queue *queue;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
lock_mutex(&queue->mutex);
|
|
clear_list(&queue->values);
|
|
signal_condvar(&queue->space_available);
|
|
unlock_mutex(&queue->mutex);
|
|
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-method: empty?
|
|
* call-seq: empty?
|
|
*
|
|
* Returns +true+ if the queue is empty.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_queue_empty_p(VALUE self)
|
|
{
|
|
Queue *queue;
|
|
VALUE result;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
lock_mutex(&queue->mutex);
|
|
result = queue->values.size == 0 ? Qtrue : Qfalse;
|
|
unlock_mutex(&queue->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Document-method: length
|
|
* call-seq: length
|
|
*
|
|
* Returns the length of the queue.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_queue_length(VALUE self)
|
|
{
|
|
Queue *queue;
|
|
VALUE result;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
lock_mutex(&queue->mutex);
|
|
result = ULONG2NUM(queue->values.size);
|
|
unlock_mutex(&queue->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Document-method: num_waiting
|
|
* call-seq: num_waiting
|
|
*
|
|
* Returns the number of threads waiting on the queue.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_queue_num_waiting(VALUE self)
|
|
{
|
|
Queue *queue;
|
|
VALUE result;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
lock_mutex(&queue->mutex);
|
|
result = ULONG2NUM(queue->value_available.waiting.size +
|
|
queue->space_available.waiting.size);
|
|
unlock_mutex(&queue->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
static void
|
|
wait_queue(ConditionVariable *condvar, Mutex *mutex)
|
|
{
|
|
condvar_wakeup(mutex);
|
|
wait_list(&condvar->waiting);
|
|
lock_mutex(mutex);
|
|
}
|
|
|
|
static VALUE queue_pop_inner(VALUE arg);
|
|
|
|
/*
|
|
* Document-method: pop
|
|
* call_seq: pop(non_block=false)
|
|
*
|
|
* Retrieves data from the queue. If the queue is empty, the calling thread is
|
|
* suspended until data is pushed onto the queue. If +non_block+ is true, the
|
|
* thread isn't suspended, and an exception is raised.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_queue_pop(int argc, VALUE *argv, VALUE self)
|
|
{
|
|
Queue *queue;
|
|
int should_block;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
if (argc == 0) {
|
|
should_block = 1;
|
|
} else if (argc == 1) {
|
|
should_block = !RTEST(argv[0]);
|
|
} else {
|
|
rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
|
|
}
|
|
|
|
lock_mutex(&queue->mutex);
|
|
if (!queue->values.entries && !should_block) {
|
|
unlock_mutex(&queue->mutex);
|
|
rb_raise(rb_eThreadError, "queue empty");
|
|
}
|
|
|
|
while (!queue->values.entries) {
|
|
wait_queue(&queue->value_available, &queue->mutex);
|
|
}
|
|
|
|
return rb_ensure(queue_pop_inner, (VALUE)queue,
|
|
unlock_mutex_call, (VALUE)&queue->mutex);
|
|
}
|
|
|
|
static VALUE
|
|
queue_pop_inner(VALUE arg)
|
|
{
|
|
Queue *queue = (Queue *)arg;
|
|
VALUE result = shift_list(&queue->values);
|
|
if (queue->capacity && queue->values.size < queue->capacity) {
|
|
signal_condvar(&queue->space_available);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Document-method: push
|
|
* call-seq: push(obj)
|
|
*
|
|
* Pushes +obj+ to the queue.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_queue_push(VALUE self, VALUE value)
|
|
{
|
|
Queue *queue;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
lock_mutex(&queue->mutex);
|
|
while (queue->capacity && queue->values.size >= queue->capacity) {
|
|
wait_queue(&queue->space_available, &queue->mutex);
|
|
}
|
|
push_list(&queue->values, value);
|
|
rb_ensure(signal_condvar_call, (VALUE)&queue->value_available,
|
|
unlock_mutex_call, (VALUE)&queue->mutex);
|
|
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-class: SizedQueue
|
|
*
|
|
* This class represents queues of specified size capacity. The push operation
|
|
* may be blocked if the capacity is full.
|
|
*
|
|
* See Queue for an example of how a SizedQueue works.
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* Document-method: new
|
|
* call-seq: new
|
|
*
|
|
* Creates a fixed-length queue with a maximum size of +max+.
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* Document-method: max
|
|
* call-seq: max
|
|
*
|
|
* Returns the maximum size of the queue.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_sized_queue_max(VALUE self)
|
|
{
|
|
Queue *queue;
|
|
VALUE result;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
lock_mutex(&queue->mutex);
|
|
result = ULONG2NUM(queue->capacity);
|
|
unlock_mutex(&queue->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Document-method: max=
|
|
* call-seq: max=(size)
|
|
*
|
|
* Sets the maximum size of the queue.
|
|
*
|
|
*/
|
|
|
|
static VALUE
|
|
rb_sized_queue_max_set(VALUE self, VALUE value)
|
|
{
|
|
Queue *queue;
|
|
unsigned long new_capacity;
|
|
unsigned long difference;
|
|
Data_Get_Struct(self, Queue, queue);
|
|
|
|
new_capacity = NUM2ULONG(value);
|
|
|
|
if (new_capacity < 1) {
|
|
rb_raise(rb_eArgError, "value must be positive");
|
|
}
|
|
|
|
lock_mutex(&queue->mutex);
|
|
if (queue->capacity && new_capacity > queue->capacity) {
|
|
difference = new_capacity - queue->capacity;
|
|
} else {
|
|
difference = 0;
|
|
}
|
|
queue->capacity = new_capacity;
|
|
for (; difference > 0; --difference) {
|
|
signal_condvar(&queue->space_available);
|
|
}
|
|
unlock_mutex(&queue->mutex);
|
|
|
|
return self;
|
|
}
|
|
|
|
/*
|
|
* Document-method: push
|
|
* call-seq: push(obj)
|
|
*
|
|
* Pushes +obj+ to the queue. If there is no space left in the queue, waits
|
|
* until space becomes available.
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* Document-method: pop
|
|
* call-seq: pop(non_block=false)
|
|
*
|
|
* Retrieves data from the queue and runs a waiting thread, if any.
|
|
*
|
|
*/
|
|
|
|
/* for marshalling mutexes and condvars */
|
|
|
|
static VALUE
|
|
dummy_load(VALUE self, VALUE string)
|
|
{
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
dummy_dump(VALUE self)
|
|
{
|
|
return rb_str_new2("");
|
|
}
|
|
|
|
void
|
|
Init_thread(void)
|
|
{
|
|
rb_define_singleton_method(rb_cThread, "exclusive", rb_thread_exclusive, 0);
|
|
|
|
rb_cMutex = rb_define_class("Mutex", rb_cObject);
|
|
rb_define_alloc_func(rb_cMutex, rb_mutex_alloc);
|
|
rb_define_method(rb_cMutex, "marshal_load", dummy_load, 1);
|
|
rb_define_method(rb_cMutex, "marshal_dump", dummy_dump, 0);
|
|
rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
|
|
rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0);
|
|
rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
|
|
rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
|
|
rb_define_method(rb_cMutex, "exclusive_unlock", rb_mutex_exclusive_unlock, 0);
|
|
rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize, 0);
|
|
|
|
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject);
|
|
rb_define_alloc_func(rb_cConditionVariable, rb_condvar_alloc);
|
|
rb_define_method(rb_cConditionVariable, "marshal_load", dummy_load, 1);
|
|
rb_define_method(rb_cConditionVariable, "marshal_dump", dummy_dump, 0);
|
|
rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, 1);
|
|
rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
|
|
rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
|
|
|
|
rb_cQueue = rb_define_class("Queue", rb_cObject);
|
|
rb_define_alloc_func(rb_cQueue, rb_queue_alloc);
|
|
rb_define_method(rb_cQueue, "marshal_load", rb_queue_marshal_load, 1);
|
|
rb_define_method(rb_cQueue, "marshal_dump", rb_queue_marshal_dump, 0);
|
|
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
|
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
|
|
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
|
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
|
|
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
|
|
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
|
rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
|
|
rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
|
|
rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
|
|
rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
|
|
rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
|
|
|
|
rb_cSizedQueue = rb_define_class("SizedQueue", rb_cQueue);
|
|
rb_define_method(rb_cSizedQueue, "initialize", rb_sized_queue_max_set, 1);
|
|
rb_define_method(rb_cSizedQueue, "num_waiting", rb_queue_num_waiting, 0);
|
|
rb_define_method(rb_cSizedQueue, "pop", rb_queue_pop, -1);
|
|
rb_define_method(rb_cSizedQueue, "push", rb_queue_push, 1);
|
|
rb_define_method(rb_cSizedQueue, "max", rb_sized_queue_max, 0);
|
|
rb_define_method(rb_cSizedQueue, "max=", rb_sized_queue_max_set, 1);
|
|
rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
|
|
rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
|
|
rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
|
|
rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
|
|
}
|
|
|