1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

* thread_pthread.c: Stop polling in the timer thread when there are

no waiting thread.  If there are 2 or more runnable threads,
  the timer thread does polling.  Avoid polling makes power save
  for several computers (0.2W per a Ruby process, when I measured).
  If outside-event such as signal or Thread#kill was occuerred
  when the timer thread does not do polling, then wake-up
  the timer thread using communication-pipe (the timer thread
  waits this communication-pipe with select(2)).
  The discussion about this modification can be found from the post
  [ruby-core:33456] and other related posts.
  Note that Eric Wong and KOSAKI Motohiro give us the huge
  contributions for this modification.  Thanks.
* thread_pthread.c (rb_thread_wakeup_timer_thread): add a function.
  This function wakes up the timer thread using communication-pipe.
* thread.c (rb_thread_stop_timer_thread): add a parameter which
  specify closing communication-pipe or not.
* thread.c (rb_thread_terminate_all): do not stop timer thread here
  (ruby_cleanup() terminate timer thread).
* signal.c: wake up timer thread using
  rb_thread_wakeup_timer_thread() from signal handler.
* eval.c (ruby_cleanup): use rb_thread_stop_timer_thread(1).
* process.c: use rb_thread_stop_timer_thread(0)
  (reuse communication-pipe).
* thread_win32.c (rb_thread_wakeup_timer_thread): add a dummy
  function.
* vm_core.h: add and fix decl. of functions.



git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@32244 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
ko1 2011-06-27 00:30:41 +00:00
parent 8ce62003a0
commit d1d5d5e798
8 changed files with 222 additions and 45 deletions

View file

@ -1,3 +1,40 @@
Mon Jun 27 09:07:42 2011 Koichi Sasada <ko1@atdot.net>
* thread_pthread.c: Stop polling in the timer thread when there are
no waiting thread. If there are 2 or more runnable threads,
the timer thread does polling. Avoid polling makes power save
for several computers (0.2W per a Ruby process, when I measured).
If outside-event such as signal or Thread#kill was occuerred
when the timer thread does not do polling, then wake-up
the timer thread using communication-pipe (the timer thread
waits this communication-pipe with select(2)).
The discussion about this modification can be found from the post
[ruby-core:33456] and other related posts.
Note that Eric Wong and KOSAKI Motohiro give us the huge
contributions for this modification. Thanks.
* thread_pthread.c (rb_thread_wakeup_timer_thread): add a function.
This function wakes up the timer thread using communication-pipe.
* thread.c (rb_thread_stop_timer_thread): add a parameter which
specify closing communication-pipe or not.
* thread.c (rb_thread_terminate_all): do not stop timer thread here
(ruby_cleanup() terminate timer thread).
* signal.c: wake up timer thread using
rb_thread_wakeup_timer_thread() from signal handler.
* eval.c (ruby_cleanup): use rb_thread_stop_timer_thread(1).
* process.c: use rb_thread_stop_timer_thread(0)
(reuse communication-pipe).
* thread_win32.c (rb_thread_wakeup_timer_thread): add a dummy
function.
* vm_core.h: add and fix decl. of functions.
Mon Jun 27 08:01:19 2011 Tadayoshi Funaba <tadf@dotrb.org> Mon Jun 27 08:01:19 2011 Tadayoshi Funaba <tadf@dotrb.org>
* ext/date/date_parse.c: should use ALLOCA_N. * ext/date/date_parse.c: should use ALLOCA_N.

2
eval.c
View file

@ -146,7 +146,7 @@ ruby_cleanup(volatile int ex)
ex = error_handle(ex); ex = error_handle(ex);
ruby_finalize_1(); ruby_finalize_1();
POP_TAG(); POP_TAG();
rb_thread_stop_timer_thread(); rb_thread_stop_timer_thread(1);
#if EXIT_SUCCESS != 0 || EXIT_FAILURE != 1 #if EXIT_SUCCESS != 0 || EXIT_FAILURE != 1
switch (ex) { switch (ex) {

View file

@ -1014,7 +1014,7 @@ static void before_exec(void)
* multiple threads. Therefore we have to kill internal threads at once. * multiple threads. Therefore we have to kill internal threads at once.
* [ruby-core: 10583] * [ruby-core: 10583]
*/ */
rb_thread_stop_timer_thread(); rb_thread_stop_timer_thread(0);
} }
} }

