* thread_pthread.c: rewrite GVL completely.

* thread_win32.c: ditto.
* thread_pthread.h: ditto.
* vm_core.h: ditto.
* thread.c: ditto.



git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@32064 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
kosaki 2011-06-13 14:14:53 +00:00
parent e504a81ff1
commit bcfc22b10e
6 changed files with 107 additions and 97 deletions

View File

@ -1,3 +1,11 @@
Mon Jun 13 23:06:12 2011 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* thread_pthread.c: rewrite GVL completely.
* thread_win32.c: ditto.
* thread_pthread.h: ditto.
* vm_core.h: ditto.
* thread.c: ditto.
Mon Jun 13 23:11:52 2011 Tanaka Akira <akr@fsij.org>
* test/socket/test_unix.rb: don't use Thread.abort_on_exception.

View File

@ -1015,7 +1015,7 @@ rb_thread_sleep(int sec)
static void rb_threadptr_execute_interrupts_rec(rb_thread_t *, int);
static void
rb_thread_schedule_rec(int sched_depth)
rb_thread_schedule_rec(int sched_depth, unsigned long limits_us)
{
thread_debug("rb_thread_schedule\n");
if (!rb_thread_alone()) {
@ -1024,11 +1024,19 @@ rb_thread_schedule_rec(int sched_depth)
thread_debug("rb_thread_schedule/switch start\n");
RB_GC_SAVE_MACHINE_CONTEXT(th);
#if HAVE_GVL_YIELD
{
if (th->running_time_us >= limits_us)
gvl_yield(th->vm, th);
}
#else
gvl_release(th->vm);
{
native_thread_yield();
}
gvl_acquire(th->vm, th);
#endif
rb_thread_set_current(th);
thread_debug("rb_thread_schedule/switch done\n");
@ -1042,7 +1050,7 @@ rb_thread_schedule_rec(int sched_depth)
void
rb_thread_schedule(void)
{
rb_thread_schedule_rec(0);
rb_thread_schedule_rec(0, 0);
}
/* blocking region */
@ -1333,23 +1341,20 @@ rb_threadptr_execute_interrupts_rec(rb_thread_t *th, int sched_depth)
}
if (!sched_depth && timer_interrupt) {
sched_depth++;
unsigned long limits_us = 250 * 1000;
if (th->priority > 0)
limits_us <<= th->priority;
else
limits_us >>= -th->priority;
if (status == THREAD_RUNNABLE)
th->running_time_us += TIME_QUANTUM_USEC;
sched_depth++;
EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0);
if (th->slice > 0) {
th->slice--;
}
else {
reschedule:
rb_thread_schedule_rec(sched_depth+1);
if (th->slice < 0) {
th->slice++;
goto reschedule;
}
else {
th->slice = th->priority;
}
}
rb_thread_schedule_rec(sched_depth+1, limits_us);
}
}
}
@ -2293,7 +2298,6 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
priority = RUBY_THREAD_PRIORITY_MIN;
}
th->priority = priority;
th->slice = priority;
#endif
return INT2NUM(th->priority);
}

View File

