mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
4a627dbdfd
When a Ractor is removed, the freelist in the Ractor cache is not returned to the GC, leaving the freelist permanently lost. This commit recycles the freelist when the Ractor is destroyed, preventing a memory leak from occurring.
3250 lines
83 KiB
C
3250 lines
83 KiB
C
// Ractor implementation
|
|
|
|
#include "ruby/ruby.h"
|
|
#include "ruby/thread.h"
|
|
#include "ruby/ractor.h"
|
|
#include "ruby/thread_native.h"
|
|
#include "vm_core.h"
|
|
#include "vm_sync.h"
|
|
#include "ractor_core.h"
|
|
#include "internal/complex.h"
|
|
#include "internal/error.h"
|
|
#include "internal/hash.h"
|
|
#include "internal/rational.h"
|
|
#include "internal/struct.h"
|
|
#include "internal/thread.h"
|
|
#include "variable.h"
|
|
#include "gc.h"
|
|
#include "transient_heap.h"
|
|
|
|
VALUE rb_cRactor;
|
|
|
|
VALUE rb_eRactorUnsafeError;
|
|
VALUE rb_eRactorIsolationError;
|
|
static VALUE rb_eRactorError;
|
|
static VALUE rb_eRactorRemoteError;
|
|
static VALUE rb_eRactorMovedError;
|
|
static VALUE rb_eRactorClosedError;
|
|
static VALUE rb_cRactorMovedObject;
|
|
|
|
static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
|
|
|
|
static void
|
|
ASSERT_ractor_unlocking(rb_ractor_t *r)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
// GET_EC is NULL in an MJIT worker
|
|
if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) {
|
|
rb_bug("recursive ractor locking");
|
|
}
|
|
#endif
|
|
}
|
|
|
|
static void
|
|
ASSERT_ractor_locking(rb_ractor_t *r)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
// GET_EC is NULL in an MJIT worker
|
|
if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) {
|
|
rp(r->sync.locked_by);
|
|
rb_bug("ractor lock is not acquired.");
|
|
}
|
|
#endif
|
|
}
|
|
|
|
static void
|
|
ractor_lock(rb_ractor_t *r, const char *file, int line)
|
|
{
|
|
RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
|
|
|
|
ASSERT_ractor_unlocking(r);
|
|
rb_native_mutex_lock(&r->sync.lock);
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an MJIT worker
|
|
r->sync.locked_by = rb_ractor_self(GET_RACTOR());
|
|
}
|
|
#endif
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
|
|
}
|
|
|
|
static void
|
|
ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
VM_ASSERT(cr->sync.locked_by != cr->pub.self);
|
|
ractor_lock(cr, file, line);
|
|
}
|
|
|
|
static void
|
|
ractor_unlock(rb_ractor_t *r, const char *file, int line)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
#if RACTOR_CHECK_MODE > 0
|
|
r->sync.locked_by = Qnil;
|
|
#endif
|
|
rb_native_mutex_unlock(&r->sync.lock);
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
|
|
}
|
|
|
|
static void
|
|
ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
VM_ASSERT(cr->sync.locked_by == cr->pub.self);
|
|
ractor_unlock(cr, file, line);
|
|
}
|
|
|
|
#define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__)
|
|
#define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__)
|
|
#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
|
|
#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
|
|
|
|
static void
|
|
ractor_cond_wait(rb_ractor_t *r)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
VALUE locked_by = r->sync.locked_by;
|
|
r->sync.locked_by = Qnil;
|
|
#endif
|
|
rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
r->sync.locked_by = locked_by;
|
|
#endif
|
|
}
|
|
|
|
static const char *
|
|
ractor_status_str(enum ractor_status status)
|
|
{
|
|
switch (status) {
|
|
case ractor_created: return "created";
|
|
case ractor_running: return "running";
|
|
case ractor_blocking: return "blocking";
|
|
case ractor_terminated: return "terminated";
|
|
}
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
static void
|
|
ractor_status_set(rb_ractor_t *r, enum ractor_status status)
|
|
{
|
|
RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->pub.id, ractor_status_str(r->status_), ractor_status_str(status));
|
|
|
|
// check 1
|
|
if (r->status_ != ractor_created) {
|
|
VM_ASSERT(r == GET_RACTOR()); // only self-modification is allowed.
|
|
ASSERT_vm_locking();
|
|
}
|
|
|
|
// check2: transition check. assume it will be vanished on non-debug build.
|
|
switch (r->status_) {
|
|
case ractor_created:
|
|
VM_ASSERT(status == ractor_blocking);
|
|
break;
|
|
case ractor_running:
|
|
VM_ASSERT(status == ractor_blocking||
|
|
status == ractor_terminated);
|
|
break;
|
|
case ractor_blocking:
|
|
VM_ASSERT(status == ractor_running);
|
|
break;
|
|
case ractor_terminated:
|
|
VM_ASSERT(0); // unreachable
|
|
break;
|
|
}
|
|
|
|
r->status_ = status;
|
|
}
|
|
|
|
static bool
|
|
ractor_status_p(rb_ractor_t *r, enum ractor_status status)
|
|
{
|
|
return rb_ractor_status_p(r, status);
|
|
}
|
|
|
|
static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i);
|
|
|
|
static void
|
|
ractor_queue_mark(struct rb_ractor_queue *rq)
|
|
{
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
|
rb_gc_mark(b->v);
|
|
rb_gc_mark(b->sender);
|
|
}
|
|
}
|
|
|
|
static void ractor_local_storage_mark(rb_ractor_t *r);
|
|
static void ractor_local_storage_free(rb_ractor_t *r);
|
|
|
|
static void
|
|
ractor_mark(void *ptr)
|
|
{
|
|
rb_ractor_t *r = (rb_ractor_t *)ptr;
|
|
|
|
ractor_queue_mark(&r->sync.incoming_queue);
|
|
rb_gc_mark(r->sync.wait.taken_basket.v);
|
|
rb_gc_mark(r->sync.wait.taken_basket.sender);
|
|
rb_gc_mark(r->sync.wait.yielded_basket.v);
|
|
rb_gc_mark(r->sync.wait.yielded_basket.sender);
|
|
rb_gc_mark(r->receiving_mutex);
|
|
|
|
rb_gc_mark(r->loc);
|
|
rb_gc_mark(r->name);
|
|
rb_gc_mark(r->r_stdin);
|
|
rb_gc_mark(r->r_stdout);
|
|
rb_gc_mark(r->r_stderr);
|
|
rb_hook_list_mark(&r->pub.hooks);
|
|
|
|
if (r->threads.cnt > 0) {
|
|
rb_thread_t *th = 0;
|
|
list_for_each(&r->threads.set, th, lt_node) {
|
|
VM_ASSERT(th != NULL);
|
|
rb_gc_mark(th->self);
|
|
}
|
|
}
|
|
|
|
ractor_local_storage_mark(r);
|
|
}
|
|
|
|
static void
|
|
ractor_queue_free(struct rb_ractor_queue *rq)
|
|
{
|
|
free(rq->baskets);
|
|
}
|
|
|
|
static void
|
|
ractor_waiting_list_free(struct rb_ractor_waiting_list *wl)
|
|
{
|
|
free(wl->ractors);
|
|
}
|
|
|
|
static void
|
|
ractor_free(void *ptr)
|
|
{
|
|
rb_ractor_t *r = (rb_ractor_t *)ptr;
|
|
rb_native_mutex_destroy(&r->sync.lock);
|
|
rb_native_cond_destroy(&r->sync.cond);
|
|
ractor_queue_free(&r->sync.incoming_queue);
|
|
ractor_waiting_list_free(&r->sync.taking_ractors);
|
|
ractor_local_storage_free(r);
|
|
rb_hook_list_free(&r->pub.hooks);
|
|
ruby_xfree(r);
|
|
}
|
|
|
|
static size_t
|
|
ractor_queue_memsize(const struct rb_ractor_queue *rq)
|
|
{
|
|
return sizeof(struct rb_ractor_basket) * rq->size;
|
|
}
|
|
|
|
static size_t
|
|
ractor_waiting_list_memsize(const struct rb_ractor_waiting_list *wl)
|
|
{
|
|
return sizeof(rb_ractor_t *) * wl->size;
|
|
}
|
|
|
|
static size_t
|
|
ractor_memsize(const void *ptr)
|
|
{
|
|
rb_ractor_t *r = (rb_ractor_t *)ptr;
|
|
|
|
// TODO
|
|
return sizeof(rb_ractor_t) +
|
|
ractor_queue_memsize(&r->sync.incoming_queue) +
|
|
ractor_waiting_list_memsize(&r->sync.taking_ractors);
|
|
}
|
|
|
|
static const rb_data_type_t ractor_data_type = {
|
|
"ractor",
|
|
{
|
|
ractor_mark,
|
|
ractor_free,
|
|
ractor_memsize,
|
|
NULL, // update
|
|
},
|
|
0, 0, RUBY_TYPED_FREE_IMMEDIATELY /* | RUBY_TYPED_WB_PROTECTED */
|
|
};
|
|
|
|
bool
|
|
rb_ractor_p(VALUE gv)
|
|
{
|
|
if (rb_typeddata_is_kind_of(gv, &ractor_data_type)) {
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
static inline rb_ractor_t *
|
|
RACTOR_PTR(VALUE self)
|
|
{
|
|
VM_ASSERT(rb_ractor_p(self));
|
|
|
|
rb_ractor_t *r = DATA_PTR(self);
|
|
// TODO: check
|
|
return r;
|
|
}
|
|
|
|
static rb_atomic_t ractor_last_id;
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
MJIT_FUNC_EXPORTED uint32_t
|
|
rb_ractor_current_id(void)
|
|
{
|
|
if (GET_THREAD()->ractor == NULL) {
|
|
return 1; // main ractor
|
|
}
|
|
else {
|
|
return rb_ractor_id(GET_RACTOR());
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static void
|
|
ractor_queue_setup(struct rb_ractor_queue *rq)
|
|
{
|
|
rq->size = 2;
|
|
rq->cnt = 0;
|
|
rq->start = 0;
|
|
rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
|
|
}
|
|
|
|
static struct rb_ractor_basket *
|
|
ractor_queue_at(struct rb_ractor_queue *rq, int i)
|
|
{
|
|
return &rq->baskets[(rq->start + i) % rq->size];
|
|
}
|
|
|
|
static void
|
|
ractor_queue_advance(struct rb_ractor_queue *rq)
|
|
{
|
|
ASSERT_ractor_locking(GET_RACTOR());
|
|
|
|
if (rq->reserved_cnt == 0) {
|
|
rq->cnt--;
|
|
rq->start = (rq->start + 1) % rq->size;
|
|
rq->serial++;
|
|
}
|
|
else {
|
|
ractor_queue_at(rq, 0)->type = basket_type_deleted;
|
|
}
|
|
}
|
|
|
|
static bool
|
|
ractor_queue_skip_p(struct rb_ractor_queue *rq, int i)
|
|
{
|
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
|
return b->type == basket_type_deleted ||
|
|
b->type == basket_type_reserved;
|
|
}
|
|
|
|
static void
|
|
ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) {
|
|
ractor_queue_advance(rq);
|
|
}
|
|
}
|
|
|
|
static bool
|
|
ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
if (rq->cnt == 0) {
|
|
return true;
|
|
}
|
|
|
|
ractor_queue_compact(r, rq);
|
|
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
if (!ractor_queue_skip_p(rq, i)) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
static bool
|
|
ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
|
|
{
|
|
bool found = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (!ractor_queue_empty_p(r, rq)) {
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
if (!ractor_queue_skip_p(rq, i)) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
|
*basket = *b;
|
|
|
|
// remove from queue
|
|
b->type = basket_type_deleted;
|
|
ractor_queue_compact(r, rq);
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
return found;
|
|
}
|
|
|
|
static void
|
|
ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
if (rq->size <= rq->cnt) {
|
|
rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2);
|
|
for (int i=rq->size - rq->start; i<rq->cnt; i++) {
|
|
rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size];
|
|
}
|
|
rq->size *= 2;
|
|
}
|
|
rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
|
|
// fprintf(stderr, "%s %p->cnt:%d\n", __func__, rq, rq->cnt);
|
|
}
|
|
|
|
static void
|
|
ractor_basket_clear(struct rb_ractor_basket *b)
|
|
{
|
|
b->type = basket_type_none;
|
|
b->v = Qfalse;
|
|
b->sender = Qfalse;
|
|
}
|
|
|
|
static VALUE ractor_reset_belonging(VALUE obj); // in this file
|
|
|
|
static VALUE
|
|
ractor_basket_value(struct rb_ractor_basket *b)
|
|
{
|
|
switch (b->type) {
|
|
case basket_type_ref:
|
|
break;
|
|
case basket_type_copy:
|
|
case basket_type_move:
|
|
case basket_type_will:
|
|
b->type = basket_type_ref;
|
|
b->v = ractor_reset_belonging(b->v);
|
|
break;
|
|
default:
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
return b->v;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_basket_accept(struct rb_ractor_basket *b)
|
|
{
|
|
VALUE v = ractor_basket_value(b);
|
|
|
|
if (b->exception) {
|
|
VALUE cause = v;
|
|
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
|
|
rb_ivar_set(err, rb_intern("@ractor"), b->sender);
|
|
ractor_basket_clear(b);
|
|
rb_ec_setup_exception(NULL, err, cause);
|
|
rb_exc_raise(err);
|
|
}
|
|
|
|
ractor_basket_clear(b);
|
|
return v;
|
|
}
|
|
|
|
static void
|
|
ractor_recursive_receive_if(rb_ractor_t *r)
|
|
{
|
|
if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
|
|
rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
struct rb_ractor_queue *rq = &r->sync.incoming_queue;
|
|
struct rb_ractor_basket basket;
|
|
|
|
ractor_recursive_receive_if(r);
|
|
|
|
if (ractor_queue_deq(r, rq, &basket) == false) {
|
|
if (r->sync.incoming_port_closed) {
|
|
rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
|
|
}
|
|
else {
|
|
return Qundef;
|
|
}
|
|
}
|
|
|
|
return ractor_basket_accept(&basket);
|
|
}
|
|
|
|
static bool
|
|
ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status)
|
|
{
|
|
return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
|
|
}
|
|
|
|
static bool
|
|
ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_wakeup_status wakeup_status)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
|
|
// fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r,
|
|
// wait_status_str(r->sync.wait.status), wait_status_str(wait_status),
|
|
// wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status));
|
|
|
|
if (ractor_sleeping_by(r, wait_status)) {
|
|
r->sync.wait.wakeup_status = wakeup_status;
|
|
rb_native_cond_signal(&r->sync.cond);
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
static void *
|
|
ractor_sleep_wo_gvl(void *ptr)
|
|
{
|
|
rb_ractor_t *cr = ptr;
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
VM_ASSERT(cr->sync.wait.status != wait_none);
|
|
if (cr->sync.wait.wakeup_status == wakeup_none) {
|
|
ractor_cond_wait(cr);
|
|
}
|
|
cr->sync.wait.status = wait_none;
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
return NULL;
|
|
}
|
|
|
|
static void
|
|
ractor_sleep_interrupt(void *ptr)
|
|
{
|
|
rb_ractor_t *r = ptr;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
}
|
|
|
|
#if USE_RUBY_DEBUG_LOG
|
|
static const char *
|
|
wait_status_str(enum ractor_wait_status wait_status)
|
|
{
|
|
switch ((int)wait_status) {
|
|
case wait_none: return "none";
|
|
case wait_receiving: return "receiving";
|
|
case wait_taking: return "taking";
|
|
case wait_yielding: return "yielding";
|
|
case wait_receiving|wait_taking: return "receiving|taking";
|
|
case wait_receiving|wait_yielding: return "receiving|yielding";
|
|
case wait_taking|wait_yielding: return "taking|yielding";
|
|
case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
|
|
}
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
static const char *
|
|
wakeup_status_str(enum ractor_wakeup_status wakeup_status)
|
|
{
|
|
switch (wakeup_status) {
|
|
case wakeup_none: return "none";
|
|
case wakeup_by_send: return "by_send";
|
|
case wakeup_by_yield: return "by_yield";
|
|
case wakeup_by_take: return "by_take";
|
|
case wakeup_by_close: return "by_close";
|
|
case wakeup_by_interrupt: return "by_interrupt";
|
|
case wakeup_by_retry: return "by_retry";
|
|
}
|
|
rb_bug("unreachable");
|
|
}
|
|
#endif // USE_RUBY_DEBUG_LOG
|
|
|
|
static void
|
|
ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr)
|
|
{
|
|
VM_ASSERT(GET_RACTOR() == cr);
|
|
VM_ASSERT(cr->sync.wait.status != wait_none);
|
|
// fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr,
|
|
// wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
|
|
|
|
RACTOR_UNLOCK(cr);
|
|
{
|
|
rb_nogvl(ractor_sleep_wo_gvl, cr,
|
|
ractor_sleep_interrupt, cr,
|
|
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
|
|
}
|
|
RACTOR_LOCK(cr);
|
|
|
|
// rb_nogvl() can be canceled by interrupts
|
|
if (cr->sync.wait.status != wait_none) {
|
|
cr->sync.wait.status = wait_none;
|
|
cr->sync.wait.wakeup_status = wakeup_by_interrupt;
|
|
|
|
RACTOR_UNLOCK(cr);
|
|
rb_thread_check_ints();
|
|
RACTOR_LOCK(cr); // reachable?
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
bool retry_try = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (ractor_sleeping_by(r, wait_yielding)) {
|
|
// already waiting for yielding. retry try_take.
|
|
retry_try = true;
|
|
}
|
|
else {
|
|
// insert cr into taking list
|
|
struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors;
|
|
|
|
for (int i=0; i<wl->cnt; i++) {
|
|
if (wl->ractors[i] == cr) {
|
|
// TODO: make it clean code.
|
|
rb_native_mutex_unlock(&r->sync.lock);
|
|
rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting.");
|
|
}
|
|
}
|
|
|
|
if (wl->size == 0) {
|
|
wl->size = 1;
|
|
wl->ractors = malloc(sizeof(rb_ractor_t *) * wl->size);
|
|
if (wl->ractors == NULL) rb_bug("can't allocate buffer");
|
|
}
|
|
else if (wl->size <= wl->cnt + 1) {
|
|
wl->size *= 2;
|
|
wl->ractors = realloc(wl->ractors, sizeof(rb_ractor_t *) * wl->size);
|
|
if (wl->ractors == NULL) rb_bug("can't re-allocate buffer");
|
|
}
|
|
wl->ractors[wl->cnt++] = cr;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
if (retry_try) {
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
if (cr->sync.wait.wakeup_status == wakeup_none) {
|
|
VM_ASSERT(cr->sync.wait.status != wait_none);
|
|
|
|
cr->sync.wait.wakeup_status = wakeup_by_retry;
|
|
cr->sync.wait.status = wait_none;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_waiting_list_del(rb_ractor_t *r, struct rb_ractor_waiting_list *wl, rb_ractor_t *wr)
|
|
{
|
|
RACTOR_LOCK(r);
|
|
{
|
|
int pos = -1;
|
|
for (int i=0; i<wl->cnt; i++) {
|
|
if (wl->ractors[i] == wr) {
|
|
pos = i;
|
|
break;
|
|
}
|
|
}
|
|
if (pos >= 0) { // found
|
|
wl->cnt--;
|
|
for (int i=pos; i<wl->cnt; i++) {
|
|
wl->ractors[i] = wl->ractors[i+1];
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
}
|
|
|
|
static rb_ractor_t *
|
|
ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
|
|
{
|
|
ASSERT_ractor_locking(r);
|
|
VM_ASSERT(&r->sync.taking_ractors == wl);
|
|
|
|
if (wl->cnt > 0) {
|
|
rb_ractor_t *tr = wl->ractors[0];
|
|
for (int i=1; i<wl->cnt; i++) {
|
|
wl->ractors[i-1] = wl->ractors[i];
|
|
}
|
|
wl->cnt--;
|
|
return tr;
|
|
}
|
|
else {
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
|
|
{
|
|
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
|
|
ractor_recursive_receive_if(cr);
|
|
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) {
|
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
|
cr->sync.wait.status = wait_receiving;
|
|
cr->sync.wait.wakeup_status = wakeup_none;
|
|
ractor_sleep(ec, cr);
|
|
cr->sync.wait.wakeup_status = wakeup_none;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
|
|
{
|
|
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
|
|
VALUE v;
|
|
|
|
while ((v = ractor_try_receive(ec, cr)) == Qundef) {
|
|
ractor_receive_wait(ec, cr);
|
|
}
|
|
|
|
return v;
|
|
}
|
|
|
|
#if 0
|
|
// for debug
|
|
static const char *
|
|
basket_type_name(enum rb_ractor_basket_type type)
|
|
{
|
|
switch (type) {
|
|
#define T(t) case basket_type_##t: return #t
|
|
T(none);
|
|
T(ref);
|
|
T(copy);
|
|
T(move);
|
|
T(will);
|
|
T(deleted);
|
|
T(reserved);
|
|
default: rb_bug("unreachable");
|
|
}
|
|
}
|
|
|
|
static void
|
|
rq_dump(struct rb_ractor_queue *rq)
|
|
{
|
|
bool bug = false;
|
|
for (int i=0; i<rq->cnt; i++) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
|
fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
|
|
if (b->type == basket_type_reserved) bug = true;
|
|
}
|
|
if (bug) rb_bug("!!");
|
|
}
|
|
#endif
|
|
|
|
struct receive_block_data {
|
|
rb_ractor_t *cr;
|
|
struct rb_ractor_queue *rq;
|
|
VALUE v;
|
|
int index;
|
|
bool success;
|
|
};
|
|
|
|
static void
|
|
ractor_receive_if_lock(rb_ractor_t *cr)
|
|
{
|
|
VALUE m = cr->receiving_mutex;
|
|
if (m == Qfalse) {
|
|
m = cr->receiving_mutex = rb_mutex_new();
|
|
}
|
|
rb_mutex_lock(m);
|
|
}
|
|
|
|
static VALUE
|
|
receive_if_body(VALUE ptr)
|
|
{
|
|
struct receive_block_data *data = (struct receive_block_data *)ptr;
|
|
|
|
ractor_receive_if_lock(data->cr);
|
|
VALUE block_result = rb_yield(data->v);
|
|
|
|
RACTOR_LOCK_SELF(data->cr);
|
|
{
|
|
struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
|
|
VM_ASSERT(b->type == basket_type_reserved);
|
|
data->rq->reserved_cnt--;
|
|
|
|
if (RTEST(block_result)) {
|
|
b->type = basket_type_deleted;
|
|
ractor_queue_compact(data->cr, data->rq);
|
|
}
|
|
else {
|
|
b->type = basket_type_ref;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(data->cr);
|
|
|
|
data->success = true;
|
|
|
|
if (RTEST(block_result)) {
|
|
return data->v;
|
|
}
|
|
else {
|
|
return Qundef;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
receive_if_ensure(VALUE v)
|
|
{
|
|
struct receive_block_data *data = (struct receive_block_data *)v;
|
|
|
|
if (!data->success) {
|
|
RACTOR_LOCK_SELF(data->cr);
|
|
{
|
|
struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
|
|
VM_ASSERT(b->type == basket_type_reserved);
|
|
b->type = basket_type_deleted;
|
|
data->rq->reserved_cnt--;
|
|
}
|
|
RACTOR_UNLOCK_SELF(data->cr);
|
|
}
|
|
|
|
rb_mutex_unlock(data->cr->receiving_mutex);
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
|
|
{
|
|
if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
|
|
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
unsigned int serial = (unsigned int)-1;
|
|
int index = 0;
|
|
struct rb_ractor_queue *rq = &cr->sync.incoming_queue;
|
|
|
|
while (1) {
|
|
VALUE v = Qundef;
|
|
|
|
ractor_receive_wait(ec, cr);
|
|
|
|
RACTOR_LOCK_SELF(cr);
|
|
{
|
|
if (serial != rq->serial) {
|
|
serial = rq->serial;
|
|
index = 0;
|
|
}
|
|
|
|
// check newer version
|
|
for (int i=index; i<rq->cnt; i++) {
|
|
if (!ractor_queue_skip_p(rq, i)) {
|
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
|
v = ractor_basket_value(b);
|
|
b->type = basket_type_reserved;
|
|
rq->reserved_cnt++;
|
|
index = i;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK_SELF(cr);
|
|
|
|
if (v != Qundef) {
|
|
struct receive_block_data data = {
|
|
.cr = cr,
|
|
.rq = rq,
|
|
.v = v,
|
|
.index = index,
|
|
.success = false,
|
|
};
|
|
|
|
VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
|
|
receive_if_ensure, (VALUE)&data);
|
|
|
|
if (result != Qundef) return result;
|
|
index++;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
|
|
{
|
|
bool closed = false;
|
|
struct rb_ractor_queue *rq = &r->sync.incoming_queue;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (r->sync.incoming_port_closed) {
|
|
closed = true;
|
|
}
|
|
else {
|
|
ractor_queue_enq(r, rq, b);
|
|
if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) {
|
|
RUBY_DEBUG_LOG("wakeup", 0);
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
if (closed) {
|
|
rb_raise(rb_eRactorClosedError, "The incoming-port is already closed");
|
|
}
|
|
}
|
|
|
|
static VALUE ractor_move(VALUE obj); // in this file
|
|
static VALUE ractor_copy(VALUE obj); // in this file
|
|
|
|
static void
|
|
ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will, bool is_yield)
|
|
{
|
|
basket->sender = rb_ec_ractor_ptr(ec)->pub.self;
|
|
basket->exception = exc;
|
|
|
|
if (is_will) {
|
|
basket->type = basket_type_will;
|
|
basket->v = obj;
|
|
}
|
|
else if (rb_ractor_shareable_p(obj)) {
|
|
basket->type = basket_type_ref;
|
|
basket->v = obj;
|
|
}
|
|
else if (!RTEST(move)) {
|
|
basket->v = ractor_copy(obj);
|
|
basket->type = basket_type_copy;
|
|
}
|
|
else {
|
|
basket->type = basket_type_move;
|
|
|
|
if (is_yield) {
|
|
basket->v = obj; // call ractor_move() when yielding timing.
|
|
}
|
|
else {
|
|
basket->v = ractor_move(obj);
|
|
}
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
|
|
{
|
|
struct rb_ractor_basket basket;
|
|
ractor_basket_setup(ec, &basket, obj, move, false, false, false);
|
|
ractor_send_basket(ec, r, &basket);
|
|
return r->pub.self;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
struct rb_ractor_basket basket = {
|
|
.type = basket_type_none,
|
|
};
|
|
bool closed = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (ractor_sleeping_by(r, wait_yielding)) {
|
|
MAYBE_UNUSED(bool) wakeup_result;
|
|
VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none);
|
|
|
|
if (r->sync.wait.yielded_basket.type == basket_type_move) {
|
|
wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_retry);
|
|
}
|
|
else {
|
|
wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_take);
|
|
basket = r->sync.wait.yielded_basket;
|
|
ractor_basket_clear(&r->sync.wait.yielded_basket);
|
|
}
|
|
VM_ASSERT(wakeup_result);
|
|
}
|
|
else if (r->sync.outgoing_port_closed) {
|
|
closed = true;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
if (basket.type == basket_type_none) {
|
|
if (closed) {
|
|
rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
|
|
}
|
|
else {
|
|
return Qundef;
|
|
}
|
|
}
|
|
else {
|
|
return ractor_basket_accept(&basket);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_yield_move_body(VALUE v)
|
|
{
|
|
return ractor_move(v);
|
|
}
|
|
|
|
static bool
|
|
ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket)
|
|
{
|
|
ASSERT_ractor_unlocking(cr);
|
|
VM_ASSERT(basket->type != basket_type_none);
|
|
|
|
if (cr->sync.outgoing_port_closed) {
|
|
rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
|
|
}
|
|
|
|
rb_ractor_t *r;
|
|
|
|
retry_shift:
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors);
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
|
|
if (r) {
|
|
bool retry_shift = false;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (ractor_sleeping_by(r, wait_taking)) {
|
|
VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none);
|
|
|
|
if (basket->type == basket_type_move) {
|
|
enum ractor_wait_status prev_wait_status = r->sync.wait.status;
|
|
r->sync.wait.status = wait_moving;
|
|
|
|
RACTOR_UNLOCK(r);
|
|
{
|
|
int state;
|
|
VALUE moved_value = rb_protect(ractor_yield_move_body, basket->v, &state);
|
|
if (state) {
|
|
r->sync.wait.status = prev_wait_status;
|
|
rb_jump_tag(state);
|
|
}
|
|
else {
|
|
basket->v = moved_value;
|
|
}
|
|
}
|
|
RACTOR_LOCK(r);
|
|
|
|
if (!ractor_wakeup(r, wait_moving, wakeup_by_yield)) {
|
|
// terminating?
|
|
}
|
|
}
|
|
else {
|
|
ractor_wakeup(r, wait_taking, wakeup_by_yield);
|
|
}
|
|
r->sync.wait.taken_basket = *basket;
|
|
}
|
|
else {
|
|
retry_shift = true;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
if (retry_shift) {
|
|
// get candidate take-waiting ractor, but already woke up by another reason.
|
|
// retry to check another ractor.
|
|
goto retry_shift;
|
|
}
|
|
else {
|
|
return true;
|
|
}
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// select(r1, r2, r3, receive: true, yield: obj)
|
|
static VALUE
|
|
ractor_select(rb_execution_context_t *ec, const VALUE *rs, const int rs_len, VALUE yielded_value, bool move, VALUE *ret_r)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
VALUE crv = cr->pub.self;
|
|
VALUE ret = Qundef;
|
|
int i;
|
|
bool interrupted = false;
|
|
enum ractor_wait_status wait_status = 0;
|
|
bool yield_p = (yielded_value != Qundef) ? true : false;
|
|
const int alen = rs_len + (yield_p ? 1 : 0);
|
|
|
|
struct ractor_select_action {
|
|
enum ractor_select_action_type {
|
|
ractor_select_action_take,
|
|
ractor_select_action_receive,
|
|
ractor_select_action_yield,
|
|
} type;
|
|
VALUE v;
|
|
} *actions = ALLOCA_N(struct ractor_select_action, alen);
|
|
|
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
|
VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
|
|
VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
|
|
VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
|
|
|
|
// setup actions
|
|
for (i=0; i<rs_len; i++) {
|
|
VALUE v = rs[i];
|
|
|
|
if (v == crv) {
|
|
actions[i].type = ractor_select_action_receive;
|
|
actions[i].v = Qnil;
|
|
wait_status |= wait_receiving;
|
|
}
|
|
else if (rb_ractor_p(v)) {
|
|
actions[i].type = ractor_select_action_take;
|
|
actions[i].v = v;
|
|
wait_status |= wait_taking;
|
|
}
|
|
else {
|
|
rb_raise(rb_eArgError, "should be a ractor object, but %"PRIsVALUE, v);
|
|
}
|
|
}
|
|
rs = NULL;
|
|
|
|
restart:
|
|
|
|
if (yield_p) {
|
|
actions[rs_len].type = ractor_select_action_yield;
|
|
actions[rs_len].v = Qundef;
|
|
wait_status |= wait_yielding;
|
|
ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false, true);
|
|
}
|
|
|
|
// TODO: shuffle actions
|
|
|
|
while (1) {
|
|
RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status));
|
|
|
|
for (i=0; i<alen; i++) {
|
|
VALUE v, rv;
|
|
switch (actions[i].type) {
|
|
case ractor_select_action_take:
|
|
rv = actions[i].v;
|
|
v = ractor_try_take(ec, RACTOR_PTR(rv));
|
|
if (v != Qundef) {
|
|
*ret_r = rv;
|
|
ret = v;
|
|
goto cleanup;
|
|
}
|
|
break;
|
|
case ractor_select_action_receive:
|
|
v = ractor_try_receive(ec, cr);
|
|
if (v != Qundef) {
|
|
*ret_r = ID2SYM(rb_intern("receive"));
|
|
ret = v;
|
|
goto cleanup;
|
|
}
|
|
break;
|
|
case ractor_select_action_yield:
|
|
{
|
|
if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) {
|
|
*ret_r = ID2SYM(rb_intern("yield"));
|
|
ret = Qnil;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status));
|
|
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
|
cr->sync.wait.status = wait_status;
|
|
cr->sync.wait.wakeup_status = wakeup_none;
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
|
|
// prepare waiting
|
|
for (i=0; i<alen; i++) {
|
|
rb_ractor_t *r;
|
|
switch (actions[i].type) {
|
|
case ractor_select_action_take:
|
|
r = RACTOR_PTR(actions[i].v);
|
|
ractor_register_taking(r, cr);
|
|
break;
|
|
case ractor_select_action_yield:
|
|
case ractor_select_action_receive:
|
|
break;
|
|
}
|
|
}
|
|
|
|
// wait
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
if (cr->sync.wait.wakeup_status == wakeup_none) {
|
|
for (i=0; i<alen; i++) {
|
|
rb_ractor_t *r;
|
|
|
|
switch (actions[i].type) {
|
|
case ractor_select_action_take:
|
|
r = RACTOR_PTR(actions[i].v);
|
|
if (ractor_sleeping_by(r, wait_yielding)) {
|
|
RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->pub.id);
|
|
cr->sync.wait.wakeup_status = wakeup_by_retry;
|
|
goto skip_sleep;
|
|
}
|
|
break;
|
|
case ractor_select_action_receive:
|
|
if (cr->sync.incoming_queue.cnt > 0) {
|
|
RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt);
|
|
cr->sync.wait.wakeup_status = wakeup_by_retry;
|
|
goto skip_sleep;
|
|
}
|
|
break;
|
|
case ractor_select_action_yield:
|
|
if (cr->sync.taking_ractors.cnt > 0) {
|
|
RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt);
|
|
cr->sync.wait.wakeup_status = wakeup_by_retry;
|
|
goto skip_sleep;
|
|
}
|
|
else if (cr->sync.outgoing_port_closed) {
|
|
cr->sync.wait.wakeup_status = wakeup_by_close;
|
|
goto skip_sleep;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status));
|
|
ractor_sleep(ec, cr);
|
|
RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status));
|
|
}
|
|
else {
|
|
skip_sleep:
|
|
RUBY_DEBUG_LOG("no need to sleep %s->%s",
|
|
wait_status_str(cr->sync.wait.status),
|
|
wakeup_status_str(cr->sync.wait.wakeup_status));
|
|
cr->sync.wait.status = wait_none;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
|
|
// cleanup waiting
|
|
for (i=0; i<alen; i++) {
|
|
rb_ractor_t *r;
|
|
switch (actions[i].type) {
|
|
case ractor_select_action_take:
|
|
r = RACTOR_PTR(actions[i].v);
|
|
ractor_waiting_list_del(r, &r->sync.taking_ractors, cr);
|
|
break;
|
|
case ractor_select_action_receive:
|
|
case ractor_select_action_yield:
|
|
break;
|
|
}
|
|
}
|
|
|
|
// check results
|
|
enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status;
|
|
cr->sync.wait.wakeup_status = wakeup_none;
|
|
|
|
switch (wakeup_status) {
|
|
case wakeup_none:
|
|
// OK. something happens.
|
|
// retry loop.
|
|
break;
|
|
case wakeup_by_retry:
|
|
// Retry request.
|
|
break;
|
|
case wakeup_by_send:
|
|
// OK.
|
|
// retry loop and try_receive will succss.
|
|
break;
|
|
case wakeup_by_yield:
|
|
// take was succeeded!
|
|
// cr.wait.taken_basket contains passed block
|
|
VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none);
|
|
*ret_r = cr->sync.wait.taken_basket.sender;
|
|
VM_ASSERT(rb_ractor_p(*ret_r));
|
|
ret = ractor_basket_accept(&cr->sync.wait.taken_basket);
|
|
goto cleanup;
|
|
case wakeup_by_take:
|
|
*ret_r = ID2SYM(rb_intern("yield"));
|
|
ret = Qnil;
|
|
goto cleanup;
|
|
case wakeup_by_close:
|
|
// OK.
|
|
// retry loop and will get CloseError.
|
|
break;
|
|
case wakeup_by_interrupt:
|
|
ret = Qundef;
|
|
interrupted = true;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
|
|
cleanup:
|
|
RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status));
|
|
|
|
if (cr->sync.wait.yielded_basket.type != basket_type_none) {
|
|
ractor_basket_clear(&cr->sync.wait.yielded_basket);
|
|
}
|
|
|
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
|
VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
|
|
VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
|
|
VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
|
|
|
|
if (interrupted) {
|
|
rb_vm_check_ints_blocking(ec);
|
|
interrupted = false;
|
|
goto restart;
|
|
}
|
|
|
|
VM_ASSERT(ret != Qundef);
|
|
return ret;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
|
|
{
|
|
VALUE ret_r;
|
|
ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r);
|
|
return Qnil;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
VALUE ret_r;
|
|
VALUE v = ractor_select(ec, &r->pub.self, 1, Qundef, false, &ret_r);
|
|
return v;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
VALUE prev;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (!r->sync.incoming_port_closed) {
|
|
prev = Qfalse;
|
|
r->sync.incoming_port_closed = true;
|
|
if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
|
|
VM_ASSERT(r->sync.incoming_queue.cnt == 0);
|
|
RUBY_DEBUG_LOG("cancel receiving", 0);
|
|
}
|
|
}
|
|
else {
|
|
prev = Qtrue;
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
return prev;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
|
|
{
|
|
VALUE prev;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (!r->sync.outgoing_port_closed) {
|
|
prev = Qfalse;
|
|
r->sync.outgoing_port_closed = true;
|
|
}
|
|
else {
|
|
prev = Qtrue;
|
|
}
|
|
|
|
// wakeup all taking ractors
|
|
rb_ractor_t *taking_ractor;
|
|
while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) {
|
|
RACTOR_LOCK(taking_ractor);
|
|
ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close);
|
|
RACTOR_UNLOCK(taking_ractor);
|
|
}
|
|
|
|
// raising yielding Ractor
|
|
if (!r->yield_atexit &&
|
|
ractor_wakeup(r, wait_yielding, wakeup_by_close)) {
|
|
RUBY_DEBUG_LOG("cancel yielding", 0);
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
return prev;
|
|
}
|
|
|
|
// creation/termination
|
|
|
|
static uint32_t
|
|
ractor_next_id(void)
|
|
{
|
|
uint32_t id;
|
|
|
|
id = (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id, 1) + 1);
|
|
|
|
return id;
|
|
}
|
|
|
|
static void
|
|
vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode)
|
|
{
|
|
RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt);
|
|
VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P());
|
|
|
|
list_add_tail(&vm->ractor.set, &r->vmlr_node);
|
|
vm->ractor.cnt++;
|
|
}
|
|
|
|
static void
|
|
cancel_single_ractor_mode(void)
|
|
{
|
|
// enable multi-ractor mode
|
|
RUBY_DEBUG_LOG("enable multi-ractor mode", 0);
|
|
|
|
VALUE was_disabled = rb_gc_enable();
|
|
|
|
rb_gc_start();
|
|
rb_transient_heap_evacuate();
|
|
|
|
if (was_disabled) {
|
|
rb_gc_disable();
|
|
}
|
|
|
|
ruby_single_main_ractor = NULL;
|
|
|
|
if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) {
|
|
rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL,
|
|
"Ractor is experimental, and the behavior may change in future versions of Ruby! "
|
|
"Also there are many implementation issues.");
|
|
}
|
|
}
|
|
|
|
static void
|
|
vm_insert_ractor(rb_vm_t *vm, rb_ractor_t *r)
|
|
{
|
|
VM_ASSERT(ractor_status_p(r, ractor_created));
|
|
|
|
if (rb_multi_ractor_p()) {
|
|
RB_VM_LOCK();
|
|
{
|
|
vm_insert_ractor0(vm, r, false);
|
|
vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
else {
|
|
if (vm->ractor.cnt == 0) {
|
|
// main ractor
|
|
vm_insert_ractor0(vm, r, true);
|
|
ractor_status_set(r, ractor_blocking);
|
|
ractor_status_set(r, ractor_running);
|
|
}
|
|
else {
|
|
cancel_single_ractor_mode();
|
|
vm_insert_ractor0(vm, r, true);
|
|
vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr)
|
|
{
|
|
VM_ASSERT(ractor_status_p(cr, ractor_running));
|
|
VM_ASSERT(vm->ractor.cnt > 1);
|
|
VM_ASSERT(cr->threads.cnt == 1);
|
|
|
|
RB_VM_LOCK();
|
|
{
|
|
RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d",
|
|
vm->ractor.cnt, vm->ractor.sync.terminate_waiting);
|
|
|
|
VM_ASSERT(vm->ractor.cnt > 0);
|
|
list_del(&cr->vmlr_node);
|
|
|
|
if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) {
|
|
rb_native_cond_signal(&vm->ractor.sync.terminate_cond);
|
|
}
|
|
vm->ractor.cnt--;
|
|
|
|
/* Clear the cached freelist to prevent a memory leak. */
|
|
rb_gc_ractor_newobj_cache_clear(&cr->newobj_cache);
|
|
|
|
ractor_status_set(cr, ractor_terminated);
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
|
|
static VALUE
|
|
ractor_alloc(VALUE klass)
|
|
{
|
|
rb_ractor_t *r;
|
|
VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r);
|
|
FL_SET_RAW(rv, RUBY_FL_SHAREABLE);
|
|
r->pub.self = rv;
|
|
VM_ASSERT(ractor_status_p(r, ractor_created));
|
|
return rv;
|
|
}
|
|
|
|
rb_ractor_t *
|
|
rb_ractor_main_alloc(void)
|
|
{
|
|
rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t));
|
|
if (r == NULL) {
|
|
fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
|
|
exit(EXIT_FAILURE);
|
|
}
|
|
MEMZERO(r, rb_ractor_t, 1);
|
|
r->pub.id = ++ractor_last_id;
|
|
r->loc = Qnil;
|
|
r->name = Qnil;
|
|
r->pub.self = Qnil;
|
|
ruby_single_main_ractor = r;
|
|
|
|
return r;
|
|
}
|
|
|
|
#if defined(HAVE_WORKING_FORK)
|
|
void
|
|
rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
|
|
{
|
|
// initialize as a main ractor
|
|
vm->ractor.cnt = 0;
|
|
vm->ractor.blocking_cnt = 0;
|
|
ruby_single_main_ractor = th->ractor;
|
|
th->ractor->status_ = ractor_created;
|
|
|
|
rb_ractor_living_threads_init(th->ractor);
|
|
rb_ractor_living_threads_insert(th->ractor, th);
|
|
|
|
VM_ASSERT(vm->ractor.blocking_cnt == 0);
|
|
VM_ASSERT(vm->ractor.cnt == 1);
|
|
}
|
|
#endif
|
|
|
|
void rb_gvl_init(rb_global_vm_lock_t *gvl);
|
|
|
|
void
|
|
rb_ractor_living_threads_init(rb_ractor_t *r)
|
|
{
|
|
list_head_init(&r->threads.set);
|
|
r->threads.cnt = 0;
|
|
r->threads.blocking_cnt = 0;
|
|
}
|
|
|
|
static void
|
|
ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
|
|
{
|
|
ractor_queue_setup(&r->sync.incoming_queue);
|
|
rb_native_mutex_initialize(&r->sync.lock);
|
|
rb_native_cond_initialize(&r->sync.cond);
|
|
rb_native_cond_initialize(&r->barrier_wait_cond);
|
|
|
|
// thread management
|
|
rb_gvl_init(&r->threads.gvl);
|
|
rb_ractor_living_threads_init(r);
|
|
|
|
// naming
|
|
if (!NIL_P(name)) {
|
|
rb_encoding *enc;
|
|
StringValueCStr(name);
|
|
enc = rb_enc_get(name);
|
|
if (!rb_enc_asciicompat(enc)) {
|
|
rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
|
|
rb_enc_name(enc));
|
|
}
|
|
name = rb_str_new_frozen(name);
|
|
}
|
|
r->name = name;
|
|
r->loc = loc;
|
|
}
|
|
|
|
void
|
|
rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th)
|
|
{
|
|
r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r);
|
|
FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE);
|
|
ractor_init(r, Qnil, Qnil);
|
|
r->threads.main = th;
|
|
rb_ractor_living_threads_insert(r, th);
|
|
}
|
|
|
|
static VALUE
|
|
ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VALUE args, VALUE block)
|
|
{
|
|
VALUE rv = ractor_alloc(self);
|
|
rb_ractor_t *r = RACTOR_PTR(rv);
|
|
ractor_init(r, name, loc);
|
|
|
|
// can block here
|
|
r->pub.id = ractor_next_id();
|
|
RUBY_DEBUG_LOG("r:%u", r->pub.id);
|
|
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
r->verbose = cr->verbose;
|
|
r->debug = cr->debug;
|
|
|
|
rb_thread_create_ractor(r, args, block);
|
|
|
|
RB_GC_GUARD(rv);
|
|
return rv;
|
|
}
|
|
|
|
static void
|
|
ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
|
|
{
|
|
if (cr->sync.outgoing_port_closed) {
|
|
return;
|
|
}
|
|
|
|
ASSERT_ractor_unlocking(cr);
|
|
|
|
struct rb_ractor_basket basket;
|
|
ractor_basket_setup(ec, &basket, v, Qfalse, exc, true, true /* this flag is ignored because move is Qfalse */);
|
|
|
|
retry:
|
|
if (ractor_try_yield(ec, cr, &basket)) {
|
|
// OK.
|
|
}
|
|
else {
|
|
bool retry = false;
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
if (cr->sync.taking_ractors.cnt == 0) {
|
|
cr->sync.wait.yielded_basket = basket;
|
|
|
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
|
cr->sync.wait.status = wait_yielding;
|
|
cr->sync.wait.wakeup_status = wakeup_none;
|
|
|
|
VM_ASSERT(cr->yield_atexit == false);
|
|
cr->yield_atexit = true;
|
|
}
|
|
else {
|
|
retry = true; // another ractor is waiting for the yield.
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
|
|
if (retry) goto retry;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_teardown(rb_execution_context_t *ec)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ractor_close_incoming(ec, cr);
|
|
ractor_close_outgoing(ec, cr);
|
|
|
|
// sync with rb_ractor_terminate_interrupt_main_thread()
|
|
RB_VM_LOCK_ENTER();
|
|
{
|
|
VM_ASSERT(cr->threads.main != NULL);
|
|
cr->threads.main = NULL;
|
|
}
|
|
RB_VM_LOCK_LEAVE();
|
|
}
|
|
|
|
void
|
|
rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ractor_yield_atexit(ec, cr, result, false);
|
|
}
|
|
|
|
void
|
|
rb_ractor_atexit_exception(rb_execution_context_t *ec)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ractor_yield_atexit(ec, cr, ec->errinfo, true);
|
|
}
|
|
|
|
void
|
|
rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
|
|
{
|
|
for (int i=0; i<len; i++) {
|
|
ptr[i] = ractor_receive(ec, r);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args)
|
|
{
|
|
int len = RARRAY_LENINT(args);
|
|
for (int i=0; i<len; i++) {
|
|
ractor_send(ec, r, RARRAY_AREF(args, i), false);
|
|
}
|
|
}
|
|
|
|
MJIT_FUNC_EXPORTED bool
|
|
rb_ractor_main_p_(void)
|
|
{
|
|
VM_ASSERT(rb_multi_ractor_p());
|
|
rb_execution_context_t *ec = GET_EC();
|
|
return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor;
|
|
}
|
|
|
|
bool
|
|
rb_obj_is_main_ractor(VALUE gv)
|
|
{
|
|
if (!rb_ractor_p(gv)) return false;
|
|
rb_ractor_t *r = DATA_PTR(gv);
|
|
return r == GET_VM()->ractor.main_ractor;
|
|
}
|
|
|
|
rb_global_vm_lock_t *
|
|
rb_ractor_gvl(rb_ractor_t *r)
|
|
{
|
|
return &r->threads.gvl;
|
|
}
|
|
|
|
int
|
|
rb_ractor_living_thread_num(const rb_ractor_t *r)
|
|
{
|
|
return r->threads.cnt;
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_thread_list(rb_ractor_t *r)
|
|
{
|
|
rb_thread_t *th = 0;
|
|
VALUE *ts;
|
|
int ts_cnt;
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
ts = ALLOCA_N(VALUE, r->threads.cnt);
|
|
ts_cnt = 0;
|
|
|
|
list_for_each(&r->threads.set, th, lt_node) {
|
|
switch (th->status) {
|
|
case THREAD_RUNNABLE:
|
|
case THREAD_STOPPED:
|
|
case THREAD_STOPPED_FOREVER:
|
|
ts[ts_cnt++] = th->self;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
VALUE ary = rb_ary_new();
|
|
for (int i=0; i<ts_cnt; i++) {
|
|
rb_ary_push(ary, ts[i]);
|
|
}
|
|
|
|
return ary;
|
|
}
|
|
|
|
void
|
|
rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th)
|
|
{
|
|
VM_ASSERT(th != NULL);
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt);
|
|
list_add_tail(&r->threads.set, &th->lt_node);
|
|
r->threads.cnt++;
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
|
|
// first thread for a ractor
|
|
if (r->threads.cnt == 1) {
|
|
VM_ASSERT(ractor_status_p(r, ractor_created));
|
|
vm_insert_ractor(th->vm, r);
|
|
}
|
|
}
|
|
|
|
static void
|
|
vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line)
|
|
{
|
|
ractor_status_set(r, ractor_blocking);
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d++", vm->ractor.blocking_cnt);
|
|
vm->ractor.blocking_cnt++;
|
|
VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt);
|
|
}
|
|
|
|
void
|
|
rb_vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
ASSERT_vm_locking();
|
|
VM_ASSERT(GET_RACTOR() == cr);
|
|
vm_ractor_blocking_cnt_inc(vm, cr, file, line);
|
|
}
|
|
|
|
void
|
|
rb_vm_ractor_blocking_cnt_dec(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
ASSERT_vm_locking();
|
|
VM_ASSERT(GET_RACTOR() == cr);
|
|
|
|
RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d--", vm->ractor.blocking_cnt);
|
|
VM_ASSERT(vm->ractor.blocking_cnt > 0);
|
|
vm->ractor.blocking_cnt--;
|
|
|
|
ractor_status_set(cr, ractor_running);
|
|
}
|
|
|
|
static void
|
|
ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const char *file, int line)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
RUBY_DEBUG_LOG2(file, line,
|
|
"cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u",
|
|
cr->threads.cnt, cr->threads.blocking_cnt,
|
|
GET_VM()->ractor.cnt, GET_VM()->ractor.blocking_cnt);
|
|
|
|
VM_ASSERT(cr->threads.cnt >= cr->threads.blocking_cnt + 1);
|
|
|
|
if (remained_thread_cnt > 0 &&
|
|
// will be block
|
|
cr->threads.cnt == cr->threads.blocking_cnt + 1) {
|
|
// change ractor status: running -> blocking
|
|
rb_vm_t *vm = GET_VM();
|
|
ASSERT_vm_unlocking();
|
|
|
|
RB_VM_LOCK();
|
|
{
|
|
rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line);
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
|
|
{
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
|
|
ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
|
|
|
|
if (cr->threads.cnt == 1) {
|
|
vm_remove_ractor(th->vm, cr);
|
|
}
|
|
else {
|
|
RACTOR_LOCK(cr);
|
|
{
|
|
list_del(&th->lt_node);
|
|
cr->threads.cnt--;
|
|
}
|
|
RACTOR_UNLOCK(cr);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_blocking_threads_inc(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
RUBY_DEBUG_LOG2(file, line, "cr->threads.blocking_cnt:%d++", cr->threads.blocking_cnt);
|
|
|
|
VM_ASSERT(cr->threads.cnt > 0);
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
ractor_check_blocking(cr, cr->threads.cnt, __FILE__, __LINE__);
|
|
cr->threads.blocking_cnt++;
|
|
}
|
|
|
|
void
|
|
rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line)
|
|
{
|
|
RUBY_DEBUG_LOG2(file, line,
|
|
"r->threads.blocking_cnt:%d--, r->threads.cnt:%u",
|
|
cr->threads.blocking_cnt, cr->threads.cnt);
|
|
|
|
VM_ASSERT(cr == GET_RACTOR());
|
|
|
|
if (cr->threads.cnt == cr->threads.blocking_cnt) {
|
|
rb_vm_t *vm = GET_VM();
|
|
|
|
RB_VM_LOCK_ENTER();
|
|
{
|
|
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
|
|
}
|
|
RB_VM_LOCK_LEAVE();
|
|
}
|
|
|
|
cr->threads.blocking_cnt--;
|
|
}
|
|
|
|
void
|
|
rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t *r)
|
|
{
|
|
VM_ASSERT(r != GET_RACTOR());
|
|
ASSERT_ractor_unlocking(r);
|
|
ASSERT_vm_locking();
|
|
|
|
RACTOR_LOCK(r);
|
|
{
|
|
if (ractor_status_p(r, ractor_running)) {
|
|
rb_execution_context_t *ec = r->threads.running_ec;
|
|
if (ec) {
|
|
RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec);
|
|
}
|
|
}
|
|
}
|
|
RACTOR_UNLOCK(r);
|
|
}
|
|
|
|
void
|
|
rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r)
|
|
{
|
|
VM_ASSERT(r != GET_RACTOR());
|
|
ASSERT_ractor_unlocking(r);
|
|
ASSERT_vm_locking();
|
|
|
|
rb_thread_t *main_th = r->threads.main;
|
|
if (main_th) {
|
|
if (main_th->status != THREAD_KILLED) {
|
|
RUBY_VM_SET_TERMINATE_INTERRUPT(main_th->ec);
|
|
rb_threadptr_interrupt(main_th);
|
|
}
|
|
else {
|
|
RUBY_DEBUG_LOG("killed (%p)", main_th);
|
|
}
|
|
}
|
|
}
|
|
|
|
void rb_thread_terminate_all(rb_thread_t *th); // thread.c
|
|
|
|
static void
|
|
ractor_terminal_interrupt_all(rb_vm_t *vm)
|
|
{
|
|
if (vm->ractor.cnt > 1) {
|
|
// send terminate notification to all ractors
|
|
rb_ractor_t *r = 0;
|
|
list_for_each(&vm->ractor.set, r, vmlr_node) {
|
|
if (r != vm->ractor.main_ractor) {
|
|
rb_ractor_terminate_interrupt_main_thread(r);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_terminate_all(void)
|
|
{
|
|
rb_vm_t *vm = GET_VM();
|
|
rb_ractor_t *cr = vm->ractor.main_ractor;
|
|
|
|
VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it.
|
|
|
|
if (vm->ractor.cnt > 1) {
|
|
RB_VM_LOCK();
|
|
ractor_terminal_interrupt_all(vm); // kill all ractors
|
|
RB_VM_UNLOCK();
|
|
}
|
|
rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
|
|
|
|
RB_VM_LOCK();
|
|
{
|
|
while (vm->ractor.cnt > 1) {
|
|
RUBY_DEBUG_LOG("terminate_waiting:%d", vm->ractor.sync.terminate_waiting);
|
|
vm->ractor.sync.terminate_waiting = true;
|
|
|
|
// wait for 1sec
|
|
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
|
|
rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
|
|
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
|
|
|
|
ractor_terminal_interrupt_all(vm);
|
|
}
|
|
}
|
|
RB_VM_UNLOCK();
|
|
}
|
|
|
|
rb_execution_context_t *
|
|
rb_vm_main_ractor_ec(rb_vm_t *vm)
|
|
{
|
|
return vm->ractor.main_ractor->threads.running_ec;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_moved_missing(int argc, VALUE *argv, VALUE self)
|
|
{
|
|
rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object");
|
|
}
|
|
|
|
/*
|
|
* Document-class: Ractor::ClosedError
|
|
*
|
|
* Raised when an attempt is made to send a message to a closed port,
|
|
* or to retrieve a message from a closed and empty port.
|
|
* Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
|
|
* and are closed implicitly when a Ractor terminates.
|
|
*
|
|
* r = Ractor.new { sleep(500) }
|
|
* r.close_outgoing
|
|
* r.take # Ractor::ClosedError
|
|
*
|
|
* ClosedError is a descendant of StopIteration, so the closing of the ractor will break
|
|
* the loops without propagating the error:
|
|
*
|
|
* r = Ractor.new do
|
|
* loop do
|
|
* msg = receive # raises ClosedError and loop traps it
|
|
* puts "Received: #{msg}"
|
|
* end
|
|
* puts "loop exited"
|
|
* end
|
|
*
|
|
* 3.times{|i| r << i}
|
|
* r.close_incoming
|
|
* r.take
|
|
* puts "Continue successfully"
|
|
*
|
|
* This will print:
|
|
*
|
|
* Received: 0
|
|
* Received: 1
|
|
* Received: 2
|
|
* loop exited
|
|
* Continue successfully
|
|
*/
|
|
|
|
/*
|
|
* Document-class: Ractor::RemoteError
|
|
*
|
|
* Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
|
|
* Its +cause+ will contain the original exception, and +ractor+ is the original ractor
|
|
* it was raised in.
|
|
*
|
|
* r = Ractor.new { raise "Something weird happened" }
|
|
*
|
|
* begin
|
|
* r.take
|
|
* rescue => e
|
|
* p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
|
|
* p e.ractor == r # => true
|
|
* p e.cause # => #<RuntimeError: Something weird happened>
|
|
* end
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* Document-class: Ractor::MovedError
|
|
*
|
|
* Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
|
|
*
|
|
* r = Ractor.new { sleep }
|
|
*
|
|
* ary = [1, 2, 3]
|
|
* r.send(ary, move: true)
|
|
* ary.inspect
|
|
* # Ractor::MovedError (can not send any methods to a moved object)
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* Document-class: Ractor::MovedObject
|
|
*
|
|
* A special object which replaces any value that was moved to another ractor in Ractor#send
|
|
* or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
|
|
*
|
|
* r = Ractor.new { receive }
|
|
*
|
|
* ary = [1, 2, 3]
|
|
* r.send(ary, move: true)
|
|
* p Ractor::MovedObject === ary
|
|
* # => true
|
|
* ary.inspect
|
|
* # Ractor::MovedError (can not send any methods to a moved object)
|
|
*/
|
|
|
|
// Main docs are in ractor.rb, but without this clause there are weird artifacts
|
|
// in their rendering.
|
|
/*
|
|
* Document-class: Ractor
|
|
*
|
|
*/
|
|
|
|
void
|
|
Init_Ractor(void)
|
|
{
|
|
rb_cRactor = rb_define_class("Ractor", rb_cObject);
|
|
rb_undef_alloc_func(rb_cRactor);
|
|
|
|
rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError);
|
|
rb_eRactorIsolationError = rb_define_class_under(rb_cRactor, "IsolationError", rb_eRactorError);
|
|
rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError);
|
|
rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError);
|
|
rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration);
|
|
rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError);
|
|
|
|
rb_cRactorMovedObject = rb_define_class_under(rb_cRactor, "MovedObject", rb_cBasicObject);
|
|
rb_undef_alloc_func(rb_cRactorMovedObject);
|
|
rb_define_method(rb_cRactorMovedObject, "method_missing", ractor_moved_missing, -1);
|
|
|
|
// override methods defined in BasicObject
|
|
rb_define_method(rb_cRactorMovedObject, "__send__", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "!", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "==", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "!=", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "__id__", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
|
|
rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
|
|
}
|
|
|
|
void
|
|
rb_ractor_dump(void)
|
|
{
|
|
rb_vm_t *vm = GET_VM();
|
|
rb_ractor_t *r = 0;
|
|
|
|
list_for_each(&vm->ractor.set, r, vmlr_node) {
|
|
if (r != vm->ractor.main_ractor) {
|
|
fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_));
|
|
}
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_stdin(void)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
return rb_stdin;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
return cr->r_stdin;
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_stdout(void)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
return rb_stdout;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
return cr->r_stdout;
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_stderr(void)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
return rb_stderr;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
return cr->r_stderr;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_stdin_set(VALUE in)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
rb_stdin = in;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
RB_OBJ_WRITE(cr->pub.self, &cr->r_stdin, in);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_stdout_set(VALUE out)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
rb_stdout = out;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
RB_OBJ_WRITE(cr->pub.self, &cr->r_stdout, out);
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_stderr_set(VALUE err)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
rb_stderr = err;
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
RB_OBJ_WRITE(cr->pub.self, &cr->r_stderr, err);
|
|
}
|
|
}
|
|
|
|
rb_hook_list_t *
|
|
rb_ractor_hooks(rb_ractor_t *cr)
|
|
{
|
|
return &cr->pub.hooks;
|
|
}
|
|
|
|
/// traverse function
|
|
|
|
// 2: stop search
|
|
// 1: skip child
|
|
// 0: continue
|
|
|
|
enum obj_traverse_iterator_result {
|
|
traverse_cont,
|
|
traverse_skip,
|
|
traverse_stop,
|
|
};
|
|
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func)(VALUE obj);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func)(VALUE obj);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func)(VALUE obj);
|
|
|
|
static enum obj_traverse_iterator_result null_leave(VALUE obj);
|
|
|
|
struct obj_traverse_data {
|
|
rb_obj_traverse_enter_func enter_func;
|
|
rb_obj_traverse_leave_func leave_func;
|
|
|
|
st_table *rec;
|
|
VALUE rec_hash;
|
|
};
|
|
|
|
|
|
struct obj_traverse_callback_data {
|
|
bool stop;
|
|
struct obj_traverse_data *data;
|
|
};
|
|
|
|
static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data);
|
|
|
|
static int
|
|
obj_hash_traverse_i(VALUE key, VALUE val, VALUE ptr)
|
|
{
|
|
struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
|
|
|
|
if (obj_traverse_i(key, d->data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
|
|
if (obj_traverse_i(val, d->data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
obj_traverse_reachable_i(VALUE obj, void *ptr)
|
|
{
|
|
struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
|
|
|
|
if (obj_traverse_i(obj, d->data)) {
|
|
d->stop = true;
|
|
}
|
|
}
|
|
|
|
static struct st_table *
|
|
obj_traverse_rec(struct obj_traverse_data *data)
|
|
{
|
|
if (UNLIKELY(!data->rec)) {
|
|
data->rec_hash = rb_ident_hash_new();
|
|
data->rec = rb_hash_st_table(data->rec_hash);
|
|
}
|
|
return data->rec;
|
|
}
|
|
|
|
static int
|
|
obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
|
|
{
|
|
if (RB_SPECIAL_CONST_P(obj)) return 0;
|
|
|
|
switch (data->enter_func(obj)) {
|
|
case traverse_cont: break;
|
|
case traverse_skip: return 0; // skip children
|
|
case traverse_stop: return 1; // stop search
|
|
}
|
|
|
|
if (UNLIKELY(st_insert(obj_traverse_rec(data), obj, 1))) {
|
|
// already traversed
|
|
return 0;
|
|
}
|
|
|
|
if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
|
|
struct gen_ivtbl *ivtbl;
|
|
rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
|
|
for (uint32_t i = 0; i < ivtbl->numiv; i++) {
|
|
VALUE val = ivtbl->ivptr[i];
|
|
if (val != Qundef && obj_traverse_i(val, data)) return 1;
|
|
}
|
|
}
|
|
|
|
switch (BUILTIN_TYPE(obj)) {
|
|
// no child node
|
|
case T_STRING:
|
|
case T_FLOAT:
|
|
case T_BIGNUM:
|
|
case T_REGEXP:
|
|
case T_FILE:
|
|
case T_SYMBOL:
|
|
case T_MATCH:
|
|
break;
|
|
|
|
case T_OBJECT:
|
|
{
|
|
uint32_t len = ROBJECT_NUMIV(obj);
|
|
VALUE *ptr = ROBJECT_IVPTR(obj);
|
|
|
|
for (uint32_t i=0; i<len; i++) {
|
|
VALUE val = ptr[i];
|
|
if (val != Qundef && obj_traverse_i(val, data)) return 1;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_ARRAY:
|
|
{
|
|
for (int i = 0; i < RARRAY_LENINT(obj); i++) {
|
|
VALUE e = rb_ary_entry(obj, i);
|
|
if (obj_traverse_i(e, data)) return 1;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_HASH:
|
|
{
|
|
if (obj_traverse_i(RHASH_IFNONE(obj), data)) return 1;
|
|
|
|
struct obj_traverse_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
};
|
|
rb_hash_foreach(obj, obj_hash_traverse_i, (VALUE)&d);
|
|
if (d.stop) return 1;
|
|
}
|
|
break;
|
|
|
|
case T_STRUCT:
|
|
{
|
|
long len = RSTRUCT_LEN(obj);
|
|
const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
|
|
|
|
for (long i=0; i<len; i++) {
|
|
if (obj_traverse_i(ptr[i], data)) return 1;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_RATIONAL:
|
|
if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1;
|
|
if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1;
|
|
break;
|
|
case T_COMPLEX:
|
|
if (obj_traverse_i(RCOMPLEX(obj)->real, data)) return 1;
|
|
if (obj_traverse_i(RCOMPLEX(obj)->imag, data)) return 1;
|
|
break;
|
|
|
|
case T_DATA:
|
|
case T_IMEMO:
|
|
{
|
|
struct obj_traverse_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
};
|
|
rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d);
|
|
if (d.stop) return 1;
|
|
}
|
|
break;
|
|
|
|
// unreachable
|
|
case T_CLASS:
|
|
case T_MODULE:
|
|
case T_ICLASS:
|
|
default:
|
|
rp(obj);
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
if (data->leave_func(obj) == traverse_stop) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
struct rb_obj_traverse_final_data {
|
|
rb_obj_traverse_final_func final_func;
|
|
int stopped;
|
|
};
|
|
|
|
static int
|
|
obj_traverse_final_i(st_data_t key, st_data_t val, st_data_t arg)
|
|
{
|
|
struct rb_obj_traverse_final_data *data = (void *)arg;
|
|
if (data->final_func(key)) {
|
|
data->stopped = 1;
|
|
return ST_STOP;
|
|
}
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
// 0: traverse all
|
|
// 1: stopped
|
|
static int
|
|
rb_obj_traverse(VALUE obj,
|
|
rb_obj_traverse_enter_func enter_func,
|
|
rb_obj_traverse_leave_func leave_func,
|
|
rb_obj_traverse_final_func final_func)
|
|
{
|
|
struct obj_traverse_data data = {
|
|
.enter_func = enter_func,
|
|
.leave_func = leave_func,
|
|
.rec = NULL,
|
|
};
|
|
|
|
if (obj_traverse_i(obj, &data)) return 1;
|
|
if (final_func && data.rec) {
|
|
struct rb_obj_traverse_final_data f = {final_func, 0};
|
|
st_foreach(data.rec, obj_traverse_final_i, (st_data_t)&f);
|
|
return f.stopped;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
frozen_shareable_p(VALUE obj, bool *made_shareable)
|
|
{
|
|
if (!RB_TYPE_P(obj, T_DATA)) {
|
|
return true;
|
|
}
|
|
else if (RTYPEDDATA_P(obj)) {
|
|
const rb_data_type_t *type = RTYPEDDATA_TYPE(obj);
|
|
if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) {
|
|
return true;
|
|
}
|
|
else if (made_shareable && rb_obj_is_proc(obj)) {
|
|
// special path to make shareable Proc.
|
|
rb_proc_ractor_make_shareable(obj);
|
|
*made_shareable = true;
|
|
VM_ASSERT(RB_OBJ_SHAREABLE_P(obj));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
make_shareable_check_shareable(VALUE obj)
|
|
{
|
|
VM_ASSERT(!SPECIAL_CONST_P(obj));
|
|
bool made_shareable = false;
|
|
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
else if (!frozen_shareable_p(obj, &made_shareable)) {
|
|
if (made_shareable) {
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj);
|
|
}
|
|
}
|
|
|
|
if (!RB_OBJ_FROZEN_RAW(obj)) {
|
|
rb_funcall(obj, idFreeze, 0);
|
|
|
|
if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
|
|
rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
|
|
}
|
|
|
|
if (RB_OBJ_SHAREABLE_P(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
}
|
|
|
|
return traverse_cont;
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
mark_shareable(VALUE obj)
|
|
{
|
|
FL_SET_RAW(obj, RUBY_FL_SHAREABLE);
|
|
return traverse_cont;
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_make_shareable(VALUE obj)
|
|
{
|
|
rb_obj_traverse(obj,
|
|
make_shareable_check_shareable,
|
|
null_leave, mark_shareable);
|
|
return obj;
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_make_shareable_copy(VALUE obj)
|
|
{
|
|
VALUE copy = ractor_copy(obj);
|
|
rb_obj_traverse(copy,
|
|
make_shareable_check_shareable,
|
|
null_leave, mark_shareable);
|
|
return copy;
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_ensure_shareable(VALUE obj, VALUE name)
|
|
{
|
|
if (!rb_ractor_shareable_p(obj)) {
|
|
VALUE message = rb_sprintf("cannot assign unshareable object to %"PRIsVALUE,
|
|
name);
|
|
rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError, message));
|
|
}
|
|
return obj;
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
shareable_p_enter(VALUE obj)
|
|
{
|
|
if (RB_OBJ_SHAREABLE_P(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
else if (RB_TYPE_P(obj, T_CLASS) ||
|
|
RB_TYPE_P(obj, T_MODULE) ||
|
|
RB_TYPE_P(obj, T_ICLASS)) {
|
|
// TODO: remove it
|
|
mark_shareable(obj);
|
|
return traverse_skip;
|
|
}
|
|
else if (RB_OBJ_FROZEN_RAW(obj) &&
|
|
frozen_shareable_p(obj, NULL)) {
|
|
return traverse_cont;
|
|
}
|
|
|
|
return traverse_stop; // fail
|
|
}
|
|
|
|
MJIT_FUNC_EXPORTED bool
|
|
rb_ractor_shareable_p_continue(VALUE obj)
|
|
{
|
|
if (rb_obj_traverse(obj,
|
|
shareable_p_enter, null_leave,
|
|
mark_shareable)) {
|
|
return false;
|
|
}
|
|
else {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
#if RACTOR_CHECK_MODE > 0
|
|
static enum obj_traverse_iterator_result
|
|
reset_belonging_enter(VALUE obj)
|
|
{
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
rb_ractor_setup_belonging(obj);
|
|
return traverse_cont;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static enum obj_traverse_iterator_result
|
|
null_leave(VALUE obj)
|
|
{
|
|
return traverse_cont;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_reset_belonging(VALUE obj)
|
|
{
|
|
#if RACTOR_CHECK_MODE > 0
|
|
rb_obj_traverse(obj, reset_belonging_enter, null_leave, NULL);
|
|
#endif
|
|
return obj;
|
|
}
|
|
|
|
|
|
/// traverse and replace function
|
|
|
|
// 2: stop search
|
|
// 1: skip child
|
|
// 0: continue
|
|
|
|
struct obj_traverse_replace_data;
|
|
static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func)(VALUE obj, struct obj_traverse_replace_data *data);
|
|
typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func)(VALUE obj, struct obj_traverse_replace_data *data);
|
|
|
|
struct obj_traverse_replace_data {
|
|
rb_obj_traverse_replace_enter_func enter_func;
|
|
rb_obj_traverse_replace_leave_func leave_func;
|
|
|
|
st_table *rec;
|
|
VALUE rec_hash;
|
|
|
|
VALUE replacement;
|
|
bool move;
|
|
};
|
|
|
|
struct obj_traverse_replace_callback_data {
|
|
bool stop;
|
|
VALUE src;
|
|
struct obj_traverse_replace_data *data;
|
|
};
|
|
|
|
static int
|
|
obj_hash_traverse_replace_foreach_i(st_data_t key, st_data_t value, st_data_t argp, int error)
|
|
{
|
|
return ST_REPLACE;
|
|
}
|
|
|
|
static int
|
|
obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int exists)
|
|
{
|
|
struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
|
|
struct obj_traverse_replace_data *data = d->data;
|
|
|
|
if (obj_traverse_replace_i(*key, data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
else if (*key != data->replacement) {
|
|
VALUE v = *key = data->replacement;
|
|
RB_OBJ_WRITTEN(d->src, Qundef, v);
|
|
}
|
|
|
|
if (obj_traverse_replace_i(*val, data)) {
|
|
d->stop = true;
|
|
return ST_STOP;
|
|
}
|
|
else if (*val != data->replacement) {
|
|
VALUE v = *val = data->replacement;
|
|
RB_OBJ_WRITTEN(d->src, Qundef, v);
|
|
}
|
|
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static struct st_table *
|
|
obj_traverse_replace_rec(struct obj_traverse_replace_data *data)
|
|
{
|
|
if (UNLIKELY(!data->rec)) {
|
|
data->rec_hash = rb_ident_hash_new();
|
|
data->rec = rb_hash_st_table(data->rec_hash);
|
|
}
|
|
return data->rec;
|
|
}
|
|
|
|
#if USE_TRANSIENT_HEAP
|
|
void rb_ary_transient_heap_evacuate(VALUE ary, int promote);
|
|
void rb_obj_transient_heap_evacuate(VALUE obj, int promote);
|
|
void rb_hash_transient_heap_evacuate(VALUE hash, int promote);
|
|
void rb_struct_transient_heap_evacuate(VALUE st, int promote);
|
|
#endif
|
|
|
|
static void
|
|
obj_refer_only_shareables_p_i(VALUE obj, void *ptr)
|
|
{
|
|
int *pcnt = (int *)ptr;
|
|
|
|
if (!rb_ractor_shareable_p(obj)) {
|
|
pcnt++;
|
|
}
|
|
}
|
|
|
|
static int
|
|
obj_refer_only_shareables_p(VALUE obj)
|
|
{
|
|
int cnt = 0;
|
|
rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt);
|
|
return cnt == 0;
|
|
}
|
|
|
|
static int
|
|
obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
VALUE replacement;
|
|
|
|
if (RB_SPECIAL_CONST_P(obj)) {
|
|
data->replacement = obj;
|
|
return 0;
|
|
}
|
|
|
|
switch (data->enter_func(obj, data)) {
|
|
case traverse_cont: break;
|
|
case traverse_skip: return 0; // skip children
|
|
case traverse_stop: return 1; // stop search
|
|
}
|
|
|
|
replacement = data->replacement;
|
|
|
|
if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t *)&replacement))) {
|
|
data->replacement = replacement;
|
|
return 0;
|
|
}
|
|
else {
|
|
st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t)replacement);
|
|
}
|
|
|
|
if (!data->move) {
|
|
obj = replacement;
|
|
}
|
|
|
|
#define CHECK_AND_REPLACE(v) do { \
|
|
VALUE _val = (v); \
|
|
if (obj_traverse_replace_i(_val, data)) { return 1; } \
|
|
else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
|
|
} while (0)
|
|
|
|
if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
|
|
struct gen_ivtbl *ivtbl;
|
|
rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
|
|
for (uint32_t i = 0; i < ivtbl->numiv; i++) {
|
|
if (ivtbl->ivptr[i] != Qundef) {
|
|
CHECK_AND_REPLACE(ivtbl->ivptr[i]);
|
|
}
|
|
}
|
|
}
|
|
|
|
switch (BUILTIN_TYPE(obj)) {
|
|
// no child node
|
|
case T_FLOAT:
|
|
case T_BIGNUM:
|
|
case T_REGEXP:
|
|
case T_FILE:
|
|
case T_SYMBOL:
|
|
case T_MATCH:
|
|
break;
|
|
case T_STRING:
|
|
rb_str_make_independent(obj);
|
|
break;
|
|
|
|
case T_OBJECT:
|
|
{
|
|
#if USE_TRANSIENT_HEAP
|
|
if (data->move) rb_obj_transient_heap_evacuate(obj, TRUE);
|
|
#endif
|
|
|
|
uint32_t len = ROBJECT_NUMIV(obj);
|
|
VALUE *ptr = ROBJECT_IVPTR(obj);
|
|
|
|
for (uint32_t i=0; i<len; i++) {
|
|
if (ptr[i] != Qundef) {
|
|
CHECK_AND_REPLACE(ptr[i]);
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_ARRAY:
|
|
{
|
|
rb_ary_cancel_sharing(obj);
|
|
#if USE_TRANSIENT_HEAP
|
|
if (data->move) rb_ary_transient_heap_evacuate(obj, TRUE);
|
|
#endif
|
|
|
|
for (int i = 0; i < RARRAY_LENINT(obj); i++) {
|
|
VALUE e = rb_ary_entry(obj, i);
|
|
|
|
if (obj_traverse_replace_i(e, data)) {
|
|
return 1;
|
|
}
|
|
else if (e != data->replacement) {
|
|
RARRAY_ASET(obj, i, data->replacement);
|
|
}
|
|
}
|
|
RB_GC_GUARD(obj);
|
|
}
|
|
break;
|
|
|
|
case T_HASH:
|
|
{
|
|
#if USE_TRANSIENT_HEAP
|
|
if (data->move) rb_hash_transient_heap_evacuate(obj, TRUE);
|
|
#endif
|
|
struct obj_traverse_replace_callback_data d = {
|
|
.stop = false,
|
|
.data = data,
|
|
.src = obj,
|
|
};
|
|
rb_hash_stlike_foreach_with_replace(obj,
|
|
obj_hash_traverse_replace_foreach_i,
|
|
obj_hash_traverse_replace_i,
|
|
(VALUE)&d);
|
|
if (d.stop) return 1;
|
|
// TODO: rehash here?
|
|
|
|
VALUE ifnone = RHASH_IFNONE(obj);
|
|
if (obj_traverse_replace_i(ifnone, data)) {
|
|
return 1;
|
|
}
|
|
else if (ifnone != data->replacement) {
|
|
RHASH_SET_IFNONE(obj, data->replacement);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_STRUCT:
|
|
{
|
|
#if USE_TRANSIENT_HEAP
|
|
if (data->move) rb_struct_transient_heap_evacuate(obj, TRUE);
|
|
#endif
|
|
long len = RSTRUCT_LEN(obj);
|
|
const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
|
|
|
|
for (long i=0; i<len; i++) {
|
|
CHECK_AND_REPLACE(ptr[i]);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case T_RATIONAL:
|
|
CHECK_AND_REPLACE(RRATIONAL(obj)->num);
|
|
CHECK_AND_REPLACE(RRATIONAL(obj)->den);
|
|
break;
|
|
case T_COMPLEX:
|
|
CHECK_AND_REPLACE(RCOMPLEX(obj)->real);
|
|
CHECK_AND_REPLACE(RCOMPLEX(obj)->imag);
|
|
break;
|
|
|
|
case T_DATA:
|
|
if (!data->move && obj_refer_only_shareables_p(obj)) {
|
|
break;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not %s %"PRIsVALUE" object.",
|
|
data->move ? "move" : "copy", rb_class_of(obj));
|
|
}
|
|
|
|
case T_IMEMO:
|
|
// not supported yet
|
|
return 1;
|
|
|
|
// unreachable
|
|
case T_CLASS:
|
|
case T_MODULE:
|
|
case T_ICLASS:
|
|
default:
|
|
rp(obj);
|
|
rb_bug("unreachable");
|
|
}
|
|
|
|
data->replacement = replacement;
|
|
|
|
if (data->leave_func(obj, data) == traverse_stop) {
|
|
return 1;
|
|
}
|
|
else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
// 0: traverse all
|
|
// 1: stopped
|
|
static VALUE
|
|
rb_obj_traverse_replace(VALUE obj,
|
|
rb_obj_traverse_replace_enter_func enter_func,
|
|
rb_obj_traverse_replace_leave_func leave_func,
|
|
bool move)
|
|
{
|
|
struct obj_traverse_replace_data data = {
|
|
.enter_func = enter_func,
|
|
.leave_func = leave_func,
|
|
.rec = NULL,
|
|
.replacement = Qundef,
|
|
.move = move,
|
|
};
|
|
|
|
if (obj_traverse_replace_i(obj, &data)) {
|
|
return Qundef;
|
|
}
|
|
else {
|
|
return data.replacement;
|
|
}
|
|
}
|
|
|
|
struct RVALUE {
|
|
VALUE flags;
|
|
VALUE klass;
|
|
VALUE v1;
|
|
VALUE v2;
|
|
VALUE v3;
|
|
};
|
|
|
|
static const VALUE fl_users = FL_USER1 | FL_USER2 | FL_USER3 |
|
|
FL_USER4 | FL_USER5 | FL_USER6 | FL_USER7 |
|
|
FL_USER8 | FL_USER9 | FL_USER10 | FL_USER11 |
|
|
FL_USER12 | FL_USER13 | FL_USER14 | FL_USER15 |
|
|
FL_USER16 | FL_USER17 | FL_USER18 | FL_USER19;
|
|
|
|
static void
|
|
ractor_moved_bang(VALUE obj)
|
|
{
|
|
// invalidate src object
|
|
struct RVALUE *rv = (void *)obj;
|
|
|
|
rv->klass = rb_cRactorMovedObject;
|
|
rv->v1 = 0;
|
|
rv->v2 = 0;
|
|
rv->v3 = 0;
|
|
rv->flags = rv->flags & ~fl_users;
|
|
|
|
// TODO: record moved location
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
move_enter(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
data->replacement = obj;
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
data->replacement = rb_obj_alloc(RBASIC_CLASS(obj));
|
|
return traverse_cont;
|
|
}
|
|
}
|
|
|
|
void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c
|
|
|
|
static enum obj_traverse_iterator_result
|
|
move_leave(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
VALUE v = data->replacement;
|
|
struct RVALUE *dst = (struct RVALUE *)v;
|
|
struct RVALUE *src = (struct RVALUE *)obj;
|
|
|
|
dst->flags = (dst->flags & ~fl_users) | (src->flags & fl_users);
|
|
|
|
dst->v1 = src->v1;
|
|
dst->v2 = src->v2;
|
|
dst->v3 = src->v3;
|
|
|
|
if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
|
|
rb_replace_generic_ivar(v, obj);
|
|
}
|
|
|
|
// TODO: generic_ivar
|
|
|
|
ractor_moved_bang(obj);
|
|
return traverse_cont;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_move(VALUE obj)
|
|
{
|
|
VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true);
|
|
if (val != Qundef) {
|
|
return val;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not move the object");
|
|
}
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
copy_enter(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
if (rb_ractor_shareable_p(obj)) {
|
|
data->replacement = obj;
|
|
return traverse_skip;
|
|
}
|
|
else {
|
|
data->replacement = rb_obj_clone(obj);
|
|
return traverse_cont;
|
|
}
|
|
}
|
|
|
|
static enum obj_traverse_iterator_result
|
|
copy_leave(VALUE obj, struct obj_traverse_replace_data *data)
|
|
{
|
|
return traverse_cont;
|
|
}
|
|
|
|
static VALUE
|
|
ractor_copy(VALUE obj)
|
|
{
|
|
VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false);
|
|
if (val != Qundef) {
|
|
return val;
|
|
}
|
|
else {
|
|
rb_raise(rb_eRactorError, "can not copy the object");
|
|
}
|
|
}
|
|
|
|
// Ractor local storage
|
|
|
|
struct rb_ractor_local_key_struct {
|
|
const struct rb_ractor_local_storage_type *type;
|
|
void *main_cache;
|
|
};
|
|
|
|
static struct freed_ractor_local_keys_struct {
|
|
int cnt;
|
|
int capa;
|
|
rb_ractor_local_key_t *keys;
|
|
} freed_ractor_local_keys;
|
|
|
|
static int
|
|
ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
|
|
{
|
|
struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
|
|
if (k->type->mark) (*k->type->mark)((void *)val);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static enum rb_id_table_iterator_result
|
|
idkey_local_storage_mark_i(ID id, VALUE val, void *dmy)
|
|
{
|
|
rb_gc_mark(val);
|
|
return ID_TABLE_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_local_storage_mark(rb_ractor_t *r)
|
|
{
|
|
if (r->local_storage) {
|
|
st_foreach(r->local_storage, ractor_local_storage_mark_i, 0);
|
|
|
|
for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
|
|
rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i];
|
|
st_data_t val;
|
|
if (st_delete(r->local_storage, (st_data_t *)&key, &val) &&
|
|
key->type->free) {
|
|
(*key->type->free)((void *)val);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (r->idkey_local_storage) {
|
|
rb_id_table_foreach(r->idkey_local_storage, idkey_local_storage_mark_i, NULL);
|
|
}
|
|
}
|
|
|
|
static int
|
|
ractor_local_storage_free_i(st_data_t key, st_data_t val, st_data_t dmy)
|
|
{
|
|
struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
|
|
if (k->type->free) (*k->type->free)((void *)val);
|
|
return ST_CONTINUE;
|
|
}
|
|
|
|
static void
|
|
ractor_local_storage_free(rb_ractor_t *r)
|
|
{
|
|
if (r->local_storage) {
|
|
st_foreach(r->local_storage, ractor_local_storage_free_i, 0);
|
|
st_free_table(r->local_storage);
|
|
}
|
|
|
|
if (r->idkey_local_storage) {
|
|
rb_id_table_free(r->idkey_local_storage);
|
|
}
|
|
}
|
|
|
|
static void
|
|
rb_ractor_local_storage_value_mark(void *ptr)
|
|
{
|
|
rb_gc_mark((VALUE)ptr);
|
|
}
|
|
|
|
static const struct rb_ractor_local_storage_type ractor_local_storage_type_null = {
|
|
NULL,
|
|
NULL,
|
|
};
|
|
|
|
const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free = {
|
|
NULL,
|
|
ruby_xfree,
|
|
};
|
|
|
|
static const struct rb_ractor_local_storage_type ractor_local_storage_type_value = {
|
|
rb_ractor_local_storage_value_mark,
|
|
NULL,
|
|
};
|
|
|
|
rb_ractor_local_key_t
|
|
rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type *type)
|
|
{
|
|
rb_ractor_local_key_t key = ALLOC(struct rb_ractor_local_key_struct);
|
|
key->type = type ? type : &ractor_local_storage_type_null;
|
|
key->main_cache = (void *)Qundef;
|
|
return key;
|
|
}
|
|
|
|
rb_ractor_local_key_t
|
|
rb_ractor_local_storage_value_newkey(void)
|
|
{
|
|
return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value);
|
|
}
|
|
|
|
void
|
|
rb_ractor_local_storage_delkey(rb_ractor_local_key_t key)
|
|
{
|
|
RB_VM_LOCK_ENTER();
|
|
{
|
|
if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) {
|
|
freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4;
|
|
REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa);
|
|
}
|
|
freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key;
|
|
}
|
|
RB_VM_LOCK_LEAVE();
|
|
}
|
|
|
|
static bool
|
|
ractor_local_ref(rb_ractor_local_key_t key, void **pret)
|
|
{
|
|
if (rb_ractor_main_p()) {
|
|
if ((VALUE)key->main_cache != Qundef) {
|
|
*pret = key->main_cache;
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
else {
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
|
|
if (cr->local_storage && st_lookup(cr->local_storage, (st_data_t)key, (st_data_t *)pret)) {
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
ractor_local_set(rb_ractor_local_key_t key, void *ptr)
|
|
{
|
|
rb_ractor_t *cr = GET_RACTOR();
|
|
|
|
if (cr->local_storage == NULL) {
|
|
cr->local_storage = st_init_numtable();
|
|
}
|
|
|
|
st_insert(cr->local_storage, (st_data_t)key, (st_data_t)ptr);
|
|
|
|
if (rb_ractor_main_p()) {
|
|
key->main_cache = ptr;
|
|
}
|
|
}
|
|
|
|
VALUE
|
|
rb_ractor_local_storage_value(rb_ractor_local_key_t key)
|
|
{
|
|
VALUE val;
|
|
if (ractor_local_ref(key, (void **)&val)) {
|
|
return val;
|
|
}
|
|
else {
|
|
return Qnil;
|
|
}
|
|
}
|
|
|
|
bool
|
|
rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key, VALUE *val)
|
|
{
|
|
if (ractor_local_ref(key, (void **)val)) {
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_local_storage_value_set(rb_ractor_local_key_t key, VALUE val)
|
|
{
|
|
ractor_local_set(key, (void *)val);
|
|
}
|
|
|
|
void *
|
|
rb_ractor_local_storage_ptr(rb_ractor_local_key_t key)
|
|
{
|
|
void *ret;
|
|
if (ractor_local_ref(key, &ret)) {
|
|
return ret;
|
|
}
|
|
else {
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
void
|
|
rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key, void *ptr)
|
|
{
|
|
ractor_local_set(key, ptr);
|
|
}
|
|
|
|
#define DEFAULT_KEYS_CAPA 0x10
|
|
|
|
void
|
|
rb_ractor_finish_marking(void)
|
|
{
|
|
for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
|
|
ruby_xfree(freed_ractor_local_keys.keys[i]);
|
|
}
|
|
freed_ractor_local_keys.cnt = 0;
|
|
if (freed_ractor_local_keys.capa > DEFAULT_KEYS_CAPA) {
|
|
freed_ractor_local_keys.capa = DEFAULT_KEYS_CAPA;
|
|
REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA);
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_local_value(rb_execution_context_t *ec, VALUE self, VALUE sym)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ID id = rb_check_id(&sym);
|
|
struct rb_id_table *tbl = cr->idkey_local_storage;
|
|
VALUE val;
|
|
|
|
if (id && tbl && rb_id_table_lookup(tbl, id, &val)) {
|
|
return val;
|
|
}
|
|
else {
|
|
return Qnil;
|
|
}
|
|
}
|
|
|
|
static VALUE
|
|
ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE val)
|
|
{
|
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
|
ID id = SYM2ID(rb_to_symbol(sym));
|
|
struct rb_id_table *tbl = cr->idkey_local_storage;
|
|
|
|
if (tbl == NULL) {
|
|
tbl = cr->idkey_local_storage = rb_id_table_create(2);
|
|
}
|
|
rb_id_table_insert(tbl, id, val);
|
|
return val;
|
|
}
|
|
|
|
#include "ractor.rbinc"
|