View file

@ -507,6 +507,7 @@ sighandler(int sig)
{ {
ATOMIC_INC(signal_buff.cnt[sig]); ATOMIC_INC(signal_buff.cnt[sig]);
ATOMIC_INC(signal_buff.size); ATOMIC_INC(signal_buff.size);
rb_thread_wakeup_timer_thread();
#if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL) #if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL)
ruby_signal(sig, sighandler); ruby_signal(sig, sighandler);
#endif #endif

View file

@ -363,7 +363,6 @@ rb_thread_terminate_all(void)
} }
POP_TAG(); POP_TAG();
} }
rb_thread_stop_timer_thread();
} }
static void static void
@ -2985,9 +2984,9 @@ timer_thread_function(void *arg)
} }
void void
rb_thread_stop_timer_thread(void) rb_thread_stop_timer_thread(int close_anyway)
{ {
if (timer_thread_id && native_stop_timer_thread()) { if (timer_thread_id && native_stop_timer_thread(close_anyway)) {
native_reset_timer_thread(); native_reset_timer_thread();
} }
} }

View file

@ -19,6 +19,11 @@
#ifdef HAVE_THR_STKSEGMENT #ifdef HAVE_THR_STKSEGMENT
#include <thread.h> #include <thread.h>
#endif #endif
#if HAVE_FCNTL_H
#include <fcntl.h>
#elif HAVE_SYS_FCNTL_H
#include <sys/fcntl.h>
#endif
static void native_mutex_lock(pthread_mutex_t *lock); static void native_mutex_lock(pthread_mutex_t *lock);
static void native_mutex_unlock(pthread_mutex_t *lock); static void native_mutex_unlock(pthread_mutex_t *lock);
@ -44,10 +49,17 @@ static void
__gvl_acquire(rb_vm_t *vm) __gvl_acquire(rb_vm_t *vm)
{ {
if (vm->gvl.acquired) { if (vm->gvl.acquired) {
vm->gvl.waiting++; vm->gvl.waiting++;
if (vm->gvl.waiting == 1) {
/* transit to polling mode */
rb_thread_wakeup_timer_thread();
}
while (vm->gvl.acquired) { while (vm->gvl.acquired) {
native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
} }
vm->gvl.waiting--; vm->gvl.waiting--;
if (vm->gvl.need_yield) { if (vm->gvl.need_yield) {
@ -973,8 +985,68 @@ static void ping_signal_thread_list(void) { }
#endif /* USE_SIGNAL_THREAD_LIST */ #endif /* USE_SIGNAL_THREAD_LIST */
static pthread_t timer_thread_id; static pthread_t timer_thread_id;
static rb_thread_cond_t timer_thread_cond; static int timer_thread_pipe[2] = {-1, -1};
static pthread_mutex_t timer_thread_lock = PTHREAD_MUTEX_INITIALIZER; static int timer_thread_pipe_owner_process;
#define TT_DEBUG 0
void
rb_thread_wakeup_timer_thread(void)
{
int result;
/* already opened */
if (timer_thread_pipe_owner_process == getpid()) {
const char *buff = "!";
retry:
if ((result = write(timer_thread_pipe[1], buff, 1)) <= 0) {
switch (errno) {
case EINTR: goto retry;
case EAGAIN:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
break;
default:
rb_bug_errno("rb_thread_wakeup_timer_thread - write", errno);
}
}
if (TT_DEBUG) fprintf(stderr, "rb_thread_wakeup_timer_thread: write\n");
}
else {
/* ignore wakeup */
}
}
static int
consume_communication_pipe(void)
{
const size_t buff_size = 1024;
char buff[buff_size];
int result;
retry:
result = read(timer_thread_pipe[0], buff, buff_size);
if (result < 0) {
switch (errno) {
case EINTR: goto retry;
default:
rb_bug_errno("consume_communication_pipe: read", errno);
}
}
return result;
}
static void
close_communication_pipe(void)
{
if (close(timer_thread_pipe[0]) < 0) {
rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno);
}
if (close(timer_thread_pipe[1]) < 0) {
rb_bug_errno("native_stop_timer_thread - close(ttp[1])", errno);
}
timer_thread_pipe[0] = timer_thread_pipe[1] = -1;
}
/* 100ms. 10ms is too small for user level thread scheduling /* 100ms. 10ms is too small for user level thread scheduling
* on recent Linux (tested on 2.6.35) * on recent Linux (tested on 2.6.35)
@ -982,38 +1054,56 @@ static pthread_mutex_t timer_thread_lock = PTHREAD_MUTEX_INITIALIZER;
#define TIME_QUANTUM_USEC (100 * 1000) #define TIME_QUANTUM_USEC (100 * 1000)
static void * static void *
thread_timer(void *dummy) thread_timer(void *p)
{ {
struct timespec time_quantum; rb_global_vm_lock_t *gvl = (rb_global_vm_lock_t *)p;
struct timespec timeout; int result;
int len;
struct timeval timeout;
time_quantum.tv_sec = 0; if (TT_DEBUG) fprintf(stderr, "start timer thread\n");
time_quantum.tv_nsec = TIME_QUANTUM_USEC * 1000;
native_mutex_lock(&timer_thread_lock);
native_cond_broadcast(&timer_thread_cond);
timeout = native_cond_timeout(&timer_thread_cond, time_quantum);
while (system_working > 0) { while (system_working > 0) {
int err; fd_set rfds;
err = native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, /* timer function */
&timeout);
if (err == 0) {
/*
* Spurious wakeup or native_stop_timer_thread() was called.
* We need to recheck a system_working state.
*/
}
else if (err == ETIMEDOUT) {
ping_signal_thread_list(); ping_signal_thread_list();
timer_thread_function(dummy); timer_thread_function(0);
timeout = native_cond_timeout(&timer_thread_cond, time_quantum); if (TT_DEBUG) fprintf(stderr, "tick\n");
/* wait */
FD_ZERO(&rfds);
FD_SET(timer_thread_pipe[0], &rfds);
if (gvl->waiting > 0) {
timeout.tv_sec = 0;
timeout.tv_usec = TIME_QUANTUM_USEC;
/* polling (TIME_QUANTUM_USEC usec) */
result = select(timer_thread_pipe[0] + 1, &rfds, 0, 0, &timeout);
} }
else else {
rb_bug_errno("thread_timer/timedwait", err); /* wait (infinite) */
result = select(timer_thread_pipe[0] + 1, &rfds, 0, 0, 0);
} }
native_mutex_unlock(&timer_thread_lock);
if (result == 0) {
/* maybe timeout */
}
else if (result > 0) {
len = consume_communication_pipe();
}
else { /* result < 0 */
if (errno == EINTR) {
/* interrupted. ignore */
}
else {
rb_bug_errno("thread_timer: select", errno);
}
}
}
if (TT_DEBUG) fprintf(stderr, "finish timer thread\n");
return NULL; return NULL;
} }
@ -1027,36 +1117,78 @@ rb_thread_create_timer_thread(void)
int err; int err;
pthread_attr_init(&attr); pthread_attr_init(&attr);
native_cond_initialize(&timer_thread_cond, RB_CONDATTR_CLOCK_MONOTONIC);
#ifdef PTHREAD_STACK_MIN #ifdef PTHREAD_STACK_MIN
pthread_attr_setstacksize(&attr, pthread_attr_setstacksize(&attr,
PTHREAD_STACK_MIN + (THREAD_DEBUG ? BUFSIZ : 0)); PTHREAD_STACK_MIN + (THREAD_DEBUG ? BUFSIZ : 0));
#endif #endif
native_mutex_lock(&timer_thread_lock);
err = pthread_create(&timer_thread_id, &attr, thread_timer, 0); /* communication pipe with timer thread and signal handler */
if (timer_thread_pipe_owner_process != getpid()) {
if (timer_thread_pipe[0] != -1) {
/* close pipe of parent process */
close_communication_pipe();
}
err = pipe(timer_thread_pipe);
if (err != 0) {
rb_bug_errno("thread_timer: Failed to create communication pipe for timer thread", errno);
}
#if defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL)
{
int oflags;
#if defined(O_NONBLOCK)
oflags |= O_NONBLOCK;
fcntl(timer_thread_pipe[1], F_SETFL, oflags);
#endif /* defined(O_NONBLOCK) */
#if defined(FD_CLOEXEC)
oflags = fcntl(timer_thread_pipe[0], F_GETFD);
fcntl(timer_thread_pipe[0], F_SETFD, oflags | FD_CLOEXEC);
oflags = fcntl(timer_thread_pipe[1], F_GETFD);
fcntl(timer_thread_pipe[1], F_SETFD, oflags | FD_CLOEXEC);
#endif /* defined(FD_CLOEXEC) */
}
#endif /* defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) */
/* validate pipe on this process */
timer_thread_pipe_owner_process = getpid();
}
/* create timer thread */
if (timer_thread_id) {
rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
}
err = pthread_create(&timer_thread_id, &attr, thread_timer, &GET_VM()->gvl);
if (err != 0) { if (err != 0) {
native_mutex_unlock(&timer_thread_lock);
fprintf(stderr, "[FATAL] Failed to create timer thread (errno: %d)\n", err); fprintf(stderr, "[FATAL] Failed to create timer thread (errno: %d)\n", err);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
native_cond_wait(&timer_thread_cond, &timer_thread_lock);
native_mutex_unlock(&timer_thread_lock);
} }
rb_disable_interrupt(); /* only timer thread recieve signal */ rb_disable_interrupt(); /* only timer thread recieve signal */
} }
static int static int
native_stop_timer_thread(void) native_stop_timer_thread(int close_anyway)
{ {
int stopped; int stopped;
native_mutex_lock(&timer_thread_lock);
stopped = --system_working <= 0; stopped = --system_working <= 0;
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
if (stopped) { if (stopped) {
native_cond_signal(&timer_thread_cond); /* join */
} rb_thread_wakeup_timer_thread();
native_mutex_unlock(&timer_thread_lock);
if (stopped) {
native_thread_join(timer_thread_id); native_thread_join(timer_thread_id);
if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
timer_thread_id = 0;
/* close communication pipe */
if (close_anyway) {
/* TODO: Uninstall all signal handlers or mask all signals.
* This pass is cleaning phase. It is too rare case
* to generate problem, so we remains it in TODO.
*/
close_communication_pipe();
}
} }
return stopped; return stopped;
} }
@ -1064,7 +1196,7 @@ native_stop_timer_thread(void)
static void static void
native_reset_timer_thread(void) native_reset_timer_thread(void)
{ {
timer_thread_id = 0; if (TT_DEBUG) fprintf(stderr, "reset timer thread\n");
} }
#ifdef HAVE_SIGALTSTACK #ifdef HAVE_SIGALTSTACK

View file

@ -725,6 +725,12 @@ timer_thread_func(void *dummy)
return 0; return 0;
} }
void
rb_thread_wakeup_timer_thread(void)
{
/* do nothing */
}
static void static void
rb_thread_create_timer_thread(void) rb_thread_create_timer_thread(void)
{ {

View file

@ -651,8 +651,10 @@ VALUE rb_vm_call(rb_thread_t *th, VALUE recv, VALUE id, int argc,
const VALUE *argv, const rb_method_entry_t *me); const VALUE *argv, const rb_method_entry_t *me);
void rb_thread_start_timer_thread(void); void rb_thread_start_timer_thread(void);
void rb_thread_stop_timer_thread(void); void rb_thread_stop_timer_thread(int);
void rb_thread_reset_timer_thread(void); void rb_thread_reset_timer_thread(void);
void rb_thread_wakeup_timer_thread(void);
int ruby_thread_has_gvl_p(void); int ruby_thread_has_gvl_p(void);
VALUE rb_make_backtrace(void); VALUE rb_make_backtrace(void);
typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE); typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);