@ -37,92 +37,79 @@ static void native_cond_destroy(rb_thread_cond_t *cond);
#define USE_MONOTONIC_COND 0
#endif
#define GVL_SIMPLE_LOCK 0
#define GVL_DEBUG 0
static void
gvl_show_waiting_threads(rb_vm_t *vm)
__gvl_acquire(rb_vm_t *vm)
{
rb_thread_t *th = vm->gvl.waiting_threads;
int i = 0;
while (th) {
fprintf(stderr, "waiting (%d): %p\n", i++, (void *)th);
th = th->native_thread_data.gvl_next;
}
}
#if !GVL_SIMPLE_LOCK
static void
gvl_waiting_push(rb_vm_t *vm, rb_thread_t *th)
{
th->native_thread_data.gvl_next = 0;
if (vm->gvl.acquired) {
vm->gvl.waiting++;
while (vm->gvl.acquired) {
native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
}
vm->gvl.waiting--;
if (vm->gvl.waiting_threads) {
vm->gvl.waiting_last_thread->native_thread_data.gvl_next = th;
vm->gvl.waiting_last_thread = th;
if (vm->gvl.need_yield) {
vm->gvl.need_yield = 0;
native_cond_signal(&vm->gvl.switch_cond);
}
}
else {
vm->gvl.waiting_threads = th;
vm->gvl.waiting_last_thread = th;
}
th = vm->gvl.waiting_threads;
vm->gvl.waiting++;
}
static void
gvl_waiting_shift(rb_vm_t *vm, rb_thread_t *th)
{
vm->gvl.waiting_threads = vm->gvl.waiting_threads->native_thread_data.gvl_next;
vm->gvl.waiting--;
vm->gvl.acquired = 1;
}
#endif
static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{
#if GVL_SIMPLE_LOCK
native_mutex_lock(&vm->gvl.lock);
#else
native_mutex_lock(&vm->gvl.lock);
if (vm->gvl.waiting > 0 || vm->gvl.acquired != 0) {
if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): sleep\n", (void *)th);
gvl_waiting_push(vm, th);
if (GVL_DEBUG) gvl_show_waiting_threads(vm);
while (vm->gvl.acquired != 0 || vm->gvl.waiting_threads != th) {
native_cond_wait(&th->native_thread_data.gvl_cond, &vm->gvl.lock);
}
gvl_waiting_shift(vm, th);
}
else {
/* do nothing */
}
vm->gvl.acquired = 1;
__gvl_acquire(vm);
native_mutex_unlock(&vm->gvl.lock);
#endif
if (GVL_DEBUG) gvl_show_waiting_threads(vm);
if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): acquire\n", (void *)th);
}
static void
__gvl_release(rb_vm_t *vm)
{
vm->gvl.acquired = 0;
if (vm->gvl.waiting > 0)
native_cond_signal(&vm->gvl.cond);
}
static void
gvl_release(rb_vm_t *vm)
{
#if GVL_SIMPLE_LOCK
native_mutex_unlock(&vm->gvl.lock);
#else
native_mutex_lock(&vm->gvl.lock);
if (vm->gvl.waiting > 0) {
rb_thread_t *th = vm->gvl.waiting_threads;
if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", (void *)GET_THREAD(), (void *)th);
native_cond_signal(&th->native_thread_data.gvl_cond);
}
else {
if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", (void *)GET_THREAD(), NULL);
/* do nothing */
}
vm->gvl.acquired = 0;
__gvl_release(vm);
native_mutex_unlock(&vm->gvl.lock);
}
#define HAVE_GVL_YIELD 1
static void
gvl_yield(rb_vm_t *vm, rb_thread_t *th)
{
native_mutex_lock(&vm->gvl.lock);
/* An another thread is processing GVL yield. */
if (vm->gvl.need_yield) {
native_mutex_unlock(&vm->gvl.lock);
return;
}
if (vm->gvl.waiting > 0)
vm->gvl.need_yield = 1;
__gvl_release(vm);
if (vm->gvl.need_yield) {
/* Wait until another thread task take GVL. */
native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
} else {
native_mutex_unlock(&vm->gvl.lock);
sched_yield();
native_mutex_lock(&vm->gvl.lock);
}
__gvl_acquire(vm);
native_mutex_unlock(&vm->gvl.lock);
#endif
}
static void
@ -130,15 +117,12 @@ gvl_init(rb_vm_t *vm)
{
if (GVL_DEBUG) fprintf(stderr, "gvl init\n");
#if GVL_SIMPLE_LOCK
native_mutex_initialize(&vm->gvl.lock);
#else
native_mutex_initialize(&vm->gvl.lock);
vm->gvl.waiting_threads = 0;
vm->gvl.waiting_last_thread = 0;
vm->gvl.waiting = 0;
native_cond_initialize(&vm->gvl.cond, RB_CONDATTR_CLOCK_MONOTONIC);
native_cond_initialize(&vm->gvl.switch_cond, RB_CONDATTR_CLOCK_MONOTONIC);
vm->gvl.acquired = 0;
#endif
vm->gvl.waiting = 0;
vm->gvl.need_yield = 0;
}
static void
@ -990,6 +974,11 @@ static pthread_t timer_thread_id;
static rb_thread_cond_t timer_thread_cond;
static pthread_mutex_t timer_thread_lock = PTHREAD_MUTEX_INITIALIZER;
/* 100ms. 10ms is too small for user level thread scheduling
* on recent Linux (tested on 2.6.35)
*/
#define TIME_QUANTUM_USEC (100 * 1000)
static void *
thread_timer(void *dummy)
{
@ -997,7 +986,7 @@ thread_timer(void *dummy)
struct timespec timeout;
timeout_10ms.tv_sec = 0;
timeout_10ms.tv_nsec = 10 * 1000 * 1000;
timeout_10ms.tv_nsec = TIME_QUANTUM_USEC * 1000;
native_mutex_lock(&timer_thread_lock);
native_cond_broadcast(&timer_thread_cond);

View File

@ -35,11 +35,17 @@ typedef struct native_thread_data_struct {
#include <semaphore.h>
typedef struct rb_global_vm_lock_struct {
/* fast path */
unsigned long acquired;
pthread_mutex_t lock;
struct rb_thread_struct * volatile waiting_threads;
struct rb_thread_struct *waiting_last_thread;
int waiting;
int volatile acquired;
/* slow path */
unsigned long waiting;
rb_thread_cond_t cond;
/* yield */
rb_thread_cond_t switch_cond;
unsigned long need_yield;
} rb_global_vm_lock_t;
#endif /* RUBY_THREAD_PTHREAD_H */

View File

@ -13,7 +13,7 @@
#include <process.h>
#define WIN32_WAIT_TIMEOUT 10 /* 10 ms */
#define TIME_QUANTUM_USEC (100 * 1000)
#define RB_CONDATTR_CLOCK_MONOTONIC 1 /* no effect */
#undef Sleep
@ -680,7 +680,7 @@ static unsigned long _stdcall
timer_thread_func(void *dummy)
{
thread_debug("timer_thread\n");
while (WaitForSingleObject(timer_thread_lock, WIN32_WAIT_TIMEOUT) ==
while (WaitForSingleObject(timer_thread_lock, TIME_QUANTUM_USEC/1000) ==
WAIT_TIMEOUT) {
timer_thread_function(dummy);
}

View File

@ -419,7 +419,6 @@ typedef struct rb_thread_struct {
rb_thread_id_t thread_id;
enum rb_thread_status status;
int priority;
int slice;
native_thread_data_t native_thread_data;
void *blocking_region_buffer;
@ -484,6 +483,7 @@ typedef struct rb_thread_struct {
#ifdef USE_SIGALTSTACK
void *altstack;
#endif
unsigned long running_time_us;
} rb_thread_t;
/* iseq.c */
@ -673,6 +673,9 @@ extern rb_vm_t *ruby_current_vm;
#define GET_THREAD() ruby_current_thread
#define rb_thread_set_current_raw(th) (void)(ruby_current_thread = (th))
#define rb_thread_set_current(th) do { \
if ((th)->vm->running_thread != (th)) { \
(th)->vm->running_thread->running_time_us = 0; \
} \
rb_thread_set_current_raw(th); \
(th)->vm->running_thread = (th); \
} while (0)