mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
vm_trace.c: workqueue as thread-safe version of postponed_job
postponed_job is safe to use in signal handlers, but is not thread-safe for MJIT. Implement a workqueue for MJIT thread-safety. [Bug #15316] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@66100 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
d7e4e50bdb
commit
eb38fb670b
5 changed files with 67 additions and 20 deletions
14
mjit.c
14
mjit.c
|
@ -106,20 +106,6 @@ mjit_gc_finish_hook(void)
|
||||||
CRITICAL_SECTION_FINISH(4, "mjit_gc_finish_hook");
|
CRITICAL_SECTION_FINISH(4, "mjit_gc_finish_hook");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wrap critical section to prevent [Bug #15316] */
|
|
||||||
void
|
|
||||||
mjit_postponed_job_register_start_hook(void)
|
|
||||||
{
|
|
||||||
CRITICAL_SECTION_START(4, "mjit_postponed_job_register_start_hook");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Unwrap critical section of mjit_postponed_job_register_start_hook() */
|
|
||||||
void
|
|
||||||
mjit_postponed_job_register_finish_hook(void)
|
|
||||||
{
|
|
||||||
CRITICAL_SECTION_FINISH(4, "mjit_postponed_job_register_finish_hook");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Iseqs can be garbage collected. This function should call when it
|
/* Iseqs can be garbage collected. This function should call when it
|
||||||
happens. It removes iseq from the unit. */
|
happens. It removes iseq from the unit. */
|
||||||
void
|
void
|
||||||
|
|
|
@ -1133,6 +1133,9 @@ static mjit_copy_job_t mjit_copy_job;
|
||||||
|
|
||||||
static void mjit_copy_job_handler(void *data);
|
static void mjit_copy_job_handler(void *data);
|
||||||
|
|
||||||
|
/* vm_trace.c */
|
||||||
|
int rb_workqueue_register(unsigned flags, rb_postponed_job_func_t , void *);
|
||||||
|
|
||||||
/* We're lazily copying cache values from main thread because these cache values
|
/* We're lazily copying cache values from main thread because these cache values
|
||||||
could be different between ones on enqueue timing and ones on dequeue timing.
|
could be different between ones on enqueue timing and ones on dequeue timing.
|
||||||
Return TRUE if copy succeeds. */
|
Return TRUE if copy succeeds. */
|
||||||
|
@ -1148,7 +1151,7 @@ copy_cache_from_main_thread(mjit_copy_job_t *job)
|
||||||
return job->finish_p;
|
return job->finish_p;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!rb_postponed_job_register(0, mjit_copy_job_handler, (void *)job))
|
if (!rb_workqueue_register(0, mjit_copy_job_handler, (void *)job))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
CRITICAL_SECTION_START(3, "in MJIT copy job wait");
|
CRITICAL_SECTION_START(3, "in MJIT copy job wait");
|
||||||
/* checking `stop_worker_p` too because `RUBY_VM_CHECK_INTS(ec)` may not
|
/* checking `stop_worker_p` too because `RUBY_VM_CHECK_INTS(ec)` may not
|
||||||
|
|
3
thread.c
3
thread.c
|
@ -419,6 +419,7 @@ rb_vm_gvl_destroy(rb_vm_t *vm)
|
||||||
if (0) {
|
if (0) {
|
||||||
/* may be held by running threads */
|
/* may be held by running threads */
|
||||||
rb_native_mutex_destroy(&vm->waitpid_lock);
|
rb_native_mutex_destroy(&vm->waitpid_lock);
|
||||||
|
rb_native_mutex_destroy(&vm->workqueue_lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4422,6 +4423,7 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
|
||||||
|
|
||||||
/* may be held by MJIT threads in parent */
|
/* may be held by MJIT threads in parent */
|
||||||
rb_native_mutex_initialize(&vm->waitpid_lock);
|
rb_native_mutex_initialize(&vm->waitpid_lock);
|
||||||
|
rb_native_mutex_initialize(&vm->workqueue_lock);
|
||||||
|
|
||||||
/* may be held by any thread in parent */
|
/* may be held by any thread in parent */
|
||||||
rb_native_mutex_initialize(&th->interrupt_lock);
|
rb_native_mutex_initialize(&th->interrupt_lock);
|
||||||
|
@ -5183,6 +5185,7 @@ Init_Thread(void)
|
||||||
gvl_init(th->vm);
|
gvl_init(th->vm);
|
||||||
gvl_acquire(th->vm, th);
|
gvl_acquire(th->vm, th);
|
||||||
rb_native_mutex_initialize(&th->vm->waitpid_lock);
|
rb_native_mutex_initialize(&th->vm->waitpid_lock);
|
||||||
|
rb_native_mutex_initialize(&th->vm->workqueue_lock);
|
||||||
rb_native_mutex_initialize(&th->interrupt_lock);
|
rb_native_mutex_initialize(&th->interrupt_lock);
|
||||||
|
|
||||||
th->pending_interrupt_queue = rb_ary_tmp_new(0);
|
th->pending_interrupt_queue = rb_ary_tmp_new(0);
|
||||||
|
|
|
@ -638,12 +638,16 @@ typedef struct rb_vm_struct {
|
||||||
/* relation table of ensure - rollback for callcc */
|
/* relation table of ensure - rollback for callcc */
|
||||||
struct st_table *ensure_rollback_table;
|
struct st_table *ensure_rollback_table;
|
||||||
|
|
||||||
/* postponed_job */
|
/* postponed_job (async-signal-safe, NOT thread-safe) */
|
||||||
struct rb_postponed_job_struct *postponed_job_buffer;
|
struct rb_postponed_job_struct *postponed_job_buffer;
|
||||||
int postponed_job_index;
|
int postponed_job_index;
|
||||||
|
|
||||||
int src_encoding_index;
|
int src_encoding_index;
|
||||||
|
|
||||||
|
/* workqueue (thread-safe, NOT async-signal-safe) */
|
||||||
|
struct list_head workqueue; /* <=> rb_workqueue_job.jnode */
|
||||||
|
rb_nativethread_lock_t workqueue_lock;
|
||||||
|
|
||||||
VALUE verbose, debug, orig_progname, progname;
|
VALUE verbose, debug, orig_progname, progname;
|
||||||
VALUE coverages;
|
VALUE coverages;
|
||||||
int coverage_mode;
|
int coverage_mode;
|
||||||
|
@ -1628,6 +1632,7 @@ rb_vm_living_threads_init(rb_vm_t *vm)
|
||||||
{
|
{
|
||||||
list_head_init(&vm->waiting_fds);
|
list_head_init(&vm->waiting_fds);
|
||||||
list_head_init(&vm->waiting_pids);
|
list_head_init(&vm->waiting_pids);
|
||||||
|
list_head_init(&vm->workqueue);
|
||||||
list_head_init(&vm->waiting_grps);
|
list_head_init(&vm->waiting_grps);
|
||||||
list_head_init(&vm->living_threads);
|
list_head_init(&vm->living_threads);
|
||||||
vm->living_thread_num = 0;
|
vm->living_thread_num = 0;
|
||||||
|
|
58
vm_trace.c
58
vm_trace.c
|
@ -1752,12 +1752,18 @@ typedef struct rb_postponed_job_struct {
|
||||||
#define MAX_POSTPONED_JOB 1000
|
#define MAX_POSTPONED_JOB 1000
|
||||||
#define MAX_POSTPONED_JOB_SPECIAL_ADDITION 24
|
#define MAX_POSTPONED_JOB_SPECIAL_ADDITION 24
|
||||||
|
|
||||||
|
struct rb_workqueue_job {
|
||||||
|
struct list_node jnode; /* <=> vm->workqueue */
|
||||||
|
rb_postponed_job_t job;
|
||||||
|
};
|
||||||
|
|
||||||
void
|
void
|
||||||
Init_vm_postponed_job(void)
|
Init_vm_postponed_job(void)
|
||||||
{
|
{
|
||||||
rb_vm_t *vm = GET_VM();
|
rb_vm_t *vm = GET_VM();
|
||||||
vm->postponed_job_buffer = ALLOC_N(rb_postponed_job_t, MAX_POSTPONED_JOB);
|
vm->postponed_job_buffer = ALLOC_N(rb_postponed_job_t, MAX_POSTPONED_JOB);
|
||||||
vm->postponed_job_index = 0;
|
vm->postponed_job_index = 0;
|
||||||
|
/* workqueue is initialized when VM locks are initialized */
|
||||||
}
|
}
|
||||||
|
|
||||||
enum postponed_job_register_result {
|
enum postponed_job_register_result {
|
||||||
|
@ -1766,7 +1772,7 @@ enum postponed_job_register_result {
|
||||||
PJRR_INTERRUPTED = 2
|
PJRR_INTERRUPTED = 2
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Async-signal-safe, thread-safe against MJIT worker thread */
|
/* Async-signal-safe */
|
||||||
static enum postponed_job_register_result
|
static enum postponed_job_register_result
|
||||||
postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
|
postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
|
||||||
unsigned int flags, rb_postponed_job_func_t func, void *data, int max, int expected_index)
|
unsigned int flags, rb_postponed_job_func_t func, void *data, int max, int expected_index)
|
||||||
|
@ -1774,13 +1780,11 @@ postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
|
||||||
rb_postponed_job_t *pjob;
|
rb_postponed_job_t *pjob;
|
||||||
|
|
||||||
if (expected_index >= max) return PJRR_FULL; /* failed */
|
if (expected_index >= max) return PJRR_FULL; /* failed */
|
||||||
if (mjit_enabled) mjit_postponed_job_register_start_hook();
|
|
||||||
|
|
||||||
if (ATOMIC_CAS(vm->postponed_job_index, expected_index, expected_index+1) == expected_index) {
|
if (ATOMIC_CAS(vm->postponed_job_index, expected_index, expected_index+1) == expected_index) {
|
||||||
pjob = &vm->postponed_job_buffer[expected_index];
|
pjob = &vm->postponed_job_buffer[expected_index];
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (mjit_enabled) mjit_postponed_job_register_finish_hook();
|
|
||||||
return PJRR_INTERRUPTED;
|
return PJRR_INTERRUPTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1789,7 +1793,6 @@ postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
|
||||||
pjob->data = data;
|
pjob->data = data;
|
||||||
|
|
||||||
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec);
|
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec);
|
||||||
if (mjit_enabled) mjit_postponed_job_register_finish_hook();
|
|
||||||
|
|
||||||
return PJRR_SUCCESS;
|
return PJRR_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1842,6 +1845,29 @@ rb_postponed_job_register_one(unsigned int flags, rb_postponed_job_func_t func,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* thread-safe and called from non-Ruby thread
|
||||||
|
* returns FALSE on failure (ENOMEM), TRUE otherwise
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
rb_workqueue_register(unsigned flags, rb_postponed_job_func_t func, void *data)
|
||||||
|
{
|
||||||
|
struct rb_workqueue_job *wq_job = malloc(sizeof(*wq_job));
|
||||||
|
rb_vm_t *vm = GET_VM();
|
||||||
|
|
||||||
|
if (!wq_job) return FALSE;
|
||||||
|
wq_job->job.func = func;
|
||||||
|
wq_job->job.data = data;
|
||||||
|
|
||||||
|
rb_nativethread_lock_lock(&vm->workqueue_lock);
|
||||||
|
list_add_tail(&vm->workqueue, &wq_job->jnode);
|
||||||
|
rb_nativethread_lock_unlock(&vm->workqueue_lock);
|
||||||
|
|
||||||
|
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
rb_postponed_job_flush(rb_vm_t *vm)
|
rb_postponed_job_flush(rb_vm_t *vm)
|
||||||
{
|
{
|
||||||
|
@ -1849,6 +1875,13 @@ rb_postponed_job_flush(rb_vm_t *vm)
|
||||||
const rb_atomic_t block_mask = POSTPONED_JOB_INTERRUPT_MASK|TRAP_INTERRUPT_MASK;
|
const rb_atomic_t block_mask = POSTPONED_JOB_INTERRUPT_MASK|TRAP_INTERRUPT_MASK;
|
||||||
volatile rb_atomic_t saved_mask = ec->interrupt_mask & block_mask;
|
volatile rb_atomic_t saved_mask = ec->interrupt_mask & block_mask;
|
||||||
VALUE volatile saved_errno = ec->errinfo;
|
VALUE volatile saved_errno = ec->errinfo;
|
||||||
|
struct list_head tmp;
|
||||||
|
|
||||||
|
list_head_init(&tmp);
|
||||||
|
|
||||||
|
rb_nativethread_lock_lock(&vm->workqueue_lock);
|
||||||
|
list_append_list(&tmp, &vm->workqueue);
|
||||||
|
rb_nativethread_lock_unlock(&vm->workqueue_lock);
|
||||||
|
|
||||||
ec->errinfo = Qnil;
|
ec->errinfo = Qnil;
|
||||||
/* mask POSTPONED_JOB dispatch */
|
/* mask POSTPONED_JOB dispatch */
|
||||||
|
@ -1857,16 +1890,33 @@ rb_postponed_job_flush(rb_vm_t *vm)
|
||||||
EC_PUSH_TAG(ec);
|
EC_PUSH_TAG(ec);
|
||||||
if (EC_EXEC_TAG() == TAG_NONE) {
|
if (EC_EXEC_TAG() == TAG_NONE) {
|
||||||
int index;
|
int index;
|
||||||
|
struct rb_workqueue_job *wq_job;
|
||||||
|
|
||||||
while ((index = vm->postponed_job_index) > 0) {
|
while ((index = vm->postponed_job_index) > 0) {
|
||||||
if (ATOMIC_CAS(vm->postponed_job_index, index, index-1) == index) {
|
if (ATOMIC_CAS(vm->postponed_job_index, index, index-1) == index) {
|
||||||
rb_postponed_job_t *pjob = &vm->postponed_job_buffer[index-1];
|
rb_postponed_job_t *pjob = &vm->postponed_job_buffer[index-1];
|
||||||
(*pjob->func)(pjob->data);
|
(*pjob->func)(pjob->data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
while ((wq_job = list_pop(&tmp, struct rb_workqueue_job, jnode))) {
|
||||||
|
rb_postponed_job_t pjob = wq_job->job;
|
||||||
|
|
||||||
|
free(wq_job);
|
||||||
|
(pjob.func)(pjob.data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
EC_POP_TAG();
|
EC_POP_TAG();
|
||||||
}
|
}
|
||||||
/* restore POSTPONED_JOB mask */
|
/* restore POSTPONED_JOB mask */
|
||||||
ec->interrupt_mask &= ~(saved_mask ^ block_mask);
|
ec->interrupt_mask &= ~(saved_mask ^ block_mask);
|
||||||
ec->errinfo = saved_errno;
|
ec->errinfo = saved_errno;
|
||||||
|
|
||||||
|
/* don't leak memory if a job threw an exception */
|
||||||
|
if (!list_empty(&tmp)) {
|
||||||
|
rb_nativethread_lock_lock(&vm->workqueue_lock);
|
||||||
|
list_prepend_list(&vm->workqueue, &tmp);
|
||||||
|
rb_nativethread_lock_unlock(&vm->workqueue_lock);
|
||||||
|
|
||||||
|
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue