From a5a30244053e926419c2c66003e24409fc0d436e Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 7 Feb 2019 22:00:54 +0100 Subject: [PATCH] Add ErlangActor implementation --- docs-source/erlang_actor.in.md | 145 ++ docs-source/erlang_actor.init.rb | 7 + docs-source/erlang_actor.out.md | 132 ++ lib-edge/concurrent-edge.rb | 1 + lib-edge/concurrent/actor/reference.rb | 3 + lib-edge/concurrent/edge/channel.rb | 6 +- lib-edge/concurrent/edge/erlang_actor.rb | 1374 ++++++++++++++++++ lib-edge/concurrent/edge/processing_actor.rb | 24 +- lib-edge/concurrent/edge/promises.rb | 2 +- spec/concurrent/edge/erlang_actor_spec.rb | 976 +++++++++++++ spec/concurrent/promises_spec.rb | 11 + support/yard_full_types.rb | 3 +- 12 files changed, 2668 insertions(+), 16 deletions(-) create mode 100644 docs-source/erlang_actor.in.md create mode 100644 docs-source/erlang_actor.init.rb create mode 100644 docs-source/erlang_actor.out.md create mode 100644 lib-edge/concurrent/edge/erlang_actor.rb create mode 100644 spec/concurrent/edge/erlang_actor_spec.rb diff --git a/docs-source/erlang_actor.in.md b/docs-source/erlang_actor.in.md new file mode 100644 index 00000000..0b8f0097 --- /dev/null +++ b/docs-source/erlang_actor.in.md @@ -0,0 +1,145 @@ +## Examples + +The simplest example is to use the actor as an asynchronous execution. +Although, `Promises.future { 1 + 1 }` is better suited for that purpose. + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'addition') { 1 + 1 } +actor.terminated.value! +``` + +Let's send some messages and maintain some internal state +which is what actors are good for. + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'sum') do + sum = 0 # internal state + # receive and sum the messages until the actor gets :done + while true + message = receive + break if message == :done + # if the message is asked and not only told, + # reply with a current sum + reply sum += message + end + sum +end +``` + +The actor can be either told a message asynchronously, +or asked. The ask method will block until actor replies. + +```ruby +# tell returns immediately returning the actor +actor.tell(1).tell(1) +# blocks, waiting for the answer +actor.ask 10 +# stop the actor +actor.tell :done +actor.terminated.value! +``` + +### Receiving + +Simplest message receive. + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_thread) { receive } +actor.tell :m +actor.terminated.value! +``` + +which also works for actor on pool, +because if no block is given it will use a default block `{ |v| v }` + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_pool) { receive { |v| v } } +# can simply be following +actor = Concurrent::ErlangActor.spawn(:on_pool) { receive } +actor.tell :m +actor.terminated.value! +``` + +TBA + +### Actor types + +There are two types of actors. +The type is specified when calling spawn as a first argument, +`Concurrent::ErlangActor.spawn(:on_thread, ...` or +`Concurrent::ErlangActor.spawn(:on_pool, ...`. + +The main difference is in how receive method returns. + +- `:on_thread` it blocks the thread until message is available, + then it returns or calls the provided block first. + +- However, `:on_pool` it has to free up the thread on the receive + call back to the pool. Therefore the call to receive ends the + execution of current scope. The receive has to be given block + or blocks that act as a continuations and are called + when there is message available. + +Let's have a look at how the bodies of actors differ between the types: + +```ruby +ping = Concurrent::ErlangActor.spawn(:on_thread) { reply receive } +ping.ask 42 +``` + +It first calls receive, which blocks the thread of the actor. +When it returns the received message is passed an an argument to reply, +which replies the same value back to the ask method. +Then the actor terminates normally, because there is nothing else to do. + +However when running on pool a block with code which should be evaluated +after the message is received has to be provided. + +```ruby +ping = Concurrent::ErlangActor.spawn(:on_pool) { receive { |m| reply m } } +ping.ask 42 +``` + +It starts by calling receive which will remember the given block for later +execution when a message is available and stops executing the current scope. +Later when a message becomes available the previously provided block is given +the message and called. The result of the block is the final value of the +normally terminated actor. + +The direct blocking style of `:on_thread` is simpler to write and more straight +forward however it has limitations. Each `:on_thread` actor creates a Thread +taking time and resources. +There is also a limited number of threads the Ruby process can create +so you may hit the limit and fail to create more threads and therefore actors. + +Since the `:on_pool` actor runs on a poll of threads, its creations +is faster and cheaper and it does not create new threads. +Therefore there is no limit (only RAM) on how many actors can be created. + +To simplify, if you need only few actors `:on_thread` is fine. +However if you will be creating hundreds of actors or +they will be short-lived `:on_pool` should be used. + +### Erlang behaviour + +The actor matches Erlang processes in behaviour. +Therefore it supports the usual Erlang actor linking, monitoring, exit behaviour, etc. + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_thread) do + spawn(link: true) do # equivalent of spawn_link in Erlang + terminate :err # equivalent of exit in Erlang + end + trap # equivalent of process_flag(trap_exit, true) + receive +end +actor.terminated.value! +``` + +### TODO + +* receives +* More erlang behaviour examples +* Back pressure with bounded mailbox +* _op methods +* types of actors diff --git a/docs-source/erlang_actor.init.rb b/docs-source/erlang_actor.init.rb new file mode 100644 index 00000000..359538e5 --- /dev/null +++ b/docs-source/erlang_actor.init.rb @@ -0,0 +1,7 @@ +require 'concurrent-edge' + +def do_stuff(*args) + sleep 0.01 + :stuff +end + diff --git a/docs-source/erlang_actor.out.md b/docs-source/erlang_actor.out.md new file mode 100644 index 00000000..9c564587 --- /dev/null +++ b/docs-source/erlang_actor.out.md @@ -0,0 +1,132 @@ +## Examples + +The simplest example is to use the actor as an asynchronous execution. +Although, `Promises.future { 1 + 1 }` is better suited for that purpose. + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'addition') { 1 + 1 } +# => # +actor.terminated.value! # => 2 +``` + +Let's send some messages and maintain some internal state +which is what actors are good for. + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_thread, name: 'sum') do + sum = 0 # internal state + # receive and sum the messages until the actor gets :done + while true + message = receive + break if message == :done + # if the message is asked and not only told, + # reply with a current sum + reply sum += message + end + sum +end +# => # +``` + +The actor can be either told a message asynchronously, +or asked. The ask method will block until actor replies. + +```ruby +# tell returns immediately returning the actor +actor.tell(1).tell(1) +# => # +# blocks, waiting for the answer +actor.ask 10 # => 12 +# stop the actor +actor.tell :done +# => # +actor.terminated.value! # => 12 +``` + +### Actor types + +There are two types of actors. +The type is specified when calling spawn as a first argument, +`Concurrent::ErlangActor.spawn(:on_thread, ...` or +`Concurrent::ErlangActor.spawn(:on_pool, ...`. + +The main difference is in how receive method returns. + +- `:on_thread` it blocks the thread until message is available, + then it returns or calls the provided block first. + +- However, `:on_pool` it has to free up the thread on the receive + call back to the pool. Therefore the call to receive ends the + execution of current scope. The receive has to be given block + or blocks that act as a continuations and are called + when there is message available. + +Let's have a look at how the bodies of actors differ between the types: + +```ruby +ping = Concurrent::ErlangActor.spawn(:on_thread) { reply receive } +# => # +ping.ask 42 # => 42 +``` + +It first calls receive, which blocks the thread of the actor. +When it returns the received message is passed an an argument to reply, +which replies the same value back to the ask method. +Then the actor terminates normally, because there is nothing else to do. + +However when running on pool a block with code which should be evaluated +after the message is received has to be provided. + +```ruby +ping = Concurrent::ErlangActor.spawn(:on_pool) { receive { |m| reply m } } +# => # +ping.ask 42 # => 42 +``` + +It starts by calling receive which will remember the given block for later +execution when a message is available and stops executing the current scope. +Later when a message becomes available the previously provided block is given +the message and called. The result of the block is the final value of the +normally terminated actor. + +The direct blocking style of `:on_thread` is simpler to write and more straight +forward however it has limitations. Each `:on_thread` actor creates a Thread +taking time and resources. +There is also a limited number of threads the Ruby process can create +so you may hit the limit and fail to create more threads and therefore actors. + +Since the `:on_pool` actor runs on a poll of threads, its creations +is faster and cheaper and it does not create new threads. +Therefore there is no limit (only RAM) on how many actors can be created. + +To simplify, if you need only few actors `:on_thread` is fine. +However if you will be creating hundreds of actors or +they will be short-lived `:on_pool` should be used. + +### Erlang behaviour + +The actor matches Erlang processes in behaviour. +Therefore it supports the usual Erlang actor linking, monitoring, exit behaviour, etc. + +```ruby +actor = Concurrent::ErlangActor.spawn(:on_thread) do + spawn(link: true) do # equivalent of spawn_link in Erlang + terminate :err # equivalent of exit in Erlang + end + trap # equivalent of process_flag(trap_exit, true) + receive +end +# => # +actor.terminated.value! +# => #, +# @link_terminated=true, +# @reason=:err> +``` + +### TODO + +* More erlang behaviour examples +* Back pressure with bounded mailbox +* _op methods +* types of actors diff --git a/lib-edge/concurrent-edge.rb b/lib-edge/concurrent-edge.rb index 690f8a56..fae660f5 100644 --- a/lib-edge/concurrent-edge.rb +++ b/lib-edge/concurrent-edge.rb @@ -13,3 +13,4 @@ require 'concurrent/edge/throttle' require 'concurrent/edge/channel' require 'concurrent/edge/processing_actor' +require 'concurrent/edge/erlang_actor' diff --git a/lib-edge/concurrent/actor/reference.rb b/lib-edge/concurrent/actor/reference.rb index 508831ec..7ba57431 100644 --- a/lib-edge/concurrent/actor/reference.rb +++ b/lib-edge/concurrent/actor/reference.rb @@ -55,6 +55,9 @@ module Concurrent message message, future end + # @!visibility privated + alias_method :ask_op, :ask + # Sends the message synchronously and blocks until the message # is processed. Raises on error. # diff --git a/lib-edge/concurrent/edge/channel.rb b/lib-edge/concurrent/edge/channel.rb index 98359b76..080d0526 100644 --- a/lib-edge/concurrent/edge/channel.rb +++ b/lib-edge/concurrent/edge/channel.rb @@ -49,6 +49,10 @@ module Concurrent def any.===(other) true end + + def any.to_s + 'ANY' + end end # Create channel. @@ -164,7 +168,7 @@ module Concurrent # # @!macro channel.warn.blocks # @!macro channel.param.timeout - # @!macro promises.param.timeout_value + # @param [Object] timeout_value a value returned by the method when it times out # @return [Object, nil] message or nil when timed out def pop(timeout = nil, timeout_value = nil) pop_matching ANY, timeout, timeout_value diff --git a/lib-edge/concurrent/edge/erlang_actor.rb b/lib-edge/concurrent/edge/erlang_actor.rb new file mode 100644 index 00000000..3691ccea --- /dev/null +++ b/lib-edge/concurrent/edge/erlang_actor.rb @@ -0,0 +1,1374 @@ +if Concurrent.ruby_version :<, 2, 1, 0 + raise 'ErlangActor requires at least ruby version 2.1' +end + +module Concurrent + + # This module provides actor abstraction that has same behaviour as Erlang actor. + # + # {include:file:docs-source/erlang_actor.out.md} + module ErlangActor + + # TODO (pitr-ch 04-Feb-2019): mode documentation. + # TODO (pitr-ch 21-Jan-2019): actor on promises should not call blocking calls like mailbox.pop or tell + # it's fine for a actor on thread and event based though + # TODO (pitr-ch 16-Jan-2019): crate environments to run the body of the actors in + # TODO (pitr-ch 17-Jan-2019): blocking actor should react to signals? + # e.g. override sleep to wait for signal with a given timeout? + # what about other blocking stuff + # def sleep(time) + # raise NotImplementedError + # end + # + # def sleep(time) + # raise NotImplementedError + # finish = Concurrent.monotonic_time + time + # while true + # now = Concurrent.monotonic_time + # if now < finish + # message = @Mailbox.pop_matching(AbstractSignal, finish - now) + # else + # end + # end + # end + # TODO (pitr-ch 06-Feb-2019): should actors be weakly linked from pid so they can be freed from memory? + # TODO (pitr-ch 28-Jan-2019): improve matching support, take inspiration and/or port Algebrick ideas, push ANY and similar further up the namespace + + + # The public reference of the actor which can be stored and passed around. + # Nothing else of the actor should be exposed. + # {ErlangActor.spawn} and {Environment#spawn} return pid. + class Pid < Synchronization::Object + + # The actor is asynchronously told a message. + # The method returns immediately unless + # the actor has bounded mailbox and there is no more space for the message. + # Then the method blocks current thread until there is space available. + # This is useful for backpressure. + # + # @param [Object] message + # @param [Numeric] timeout the maximum time in second to wait + # @return [self, true, false] + # self if timeout was nil, false on timing out and true if told in time. + def tell(message, timeout = nil) + @Actor.tell message, timeout + end + + # Same as {#tell} but represented as a {Promises::Future}. + # @param [Object] message + # @return [Promises::Future(self)] + def tell_op(message) + @Actor.tell_op(message) + end + + # The actor is asked the message and blocks until a reply is available, + # which is returned by the method. + # If the reply is a rejection then the methods raises it. + # @param [Object] message + # @param [Numeric] timeout the maximum time in second to wait + # @param [Object] timeout_value the value returned on timeout + # @return [Object, timeout_value] reply to the message + def ask(message, timeout = nil, timeout_value = nil) + # TODO (pitr-ch 06-Feb-2019): document timout interaction with reply + @Actor.ask message, timeout, timeout_value + end + + # Same as {#tell} but represented as a {Promises::Future}. + # @param [Object] message + # @param [Promises::ResolvableFuture] probe + # a resolvable future which is resolved with the reply. + # @return [Promises::Future(Object)] reply to the message + def ask_op(message, probe = Promises.resolvable_future) + @Actor.ask_op message, probe + end + + # @!macro erlang_actor.terminated + # @return [Promises::Future] a future which is resolved with + # the final result of the actor that is either the reason for + # termination or a value if terminated normally. + def terminated + @Actor.terminated + end + + # @return [#to_s, nil] optional name of the actor + def name + @Name + end + + # @return [String] string representation + def to_s + # TODO (pitr-ch 06-Feb-2019): consider to add state + original = super + if @Name + format '%s %s>', original[0..-2], @Name + else + original + end + end + + alias_method :inspect, :to_s + + private + + safe_initialization! + + def initialize(actor, name) + @Actor = actor + @Name = name + end + end + + # An object representing instance of a monitor, created with {Environment#monitor}. + class Reference + end + + # A class providing environment and methods for actor bodies to run in. + class Environment < Synchronization::Object + safe_initialization! + + # @!macro erlang_actor.terminated + def terminated + @Actor.terminated + end + + # @return [Pid] the pid of this actor + def pid + @Actor.pid + end + + # @return [#to_s] the name od the actor if provided to spawn method + def name + pid.name + end + + # @return [true, false] does this actor trap exit messages? + # @see http://www1.erlang.org/doc/man/erlang.html#process_flag-2 + def traps? + @Actor.traps? + end + + # When trap is set to true, + # exit signals arriving to a actor are converted to {Exit} messages, + # which can be received as ordinary messages. + # If trap is set to false, + # the actor exits + # if it receives an exit signal other than normal + # and the exit signal is propagated to its linked actors. + # Application actors should normally not trap exits. + # + # @param [true, false] value + # @return [true, false] the old value of the flag + # @see http://www1.erlang.org/doc/man/erlang.html#process_flag-2 + def trap(value = true) + @Actor.trap(value) + end + + # Helper for constructing a {#receive} rules + # @see #receive + # @example + # receive on(Numeric) { |v| v.succ }, + # on(ANY) { terminate :bad_message } + def on(matcher, value = nil, &block) + @Actor.on matcher, value, &block + end + + # Receive a message. + # + # @param [::Array(), ::Array(#===), ::Array<::Array(#===, Proc)>] rules + # * No rule - `receive`, `receive {|m| m.to_s}` + # * or single rule which can be combined with the supplied block - + # `receive(Numeric)`, `receive(Numeric) {|v| v.succ}` + # * or array of matcher-proc pairs - + # `receive on(Numeric) { |v| v*2 }, on(Symbol) { |c| do_command c }` + # @param [Numeric] timeout + # how long it should wait for the message + # @param [Object] timeout_value + # if rule `on(TIMEOUT) { do_something }` is not specified + # then timeout_value is returned. + # @return [Object, nothing] + # depends on type of the actor. + # On thread it blocks until message is available + # then it returns the message (or a result of a called block). + # On pool it stops executing and continues with a given block + # when message becomes available. + # @param [Hash] options + # other options specific by type of the actor + # @option options [true, false] :keep + # Keep the rules and repeatedly call the associated blocks, + # until receive is called again. + # @yield [message] block + # to process the message + # if single matcher is supplied + # @yieldparam [Object] message the received message + def receive(*rules, timeout: nil, timeout_value: nil, **options, &block) + # TODO (pitr-ch 07-Feb-2019): add examples, keep: true, link them from this method + @Actor.receive(*rules, timeout: timeout, timeout_value: timeout_value, **options, &block) + end + + # Creates a link between the calling actor and another actor, + # if there is not such a link already. + # If a actor attempts to create a link to itself, nothing is done. Returns true. + # + # If pid does not exist, + # the behavior of the method depends on + # if the calling actor is trapping exits or not (see {#trap}): + # * If the calling actor is not trapping exits link raises with {NoActor}. + # * Otherwise, if the calling actor is trapping exits, link returns true, + # but an exit signal with reason noproc is sent to the calling actor. + # + # @return [true] + # @raise [NoActor] + # @see http://www1.erlang.org/doc/man/erlang.html#link-1 + def link(pid) + @Actor.link(pid) + end + + # Removes the link, if there is one, + # between the calling actor and the actor referred to by pid. + # + # Returns true and does not fail, even if there is no link to Id, or if Id does not exist. + # + # Once unlink(pid) has returned + # it is guaranteed + # that the link between the caller and the actor referred to by pid + # has no effect on the caller in the future (unless the link is setup again). + # If caller is trapping exits, + # an {Exit} message due to the link might have been placed + # in the caller's message queue prior to the call, though. + # + # Note, the {Exit} message can be the result of the link, + # but can also be the result of calling #terminate method externally. + # Therefore, it may be appropriate to cleanup the message queue + # when trapping exits after the call to unlink, as follow: + # ```ruby + # receive on(And[Exit, -> e { e.pid == pid }], true), timeout: 0 + # ``` + # + # @return [true] + def unlink(pid) + @Actor.unlink(pid) + end + + # @!visibility private + # @return [true, false] + def linked?(pid) + @Actor.linked? pid + end + + # The calling actor starts monitoring actor with given pid. + # + # A {Down} message will be sent to the monitoring actor + # if the actor with given pid dies, + # or if the actor with given pid does not exist. + # + # The monitoring is turned off either + # when the {Down} message is sent, or when {#demonitor} is called. + # + # Making several calls to monitor for the same pid is not an error; + # it results in as many, completely independent, monitorings. + # + # @return [Reference] + def monitor(pid) + @Actor.monitor(pid) + end + + # If MonitorRef is a reference which the calling actor obtained by calling {#monitor}, + # this monitoring is turned off. + # If the monitoring is already turned off, nothing happens. + # + # Once demonitor has returned it is guaranteed that no {Down} message + # due to the monitor will be placed in the caller's message queue in the future. + # A {Down} message might have been placed in the caller's message queue prior to the call, though. + # Therefore, in most cases, it is advisable to remove such a 'DOWN' message from the message queue + # after monitoring has been stopped. + # `demonitor(reference, :flush)` can be used if this cleanup is wanted. + # + # The behavior of this method can be viewed as two combined operations: + # asynchronously send a "demonitor signal" to the monitored actor and + # ignore any future results of the monitor. + # + # Failure: It is an error if reference refers to a monitoring started by another actor. + # In that case it may raise an ArgumentError or go unnoticed. + # + # Options: + # * `:flush` - Remove (one) {Down} message, + # if there is one, from the caller's message queue after monitoring has been stopped. + # Calling `demonitor(pid, :flush)` is equivalent to the following, but more efficient: + # ```ruby + # demonitor(pid) + # receive on(And[Down, -> d { d.reference == reference}], true), timeout: 0, timeout_value: true + # ``` + # + # * `info` + # The returned value is one of the following: + # + # - `true` - The monitor was found and removed. + # In this case no {Down} message due to this monitor have been + # nor will be placed in the message queue of the caller. + # - `false` - The monitor was not found and could not be removed. + # This probably because someone already has placed a {Down} message + # corresponding to this monitor in the caller's message queue. + # + # If the info option is combined with the flush option, + # `false` will be returned if a flush was needed; otherwise, `true`. + # + # @param [Reference] reference + # @param [:flush, :info] options + # @return [true, false] + def demonitor(reference, *options) + @Actor.demonitor(reference, *options) + end + + # @!visibility private + def monitoring?(reference) + @Actor.monitoring? reference + end + + # Creates an actor. + # + # @param [:on_thread, :on_pool] type + # of the actor to be created. + # @param [Channel] channel + # The mailbox of the actor, by default it has unlimited capacity. + # Crating the actor with a bounded queue is useful to create backpressure. + # @param [Environment, Module] environment + # A class which is used to run the body of the actor in. + # It can either be a child of {Environment} or a module. + # Module is extended to a new instance of environment, + # therefore if there is many actors with this module + # it is better to create a class and use it instead. + # @param [#to_s] name of the actor. + # Available by {Pid#name} or {Environment#name} and part of {Pid#to_s}. + # @param [true, false] link + # the created actor is atomically created and linked with the calling actor + # @param [true, false] monitor + # the created actor is atomically created and monitored by the calling actor + # @param [Hash] options + # other options specific by type of the actor + # @option options [ExecutorService] :executor + # The executor service to use to execute the actor on. + # Applies only to :on_pool actor type. + # @yield [] the body of the actor. + # When actor is spawned this block is evaluated + # until it terminates. + # @return [Pid, ::Array(Pid, Reference)] a pid or a pid-reference pair when monitor is true + # @see http://www1.erlang.org/doc/man/erlang.html#spawn-1 + # @see http://www1.erlang.org/doc/man/erlang.html#spawn_link-1 + # @see http://www1.erlang.org/doc/man/erlang.html#spawn_monitor-1 + def spawn(type = @Actor.class, + channel: Promises::Channel.new, + environment: Environment, + name: nil, + link: false, + monitor: false, + **options, + &body) + + @Actor.spawn(type, + channel: channel, + environment: environment, + name: nil, + link: link, + monitor: monitor, + **options, + &body) + end + + # Shortcut for fulfilling the reply, same as `reply_resolution true, value, nil`. + # @example + # actor = Concurrent::ErlangActor.spawn(:on_thread) { reply receive * 2 } + # actor.ask 2 #=> 4 + # @param [Object] value + # @return [true, false] did the sender ask, and was it resolved + def reply(value) + reply_resolution true, value, nil + end + + # Reply to the sender of the message currently being processed + # if the actor was asked instead of told. + # The reply is stored in a {Promises::ResolvableFuture} so the resolvable_args are arguments for + # {Promises::ResolvableFuture#resolve} method. + # @example + # actor = Concurrent::ErlangActor.spawn(:on_thread) { reply_resolution true, receive * 2, nil } + # actor.ask 2 #=> 4 + # + # @param resolve_args see Promises::ResolvableFuture#resolve + # @return [true, false] did the sender ask, and was it resolved + def reply_resolution(*resolve_args) + @Actor.reply_resolution(*resolve_args) + end + + # If pid **is not** provided stops the execution of the calling actor + # with the exit reason. + # + # If pid **is** provided, + # it sends an exit signal with exit reason to the actor identified by pid. + # + # The following behavior apply + # if `reason` is any object except `:normal` or `:kill`. + # If pid is not trapping exits, + # pid itself will exit with exit reason. + # If pid is trapping exits, + # the exit signal is transformed into a message {Exit} + # and delivered to the message queue of pid. + # + # If reason is the Symbol `:normal`, pid will not exit. + # If it is trapping exits, the exit signal is transformed into a message {Exit} + # and delivered to its message queue. + # + # If reason is the Symbol `:kill`, that is if `exit(pid, :kill)` is called, + # an untrappable exit signal is sent to pid which will unconditionally exit + # with exit reason `:killed`. + # + # Since evaluating this function causes the process to terminate, it has no return value. + # + # @param [Pid] pid + # @param [Object, :normal, :kill] reason + # @param [Object] value + # @return [nothing] + # @see http://www1.erlang.org/doc/man/erlang.html#error-1 + # @see http://www1.erlang.org/doc/man/erlang.html#error-2 + def terminate(pid = nil, reason, value: nil) + @Actor.terminate pid, reason, value: value + end + + private + + def initialize(actor) + super() + @Actor = actor + end + end + + # Creates an actor. Same as {Environment#spawn} but lacks link and monitor options. + # @param [:on_thread, :on_pool] type + # @param [Channel] channel + # @param [Environment, Module] environment + # @param [#to_s] name of the actor + # @param [Hash] options + # @option options [ExecutorService] :executor + # @return [Pid] + # @see Environment#spawn + def self.spawn(type, + channel: Promises::Channel.new, + environment: Environment, + name: nil, + **options, + &body) + + actor = create type, channel, environment, name, **options, &body + actor.run + return actor.pid + end + + # Same as {Environment#terminate}, but it requires pid. + # @param [Pid] pid + # @param [Object, :normal, :kill] reason + # @return [true] + def self.terminate(pid, reason) + if reason == :kill + pid.tell Kill.new(nil) + else + pid.tell Exit.new(nil, reason, false) + end + true + end + + extend Concern::Logging + + class Token + def initialize(name) + @name = name + end + + def to_s + @name + end + + alias_method :inspect, :to_s + end + + private_constant :Token + + JUMP = Token.new 'JUMP' + TERMINATE = Token.new 'TERMINATE' + RECEIVE = Token.new 'RECEIVE' + NOTHING = Token.new 'NOTHING' + + private_constant :JUMP + private_constant :TERMINATE + private_constant :RECEIVE + private_constant :NOTHING + + # These constants are useful + # where the body of an actor is defined. + # For convenience they are provided in this module for including. + # @example + # include Concurrent::ErlangActor::EnvironmentConstants + # actor = Concurrent::ErlangActor.spawn(:on_thread) do + # receive on(Numeric) { |v| v.succ }, + # on(ANY) { terminate :bad_message }, + # on(TIMEOUT) { terminate :no_message }, + # timeout: 1 + # end + module EnvironmentConstants + # Unique identifier of a timeout, singleton. + TIMEOUT = Token.new 'TIMEOUT' + # A singleton which matches anything using #=== method + ANY = Promises::Channel::ANY + + class AbstractLogicOperationMatcher + def self.[](*matchers) + new(*matchers) + end + + def initialize(*matchers) + @matchers = matchers + end + end + + # Combines matchers into one which matches if all match. + # @example + # And[Numeric, -> v { v >= 0 }] === 1 # => true + # And[Numeric, -> v { v >= 0 }] === -1 # => false + class And < AbstractLogicOperationMatcher + # @return [true, false] + def ===(v) + @matchers.all? { |m| m === v } + end + end + + # Combines matchers into one which matches if any matches. + # @example + # Or[Symbol, String] === :v # => true + # Or[Symbol, String] === 'v' # => true + # Or[Symbol, String] === 1 # => false + class Or < AbstractLogicOperationMatcher + # @return [true, false] + def ===(v) + @matchers.any? { |m| m === v } + end + end + end + + include EnvironmentConstants + + class Run + attr_reader :future + + def self.[](future) + new future + end + + def initialize(future) + @future = future + end + + TEST = -> v { v.future if v.is_a?(Run) } + end + private_constant :Run + + class AbstractActor < Synchronization::Object + + include EnvironmentConstants + include Concern::Logging + safe_initialization! + + # @param [Promises::Channel] mailbox + def initialize(mailbox, environment, name) + super() + @Mailbox = mailbox + @Pid = Pid.new self, name + @Linked = ::Set.new + @Monitors = {} + @Monitoring = {} + @MonitoringLateDelivery = {} + @Terminated = Promises.resolvable_future + @trap = false + @reply = nil + + @Environment = if environment.is_a?(Class) && environment <= Environment + environment.new self + elsif environment.is_a? Module + e = Environment.new self + e.extend environment + e + else + raise ArgumentError, + "environment has to be a class inheriting from Environment or a module" + end + end + + def tell_op(message) + log Logger::DEBUG, pid, told: message + @Mailbox.push_op(message).then { @Pid } + end + + def tell(message, timeout = nil) + log Logger::DEBUG, pid, told: message + timed_out = @Mailbox.push message, timeout + timeout ? timed_out : @Pid + end + + def ask(message, timeout, timeout_value) + log Logger::DEBUG, pid, asked: message + probe = Promises.resolvable_future + question = Ask.new(message, probe) + if timeout + start = Concurrent.monotonic_time + timed_out = @Mailbox.push question, timeout + return timeout_value if timed_out + to_wait = timeout - (Concurrent.monotonic_time - start) + # TODO (pitr-ch 06-Feb-2019): allow negative timeout everywhere, interpret as 0 + # TODO (pitr-ch 06-Feb-2019): test timeouts for tell and ask method + probe.value! to_wait >= 0 ? to_wait : 0, + timeout_value, + [true, :timed_out, nil] + # TODO (pitr-ch 06-Feb-2019): unify timed out values used to resolve resolvable futures on timing out + else + @Mailbox.push question + probe.value! + end + end + + def ask_op(message, probe) + log Logger::DEBUG, pid, asked: message + @Mailbox.push_op(Ask.new(message, probe)).then { probe }.flat + end + + def terminated + @Terminated.with_hidden_resolvable + end + + def pid + @Pid + end + + def traps? + @trap + end + + def trap(value = true) + old = @trap + @trap = !!value + old + end + + def on(matcher, value = nil, &block) + raise ArgumentError, 'only one of block or value can be supplied' if block && value + [matcher, value || block] + end + + def receive(*rules, timeout: nil, timeout_value: nil, **options, &block) + raise NotImplementedError + end + + def link(pid) + return true if pid == @Pid + if @Linked.add? pid + pid.tell Link.new(@Pid) + if pid.terminated.resolved? + if @trap + tell Exit.new pid, NoActor.new(pid) + else + raise NoActor.new(pid) + end + end + end + true + end + + def unlink(pid) + pid.tell UnLink.new(@Pid) if @Linked.delete pid + true + end + + def linked?(pid) + @Linked.include? pid + end + + def monitor(pid) + # *monitoring* *monitored* + # send Monitor + # terminated? + # terminate + # drain signals + reference = Reference.new + @Monitoring[reference] = pid + if pid.terminated.resolved? + # always return no-proc when terminated + tell Down.new(pid, reference, NoActor.new(pid)) + else + # otherwise let it race + pid.tell Monitor.new(@Pid, reference) + tell Down.new(pid, reference, NoActor.new(pid)) if pid.terminated.resolved? + end + reference + end + + def demonitor(reference, *options) + info = options.delete :info + flush = options.delete :flush + raise ArgumentError, "bad options #{options}" unless options.empty? + + pid = @Monitoring.delete reference + demonitoring = !!pid + pid.tell DeMonitor.new @Pid, reference if demonitoring + + if flush + # remove (one) down message having reference from mailbox + flushed = demonitoring ? !!@Mailbox.try_pop_matching(And[Down, -> m { m.reference == reference }]) : false + return info ? !flushed : true + end + + if info + return false unless demonitoring + + if @Mailbox.peek_matching(And[Down, -> m { m.reference == reference }]) + @MonitoringLateDelivery[reference] = pid # allow to deliver the message once + return false + end + end + + return true + end + + def monitoring?(reference) + @Monitoring.include? reference + end + + def spawn(type, + channel: Promises::Channel.new, + environment:, + name:, + link:, + monitor:, + **options, + &body) + actor = ErlangActor.create type, channel, environment, name, **options, &body + pid = actor.pid + link pid if link + ref = (monitor pid if monitor) + actor.run + monitor ? [pid, ref] : pid + end + + def reply_resolution(*resolve_args) + return false unless @reply + + return @reply.resolve(*resolve_args) + end + + def terminate(pid = nil, reason, value: nil) + if pid + # has to send it to itself even if pid equals self.pid + if reason == :kill + pid.tell Kill.new(@Pid) + else + pid.tell Exit.new(@Pid, reason, false) + end + else + terminate_self(reason, value) + end + end + + private + + def canonical_rules(rules, timeout, timeout_value, given_block) + block = given_block || -> v { v } + case rules.size + when 0 + rules.push(on(ANY, &block)) + when 1 + matcher = rules.first + if matcher.is_a?(::Array) && matcher.size == 2 + return ArgumentError.new 'a block cannot be given if full rules are used' if given_block + else + rules.replace([on(matcher, &block)]) + end + else + return ArgumentError.new 'a block cannot be given if full rules are used' if given_block + end + + if timeout + # TIMEOUT rule has to be first, to prevent any picking it up ANY + has_timeout = nil + i = rules.size + rules.reverse_each do |r, j| + i -= 1 + if r == TIMEOUT + has_timeout = i + break + end + end + + rules.unshift(has_timeout ? rules[has_timeout] : on(TIMEOUT, timeout_value)) + end + nil + end + + def eval_task(message, job) + if job.is_a? Proc + @Environment.instance_exec message, &job + else + job + end + end + + def send_exit_messages(reason) + @Linked.each do |pid| + pid.tell Exit.new(@Pid, reason) + end.clear + @Monitors.each do |reference, pid| + pid.tell Down.new(@Pid, reference, reason) + end.clear + end + + def consume_exit(exit_message) + from, reason = exit_message + if !exit_message.link_terminated || @Linked.delete(from) + case reason + when :normal + if @trap + false + else + if from == @Pid + terminate :normal + else + true # do nothing + end + end + else + if @trap + false # ends up in mailbox + else + terminate reason + end + end + else + # *link* *exiting* + # send Link + # terminate + # terminated? + # drain signals # generates second Exit which is dropped here + # already processed exit message, do nothing + true + end + end + + def consume_ask(message) + if message.is_a? Ask + @reply = message.probe + message.message + else + message + end + end + + def asked? + !!@reply + end + + def clean_reply + if @reply + unless @reply.resolved? + @reply.is_a?(Promises::ResolvableFuture) ? @reply.reject(NoReply) : @reply.resolve + end + @reply = nil + end + end + + def consume_signal(message) + if AbstractSignal === message + case message + when Ask + # never consume, consume_ask takes care of it later + false + when Link + @Linked.add message.from + true + when UnLink + @Linked.delete message.from + true + when Monitor + @Monitors[message.reference] = message.from + true + when DeMonitor + @Monitors.delete message.reference + true + when Kill + terminate :killed + when Down + if @Monitoring.delete(message.reference) || @MonitoringLateDelivery.delete(message.reference) + # put into a queue + return false + end + + # ignore down message if no longer monitoring, and following case + # + # *monitoring* *monitored* + # send Monitor + # terminate + # terminated? + # drain signals # generates second DOWN which is dropped here + # already reported as :noproc + true + when Exit + consume_exit message + else + raise "unknown message #{message}" + end + else + # regular message + false + end + end + + def initial_signal_consumption + while true + message = @Mailbox.try_pop + break unless message + consume_signal message or raise 'it was not consumable signal' + end + end + + def terminate_self(reason, value) + raise NotImplementedError + end + + def after_termination(final_reason) + log Logger::DEBUG, @Pid, terminated: final_reason + clean_reply + while true + message = @Mailbox.try_pop NOTHING + break if message == NOTHING + case message + when Monitor + message.from.tell Down.new(@Pid, message.reference, final_reason) + when Link + message.from.tell Exit.new(@Pid, final_reason) + else + # normal messages and other signals are thrown away + end + end + end + end + + private_constant :AbstractActor + + class OnPool < AbstractActor + + def initialize(channel, environment, name, executor: :io, **options, &body) + raise ArgumentError, "unrecognized options #{options}" unless options.empty? + + # TODO (pitr-ch 06-Feb-2019): the default executor is not configurable, add factory methods + super channel, environment, name + @Executor = executor + @behaviour = [] + @keep_behaviour = false + @Body = body || -> { start } # TODO (pitr-ch 06-Feb-2019): document + end + + def run() + initial_signal_consumption + inner_run(true). + run(Run::TEST). + then(&method(:after_termination)). + rescue { |e| log Logger::ERROR, e } + end + + def receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block) + clean_reply + err = canonical_rules rules, timeout, timeout_value, given_block + raise err if err + + @keep_behaviour = keep + @timeout = timeout + @behaviour = rules + throw JUMP, [RECEIVE] + end + + private + + def start + @Environment.instance_exec(&@Body) + end + + def terminate_self(reason, value) + throw JUMP, [TERMINATE, reason, value] + end + + def inner_run(first = false) + body = -> message, _actor do + kind, reason, value = if message.is_a?(::Array) && message.first == TERMINATE + message + else + begin + catch(JUMP) do + [NOTHING, :normal, first ? start : apply_behaviour(message)] + end + rescue => e + [TERMINATE, e, nil] + end + end + + case kind + when TERMINATE + send_exit_messages reason + @Terminated.resolve(reason == :normal, value, reason) + reason + when RECEIVE + Run[inner_run] + when NOTHING + if @behaviour.empty? + send_exit_messages reason + @Terminated.resolve(reason == :normal, value, reason) + reason + else + Run[inner_run] + end + else + raise "bad kind: #{kind.inspect}" + end + end + + if first + Promises.future_on(@Executor, nil, self, &body) + else + internal_receive.run(Run::TEST).then(self, &body) + end + end + + def internal_receive + raise if @behaviour.empty? + rules_matcher = Or[*@behaviour.map(&:first)] + matcher = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m } + start = nil + message_future = case @timeout + when 0 + Promises.fulfilled_future @Mailbox.try_pop_matching(matcher, TIMEOUT) + when Numeric + pop = @Mailbox.pop_op_matching(matcher) + start = Concurrent.monotonic_time + # FIXME (pitr-ch 30-Jan-2019): the scheduled future should be cancelled + (Promises.schedule(@timeout) { TIMEOUT } | pop).then(pop) do |message, p| + if message == TIMEOUT && !p.resolve(true, TIMEOUT, nil, false) + # timeout raced with probe resolution, take the value instead + p.value + else + message + end + end + when nil + @Mailbox.pop_op_matching(matcher) + else + raise + end + + message_future.then(start) do |message, s| + log Logger::DEBUG, pid, got: message + catch(JUMP) do + if consume_signal(message) + @timeout = [@timeout + s - Concurrent.monotonic_time, 0].max if s + Run[internal_receive] + else + consume_ask(message) + end + end + end + end + + def apply_behaviour(message) + @behaviour.each do |rule, job| + if rule === message + @behaviour = [] unless @keep_behaviour + return eval_task(message, job) + end + end + raise 'should not reach' + end + end + + private_constant :OnPool + + class OnThread < AbstractActor + def initialize(channel, environment, name, **options, &body) + raise ArgumentError, "unrecognized options #{options}" unless options.empty? + super channel, environment, name + @Body = body + @Thread = nil + end + + TERMINATE = Module.new + private_constant :TERMINATE + + def run() + initial_signal_consumption + @Thread = Thread.new(@Terminated, self) do |terminated, _actor| # sync point + Thread.abort_on_exception = true + + final_reason = begin + reason, value = catch(TERMINATE) do + [:normal, @Environment.instance_exec(&@Body)] + end + send_exit_messages reason + terminated.resolve(reason == :normal, value, reason) + reason + rescue => e + send_exit_messages e + terminated.reject e + e + end + + after_termination final_reason + @Thread = nil + end + end + + def receive(*rules, timeout: nil, timeout_value: nil, &given_block) + clean_reply + + err = canonical_rules rules, timeout, timeout_value, given_block + raise err if err + + rules_matcher = Or[*rules.map(&:first)] + matcher = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m } + while true + message = @Mailbox.pop_matching(matcher, timeout, TIMEOUT) + log Logger::DEBUG, pid, got: message + unless consume_signal(message) + message = consume_ask(message) + rules.each do |rule, job| + return eval_task(message, job) if rule === message + end + end + end + end + + private + + def terminate_self(reason, value) + throw TERMINATE, [reason, value] + end + end + + private_constant :OnThread + + # TODO (pitr-ch 06-Feb-2019): document signals and constants + + class AbstractSignal < Synchronization::Object + safe_initialization! + end + + private_constant :AbstractSignal + + class Ask < AbstractSignal + attr_reader :message, :probe + + def initialize(message, probe) + super() + @message = message + @probe = probe + raise ArgumentError, 'probe is not Resolvable' unless probe.is_a? Promises::Resolvable + end + end + + private_constant :Ask + + module HasFrom + + # @return [Pid] + attr_reader :from + + # @!visibility private + def initialize(from) + super() + @from = from + end + + # @return [true, false] + def ==(o) + o.class == self.class && o.from == @from + end + + alias_method :eql?, :== + + # @return [Integer] + def hash + @from.hash + end + end + + # A message send when actor terminates. + class Exit < AbstractSignal + # TODO (pitr-ch 06-Feb-2019): rename to terminated + # TODO (pitr-ch 06-Feb-2019): link_terminated leaks to the user + + include HasFrom + + # @return [Object] + attr_reader :reason + + # @!visibility private + attr_reader :link_terminated + + # @!visibility private + def initialize(from, reason, link_terminated = true) + super from + @reason = reason + @link_terminated = link_terminated + end + + # @return [::Array(Pid, Object)] + def to_ary + [@from, @reason] + end + + # @return [true, false] + def ==(o) + super(o) && o.reason == self.reason + end + + # @return [Integer] + def hash + to_ary.hash + end + end + + class Kill < AbstractSignal + include HasFrom + end + + private_constant :Kill + + class Link < AbstractSignal + include HasFrom + end + + private_constant :Link + + class UnLink < AbstractSignal + include HasFrom + end + + private_constant :UnLink + + module HasReference + include HasFrom + + # @return [Reference] + attr_reader :reference + + # @!visibility private + def initialize(from, reference) + super from + @reference = reference + end + + # @return [::Array(Pid, Reference)] + def to_ary + [@from, @reference] + end + + # @return [true, false] + def ==(o) + super(o) && o.reference == self.reference + end + + # @return [Integer] + def hash + [@from, @reference].hash + end + end + + class Monitor < AbstractSignal + include HasReference + end + + private_constant :Monitor + + class DeMonitor < AbstractSignal + include HasReference + end + + private_constant :DeMonitor + + # A message send by a monitored actor when terminated. + class Down < AbstractSignal + include HasReference + + # @return [Object] + attr_reader :info + + # @!visibility private + def initialize(from, reference, info) + super from, reference + @info = info + end + + # @return [::Array(Pis, Reference, Object)] + def to_ary + [@from, @reference, @info] + end + + # @return [true, false] + def ==(o) + super(o) && o.info == self.info + end + + # @return [Integer] + def hash + to_ary.hash + end + end + + # Abstract error class for ErlangActor errors. + class Error < Concurrent::Error + end + + # An error used when actor tries to link or monitor terminated actor. + class NoActor < Error + # @return [Pid] + attr_reader :pid + + # @param [Pid] pid + # @return [self] + def initialize(pid = nil) + super("No proc with #{pid}") + @pid = pid + end + + # @return [true, false] + def ==(o) + o.class == self.class && o.pid == self.pid + end + + alias_method :eql?, :== + + # @return [Integer] + def hash + pid.hash + end + end + + # An error used when actor is asked but no reply was given or + # when the actor terminates before it gives a reply. + class NoReply < Error + end + + # @!visibility private + def self.create(type, channel, environment, name, **options, &body) + actor = KLASS_MAP.fetch(type).new(channel, environment, name, **options, &body) + ensure + log Logger::DEBUG, actor.pid, created: caller[1] if actor + end + + KLASS_MAP = { + on_thread: OnThread, + on_pool: OnPool, + OnThread => OnThread, + OnPool => OnPool, + } + private_constant :KLASS_MAP + end +end diff --git a/lib-edge/concurrent/edge/processing_actor.rb b/lib-edge/concurrent/edge/processing_actor.rb index 94e79976..a61e433f 100644 --- a/lib-edge/concurrent/edge/processing_actor.rb +++ b/lib-edge/concurrent/edge/processing_actor.rb @@ -15,9 +15,9 @@ module Concurrent # values[-5, 5] # => [49996, 49997, 49998, 49999, 50000] # @!macro warn.edge class ProcessingActor < Synchronization::Object - # TODO (pitr-ch 18-Dec-2016): (un)linking, bidirectional, sends special message, multiple link calls has no effect, - # TODO (pitr-ch 21-Dec-2016): Make terminated a cancellation token? - # link_spawn atomic, Can it be fixed by sending exit when linked dead actor? + + # TODO (pitr-ch 29-Jan-2019): simplify as much as possible, maybe even do not delegate to mailbox, no ask linking etc + # TODO (pitr-ch 03-Feb-2019): remove completely safe_initialization! @@ -60,12 +60,8 @@ module Concurrent # @yieldparam [Object] *args # @yieldreturn [Promises::Future(Object)] a future representing next step of execution # @return [ProcessingActor] - # @example - # # TODO (pitr-ch 19-Jan-2017): actor with limited mailbox def self.act_listening(channel, *args, &process) - actor, _, terminated = ProcessingActor.new channel - Promises.future(actor, *args, &process).run.tangle(terminated) - actor + ProcessingActor.new channel, *args, &process end # # Receives a message when available, used in the actor's process. @@ -162,19 +158,21 @@ module Concurrent end # @return [String] string representation. - def inspect - format '%s termination:%s>', super[0..-2], termination.state + def to_s + format '%s termination: %s>', super[0..-2], termination.state end + alias_method :inspect, :to_s + def to_ary - [self, @Mailbox, @Terminated] + [@Mailbox, @Terminated] end private - def initialize(channel = Promises::Channel.new) + def initialize(channel, *args, &process) @Mailbox = channel - @Terminated = Promises.resolvable_future + @Terminated = Promises.future(self, *args, &process).run super() end diff --git a/lib-edge/concurrent/edge/promises.rb b/lib-edge/concurrent/edge/promises.rb index 2f9217c5..3d65f7ac 100644 --- a/lib-edge/concurrent/edge/promises.rb +++ b/lib-edge/concurrent/edge/promises.rb @@ -12,7 +12,7 @@ module Concurrent # Asks the actor with its value. # @return [Future] new future with the response form the actor def then_ask(actor) - self.then(actor) { |v, a| a.ask(v) }.flat + self.then(actor) { |v, a| a.ask_op(v) }.flat end end diff --git a/spec/concurrent/edge/erlang_actor_spec.rb b/spec/concurrent/edge/erlang_actor_spec.rb new file mode 100644 index 00000000..de9b3bbe --- /dev/null +++ b/spec/concurrent/edge/erlang_actor_spec.rb @@ -0,0 +1,976 @@ +RSpec.describe 'Concurrent' do + describe 'ErlangActor', edge: true do + + shared_examples 'erlang actor' do + # TODO (pitr-ch 06-Feb-2019): include constants instead + ANY ||= Concurrent::ErlangActor::ANY + TIMEOUT ||= Concurrent::ErlangActor::TIMEOUT + identity = -> v { v } + + specify "run to termination" do + expect(Concurrent::ErlangActor.spawn(type) do + :v + end.terminated.value!).to eq :v + end + + specify '#receive' do + id = -> v { v } + succ = -> v { v.succ } + + [[[:v], -> { receive }, :v], + [[:v], -> { receive on(ANY, &id) }, :v], + [[:v, 1], -> { receive Numeric }, 1], + [[:v, 1], -> { receive(Numeric, &succ) }, 2], + + [[:v], -> { receive Numeric, timeout: 0 }, nil], + [[:v], -> { receive(Numeric, timeout: 0, &succ) }, nil], + [[:v], -> { receive Numeric, timeout: 0, timeout_value: :timeout }, :timeout], + [[:v], -> { receive(Numeric, timeout: 0, timeout_value: :timeout, &succ) }, :timeout], + + [[:v, 1], -> { receive Numeric, timeout: 1 }, 1], + [[:v, 1], -> { receive(Numeric, timeout: 1, &succ) }, 2], + [[:v, 1], -> { receive Numeric, timeout: 1, timeout_value: :timeout }, 1], + [[:v, 1], -> { receive(Numeric, timeout: 1, timeout_value: :timeout, &succ) }, 2], + + [[:v], -> { receive on(Numeric, &id), on(TIMEOUT, nil), timeout: 0 }, nil], + [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 0 }, nil], + [[:v], -> { receive on(Numeric, &id), on(TIMEOUT, :timeout), timeout: 0 }, :timeout], + [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 0 }, :timeout], + + [[:v, 1], -> { receive on(Numeric, &id), on(TIMEOUT, nil), timeout: 1 }, 1], + [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 1 }, 2], + [[:v, 1], -> { receive on(Numeric, &id), on(TIMEOUT, :timeout), timeout: 1 }, 1], + [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 1 }, 2], + ].each_with_index do |(messages, body, result), i| + a = Concurrent::ErlangActor.spawn(type, &body) + messages.each { |m| a.tell m } + expect(a.terminated.value!).to eq(result), "body: #{body}" + end + end + + specify 'pid has name' do + actor = Concurrent::ErlangActor.spawn(type, name: 'test') {} + expect(actor.to_s).to match(/test/) + expect(actor.inspect).to match(/test/) + end + + specify "receives message" do + actor = Concurrent::ErlangActor.spawn(type, + &{ on_thread: -> { receive }, + on_pool: -> { receive on(ANY, &identity) } }.fetch(type)) + actor.tell :v + expect(actor.terminated.value!).to eq :v + end + + specify "receives message with matchers" do + body = { on_thread: + -> do + [receive(on(Symbol, &identity)), + receive(on(Numeric, &:succ)), + receive(on(Numeric, :got_it), timeout: 0, timeout_value: :nothing)] + end, + on_pool: + -> do + @arr = [] + receive(on(Symbol) do |v1| + @arr.push v1 + receive(on(Numeric) do |v2| + @arr << v2.succ + receive(on(Numeric, :got_it), on(TIMEOUT) { @arr << :nothing; @arr }, timeout: 0) + end) + end) + end } + actor = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + actor.tell 'junk' + actor.tell 1 + actor.tell :v + expect(actor.terminated.value!).to eq [:v, 2, :nothing] + end + + describe "monitoring" do + specify "(de)monitor" do + body_receive = { on_thread: + -> { receive }, + on_pool: + -> { receive { |v| v } } } + + body = { on_thread: + -> do + actor = receive + reference = monitor actor + monitored = monitoring? reference + demonitor reference + result = [monitored, monitoring?(reference)] + actor.tell :finish + result + end, + on_pool: + -> do + receive do |actor| + reference = monitor actor + monitored = monitoring? reference + demonitor reference + result = [monitored, monitoring?(reference)] + actor.tell :finish + result + end + end } + a1 = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + a2 = Concurrent::ErlangActor.spawn(type, &body_receive.fetch(type)) + a1.tell a2 + expect(a1.terminated.value!).to eq [true, false] + expect(a2.terminated.value!).to eq :finish + end + + specify "demonitor" do + body = { on_thread: + -> do + actor = receive + reference = monitor actor + monitored = monitoring? reference + actor.tell :done + actor.terminated.wait + demonitor = demonitor reference, :flush, :info + [monitored, monitoring?(reference), demonitor, receive(timeout: 0)] + end, + on_pool: + -> do + receive do |actor| + reference = monitor actor + monitored = monitoring? reference + actor.tell :done + actor.terminated.wait + demonitor = demonitor reference, :flush, :info + results = [monitored, monitoring?(reference), demonitor] + receive(on(ANY) { |v| [*results, v] }, + on(TIMEOUT) { [*results, nil] }, + timeout: 0) + end + end } + + a1 = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + body = { on_thread: -> { receive }, + on_pool: -> { receive(&identity) } } + a2 = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + a1.tell a2 + + a1.terminated.wait + expect(a1.terminated.value!).to eq [true, false, false, nil] + expect(a2.terminated.value!).to eq :done + end + + specify "demonitor should leave the down message in the inbox if it's already there" do + body = { on_thread: + -> do + actor = receive + reference = monitor actor + monitored = monitoring? reference + actor.tell :done + actor.terminated.wait + demonitor = demonitor reference, :info + [reference, monitored, monitoring?(reference), demonitor, receive(timeout: 0)] + end, + on_pool: + -> do + receive do |actor| + reference = monitor actor + monitored = monitoring? reference + actor.tell :done + actor.terminated.wait + demonitor = demonitor reference, :info + results = [reference, monitored, monitoring?(reference), demonitor] + receive(on(ANY) { |v| [*results, v] }, + on(TIMEOUT) { [*results, nil] }, + timeout: 0) + end + end } + + a1 = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + body = { on_thread: -> { receive }, + on_pool: -> { receive(&identity) } } + a2 = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + a1.tell a2 + + reference, monitored, monitoring, demonitor, message = a1.terminated.value! + expect(monitored).to eq true + expect(monitoring).to eq false + expect(demonitor).to eq false + expect(message).to eq Concurrent::ErlangActor::Down.new(a2, reference, :normal) + expect(a2.terminated.value!).to eq :done + end + + specify "notifications 1" do + body = { on_thread: + -> do + b = spawn { [:done, receive] } + ref = monitor b + b.tell 42 + [b, ref, receive] + end, + on_pool: + -> do + b = spawn { receive on(ANY) { |v| [:done, v] } } + ref = monitor b + b.tell 42 + receive on(ANY) { |v| [b, ref, v] } + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + b, ref, down = a.terminated.value! + expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, :normal) + expect(b.terminated.value!).to eq [:done, 42] + end + + specify "notifications 2" do + body = { on_thread: + -> do + b = spawn { :done } + b.terminated.wait + ref = monitor b + [b, ref, receive(timeout: 0.01, timeout_value: :timeout)] + end, + on_pool: + -> do + b = spawn { :done } + b.terminated.wait + ref = monitor b + receive(timeout: 0.01) { |v| [b, ref, v] } + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + b, ref, down = a.terminated.value! + expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, Concurrent::ErlangActor::NoActor.new(b)) + expect(b.terminated.value!).to eq :done + end + + # FIXME (pitr-ch 20-Jan-2019): test concurrent exit and monitor(), same for link + end + + describe 'linking' do + body_receive_test_linked = { on_thread: + -> { linked?(receive) }, + on_pool: + -> { receive { |a| linked? a } } } + + specify 'links' do + body1 = { on_thread: + -> do + actor = receive + link actor + linked = linked? actor + actor.tell pid + linked + end, + on_pool: + -> do + receive do |actor| + link actor + linked = linked? actor + actor.tell pid + linked + end + end } + + a1 = Concurrent::ErlangActor.spawn(type, &body1.fetch(type)) + a2 = Concurrent::ErlangActor.spawn(type, &body_receive_test_linked.fetch(type)) + + a1.tell a2 + expect(a1.terminated.value!).to be_truthy + expect(a2.terminated.value!).to be_truthy + end + + specify 'unlinks' do + body1 = { on_thread: + -> do + actor = receive + link actor + unlink actor + linked = linked? actor + actor.tell pid + linked + end, + on_pool: + -> do + receive do |actor| + link actor + unlink actor + linked = linked? actor + actor.tell pid + linked + end + end } + + a1 = Concurrent::ErlangActor.spawn(type, &body1.fetch(type)) + a2 = Concurrent::ErlangActor.spawn(type, &body_receive_test_linked.fetch(type)) + a1.tell a2 + expect(a1.terminated.value!).to be_falsey + expect(a2.terminated.value!).to be_falsey + end + + specify 'link dead' do + a = Concurrent::ErlangActor.spawn(type) do + b = spawn { :done } + b.terminated.wait + link b + end + expect { a.terminated.value! }.to raise_error Concurrent::ErlangActor::NoActor + end + + specify 'link dead when trapping' do + body1 = { on_thread: + -> do + b = spawn { :done } + b.terminated.wait + sleep 0.01 + trap + link b + [b, receive] + end, + on_pool: + -> do + b = spawn { :done } + b.terminated.wait + sleep 0.01 + trap + link b + receive { |v| [b, v] } + end } + + a = Concurrent::ErlangActor.spawn(type, &body1.fetch(type)) + + b, captured = a.terminated.value! + expect(captured).to eq Concurrent::ErlangActor::Exit.new(b, Concurrent::ErlangActor::NoActor.new(b)) + end + + + describe 'exit/1 when linked' do + # https://learnyousomeerlang.com/errors-and-processes#links + specify 1 do + body = { on_thread: + -> do + b = spawn(link: true) { :ok } + [receive(timeout: 0.01), b] + end, + on_pool: + -> do + b = spawn(link: true) { :ok } + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + message, b = a.terminated.value! + expect(message).to eq nil + expect(b.terminated.value!).to eq :ok + end + + specify 2 do + body = { on_thread: + -> do + b = spawn(link: true) { :ok } + trap + [receive, b] + end, + on_pool: + -> do + b = spawn(link: true) { :ok } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + message, b = a.terminated.value! + expect(message).to eq Concurrent::ErlangActor::Exit.new(b, :normal) + expect(b.terminated.value!).to eq :ok + end + + specify 3 do + body = { on_thread: + -> do + spawn(link: true) { terminate :boom } + receive(timeout: 0.01) + end, + on_pool: + -> do + spawn(link: true) { terminate :boom } + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :boom + end + + specify 4 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :boom } + trap + [receive(timeout: 0.01), b] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :boom } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + trapped_exit, b = a.terminated.value! + expect(trapped_exit).to eq Concurrent::ErlangActor::Exit.new(b, :boom) + expect(b.terminated.reason).to eq :boom + end + + specify 5 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + [receive(timeout: 0.01), b] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + message, b = a.terminated.value! + expect(message).to eq nil + expect(b.terminated.value!).to eq :ok + end + + specify 6 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + trap + [receive, b] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + message, b = a.terminated.value! + expect(message).to eq Concurrent::ErlangActor::Exit.new(b, :normal) + expect(b.terminated.value!).to eq :ok + end + + specify 7 do + body = { on_thread: + -> do + spawn(link: true) { raise 'err' } + receive(timeout: 0.01) + end, + on_pool: + -> do + spawn(link: true) { raise 'err' } + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + expect { a.terminated.value! }.to raise_error(RuntimeError, 'err') + end + + specify 8 do + body = { on_thread: + -> do + b = spawn(link: true) { raise 'err' } + trap + [receive(timeout: 0.01), b] + end, + on_pool: + -> do + b = spawn(link: true) { raise 'err' } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + trapped_exit, b = a.terminated.value! + expect(trapped_exit).to be_a Concurrent::ErlangActor::Exit + expect(trapped_exit.from).to eq b + expect(trapped_exit.reason).to eq b.terminated.reason + expect(trapped_exit.reason).to be_a RuntimeError + expect(trapped_exit.reason.message).to eq 'err' + end + + specify 9 do + body = { on_thread: + -> do + b = spawn(link: true) { throw :uncaught } + trap + [receive(timeout: 0.01), b] + end, + on_pool: + -> do + b = spawn(link: true) { throw :uncaught } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + trapped_exit, b = a.terminated.value! + expect(trapped_exit).to be_a Concurrent::ErlangActor::Exit + expect(trapped_exit.from).to eq b + expect(trapped_exit.reason).to eq b.terminated.reason + expect(trapped_exit.reason).to be_a UncaughtThrowError + expect(trapped_exit.reason.message).to eq 'uncaught throw :uncaught' + end + end + + describe 'exit/2 when linked' do + # https://learnyousomeerlang.com/errors-and-processes#links + specify 1 do + body = { on_thread: + -> do + terminate pid, :normal # sends the signal to mailbox + # TODO (pitr-ch 17-Jan-2019): does erlang require receive to process signals? + receive(timeout: 0.01) + :continued + end, + on_pool: + -> do + terminate pid, :normal # sends the signal to mailbox + receive(on(ANY, :continued), + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + expect(a.terminated.value!).to eq nil + end + + specify 2 do + body = { on_thread: + -> do + terminate pid, :normal + trap + receive(timeout: 0.01) + end, + on_pool: + -> do + terminate pid, :normal + trap + receive(on(ANY, &identity), on(TIMEOUT, nil), timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + captured_exit = a.terminated.value! + expect(captured_exit).to eq Concurrent::ErlangActor::Exit.new(a, :normal) + end + + specify 3 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 0.01, timeout_value: :timeout } + terminate b, :normal + b + end, + on_pool: + -> do + b = spawn(link: true) do + receive(on(ANY, :not_happening), + on(TIMEOUT, :timeout), + timeout: 0.01) + end + + terminate b, :normal + b + end } + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + b = a.terminated.value! + expect(b.terminated.value!).to eq :timeout + end + + specify 4 do + body = { on_thread: + -> do + b = spawn(link: true) { trap; receive timeout: 0.01, timeout_value: :timeout } + terminate b, :normal + b + end, + on_pool: + -> do + b = spawn(link: true) do + trap + receive(on(ANY, &identity), + on(TIMEOUT, :timeout), + timeout: 0.01) + end + + terminate b, :normal + b + end } + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + b = a.terminated.value! + expect(b.terminated.value!).to eq Concurrent::ErlangActor::Exit.new(a, :normal) + end + + specify 5 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 0.01; terminate :continued } + terminate b, :normal + trap + [b, receive(timeout: 0.02)] + end, + on_pool: + -> do + b = spawn(link: true) do + receive(on(ANY, :not_happening), + on(TIMEOUT) { terminate :continued }, + timeout: 0.01) + end + + terminate b, :normal + trap + receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + b, captured = a.terminated.value! + expect(b.terminated.reason).to eq :continued + # normal is never send from b to a back + expect(captured).to eq Concurrent::ErlangActor::Exit.new(b, :continued) + end + + specify 6 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 1; :done } + terminate b, :remote_err + receive timeout: 1 + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) } + terminate b, :remote_err + receive(on(ANY) { |v| [b, v] }, + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + a.terminated.wait + expect(a.terminated.reason).to eq :remote_err + end + + specify 7 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 1; :done } + terminate b, :remote_err + trap + [b, receive(timeout: 1)] + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) } + terminate b, :remote_err + trap + receive(on(ANY) { |v| [b, v] }, + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + b, captured = a.terminated.value! + expect(b.terminated.reason).to eq :remote_err + expect(captured.reason).to eq :remote_err + end + + specify 8 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 1; :done } + terminate b, :kill + receive timeout: 1 + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) } + terminate b, :kill + receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :killed + end + + specify 9 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 0.01; :done } + terminate b, :kill + trap + [b, receive(timeout: 0.01)] + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) } + terminate b, :kill + trap + receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + b, captured = a.terminated.value! + expect(b.terminated.reason).to eq :killed + expect(captured.reason).to eq :killed + end + + specify 10 do + body = { on_thread: + -> do + terminate pid, :kill + receive timeout: 0.01 + end, + on_pool: + -> do + terminate pid, :kill + receive(on(ANY, :continued), on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :killed + end + + specify 11 do + body = { on_thread: + -> do + terminate pid, :kill + trap + receive timeout: 0.01 + end, + on_pool: + -> do + terminate pid, :kill + trap + receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :killed + end + + # explained in + # http://erlang.org/pipermail/erlang-questions/2009-October/047241.html + + specify 12 do + body = { on_thread: + -> do + spawn(link: true) { terminate :kill } + receive timeout: 1 + end, + on_pool: + -> do + spawn(link: true) { terminate :kill } + receive(on(ANY, :continued), + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :kill + end + + specify 13 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :kill } + trap + [b, receive(timeout: 1)] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :kill } + trap + receive(on(ANY) { |v| [b, v] }, + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + + b, captured = a.terminated.value! + + expect(b.terminated.reason).to eq :kill + expect(captured).to eq Concurrent::ErlangActor::Exit.new(b, :kill) + end + + end + end + + specify 'spawn(link: true)' do + a = Concurrent::ErlangActor.spawn(type) do + b = spawn(link: true) { :v } + linked? b + end + expect(a.terminated.value!).to be_truthy + + a = Concurrent::ErlangActor.spawn(type) do + b = spawn { :v } + linked? b + end + expect(a.terminated.value!).to be_falsey + end + + specify 'termination' do + a = Concurrent::ErlangActor.spawn(type) { :v } + expect(a.terminated.value!).to eq :v + + a = Concurrent::ErlangActor.spawn(type) { raise 'err' } + expect { a.terminated.value! }.to raise_error(RuntimeError, 'err') + + a = Concurrent::ErlangActor.spawn(type) { terminate :normal, value: :val } + expect(a.terminated.value!).to eq :val + + a = Concurrent::ErlangActor.spawn(type) { terminate :er } + expect(a.terminated.reason).to eq :er + end + + describe 'asking' do + specify "replies" do + body = { on_thread: -> { reply receive }, + on_pool: -> { receive { |v| reply v } } } + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + expect(a.ask(:v)).to eq :v + + body = { on_thread: -> { v = receive; reply v; reply v; }, + on_pool: -> { receive { |v| reply v; reply v } } } + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + expect(a.ask(:v)).to eq :v + + expect(a.terminated.reason).to be_a_kind_of Concurrent::MultipleAssignmentError + body = { on_thread: -> { v = receive; reply v; reply_resolution true, v, nil, false }, + on_pool: -> { receive { |v| reply v; reply_resolution true, v, nil, false } } } + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + expect(a.ask(:v)).to eq :v + expect(a.terminated.value!).to be_falsey + + body = { on_thread: -> { reply_resolution false, nil, receive }, + on_pool: -> { receive { |v| reply_resolution false, nil, v } } } + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + expect { a.ask(:err) }.to raise_error StandardError, 'err' + + body = { on_thread: -> { reply_resolution false, nil, receive }, + on_pool: -> { receive { |v| reply_resolution false, nil, v } } } + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + expect(a.ask_op(:err).reason).to eq :err + end + + specify "rejects on no reply" do + body = { on_thread: -> { receive; receive }, + on_pool: -> { receive { receive {} } } } + + a = Concurrent::ErlangActor.spawn(type, &body.fetch(type)) + expect(a.ask_op(:v).reason).to eq Concurrent::ErlangActor::NoReply + expect { raise a.ask_op(:v).wait }.to raise_error Concurrent::ErlangActor::NoReply + end + + end + end + + describe 'on thread' do + let(:type) { :on_thread } + it_behaves_like 'erlang actor' + end + + describe 'event based' do + let(:type) { :on_pool } + it_behaves_like 'erlang actor' + + specify "receives message repeatedly with keep" do + actor = Concurrent::ErlangActor.spawn(:on_pool) do + receive on(ANY) { |v| v == :done ? terminate(:normal, value: 42) : reply(v) }, + keep: true + end + expect(actor.ask(1)).to eq 1 + expect(actor.ask(2)).to eq 2 + actor.tell :done + expect(actor.terminated.value!).to eq 42 + end + + specify "class defined" do + definition_module = Module.new do + def start + @sum = 0 + receive on(Numeric, &method(:count)), + on(:done, &method(:stop)), + on(TIMEOUT, &method(:fail)), + keep: true, + timeout: 0.1 + end + + def count(message) + reply @sum += message + end + + def stop(_message) + terminate :normal, value: @sum + end + + def fail(_message) + terminate :timeout + end + end + definition_class = Class.new Concurrent::ErlangActor::Environment do + include definition_module + end + + actor = Concurrent::ErlangActor.spawn(:on_pool, environment: definition_class) { start } + actor.tell 1 + expect(actor.ask(2)).to eq 3 + actor.tell :done + expect(actor.terminated.value!).to eq 3 + + actor = Concurrent::ErlangActor.spawn(:on_pool, environment: definition_module) + actor.tell 1 + expect(actor.ask(2)).to eq 3 + expect(actor.terminated.reason).to eq :timeout + end + + end + end +end diff --git a/spec/concurrent/promises_spec.rb b/spec/concurrent/promises_spec.rb index c4775295..cb65a130 100644 --- a/spec/concurrent/promises_spec.rb +++ b/spec/concurrent/promises_spec.rb @@ -692,6 +692,17 @@ RSpec.describe 'Concurrent::Promises' do value!).to eq 6 end + it 'with erlang actor' do + actor = Concurrent::ErlangActor.spawn :on_thread do + reply receive * 2 + end + + expect(future { 2 }. + then_ask(actor). + then { |v| v + 2 }. + value!).to eq 6 + end + it 'with channel' do ch1 = Concurrent::Promises::Channel.new ch2 = Concurrent::Promises::Channel.new diff --git a/support/yard_full_types.rb b/support/yard_full_types.rb index a89fea14..23d3e5e4 100644 --- a/support/yard_full_types.rb +++ b/support/yard_full_types.rb @@ -4,7 +4,8 @@ module YARD module Templates::Helpers - # make sure the signatures are complete not simplified with ... + # make sure the signatures are complete not simplified with + # '...' and '?' instead of nil module HtmlHelper def signature_types(meth, link = true) meth = convert_method_to_overload(meth)