From a9a7f4d8b8ec30abc7a47ce700edc7209ae12279 Mon Sep 17 00:00:00 2001 From: Koichi Sasada Date: Tue, 8 Dec 2020 14:04:18 +0900 Subject: [PATCH] Ractor#receive_if to receive only matched messages Instead of Ractor.receive, Ractor.receive_if can provide a pattern by a block and you can choose the receiving message. [Feature #17378] --- bootstraptest/test_ractor.rb | 58 +++++++ ractor.c | 312 +++++++++++++++++++++++++++++++---- ractor.rb | 41 +++++ ractor_core.h | 8 +- 4 files changed, 381 insertions(+), 38 deletions(-) diff --git a/bootstraptest/test_ractor.rb b/bootstraptest/test_ractor.rb index b13ecbe3e9..cde0f92962 100644 --- a/bootstraptest/test_ractor.rb +++ b/bootstraptest/test_ractor.rb @@ -100,6 +100,64 @@ assert_equal 'ok', %q{ r.take } +# Ractor#receive_if can filter the message +assert_equal '[2, 3, 1]', %q{ + r = Ractor.new Ractor.current do |main| + main << 1 + main << 2 + main << 3 + end + a = [] + a << Ractor.receive_if{|msg| msg == 2} + a << Ractor.receive_if{|msg| msg == 3} + a << Ractor.receive +} + +# Ractor#receive_if with break +assert_equal '[2, [1, :break], 3]', %q{ + r = Ractor.new Ractor.current do |main| + main << 1 + main << 2 + main << 3 + end + + a = [] + a << Ractor.receive_if{|msg| msg == 2} + a << Ractor.receive_if{|msg| break [msg, :break]} + a << Ractor.receive +} + +# Ractor#receive_if can't be called recursively +assert_equal '[[:e1, 1], [:e2, 2]]', %q{ + r = Ractor.new Ractor.current do |main| + main << 1 + main << 2 + main << 3 + end + + a = [] + + Ractor.receive_if do |msg| + begin + Ractor.receive + rescue Ractor::Error + a << [:e1, msg] + end + true # delete 1 from queue + end + + Ractor.receive_if do |msg| + begin + Ractor.receive_if{} + rescue Ractor::Error + a << [:e2, msg] + end + true # delete 2 from queue + end + + a # +} + ### ### # Ractor still has several memory corruption so skip huge number of tests diff --git a/ractor.c b/ractor.c index f7be7daea9..d0f5185225 100644 --- a/ractor.c +++ b/ractor.c @@ -168,13 +168,15 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) return rb_ractor_status_p(r, status); } +static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i); + static void ractor_queue_mark(struct rb_ractor_queue *rq) { for (int i=0; icnt; i++) { - int idx = (rq->start + i) % rq->size; - rb_gc_mark(rq->baskets[idx].v); - rb_gc_mark(rq->baskets[idx].sender); + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + rb_gc_mark(b->v); + rb_gc_mark(b->sender); } } @@ -191,6 +193,8 @@ ractor_mark(void *ptr) rb_gc_mark(r->sync.wait.taken_basket.sender); rb_gc_mark(r->sync.wait.yielded_basket.v); rb_gc_mark(r->sync.wait.yielded_basket.sender); + rb_gc_mark(r->receiving_mutex); + rb_gc_mark(r->loc); rb_gc_mark(r->name); rb_gc_mark(r->r_stdin); @@ -317,33 +321,90 @@ ractor_queue_setup(struct rb_ractor_queue *rq) rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); } +static struct rb_ractor_basket * +ractor_queue_at(struct rb_ractor_queue *rq, int i) +{ + return &rq->baskets[(rq->start + i) % rq->size]; +} + +static void +ractor_queue_advance(struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(GET_RACTOR()); + + if (rq->reserved_cnt == 0) { + rq->cnt--; + rq->start = (rq->start + 1) % rq->size; + rq->serial++; + } + else { + ractor_queue_at(rq, 0)->type = basket_type_deleted; + } +} + +static bool +ractor_queue_skip_p(struct rb_ractor_queue *rq, int i) +{ + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + return b->type == basket_type_deleted || + b->type == basket_type_reserved; +} + +static void +ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(r); + + while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) { + ractor_queue_advance(rq); + } +} + static bool ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) { ASSERT_ractor_locking(r); - return rq->cnt == 0; + + if (rq->cnt == 0) { + return true; + } + + ractor_queue_compact(r, rq); + + for (int i=0; icnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + return false; + } + } + + return true; } static bool ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { - bool b; + bool found = false; RACTOR_LOCK(r); { if (!ractor_queue_empty_p(r, rq)) { - *basket = rq->baskets[rq->start]; - rq->cnt--; - rq->start = (rq->start + 1) % rq->size; - b = true; - } - else { - b = false; + for (int i=0; icnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + *basket = *b; + + // remove from queue + b->type = basket_type_deleted; + ractor_queue_compact(r, rq); + found = true; + break; + } + } } } RACTOR_UNLOCK(r); - return b; + return found; } static void @@ -373,24 +434,29 @@ ractor_basket_clear(struct rb_ractor_basket *b) static VALUE ractor_reset_belonging(VALUE obj); // in this file static VALUE -ractor_basket_accept(struct rb_ractor_basket *b) +ractor_basket_value(struct rb_ractor_basket *b) { - VALUE v; - switch (b->type) { case basket_type_ref: - VM_ASSERT(rb_ractor_shareable_p(b->v)); - v = b->v; break; case basket_type_copy: case basket_type_move: case basket_type_will: - v = ractor_reset_belonging(b->v); + b->type = basket_type_ref; + b->v = ractor_reset_belonging(b->v); break; default: rb_bug("unreachable"); } + return b->v; +} + +static VALUE +ractor_basket_accept(struct rb_ractor_basket *b) +{ + VALUE v = ractor_basket_value(b); + if (b->exception) { VALUE cause = v; VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); @@ -404,12 +470,22 @@ ractor_basket_accept(struct rb_ractor_basket *b) return v; } +static void +ractor_recursive_receive_if(rb_ractor_t *r) +{ + if (r->receiving_mutex && rb_mutex_locked_p(r->receiving_mutex)) { + rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); + } +} + static VALUE ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) { struct rb_ractor_queue *rq = &r->sync.incoming_queue; struct rb_ractor_basket basket; + ractor_recursive_receive_if(r); + if (ractor_queue_deq(r, rq, &basket) == false) { if (r->sync.incoming_port_closed) { rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); @@ -616,31 +692,195 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) } } -static VALUE -ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r) +static void +ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr) { - VM_ASSERT(r == rb_ec_ractor_ptr(ec)); + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); + ractor_recursive_receive_if(cr); + + RACTOR_LOCK(cr); + { + if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) { + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_receiving; + cr->sync.wait.wakeup_status = wakeup_none; + ractor_sleep(ec, cr); + cr->sync.wait.wakeup_status = wakeup_none; + } + } + RACTOR_UNLOCK(cr); +} + +static VALUE +ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) +{ + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); VALUE v; - while ((v = ractor_try_receive(ec, r)) == Qundef) { - RACTOR_LOCK(r); - { - if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) { - VM_ASSERT(r->sync.wait.status == wait_none); - r->sync.wait.status = wait_receiving; - r->sync.wait.wakeup_status = wakeup_none; - - ractor_sleep(ec, r); - - r->sync.wait.wakeup_status = wakeup_none; - } - } - RACTOR_UNLOCK(r); + while ((v = ractor_try_receive(ec, cr)) == Qundef) { + ractor_receive_wait(ec, cr); } return v; } +#if 0 +// for debug +static const char * +basket_type_name(enum rb_ractor_basket_type type) +{ + switch (type) { +#define T(t) case basket_type_##t: return #t + T(none); + T(ref); + T(copy); + T(move); + T(will); + T(deleted); + T(reserved); + default: rb_bug("unreachable"); + } +} + +static void +rq_dump(struct rb_ractor_queue *rq) +{ + bool bug = false; + for (int i=0; icnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); + if (b->type == basket_type_reserved) bug = true; + } + if (bug) rb_bug("!!"); +} +#endif + +struct receive_block_data { + rb_ractor_t *cr; + struct rb_ractor_queue *rq; + VALUE v; + int index; + bool success; +}; + +static void +ractor_receive_if_lock(rb_ractor_t *cr) +{ + VALUE m = cr->receiving_mutex; + if (m == Qfalse) { + m = cr->receiving_mutex = rb_mutex_new(); + } + rb_mutex_lock(m); +} + +static VALUE +receive_if_body(VALUE ptr) +{ + struct receive_block_data *data = (struct receive_block_data *)ptr; + + ractor_receive_if_lock(data->cr); + VALUE block_result = rb_yield(data->v); + + RACTOR_LOCK_SELF(data->cr); + { + struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); + VM_ASSERT(b->type == basket_type_reserved); + data->rq->reserved_cnt--; + + if (RTEST(block_result)) { + b->type = basket_type_deleted; + ractor_queue_compact(data->cr, data->rq); + } + else { + b->type = basket_type_ref; + } + } + RACTOR_UNLOCK_SELF(data->cr); + + data->success = true; + + if (RTEST(block_result)) { + return data->v; + } + else { + return Qundef; + } +} + +static VALUE +receive_if_ensure(VALUE v) +{ + struct receive_block_data *data = (struct receive_block_data *)v; + + if (!data->success) { + RACTOR_LOCK_SELF(data->cr); + { + struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); + VM_ASSERT(b->type == basket_type_reserved); + b->type = basket_type_deleted; + data->rq->reserved_cnt--; + } + RACTOR_UNLOCK_SELF(data->cr); + } + + rb_mutex_unlock(data->cr->receiving_mutex); + return Qnil; +} + +static VALUE +ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) +{ + if (!RTEST(b)) rb_raise(rb_eArgError, "no block given"); + + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + unsigned int serial = (unsigned int)-1; + int index = 0; + struct rb_ractor_queue *rq = &cr->sync.incoming_queue; + + while (1) { + VALUE v = Qundef; + + ractor_receive_wait(ec, cr); + + RACTOR_LOCK_SELF(cr); + { + if (serial != rq->serial) { + serial = rq->serial; + index = 0; + } + + // check newer version + for (int i=index; icnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + v = ractor_basket_value(b); + b->type = basket_type_reserved; + rq->reserved_cnt++; + index = i; + break; + } + } + } + RACTOR_UNLOCK_SELF(cr); + + if (v != Qundef) { + struct receive_block_data data = { + .cr = cr, + .rq = rq, + .v = v, + .index = index, + .success = false, + }; + + VALUE result = rb_ensure(receive_if_body, (VALUE)&data, + receive_if_ensure, (VALUE)&data); + + if (result != Qundef) return result; + index++; + } + } +} + static void ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) { diff --git a/ractor.rb b/ractor.rb index c3a8f27532..b5b2677a2b 100644 --- a/ractor.rb +++ b/ractor.rb @@ -102,6 +102,47 @@ class Ractor end alias recv receive + # Receive only a specific message. + # + # Instead of Ractor.receive, Ractor.receive_if can provide a pattern + # by a block and you can choose the receiving message. + # + # # Example: + # r = Ractor.new do + # p Ractor.receive_if{|msg| /foo/ =~ msg} #=> "foo3" + # p Ractor.receive_if{|msg| /bar/ =~ msg} #=> "bar1" + # p Ractor.receive_if{|msg| /baz/ =~ msg} #=> "baz2" + # end + # r << "bar1" + # r << "baz2" + # r << "foo3" + # r.take + # + # If the block returns truthy, the message will be removed from incoming queue + # and return this method with the message. + # When the block is escaped by break/return/exception and so on, the message also + # removed from the incoming queue. + # Otherwise, the messsage is remained in the incoming queue and check next received + # message by the given block. + # + # If there is no messages in the incoming queue, wait until arrival of other messages. + # + # Note that you can not call receive/receive_if in the given block recursively. + # It means that you should not do any tasks in the block. + # + # # Example: + # Ractor.current << true + # Ractor.receive_if{|msg| Ractor.receive} + # #=> `receive': can not call receive/receive_if recursively (Ractor::Error) + # + def self.receive_if &b + Primitive.ractor_receive_if b + end + + def receive_if &b + Primitive.ractor_receive_if b + end + # Send a message to a Ractor's incoming queue. # # # Example: diff --git a/ractor_core.h b/ractor_core.h index c97dfcc85f..daa652ebff 100644 --- a/ractor_core.h +++ b/ractor_core.h @@ -14,11 +14,13 @@ enum rb_ractor_basket_type { basket_type_copy, basket_type_move, basket_type_will, + basket_type_deleted, + basket_type_reserved, }; struct rb_ractor_basket { - enum rb_ractor_basket_type type; bool exception; + enum rb_ractor_basket_type type; VALUE v; VALUE sender; }; @@ -28,6 +30,8 @@ struct rb_ractor_queue { int start; int cnt; int size; + unsigned int serial; + unsigned int reserved_cnt; }; struct rb_ractor_waiting_list { @@ -76,7 +80,7 @@ struct rb_ractor_sync { struct rb_ractor_struct { struct rb_ractor_sync sync; - + VALUE receiving_mutex; bool yield_atexit; // vm wide barrier synchronization