1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Add Ractor#receive and Ractor.receive and use it in all places

* Keep Ractor#recv/Ractor.recv as an alias for now.
This commit is contained in:
Benoit Daloze 2020-10-03 14:05:15 +02:00
parent 9eccf0711f
commit bfc1c7205d
Notes: git 2020-10-10 19:48:42 +09:00
6 changed files with 117 additions and 113 deletions

View file

@ -90,10 +90,10 @@ assert_equal 'ok', %q{
} }
# Ractor#send passes an object with copy to a Ractor # Ractor#send passes an object with copy to a Ractor
# and Ractor.recv in the Ractor block can receive the passed value. # and Ractor.receive in the Ractor block can receive the passed value.
assert_equal 'ok', %q{ assert_equal 'ok', %q{
r = Ractor.new do r = Ractor.new do
msg = Ractor.recv msg = Ractor.receive
end end
r.send 'ok' r.send 'ok'
r.take r.take
@ -196,7 +196,7 @@ assert_equal 'ok', %q{
# Raise Ractor::ClosedError when try to send into a closed actor # Raise Ractor::ClosedError when try to send into a closed actor
assert_equal 'ok', %q{ assert_equal 'ok', %q{
r = Ractor.new { Ractor.recv } r = Ractor.new { Ractor.receive }
r.close r.close
begin begin
@ -212,7 +212,7 @@ assert_equal 'ok', %q{
assert_equal 'ok', %q{ assert_equal 'ok', %q{
r = Ractor.new do r = Ractor.new do
Ractor.yield 1 Ractor.yield 1
Ractor.recv Ractor.receive
end end
r.close r.close
@ -228,14 +228,14 @@ assert_equal 'ok', %q{
# Ractor.yield raises Ractor::ClosedError when outgoing port is closed. # Ractor.yield raises Ractor::ClosedError when outgoing port is closed.
assert_equal 'ok', %q{ assert_equal 'ok', %q{
r = Ractor.new Ractor.current do |main| r = Ractor.new Ractor.current do |main|
Ractor.recv Ractor.receive
main << true main << true
Ractor.yield 1 Ractor.yield 1
end end
r.close_outgoing r.close_outgoing
r << true r << true
Ractor.recv Ractor.receive
begin begin
r.take r.take
@ -248,7 +248,7 @@ assert_equal 'ok', %q{
# Raise Ractor::ClosedError when try to send into a ractor with closed incoming port # Raise Ractor::ClosedError when try to send into a ractor with closed incoming port
assert_equal 'ok', %q{ assert_equal 'ok', %q{
r = Ractor.new { Ractor.recv } r = Ractor.new { Ractor.receive }
r.close_incoming r.close_incoming
begin begin
@ -275,7 +275,7 @@ assert_equal '[1, 2]', %q{
assert_equal 'ok', %q{ assert_equal 'ok', %q{
r = Ractor.new do r = Ractor.new do
Ractor.yield 1 Ractor.yield 1
Ractor.recv Ractor.receive
end end
sleep 0.01 # wait for Ractor.yield in r sleep 0.01 # wait for Ractor.yield in r
@ -292,7 +292,7 @@ assert_equal 'ok', %q{
# A ractor with closed outgoing port still can receive messages from incoming port # A ractor with closed outgoing port still can receive messages from incoming port
assert_equal 'ok', %q{ assert_equal 'ok', %q{
r = Ractor.new do r = Ractor.new do
Ractor.recv Ractor.receive
end end
r.close_outgoing r.close_outgoing
@ -305,11 +305,11 @@ assert_equal 'ok', %q{
end end
} }
# multiple Ractors can recv (wait) from one Ractor # multiple Ractors can receive (wait) from one Ractor
assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{ assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{
pipe = Ractor.new do pipe = Ractor.new do
loop do loop do
Ractor.yield Ractor.recv Ractor.yield Ractor.receive
end end
end end
@ -330,7 +330,7 @@ assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{
}.sort }.sort
} }
# Ractor.select also support multiple take, recv and yiled # Ractor.select also support multiple take, receive and yield
assert_equal '[true, true, true]', %q{ assert_equal '[true, true, true]', %q{
RN = 10 RN = 10
CR = Ractor.current CR = Ractor.current
@ -341,29 +341,29 @@ assert_equal '[true, true, true]', %q{
'take' 'take'
end end
} }
recv = [] received = []
take = [] take = []
yiel = [] yielded = []
until rs.empty? until rs.empty?
r, v = Ractor.select(CR, *rs, yield_value: 'yield') r, v = Ractor.select(CR, *rs, yield_value: 'yield')
case r case r
when :recv when :receive
recv << v received << v
when :yield when :yield
yiel << v yielded << v
else else
take << v take << v
rs.delete r rs.delete r
end end
end end
[recv.all?('sendyield'), yiel.all?(nil), take.all?('take')] [received.all?('sendyield'), yielded.all?(nil), take.all?('take')]
} }
# multiple Ractors can send to one Ractor # multiple Ractors can send to one Ractor
assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{ assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{
pipe = Ractor.new do pipe = Ractor.new do
loop do loop do
Ractor.yield Ractor.recv Ractor.yield Ractor.receive
end end
end end
@ -378,7 +378,7 @@ assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{
}.sort }.sort
} }
# an exception in a Ractor will be re-raised at Ractor#recv # an exception in a Ractor will be re-raised at Ractor#receive
assert_equal '[RuntimeError, "ok", true]', %q{ assert_equal '[RuntimeError, "ok", true]', %q{
r = Ractor.new do r = Ractor.new do
raise 'ok' # exception will be transferred receiver raise 'ok' # exception will be transferred receiver
@ -420,7 +420,7 @@ assert_equal 'no _dump_data is defined for class Thread', %q{
assert_equal "ok", %q{ assert_equal "ok", %q{
echo_ractor = Ractor.new do echo_ractor = Ractor.new do
loop do loop do
v = Ractor.recv v = Ractor.receive
Ractor.yield v Ractor.yield v
end end
end end
@ -518,7 +518,7 @@ assert_equal [false, true, false].inspect, %q{
assert_equal 'hello world', %q{ assert_equal 'hello world', %q{
# move # move
r = Ractor.new do r = Ractor.new do
obj = Ractor.recv obj = Ractor.receive
obj << ' world' obj << ' world'
end end
@ -538,7 +538,7 @@ assert_equal 'hello world', %q{
# move example2: Array # move example2: Array
assert_equal '[0, 1]', %q{ assert_equal '[0, 1]', %q{
r = Ractor.new do r = Ractor.new do
ary = Ractor.recv ary = Ractor.receive
ary << 1 ary << 1
end end
@ -746,7 +746,7 @@ assert_equal '[1000, 3]', %q{
assert_equal '[1, 4, 3, 2, 1]', %q{ assert_equal '[1, 4, 3, 2, 1]', %q{
counts = [] counts = []
counts << Ractor.count counts << Ractor.count
ractors = (1..3).map { Ractor.new { Ractor.recv } } ractors = (1..3).map { Ractor.new { Ractor.receive } }
counts << Ractor.count counts << Ractor.count
ractors[0].send('End 0').take ractors[0].send('End 0').take

View file

@ -68,7 +68,7 @@ Ractor helps to write a thread-safe program, but we can make thread-unsafe progr
* Some kind of shareable objects can introduce transactions (STM, for example). However, misusing transactions will generate inconsistent state. * Some kind of shareable objects can introduce transactions (STM, for example). However, misusing transactions will generate inconsistent state.
Without Ractor, we need to trace all of state-mutations to debug thread-safety issues. Without Ractor, we need to trace all of state-mutations to debug thread-safety issues.
With Ractor, you can concentrate to suspicious With Ractor, you can concentrate to suspicious
## Creation and termination ## Creation and termination
@ -96,7 +96,7 @@ The Ractor execute given `expr` in a given block.
Given block will be isolated from outer scope by `Proc#isolate`. Given block will be isolated from outer scope by `Proc#isolate`.
```ruby ```ruby
# To prevent sharing unshareable objects between ractors, # To prevent sharing unshareable objects between ractors,
# block outer-variables, `self` and other information are isolated. # block outer-variables, `self` and other information are isolated.
# Given block will be isolated by `Proc#isolate` method. # Given block will be isolated by `Proc#isolate` method.
# `Proc#isolate` is called at Ractor creation timing (`Ractor.new` is called) # `Proc#isolate` is called at Ractor creation timing (`Ractor.new` is called)
@ -133,7 +133,7 @@ r.take #=> 'ok'
```ruby ```ruby
# almost similar to the last example # almost similar to the last example
r = Ractor.new do r = Ractor.new do
msg = Ractor.recv msg = Ractor.receive
msg msg
end end
r.send 'ok' r.send 'ok'
@ -180,22 +180,22 @@ end
Communication between Ractors is achieved by sending and receiving messages. Communication between Ractors is achieved by sending and receiving messages.
* (1) Message sending/receiving * (1) Message sending/receiving
* (1-1) push type send/recv (sender knows receiver). similar to the Actor model. * (1-1) push type send/receive (sender knows receiver). similar to the Actor model.
* (1-2) pull type yield/take (receiver knows sender). * (1-2) pull type yield/take (receiver knows sender).
* (2) Using shareable container objects (not implemented yet) * (2) Using shareable container objects (not implemented yet)
Users can control blocking on (1), but should not control on (2) (only manage as critical section). Users can control blocking on (1), but should not control on (2) (only manage as critical section).
* (1-1) send/recv (push type) * (1-1) send/receive (push type)
* `Ractor#send(obj)` (`Ractor#<<(obj)` is an aliases) send a message to the Ractor's incoming port. Incoming port is connected to the infinite size incoming queue so `Ractor#send` will never block. * `Ractor#send(obj)` (`Ractor#<<(obj)` is an aliases) send a message to the Ractor's incoming port. Incoming port is connected to the infinite size incoming queue so `Ractor#send` will never block.
* `Ractor.recv` dequeue a message from its own incoming queue. If the incoming queue is empty, `Ractor.recv` calling will block. * `Ractor.receive` dequeue a message from its own incoming queue. If the incoming queue is empty, `Ractor.receive` calling will block.
* (1-2) yield/take (pull type) * (1-2) yield/take (pull type)
* `Ractor.yield(obj)` send an message to a Ractor which are calling `Ractor#take` via outgoing port . If no Ractors are waiting for it, the `Ractor.yield(obj)` will block. If multiple Ractors are waiting for `Ractor.yield(obj)`, only one Ractor can receive the message. * `Ractor.yield(obj)` send an message to a Ractor which are calling `Ractor#take` via outgoing port . If no Ractors are waiting for it, the `Ractor.yield(obj)` will block. If multiple Ractors are waiting for `Ractor.yield(obj)`, only one Ractor can receive the message.
* `Ractor#take` receives a message which is waiting by `Ractor.yield(obj)` method from the specified Ractor. If the Ractor does not call `Ractor.yield` yet, the `Ractor#take` call will block. * `Ractor#take` receives a message which is waiting by `Ractor.yield(obj)` method from the specified Ractor. If the Ractor does not call `Ractor.yield` yet, the `Ractor#take` call will block.
* `Ractor.select()` can wait for the success of `take`, `yield` and `recv`. * `Ractor.select()` can wait for the success of `take`, `yield` and `receive`.
* You can close the incoming port or outgoing port. * You can close the incoming port or outgoing port.
* You can close then with `Ractor#close_incoming` and `Ractor#close_outgoing`. * You can close then with `Ractor#close_incoming` and `Ractor#close_outgoing`.
* If the incoming port is closed for a Ractor, you can't `send` to the Ractor. If `Ractor.recv` is blocked for the closed incoming port, then it will raise an exception. * If the incoming port is closed for a Ractor, you can't `send` to the Ractor. If `Ractor.receive` is blocked for the closed incoming port, then it will raise an exception.
* If the outgoing port is closed for a Ractor, you can't call `Ractor#take` and `Ractor.yield` on the Ractor. If `Ractor#take` is blocked for the Ractor, then it will raise an exception. * If the outgoing port is closed for a Ractor, you can't call `Ractor#take` and `Ractor.yield` on the Ractor. If `Ractor#take` is blocked for the Ractor, then it will raise an exception.
* When a Ractor is terminated, the Ractor's ports are closed. * When a Ractor is terminated, the Ractor's ports are closed.
* There are 3 methods to send an object as a message * There are 3 methods to send an object as a message
@ -216,11 +216,11 @@ Each Ractor has _incoming-port_ and _outgoing-port_. Incoming-port is connected
r.send(obj) ->*->[incoming queue] Ractor.yield(obj) ->*-> r.take r.send(obj) ->*->[incoming queue] Ractor.yield(obj) ->*-> r.take
| | | | | |
| v | | v |
| Ractor.recv | | Ractor.receive |
+-------------------------------------------+ +-------------------------------------------+
Connection example: r2.send obj on r1、Ractor.recv on r2 Connection example: r2.send obj on r1、Ractor.receive on r2
+----+ +----+ +----+ +----+
* r1 |-----* r2 * * r1 |-----* r2 *
+----+ +----+ +----+ +----+
@ -245,7 +245,7 @@ Connection example: Ractor.yield(obj) on r1 and r2,
```ruby ```ruby
r = Ractor.new do r = Ractor.new do
msg = Ractor.recv # Receive from r's incoming queue msg = Ractor.receive # Receive from r's incoming queue
msg # send back msg as block return value msg # send back msg as block return value
end end
r.send 'ok' # Send 'ok' to r's incoming port -> incoming queue r.send 'ok' # Send 'ok' to r's incoming port -> incoming queue
@ -253,10 +253,10 @@ Connection example: Ractor.yield(obj) on r1 and r2,
``` ```
```ruby ```ruby
# Actual argument 'ok' for `Ractor.new()` will be send to created Ractor. # Actual argument 'ok' for `Ractor.new()` will be send to created Ractor.
r = Ractor.new 'ok' do |msg| r = Ractor.new 'ok' do |msg|
# Values for formal parameters will be received from incoming queue. # Values for formal parameters will be received from incoming queue.
# Similar to: msg = Ractor.recv # Similar to: msg = Ractor.receive
msg # Return value of the given block will be sent via outgoing port msg # Return value of the given block will be sent via outgoing port
end end
@ -304,7 +304,7 @@ Complex example:
```ruby ```ruby
pipe = Ractor.new do pipe = Ractor.new do
loop do loop do
Ractor.yield Ractor.recv Ractor.yield Ractor.receive
end end
end end
@ -333,7 +333,7 @@ Multiple Ractors can send to one Ractor.
pipe = Ractor.new do pipe = Ractor.new do
loop do loop do
Ractor.yield Ractor.recv Ractor.yield Ractor.receive
end end
end end
@ -358,7 +358,7 @@ TODO: `select` syntax of go-language uses round-robin technique to make fair sch
* `Ractor#close_incoming/outgoing` close incoming/outgoing ports (similar to `Queue#close`). * `Ractor#close_incoming/outgoing` close incoming/outgoing ports (similar to `Queue#close`).
* `Ractor#close_incoming` * `Ractor#close_incoming`
* `r.send(obj) ` where `r`'s incoming port is closed, will raise an exception. * `r.send(obj) ` where `r`'s incoming port is closed, will raise an exception.
* When the incoming queue is empty and incoming port is closed, `Ractor.recv` raise an exception. If the incoming queue is not empty, it dequeues an object. * When the incoming queue is empty and incoming port is closed, `Ractor.receive` raise an exception. If the incoming queue is not empty, it dequeues an object.
* `Ractor#close_outgoing` * `Ractor#close_outgoing`
* `Ractor.yield` on a Ractor which closed the outgoing port, it will raise an exception. * `Ractor.yield` on a Ractor which closed the outgoing port, it will raise an exception.
* `Ractor#take` for a Ractor which closed the outgoing port, it will raise an exception. If `Ractor#take` is blocking, it will raise an exception. * `Ractor#take` for a Ractor which closed the outgoing port, it will raise an exception. If `Ractor#take` is blocking, it will raise an exception.
@ -411,7 +411,7 @@ r = Ractor.new obj do |msg|
# return received msg's object_id # return received msg's object_id
msg.object_id msg.object_id
end end
obj.object_id == r.take #=> false obj.object_id == r.take #=> false
``` ```
@ -438,7 +438,7 @@ If the source Ractor touches the moved object (for example, call the method like
```ruby ```ruby
# move with Ractor#send # move with Ractor#send
r = Ractor.new do r = Ractor.new do
obj = Ractor.recv obj = Ractor.receive
obj << ' world' obj << ' world'
end end
@ -468,7 +468,7 @@ end
str = r.take str = r.take
begin begin
r.take r.take
rescue Ractor::RemoteError rescue Ractor::RemoteError
p str #=> "hello" p str #=> "hello"
end end
@ -480,7 +480,7 @@ Now only `T_FILE`, `T_STRING` and `T_ARRAY` objects are supported.
* `T_STRING` (`String`): support to send a huge string without copying (fast). * `T_STRING` (`String`): support to send a huge string without copying (fast).
* `T_ARRAY` (`Array'): support to send a huge Array without re-allocating the array's buffer. However, all of the referred objects from the array should be moved, so it is not so fast. * `T_ARRAY` (`Array'): support to send a huge Array without re-allocating the array's buffer. However, all of the referred objects from the array should be moved, so it is not so fast.
To achieve the access prohibition for moved objects, _class replacement_ technique is used to implement it. To achieve the access prohibition for moved objects, _class replacement_ technique is used to implement it.
### Shareable objects ### Shareable objects
@ -500,7 +500,7 @@ Implementation: Now shareable objects (`RVALUE`) have `FL_SHAREABLE` flag. This
```ruby ```ruby
r = Ractor.new do r = Ractor.new do
while v = Ractor.recv while v = Ractor.receive
Ractor.yield v Ractor.yield v
end end
end end
@ -659,19 +659,19 @@ RN = 1000
CR = Ractor.current CR = Ractor.current
r = Ractor.new do r = Ractor.new do
p Ractor.recv p Ractor.receive
CR << :fin CR << :fin
end end
RN.times{ RN.times{
r = Ractor.new r do |next_r| r = Ractor.new r do |next_r|
next_r << Ractor.recv next_r << Ractor.receive
end end
} }
p :setup_ok p :setup_ok
r << 1 r << 1
p Ractor.recv p Ractor.receive
``` ```
### Fork-join ### Fork-join
@ -706,7 +706,7 @@ require 'prime'
pipe = Ractor.new do pipe = Ractor.new do
loop do loop do
Ractor.yield Ractor.recv Ractor.yield Ractor.receive
end end
end end
@ -750,22 +750,22 @@ p r3.take #=> 'r1r2r3'
``` ```
```ruby ```ruby
# pipeline with send/recv # pipeline with send/receive
r3 = Ractor.new Ractor.current do |cr| r3 = Ractor.new Ractor.current do |cr|
cr.send Ractor.recv + 'r3' cr.send Ractor.receive + 'r3'
end end
r2 = Ractor.new r3 do |r3| r2 = Ractor.new r3 do |r3|
r3.send Ractor.recv + 'r2' r3.send Ractor.receive + 'r2'
end end
r1 = Ractor.new r2 do |r2| r1 = Ractor.new r2 do |r2|
r2.send Ractor.recv + 'r1' r2.send Ractor.receive + 'r1'
end end
r1 << 'r0' r1 << 'r0'
p Ractor.recv #=> "r0r1r2r3" p Ractor.receive #=> "r0r1r2r3"
``` ```
### Supervise ### Supervise
@ -776,12 +776,12 @@ p Ractor.recv #=> "r0r1r2r3"
r = Ractor.current r = Ractor.current
(1..10).map{|i| (1..10).map{|i|
r = Ractor.new r, i do |r, i| r = Ractor.new r, i do |r, i|
r.send Ractor.recv + "r#{i}" r.send Ractor.receive + "r#{i}"
end end
} }
r.send "r0" r.send "r0"
p Ractor.recv #=> "r0r10r9r8r7r6r5r4r3r2r1" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
``` ```
```ruby ```ruby
@ -791,7 +791,7 @@ r = Ractor.current
rs = (1..10).map{|i| rs = (1..10).map{|i|
r = Ractor.new r, i do |r, i| r = Ractor.new r, i do |r, i|
loop do loop do
msg = Ractor.recv msg = Ractor.receive
raise if /e/ =~ msg raise if /e/ =~ msg
r.send msg + "r#{i}" r.send msg + "r#{i}"
end end
@ -799,10 +799,10 @@ rs = (1..10).map{|i|
} }
r.send "r0" r.send "r0"
p Ractor.recv #=> "r0r10r9r8r7r6r5r4r3r2r1" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0" r.send "r0"
p Ractor.select(*rs, Ractor.current) #=> [:recv, "r0r10r9r8r7r6r5r4r3r2r1"] p Ractor.select(*rs, Ractor.current) #=> [:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
[:recv, "r0r10r9r8r7r6r5r4r3r2r1"] [:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
r.send "e0" r.send "e0"
p Ractor.select(*rs, Ractor.current) p Ractor.select(*rs, Ractor.current)
#=> #=>
@ -826,7 +826,7 @@ r = Ractor.current
rs = (1..10).map{|i| rs = (1..10).map{|i|
r = Ractor.new r, i do |r, i| r = Ractor.new r, i do |r, i|
loop do loop do
msg = Ractor.recv msg = Ractor.receive
raise if /e/ =~ msg raise if /e/ =~ msg
r.send msg + "r#{i}" r.send msg + "r#{i}"
end end
@ -834,10 +834,10 @@ rs = (1..10).map{|i|
} }
r.send "r0" r.send "r0"
p Ractor.recv #=> "r0r10r9r8r7r6r5r4r3r2r1" p Ractor.receive #=> "r0r10r9r8r7r6r5r4r3r2r1"
r.send "r0" r.send "r0"
p Ractor.select(*rs, Ractor.current) p Ractor.select(*rs, Ractor.current)
[:recv, "r0r10r9r8r7r6r5r4r3r2r1"] [:receive, "r0r10r9r8r7r6r5r4r3r2r1"]
msg = 'e0' msg = 'e0'
begin begin
r.send msg r.send msg
@ -857,7 +857,7 @@ end
def make_ractor r, i def make_ractor r, i
Ractor.new r, i do |r, i| Ractor.new r, i do |r, i|
loop do loop do
msg = Ractor.recv msg = Ractor.receive
raise if /e/ =~ msg raise if /e/ =~ msg
r.send msg + "r#{i}" r.send msg + "r#{i}"
end end
@ -879,5 +879,5 @@ rescue Ractor::RemoteError
retry retry
end end
#=> [:recv, "x0r9r9r8r7r6r5r4r3r2r1"] #=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]
``` ```

View file

@ -503,7 +503,7 @@ ractor_copy_setup(struct rb_ractor_basket *b, VALUE obj)
} }
static VALUE static VALUE
ractor_try_recv(rb_execution_context_t *ec, rb_ractor_t *r) ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
{ {
struct rb_ractor_queue *rq = &r->incoming_queue; struct rb_ractor_queue *rq = &r->incoming_queue;
struct rb_ractor_basket basket; struct rb_ractor_basket basket;
@ -553,13 +553,13 @@ wait_status_str(enum ractor_wait_status wait_status)
{ {
switch ((int)wait_status) { switch ((int)wait_status) {
case wait_none: return "none"; case wait_none: return "none";
case wait_recving: return "recving"; case wait_receiving: return "receiving";
case wait_taking: return "taking"; case wait_taking: return "taking";
case wait_yielding: return "yielding"; case wait_yielding: return "yielding";
case wait_recving|wait_taking: return "recving|taking"; case wait_receiving|wait_taking: return "receiving|taking";
case wait_recving|wait_yielding: return "recving|yielding"; case wait_receiving|wait_yielding: return "receiving|yielding";
case wait_taking|wait_yielding: return "taking|yielding"; case wait_taking|wait_yielding: return "taking|yielding";
case wait_recving|wait_taking|wait_yielding: return "recving|taking|yielding"; case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
} }
rb_bug("unrechable"); rb_bug("unrechable");
} }
@ -715,18 +715,18 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
} }
static VALUE static VALUE
ractor_recv(rb_execution_context_t *ec, rb_ractor_t *r) ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r)
{ {
VM_ASSERT(r == rb_ec_ractor_ptr(ec)); VM_ASSERT(r == rb_ec_ractor_ptr(ec));
VALUE v; VALUE v;
while ((v = ractor_try_recv(ec, r)) == Qundef) { while ((v = ractor_try_receive(ec, r)) == Qundef) {
RACTOR_LOCK(r); RACTOR_LOCK(r);
{ {
if (ractor_queue_empty_p(r, &r->incoming_queue)) { if (ractor_queue_empty_p(r, &r->incoming_queue)) {
VM_ASSERT(r->wait.status == wait_none); VM_ASSERT(r->wait.status == wait_none);
VM_ASSERT(r->wait.wakeup_status == wakeup_none); VM_ASSERT(r->wait.wakeup_status == wakeup_none);
r->wait.status = wait_recving; r->wait.status = wait_receiving;
ractor_sleep(ec, r); ractor_sleep(ec, r);
@ -752,7 +752,7 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_
} }
else { else {
ractor_queue_enq(r, rq, b); ractor_queue_enq(r, rq, b);
if (ractor_wakeup(r, wait_recving, wakeup_by_send)) { if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) {
RUBY_DEBUG_LOG("wakeup", 0); RUBY_DEBUG_LOG("wakeup", 0);
} }
} }
@ -890,7 +890,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
struct ractor_select_action { struct ractor_select_action {
enum ractor_select_action_type { enum ractor_select_action_type {
ractor_select_action_take, ractor_select_action_take,
ractor_select_action_recv, ractor_select_action_receive,
ractor_select_action_yield, ractor_select_action_yield,
} type; } type;
VALUE v; VALUE v;
@ -906,9 +906,9 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
VALUE v = rs[i]; VALUE v = rs[i];
if (v == crv) { if (v == crv) {
actions[i].type = ractor_select_action_recv; actions[i].type = ractor_select_action_receive;
actions[i].v = Qnil; actions[i].v = Qnil;
wait_status |= wait_recving; wait_status |= wait_receiving;
} }
else if (rb_ractor_p(v)) { else if (rb_ractor_p(v)) {
actions[i].type = ractor_select_action_take; actions[i].type = ractor_select_action_take;
@ -949,10 +949,10 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
goto cleanup; goto cleanup;
} }
break; break;
case ractor_select_action_recv: case ractor_select_action_receive:
v = ractor_try_recv(ec, cr); v = ractor_try_receive(ec, cr);
if (v != Qundef) { if (v != Qundef) {
*ret_r = ID2SYM(rb_intern("recv")); *ret_r = ID2SYM(rb_intern("receive"));
ret = v; ret = v;
goto cleanup; goto cleanup;
} }
@ -988,7 +988,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
ractor_register_taking(r, cr); ractor_register_taking(r, cr);
break; break;
case ractor_select_action_yield: case ractor_select_action_yield:
case ractor_select_action_recv: case ractor_select_action_receive:
break; break;
} }
} }
@ -1009,7 +1009,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
goto skip_sleep; goto skip_sleep;
} }
break; break;
case ractor_select_action_recv: case ractor_select_action_receive:
if (cr->incoming_queue.cnt > 0) { if (cr->incoming_queue.cnt > 0) {
RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->incoming_queue.cnt); RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->incoming_queue.cnt);
cr->wait.wakeup_status = wakeup_by_retry; cr->wait.wakeup_status = wakeup_by_retry;
@ -1052,7 +1052,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
r = RACTOR_PTR(actions[i].v); r = RACTOR_PTR(actions[i].v);
ractor_waiting_list_del(r, &r->taking_ractors, cr); ractor_waiting_list_del(r, &r->taking_ractors, cr);
break; break;
case ractor_select_action_recv: case ractor_select_action_receive:
case ractor_select_action_yield: case ractor_select_action_yield:
break; break;
} }
@ -1072,7 +1072,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
break; break;
case wakeup_by_send: case wakeup_by_send:
// OK. // OK.
// retry loop and try_recv will succss. // retry loop and try_receive will succss.
break; break;
case wakeup_by_yield: case wakeup_by_yield:
// take was succeeded! // take was succeeded!
@ -1145,7 +1145,7 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
if (!r->incoming_port_closed) { if (!r->incoming_port_closed) {
prev = Qfalse; prev = Qfalse;
r->incoming_port_closed = true; r->incoming_port_closed = true;
if (ractor_wakeup(r, wait_recving, wakeup_by_close)) { if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
VM_ASSERT(r->incoming_queue.cnt == 0); VM_ASSERT(r->incoming_queue.cnt == 0);
RUBY_DEBUG_LOG("cancel receiving", 0); RUBY_DEBUG_LOG("cancel receiving", 0);
} }
@ -1442,10 +1442,10 @@ rb_ractor_atexit_exception(rb_execution_context_t *ec)
} }
void void
rb_ractor_recv_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
{ {
for (int i=0; i<len; i++) { for (int i=0; i<len; i++) {
ptr[i] = ractor_recv(ec, r); ptr[i] = ractor_receive(ec, r);
} }
} }

View file

@ -53,10 +53,10 @@ struct rb_ractor_struct {
struct ractor_wait { struct ractor_wait {
enum ractor_wait_status { enum ractor_wait_status {
wait_none = 0x00, wait_none = 0x00,
wait_recving = 0x01, wait_receiving = 0x01,
wait_taking = 0x02, wait_taking = 0x02,
wait_yielding = 0x04, wait_yielding = 0x04,
} status; } status;
enum ractor_wakeup_status { enum ractor_wakeup_status {
@ -132,7 +132,7 @@ VALUE rb_ractor_self(const rb_ractor_t *g);
void rb_ractor_atexit(rb_execution_context_t *ec, VALUE result); void rb_ractor_atexit(rb_execution_context_t *ec, VALUE result);
void rb_ractor_atexit_exception(rb_execution_context_t *ec); void rb_ractor_atexit_exception(rb_execution_context_t *ec);
void rb_ractor_teardown(rb_execution_context_t *ec); void rb_ractor_teardown(rb_execution_context_t *ec);
void rb_ractor_recv_parameters(rb_execution_context_t *ec, rb_ractor_t *g, int len, VALUE *ptr); void rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *g, int len, VALUE *ptr);
void rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *g, VALUE args); void rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *g, VALUE args);
VALUE rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc); // defined in thread.c VALUE rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc); // defined in thread.c

View file

@ -1,7 +1,7 @@
class Ractor class Ractor
# Create a new Ractor with args and a block. # Create a new Ractor with args and a block.
# args are passed via incoming channel. # args are passed via incoming channel.
# A block (Proc) will be isolated (can't acccess to outer variables) # A block (Proc) will be isolated (can't access to outer variables)
# #
# A ractor has default two channels: # A ractor has default two channels:
# an incoming channel and an outgoing channel. # an incoming channel and an outgoing channel.
@ -15,22 +15,22 @@ class Ractor
# and other # and other
# #
# r = Ractor.new do # r = Ractor.new do
# Ractor.recv # recv via r's mailbox => 1 # Ractor.receive # receive via r's mailbox => 1
# Ractor.recv # recv via r's mailbox => 2 # Ractor.receive # receive via r's mailbox => 2
# Ractor.yield 3 # yield a message (3) and wait for taking by another ractor. # Ractor.yield 3 # yield a message (3) and wait for taking by another ractor.
# 'ok' # the return value will be yielded. # 'ok' # the return value will be yielded.
# # and r's incoming/outgoing ports are closed automatically. # # and r's incoming/outgoing ports are closed automatically.
# end # end
# r.send 1 # send a message (1) into r's mailbox. # r.send 1 # send a message (1) into r's mailbox.
# r << 2 # << is an alias of `send`. # r << 2 # << is an alias of `send`.
# p r.take # take a message from r's outgoing port #=> 3 # p r.take # take a message from r's outgoing port => 3
# p r.take # => 'ok' # p r.take # => 'ok'
# p r.take # raise Ractor::ClosedError # p r.take # raise Ractor::ClosedError
# #
# other options: # other options:
# name: Ractor's name # name: Ractor's name
# #
def self.new *args, name: nil, &block def self.new(*args, name: nil, &block)
b = block # TODO: builtin bug b = block # TODO: builtin bug
raise ArgumentError, "must be called with a block" unless block raise ArgumentError, "must be called with a block" unless block
loc = caller_locations(1, 1).first loc = caller_locations(1, 1).first
@ -59,18 +59,18 @@ class Ractor
# #
# r, obj = Ractor.select(r1, r2, Ractor.current) # r, obj = Ractor.select(r1, r2, Ractor.current)
# #=> wait for taking from r1 or r2 # #=> wait for taking from r1 or r2
# # or recv from incoming queue # # or receive from incoming queue
# # If recv is succeed, then obj is received value # # If receive is succeed, then obj is received value
# # and r is :recv (Ractor.current) # # and r is :receive (Ractor.current)
# #
# r, obj = Ractor.select(r1, r2, Ractor.current, yield_value: obj) # r, obj = Ractor.select(r1, r2, Ractor.current, yield_value: obj)
# #=> wait for taking from r1 or r2 # #=> wait for taking from r1 or r2
# # or recv from incoming queue # # or receive from incoming queue
# # or yield (Ractor.yield) obj # # or yield (Ractor.yield) obj
# # If yield is succeed, then obj is nil # # If yield is succeed, then obj is nil
# # and r is :yield # # and r is :yield
# #
def self.select *ractors, yield_value: yield_unspecified = true, move: false def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
__builtin_cstmt! %q{ __builtin_cstmt! %q{
const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors); const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors);
VALUE rv; VALUE rv;
@ -82,34 +82,40 @@ class Ractor
end end
# Receive an incoming message from Ractor's incoming queue. # Receive an incoming message from Ractor's incoming queue.
def self.recv def self.receive
__builtin_cexpr! %q{ __builtin_cexpr! %q{
ractor_recv(ec, rb_ec_ractor_ptr(ec)) ractor_receive(ec, rb_ec_ractor_ptr(ec))
} }
end end
private def recv class << self
alias recv receive
end
private def receive
__builtin_cexpr! %q{ __builtin_cexpr! %q{
// TODO: check current actor // TODO: check current actor
ractor_recv(ec, RACTOR_PTR(self)) ractor_receive(ec, RACTOR_PTR(self))
} }
end end
alias recv receive
# Send a message to a Ractor's incoming queue. # Send a message to a Ractor's incoming queue.
# #
# # Example: # # Example:
# r = Ractor.new do # r = Ractor.new do
# p Ractor.recv #=> 'ok' # p Ractor.receive #=> 'ok'
# end # end
# r.send 'ok' # send to r's incoming queue. # r.send 'ok' # send to r's incoming queue.
def send obj, move: false def send(obj, move: false)
__builtin_cexpr! %q{ __builtin_cexpr! %q{
ractor_send(ec, RACTOR_PTR(self), obj, move) ractor_send(ec, RACTOR_PTR(self), obj, move)
} }
end end
alias << send
# yield a message to the ractor's outgoing port. # yield a message to the ractor's outgoing port.
def self.yield obj, move: false def self.yield(obj, move: false)
__builtin_cexpr! %q{ __builtin_cexpr! %q{
ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
} }
@ -126,8 +132,6 @@ class Ractor
} }
end end
alias << send
def inspect def inspect
loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }

View file

@ -720,7 +720,7 @@ thread_do_start_proc(rb_thread_t *th)
VM_ASSERT(FIXNUM_P(args)); VM_ASSERT(FIXNUM_P(args));
args_len = FIX2INT(args); args_len = FIX2INT(args);
args_ptr = ALLOCA_N(VALUE, args_len); args_ptr = ALLOCA_N(VALUE, args_len);
rb_ractor_recv_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr); rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
vm_check_ints_blocking(th->ec); vm_check_ints_blocking(th->ec);
// kick thread // kick thread