diff --git a/thread.c b/thread.c index 75d31cfb7a..161f16365c 100644 --- a/thread.c +++ b/thread.c @@ -110,8 +110,8 @@ static int timespec_cmp(const struct timespec *a, const struct timespec *b); static int timespec_update_expire(struct timespec *, const struct timespec *); static void getclockofday(struct timespec *); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static void consume_communication_pipe(int fd); -static void check_signals_nogvl(rb_thread_t *, int sigwait_fd); +static int consume_communication_pipe(int fd); +static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ #define eKillSignal INT2FIX(0) @@ -385,6 +385,15 @@ ubf_sigwait(void *ignore) #error "unsupported thread type" #endif +/* + * TODO: somebody with win32 knowledge should be able to get rid of + * timer-thread by busy-waiting on signals. And it should be possible + * to make the GVL in thread_pthread.c be platform-independent. + */ +#ifndef BUSY_WAIT_SIGNALS +# define BUSY_WAIT_SIGNALS (0) +#endif + #if THREAD_DEBUG static int debug_mutex_initialized = 1; static rb_nativethread_lock_t debug_mutex; @@ -2173,7 +2182,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) int sigwait_fd = rb_sigwait_fd_get(th); if (sigwait_fd >= 0) { - consume_communication_pipe(sigwait_fd); + (void)consume_communication_pipe(sigwait_fd); ruby_sigchld_handler(th->vm); rb_sigwait_fd_put(th, sigwait_fd); rb_sigwait_fd_migrate(th->vm); @@ -3885,6 +3894,21 @@ select_set_free(VALUE p) return Qfalse; } +static const struct timespec * +sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig, + int *drained_p) +{ + static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 }; + + if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { + *drained_p = check_signals_nogvl(th, sigwait_fd); + if (!orig || timespec_cmp(orig, &quantum) > 0) + return &quantum; + } + + return orig; +} + static VALUE do_select(VALUE p) { @@ -3892,6 +3916,8 @@ do_select(VALUE p) int MAYBE_UNUSED(result); int lerrno; struct timespec ts, end, *tsp; + const struct timespec *to; + struct timeval tv; timeout_prepare(&tsp, &ts, &end, set->timeout); #define restore_fdset(dst, src) \ @@ -3903,17 +3929,20 @@ do_select(VALUE p) TRUE) do { + int drained; lerrno = 0; BLOCKING_REGION(set->th, { + to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained); result = native_fd_select(set->max, set->rset, set->wset, set->eset, - timeval_for(set->timeout, tsp), set->th); + timeval_for(&tv, to), set->th); if (result < 0) lerrno = errno; }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE); - if (set->sigwait_fd >= 0 && rb_fd_isset(set->sigwait_fd, set->rset)) { - result--; - check_signals_nogvl(set->th, set->sigwait_fd); + if (set->sigwait_fd >= 0) { + if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) + result--; + (void)check_signals_nogvl(set->th, set->sigwait_fd); } RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ @@ -4042,6 +4071,8 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) struct pollfd fds[2]; int result = 0, lerrno; struct timespec ts, end, *tsp; + const struct timespec *to; + int drained; rb_thread_t *th = GET_THREAD(); nfds_t nfds; rb_unblock_function_t *ubf; @@ -4066,16 +4097,17 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) lerrno = 0; BLOCKING_REGION(th, { - result = ppoll(fds, nfds, tsp, NULL); + to = sigwait_timeout(th, fds[1].fd, tsp, &drained); + result = ppoll(fds, nfds, to, NULL); if (result < 0) lerrno = errno; }, ubf, th, FALSE); if (fds[1].fd >= 0) { - if (fds[1].revents) { + if (result > 0 && fds[1].revents) { result--; - check_signals_nogvl(th, fds[1].fd); fds[1].revents = 0; } + (void)check_signals_nogvl(th, fds[1].fd); rb_sigwait_fd_put(th, fds[1].fd); rb_sigwait_fd_migrate(th->vm); } @@ -4228,18 +4260,22 @@ async_bug_fd(const char *mesg, int errno_arg, int fd) } /* VM-dependent API is not available for this function */ -static void +static int consume_communication_pipe(int fd) { #define CCP_READ_BUFF_SIZE 1024 /* buffer can be shared because no one refers to them. */ static char buff[CCP_READ_BUFF_SIZE]; ssize_t result; + int ret = FALSE; /* for rb_sigwait_sleep */ while (1) { result = read(fd, buff, sizeof(buff)); - if (result == 0) { - return; + if (result > 0) { + ret = TRUE; + } + else if (result == 0) { + return ret; } else if (result < 0) { int e = errno; @@ -4250,7 +4286,7 @@ consume_communication_pipe(int fd) #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif - return; + return ret; default: async_bug_fd("consume_communication_pipe: read", e, fd); } @@ -4258,12 +4294,11 @@ consume_communication_pipe(int fd) } } -static void +static int check_signals_nogvl(rb_thread_t *th, int sigwait_fd) { rb_vm_t *vm = GET_VM(); /* th may be 0 */ - - consume_communication_pipe(sigwait_fd); + int ret = consume_communication_pipe(sigwait_fd); ubf_wakeup_all_threads(); ruby_sigchld_handler(vm); if (rb_signal_buff_size()) { @@ -4272,7 +4307,9 @@ check_signals_nogvl(rb_thread_t *th, int sigwait_fd) RUBY_VM_SET_TRAP_INTERRUPT(th->ec); else threadptr_trap_interrupt(vm->main_thread); + ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ } + return ret; } void diff --git a/thread_pthread.c b/thread_pthread.c index fdd719abbf..e064bf71be 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -50,9 +50,14 @@ static void ubf_wakeup_all_threads(void); static int ubf_threads_empty(void); static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, const struct timespec *); +static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd, + const struct timespec *, + int *drained_p); #define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) +/* for testing, and in case we come across a platform w/o pipes: */ +#define BUSY_WAIT_SIGNALS (0) #define THREAD_INVALID ((const rb_thread_t *)-1) static const rb_thread_t *sigwait_th; @@ -129,7 +134,12 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) native_thread_data_t *last; last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) rb_native_cond_signal(&last->sleep_cond); + if (last) { + rb_native_cond_signal(&last->sleep_cond); + } + else if (!ubf_threads_empty()) { + rb_thread_wakeup_timer_thread(0); + } } } @@ -1194,7 +1204,12 @@ ubf_select(void *ptr) native_thread_data_t *last; last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) rb_native_cond_signal(&last->sleep_cond); + if (last) { + rb_native_cond_signal(&last->sleep_cond); + } + else { + rb_thread_wakeup_timer_thread(0); + } } rb_native_mutex_unlock(&vm->gvl.lock); @@ -1623,41 +1638,36 @@ rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts) pfd.fd = sigwait_fd; pfd.events = POLLIN; - if (ubf_threads_empty()) { + if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) { (void)ppoll(&pfd, 1, ts, 0); check_signals_nogvl(th, sigwait_fd); } else { - static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 }; - struct timespec *endp = 0, end, now; + struct timespec end, diff; + const struct timespec *to; + int n = 0; if (ts) { getclockofday(&end); timespec_add(&end, ts); - endp = &end; + diff = *ts; + ts = &diff; } - - getclockofday(&now); + /* + * tricky: this needs to return on spurious wakeup (no auto-retry). + * But we also need to distinguish between periodic quantum + * wakeups, so we care about the result of consume_communication_pipe + */ for (;;) { - const struct timespec *tsp = &quantum; - struct timespec diff; - int n; - - if (endp) { - diff = *endp; - timespec_sub(&diff, &now); - if (timespec_cmp(&diff, tsp) < 0) - tsp = &diff; - } - - n = ppoll(&pfd, 1, tsp, 0); - check_signals_nogvl(th, sigwait_fd); - if (RUBY_VM_INTERRUPTED(th->ec) || n != 0) break; - - if (endp) { - getclockofday(&now); - if (timespec_cmp(&now, endp) >= 0) break; - } + to = sigwait_timeout(th, sigwait_fd, ts, &n); + if (n) return; + n = ppoll(&pfd, 1, to, 0); + if (check_signals_nogvl(th, sigwait_fd)) + return; + if (n || RUBY_VM_INTERRUPTED(th->ec)) + return; + if (ts && timespec_update_expire(&diff, &end)) + return; } } } diff --git a/thread_win32.c b/thread_win32.c index 3770200d30..336cce1936 100644 --- a/thread_win32.c +++ b/thread_win32.c @@ -21,6 +21,7 @@ #define native_thread_yield() Sleep(0) #define unregister_ubf_list(th) #define ubf_wakeup_all_threads() do {} while (0) +#define ubf_threads_empty() (1) static volatile DWORD ruby_native_thread_key = TLS_OUT_OF_INDEXES;