mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
thread*.c: waiting on sigwait_fd performs periodic ubf wakeups
We need to be able to perform periodic ubf_list wakeups when a thread is sleeping and waiting on signals. [ruby-core:88088] [Misc #14937] [Bug #5343] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64115 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
95cae74817
commit
ab47a57a46
3 changed files with 92 additions and 44 deletions
71
thread.c
71
thread.c
|
@ -110,8 +110,8 @@ static int timespec_cmp(const struct timespec *a, const struct timespec *b);
|
||||||
static int timespec_update_expire(struct timespec *, const struct timespec *);
|
static int timespec_update_expire(struct timespec *, const struct timespec *);
|
||||||
static void getclockofday(struct timespec *);
|
static void getclockofday(struct timespec *);
|
||||||
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
|
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
|
||||||
static void consume_communication_pipe(int fd);
|
static int consume_communication_pipe(int fd);
|
||||||
static void check_signals_nogvl(rb_thread_t *, int sigwait_fd);
|
static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
|
||||||
void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
|
void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
|
||||||
|
|
||||||
#define eKillSignal INT2FIX(0)
|
#define eKillSignal INT2FIX(0)
|
||||||
|
@ -385,6 +385,15 @@ ubf_sigwait(void *ignore)
|
||||||
#error "unsupported thread type"
|
#error "unsupported thread type"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TODO: somebody with win32 knowledge should be able to get rid of
|
||||||
|
* timer-thread by busy-waiting on signals. And it should be possible
|
||||||
|
* to make the GVL in thread_pthread.c be platform-independent.
|
||||||
|
*/
|
||||||
|
#ifndef BUSY_WAIT_SIGNALS
|
||||||
|
# define BUSY_WAIT_SIGNALS (0)
|
||||||
|
#endif
|
||||||
|
|
||||||
#if THREAD_DEBUG
|
#if THREAD_DEBUG
|
||||||
static int debug_mutex_initialized = 1;
|
static int debug_mutex_initialized = 1;
|
||||||
static rb_nativethread_lock_t debug_mutex;
|
static rb_nativethread_lock_t debug_mutex;
|
||||||
|
@ -2173,7 +2182,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
||||||
int sigwait_fd = rb_sigwait_fd_get(th);
|
int sigwait_fd = rb_sigwait_fd_get(th);
|
||||||
|
|
||||||
if (sigwait_fd >= 0) {
|
if (sigwait_fd >= 0) {
|
||||||
consume_communication_pipe(sigwait_fd);
|
(void)consume_communication_pipe(sigwait_fd);
|
||||||
ruby_sigchld_handler(th->vm);
|
ruby_sigchld_handler(th->vm);
|
||||||
rb_sigwait_fd_put(th, sigwait_fd);
|
rb_sigwait_fd_put(th, sigwait_fd);
|
||||||
rb_sigwait_fd_migrate(th->vm);
|
rb_sigwait_fd_migrate(th->vm);
|
||||||
|
@ -3885,6 +3894,21 @@ select_set_free(VALUE p)
|
||||||
return Qfalse;
|
return Qfalse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const struct timespec *
|
||||||
|
sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig,
|
||||||
|
int *drained_p)
|
||||||
|
{
|
||||||
|
static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 };
|
||||||
|
|
||||||
|
if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
|
||||||
|
*drained_p = check_signals_nogvl(th, sigwait_fd);
|
||||||
|
if (!orig || timespec_cmp(orig, &quantum) > 0)
|
||||||
|
return &quantum;
|
||||||
|
}
|
||||||
|
|
||||||
|
return orig;
|
||||||
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
do_select(VALUE p)
|
do_select(VALUE p)
|
||||||
{
|
{
|
||||||
|
@ -3892,6 +3916,8 @@ do_select(VALUE p)
|
||||||
int MAYBE_UNUSED(result);
|
int MAYBE_UNUSED(result);
|
||||||
int lerrno;
|
int lerrno;
|
||||||
struct timespec ts, end, *tsp;
|
struct timespec ts, end, *tsp;
|
||||||
|
const struct timespec *to;
|
||||||
|
struct timeval tv;
|
||||||
|
|
||||||
timeout_prepare(&tsp, &ts, &end, set->timeout);
|
timeout_prepare(&tsp, &ts, &end, set->timeout);
|
||||||
#define restore_fdset(dst, src) \
|
#define restore_fdset(dst, src) \
|
||||||
|
@ -3903,17 +3929,20 @@ do_select(VALUE p)
|
||||||
TRUE)
|
TRUE)
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
int drained;
|
||||||
lerrno = 0;
|
lerrno = 0;
|
||||||
|
|
||||||
BLOCKING_REGION(set->th, {
|
BLOCKING_REGION(set->th, {
|
||||||
|
to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained);
|
||||||
result = native_fd_select(set->max, set->rset, set->wset, set->eset,
|
result = native_fd_select(set->max, set->rset, set->wset, set->eset,
|
||||||
timeval_for(set->timeout, tsp), set->th);
|
timeval_for(&tv, to), set->th);
|
||||||
if (result < 0) lerrno = errno;
|
if (result < 0) lerrno = errno;
|
||||||
}, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE);
|
}, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE);
|
||||||
|
|
||||||
if (set->sigwait_fd >= 0 && rb_fd_isset(set->sigwait_fd, set->rset)) {
|
if (set->sigwait_fd >= 0) {
|
||||||
result--;
|
if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset))
|
||||||
check_signals_nogvl(set->th, set->sigwait_fd);
|
result--;
|
||||||
|
(void)check_signals_nogvl(set->th, set->sigwait_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
|
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
|
||||||
|
@ -4042,6 +4071,8 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
||||||
struct pollfd fds[2];
|
struct pollfd fds[2];
|
||||||
int result = 0, lerrno;
|
int result = 0, lerrno;
|
||||||
struct timespec ts, end, *tsp;
|
struct timespec ts, end, *tsp;
|
||||||
|
const struct timespec *to;
|
||||||
|
int drained;
|
||||||
rb_thread_t *th = GET_THREAD();
|
rb_thread_t *th = GET_THREAD();
|
||||||
nfds_t nfds;
|
nfds_t nfds;
|
||||||
rb_unblock_function_t *ubf;
|
rb_unblock_function_t *ubf;
|
||||||
|
@ -4066,16 +4097,17 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
||||||
|
|
||||||
lerrno = 0;
|
lerrno = 0;
|
||||||
BLOCKING_REGION(th, {
|
BLOCKING_REGION(th, {
|
||||||
result = ppoll(fds, nfds, tsp, NULL);
|
to = sigwait_timeout(th, fds[1].fd, tsp, &drained);
|
||||||
|
result = ppoll(fds, nfds, to, NULL);
|
||||||
if (result < 0) lerrno = errno;
|
if (result < 0) lerrno = errno;
|
||||||
}, ubf, th, FALSE);
|
}, ubf, th, FALSE);
|
||||||
|
|
||||||
if (fds[1].fd >= 0) {
|
if (fds[1].fd >= 0) {
|
||||||
if (fds[1].revents) {
|
if (result > 0 && fds[1].revents) {
|
||||||
result--;
|
result--;
|
||||||
check_signals_nogvl(th, fds[1].fd);
|
|
||||||
fds[1].revents = 0;
|
fds[1].revents = 0;
|
||||||
}
|
}
|
||||||
|
(void)check_signals_nogvl(th, fds[1].fd);
|
||||||
rb_sigwait_fd_put(th, fds[1].fd);
|
rb_sigwait_fd_put(th, fds[1].fd);
|
||||||
rb_sigwait_fd_migrate(th->vm);
|
rb_sigwait_fd_migrate(th->vm);
|
||||||
}
|
}
|
||||||
|
@ -4228,18 +4260,22 @@ async_bug_fd(const char *mesg, int errno_arg, int fd)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* VM-dependent API is not available for this function */
|
/* VM-dependent API is not available for this function */
|
||||||
static void
|
static int
|
||||||
consume_communication_pipe(int fd)
|
consume_communication_pipe(int fd)
|
||||||
{
|
{
|
||||||
#define CCP_READ_BUFF_SIZE 1024
|
#define CCP_READ_BUFF_SIZE 1024
|
||||||
/* buffer can be shared because no one refers to them. */
|
/* buffer can be shared because no one refers to them. */
|
||||||
static char buff[CCP_READ_BUFF_SIZE];
|
static char buff[CCP_READ_BUFF_SIZE];
|
||||||
ssize_t result;
|
ssize_t result;
|
||||||
|
int ret = FALSE; /* for rb_sigwait_sleep */
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
result = read(fd, buff, sizeof(buff));
|
result = read(fd, buff, sizeof(buff));
|
||||||
if (result == 0) {
|
if (result > 0) {
|
||||||
return;
|
ret = TRUE;
|
||||||
|
}
|
||||||
|
else if (result == 0) {
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
else if (result < 0) {
|
else if (result < 0) {
|
||||||
int e = errno;
|
int e = errno;
|
||||||
|
@ -4250,7 +4286,7 @@ consume_communication_pipe(int fd)
|
||||||
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
||||||
case EWOULDBLOCK:
|
case EWOULDBLOCK:
|
||||||
#endif
|
#endif
|
||||||
return;
|
return ret;
|
||||||
default:
|
default:
|
||||||
async_bug_fd("consume_communication_pipe: read", e, fd);
|
async_bug_fd("consume_communication_pipe: read", e, fd);
|
||||||
}
|
}
|
||||||
|
@ -4258,12 +4294,11 @@ consume_communication_pipe(int fd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static int
|
||||||
check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
|
check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
|
||||||
{
|
{
|
||||||
rb_vm_t *vm = GET_VM(); /* th may be 0 */
|
rb_vm_t *vm = GET_VM(); /* th may be 0 */
|
||||||
|
int ret = consume_communication_pipe(sigwait_fd);
|
||||||
consume_communication_pipe(sigwait_fd);
|
|
||||||
ubf_wakeup_all_threads();
|
ubf_wakeup_all_threads();
|
||||||
ruby_sigchld_handler(vm);
|
ruby_sigchld_handler(vm);
|
||||||
if (rb_signal_buff_size()) {
|
if (rb_signal_buff_size()) {
|
||||||
|
@ -4272,7 +4307,9 @@ check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
|
||||||
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
|
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
|
||||||
else
|
else
|
||||||
threadptr_trap_interrupt(vm->main_thread);
|
threadptr_trap_interrupt(vm->main_thread);
|
||||||
|
ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
|
||||||
}
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|
|
@ -50,9 +50,14 @@ static void ubf_wakeup_all_threads(void);
|
||||||
static int ubf_threads_empty(void);
|
static int ubf_threads_empty(void);
|
||||||
static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
|
static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
|
||||||
const struct timespec *);
|
const struct timespec *);
|
||||||
|
static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd,
|
||||||
|
const struct timespec *,
|
||||||
|
int *drained_p);
|
||||||
|
|
||||||
#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid())
|
#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid())
|
||||||
|
|
||||||
|
/* for testing, and in case we come across a platform w/o pipes: */
|
||||||
|
#define BUSY_WAIT_SIGNALS (0)
|
||||||
#define THREAD_INVALID ((const rb_thread_t *)-1)
|
#define THREAD_INVALID ((const rb_thread_t *)-1)
|
||||||
static const rb_thread_t *sigwait_th;
|
static const rb_thread_t *sigwait_th;
|
||||||
|
|
||||||
|
@ -129,7 +134,12 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
|
||||||
native_thread_data_t *last;
|
native_thread_data_t *last;
|
||||||
|
|
||||||
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
|
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
|
||||||
if (last) rb_native_cond_signal(&last->sleep_cond);
|
if (last) {
|
||||||
|
rb_native_cond_signal(&last->sleep_cond);
|
||||||
|
}
|
||||||
|
else if (!ubf_threads_empty()) {
|
||||||
|
rb_thread_wakeup_timer_thread(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1194,7 +1204,12 @@ ubf_select(void *ptr)
|
||||||
native_thread_data_t *last;
|
native_thread_data_t *last;
|
||||||
|
|
||||||
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
|
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
|
||||||
if (last) rb_native_cond_signal(&last->sleep_cond);
|
if (last) {
|
||||||
|
rb_native_cond_signal(&last->sleep_cond);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
rb_thread_wakeup_timer_thread(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rb_native_mutex_unlock(&vm->gvl.lock);
|
rb_native_mutex_unlock(&vm->gvl.lock);
|
||||||
|
|
||||||
|
@ -1623,41 +1638,36 @@ rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts)
|
||||||
pfd.fd = sigwait_fd;
|
pfd.fd = sigwait_fd;
|
||||||
pfd.events = POLLIN;
|
pfd.events = POLLIN;
|
||||||
|
|
||||||
if (ubf_threads_empty()) {
|
if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) {
|
||||||
(void)ppoll(&pfd, 1, ts, 0);
|
(void)ppoll(&pfd, 1, ts, 0);
|
||||||
check_signals_nogvl(th, sigwait_fd);
|
check_signals_nogvl(th, sigwait_fd);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 };
|
struct timespec end, diff;
|
||||||
struct timespec *endp = 0, end, now;
|
const struct timespec *to;
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
if (ts) {
|
if (ts) {
|
||||||
getclockofday(&end);
|
getclockofday(&end);
|
||||||
timespec_add(&end, ts);
|
timespec_add(&end, ts);
|
||||||
endp = &end;
|
diff = *ts;
|
||||||
|
ts = &diff;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
getclockofday(&now);
|
* tricky: this needs to return on spurious wakeup (no auto-retry).
|
||||||
|
* But we also need to distinguish between periodic quantum
|
||||||
|
* wakeups, so we care about the result of consume_communication_pipe
|
||||||
|
*/
|
||||||
for (;;) {
|
for (;;) {
|
||||||
const struct timespec *tsp = &quantum;
|
to = sigwait_timeout(th, sigwait_fd, ts, &n);
|
||||||
struct timespec diff;
|
if (n) return;
|
||||||
int n;
|
n = ppoll(&pfd, 1, to, 0);
|
||||||
|
if (check_signals_nogvl(th, sigwait_fd))
|
||||||
if (endp) {
|
return;
|
||||||
diff = *endp;
|
if (n || RUBY_VM_INTERRUPTED(th->ec))
|
||||||
timespec_sub(&diff, &now);
|
return;
|
||||||
if (timespec_cmp(&diff, tsp) < 0)
|
if (ts && timespec_update_expire(&diff, &end))
|
||||||
tsp = &diff;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
n = ppoll(&pfd, 1, tsp, 0);
|
|
||||||
check_signals_nogvl(th, sigwait_fd);
|
|
||||||
if (RUBY_VM_INTERRUPTED(th->ec) || n != 0) break;
|
|
||||||
|
|
||||||
if (endp) {
|
|
||||||
getclockofday(&now);
|
|
||||||
if (timespec_cmp(&now, endp) >= 0) break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#define native_thread_yield() Sleep(0)
|
#define native_thread_yield() Sleep(0)
|
||||||
#define unregister_ubf_list(th)
|
#define unregister_ubf_list(th)
|
||||||
#define ubf_wakeup_all_threads() do {} while (0)
|
#define ubf_wakeup_all_threads() do {} while (0)
|
||||||
|
#define ubf_threads_empty() (1)
|
||||||
|
|
||||||
static volatile DWORD ruby_native_thread_key = TLS_OUT_OF_INDEXES;
|
static volatile DWORD ruby_native_thread_key = TLS_OUT_OF_INDEXES;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue