diff --git a/Gemfile b/Gemfile index 6d9ee251..f9ef172a 100644 --- a/Gemfile +++ b/Gemfile @@ -12,7 +12,7 @@ gem 'concurrent-ruby-edge', Concurrent::EDGE_VERSION, options gem 'concurrent-ruby-ext', Concurrent::VERSION, options.merge(platform: :mri) group :development do - gem 'rake', (Concurrent.ruby_version :<, 2, 2, 0) ? '~> 12.0' : '~> 13.0' + gem 'rake', '~> 13.0' gem 'rake-compiler', '~> 1.0', '>= 1.0.7' gem 'rake-compiler-dock', '~> 1.0' gem 'pry', '~> 0.11', platforms: :mri diff --git a/Rakefile b/Rakefile index 50b020e7..55e78b7b 100644 --- a/Rakefile +++ b/Rakefile @@ -2,15 +2,6 @@ require_relative 'lib/concurrent-ruby/concurrent/version' require_relative 'lib/concurrent-ruby-edge/concurrent/edge/version' require_relative 'lib/concurrent-ruby/concurrent/utility/engine' -if Concurrent.ruby_version :<, 2, 0, 0 - # @!visibility private - module Kernel - def __dir__ - File.dirname __FILE__ - end - end -end - core_gemspec = Gem::Specification.load File.join(__dir__, 'concurrent-ruby.gemspec') ext_gemspec = Gem::Specification.load File.join(__dir__, 'concurrent-ruby-ext.gemspec') edge_gemspec = Gem::Specification.load File.join(__dir__, 'concurrent-ruby-edge.gemspec') diff --git a/lib/concurrent-ruby-edge/concurrent-edge.rb b/lib/concurrent-ruby-edge/concurrent-edge.rb index 1f40efc4..d3bbb4e8 100644 --- a/lib/concurrent-ruby-edge/concurrent-edge.rb +++ b/lib/concurrent-ruby-edge/concurrent-edge.rb @@ -6,7 +6,7 @@ require 'concurrent/actor' require 'concurrent/agent' require 'concurrent/channel' require 'concurrent/lazy_register' -require 'concurrent/executor/wrapping_executor' if Concurrent.ruby_version :>=, 2, 1, 0 +require 'concurrent/executor/wrapping_executor' require 'concurrent/edge/lock_free_linked_set' require 'concurrent/edge/lock_free_queue' @@ -16,4 +16,4 @@ require 'concurrent/edge/throttle' require 'concurrent/edge/channel' require 'concurrent/edge/processing_actor' -require 'concurrent/edge/erlang_actor' if Concurrent.ruby_version :>=, 2, 1, 0 +require 'concurrent/edge/erlang_actor' diff --git a/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb b/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb index f6d462fd..4076a022 100644 --- a/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb +++ b/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb @@ -1,7 +1,3 @@ -if Concurrent.ruby_version :<, 2, 1, 0 - raise 'ErlangActor requires at least ruby version 2.1' -end - module Concurrent # This module provides actor abstraction that has same behaviour as Erlang actor. diff --git a/lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb b/lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb index cdbe4d43..ae28e5bc 100644 --- a/lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb +++ b/lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb @@ -4,9 +4,7 @@ module Concurrent # @!visibility private # @!macro internal_implementation_note LockableObjectImplementation = case - when Concurrent.on_cruby? && Concurrent.ruby_version(:<=, 1, 9, 3) - MonitorLockableObject - when Concurrent.on_cruby? && Concurrent.ruby_version(:>, 1, 9, 3) + when Concurrent.on_cruby? MutexLockableObject when Concurrent.on_jruby? JRubyLockableObject diff --git a/spec/concurrent/edge/erlang_actor_spec.rb b/spec/concurrent/edge/erlang_actor_spec.rb index 13dbfba4..b86be0c4 100644 --- a/spec/concurrent/edge/erlang_actor_spec.rb +++ b/spec/concurrent/edge/erlang_actor_spec.rb @@ -1,1031 +1,1029 @@ -if Concurrent.ruby_version :>=, 2, 1, 0 - RSpec.describe 'Concurrent' do - describe 'ErlangActor', edge: true do - # TODO (pitr-ch 06-Feb-2019): include constants instead - ANY ||= Concurrent::ErlangActor::ANY - TIMEOUT ||= Concurrent::ErlangActor::TIMEOUT - And ||= Concurrent::ErlangActor::And - identity = -> v { v } +RSpec.describe 'Concurrent' do + describe 'ErlangActor', edge: true do + # TODO (pitr-ch 06-Feb-2019): include constants instead + ANY ||= Concurrent::ErlangActor::ANY + TIMEOUT ||= Concurrent::ErlangActor::TIMEOUT + And ||= Concurrent::ErlangActor::And + identity = -> v { v } - shared_examples 'erlang actor' do + shared_examples 'erlang actor' do - specify "run to termination" do - expect(Concurrent::ErlangActor.spawn(type: type) do - :v - end.terminated.value!).to eq :v + specify "run to termination" do + expect(Concurrent::ErlangActor.spawn(type: type) do + :v + end.terminated.value!).to eq :v + end + + specify "run to termination with arguments" do + expect(Concurrent::ErlangActor. + spawn(1, 2, type: type) { |a, b| a + b }.terminated.value!). + to eq 3 + end + + specify '#receive' do + succ = -> v { v.succ } + + [[[:v], -> { receive }, :v], + [[:v], -> { receive on(ANY, &identity) }, :v], + [[:v, 1], -> { receive Numeric }, 1], + [[:v, 1], -> { receive(Numeric, &succ) }, 2], + + [[:v], -> { receive Numeric, timeout: 0 }, nil], + [[:v], -> { receive(Numeric, timeout: 0, &succ) }, nil], + [[:v], -> { receive Numeric, timeout: 0, timeout_value: :timeout }, :timeout], + [[:v], -> { receive(Numeric, timeout: 0, timeout_value: :timeout, &succ) }, :timeout], + + [[:v, 1], -> { receive Numeric, timeout: 1 }, 1], + [[:v, 1], -> { receive(Numeric, timeout: 1, &succ) }, 2], + [[:v, 1], -> { receive Numeric, timeout: 1, timeout_value: :timeout }, 1], + [[:v, 1], -> { receive(Numeric, timeout: 1, timeout_value: :timeout, &succ) }, 2], + + [[:v], -> { receive on(Numeric, &identity), on(TIMEOUT, nil), timeout: 0 }, nil], + [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 0 }, nil], + [[:v], -> { receive on(Numeric, &identity), on(TIMEOUT, :timeout), timeout: 0 }, :timeout], + [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 0 }, :timeout], + + [[:v, 1], -> { receive on(Numeric, &identity), on(TIMEOUT, nil), timeout: 1 }, 1], + [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 1 }, 2], + [[:v, 1], -> { receive on(Numeric, &identity), on(TIMEOUT, :timeout), timeout: 1 }, 1], + [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 1 }, 2], + ].each_with_index do |(messages, body, result), i| + a = Concurrent::ErlangActor.spawn(type: type, &body) + messages.each { |m| a.tell m } + expect(a.terminated.value!).to eq(result), "body: #{body}" end + end - specify "run to termination with arguments" do - expect(Concurrent::ErlangActor. - spawn(1, 2, type: type) { |a, b| a + b }.terminated.value!). - to eq 3 - end + specify 'pid has name' do + actor = Concurrent::ErlangActor.spawn(type: type, name: 'test') {} + expect(actor.to_s).to match(/test/) + expect(actor.inspect).to match(/test/) + end - specify '#receive' do - succ = -> v { v.succ } + specify "receives message" do + actor = Concurrent::ErlangActor.spawn(type: type, + &{ on_thread: -> { receive }, + on_pool: -> { receive on(ANY, &identity) } }.fetch(type)) + actor.tell :v + expect(actor.terminated.value!).to eq :v + end - [[[:v], -> { receive }, :v], - [[:v], -> { receive on(ANY, &identity) }, :v], - [[:v, 1], -> { receive Numeric }, 1], - [[:v, 1], -> { receive(Numeric, &succ) }, 2], - - [[:v], -> { receive Numeric, timeout: 0 }, nil], - [[:v], -> { receive(Numeric, timeout: 0, &succ) }, nil], - [[:v], -> { receive Numeric, timeout: 0, timeout_value: :timeout }, :timeout], - [[:v], -> { receive(Numeric, timeout: 0, timeout_value: :timeout, &succ) }, :timeout], - - [[:v, 1], -> { receive Numeric, timeout: 1 }, 1], - [[:v, 1], -> { receive(Numeric, timeout: 1, &succ) }, 2], - [[:v, 1], -> { receive Numeric, timeout: 1, timeout_value: :timeout }, 1], - [[:v, 1], -> { receive(Numeric, timeout: 1, timeout_value: :timeout, &succ) }, 2], - - [[:v], -> { receive on(Numeric, &identity), on(TIMEOUT, nil), timeout: 0 }, nil], - [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 0 }, nil], - [[:v], -> { receive on(Numeric, &identity), on(TIMEOUT, :timeout), timeout: 0 }, :timeout], - [[:v], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 0 }, :timeout], - - [[:v, 1], -> { receive on(Numeric, &identity), on(TIMEOUT, nil), timeout: 1 }, 1], - [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, nil), timeout: 1 }, 2], - [[:v, 1], -> { receive on(Numeric, &identity), on(TIMEOUT, :timeout), timeout: 1 }, 1], - [[:v, 1], -> { receive on(Numeric, &succ), on(TIMEOUT, :timeout), timeout: 1 }, 2], - ].each_with_index do |(messages, body, result), i| - a = Concurrent::ErlangActor.spawn(type: type, &body) - messages.each { |m| a.tell m } - expect(a.terminated.value!).to eq(result), "body: #{body}" - end - end - - specify 'pid has name' do - actor = Concurrent::ErlangActor.spawn(type: type, name: 'test') {} - expect(actor.to_s).to match(/test/) - expect(actor.inspect).to match(/test/) - end - - specify "receives message" do - actor = Concurrent::ErlangActor.spawn(type: type, - &{ on_thread: -> { receive }, - on_pool: -> { receive on(ANY, &identity) } }.fetch(type)) - actor.tell :v - expect(actor.terminated.value!).to eq :v - end - - specify "receives message with matchers" do - body = { on_thread: - -> do - [receive(on(Symbol, &identity)), - receive(on(Numeric, &:succ)), - receive(on(Numeric, :got_it), timeout: 0, timeout_value: :nothing)] - end, - on_pool: - -> do - @arr = [] - receive(on(Symbol) do |v1| - @arr.push v1 - receive(on(Numeric) do |v2| - @arr << v2.succ - receive(on(Numeric, :got_it), on(TIMEOUT) { @arr << :nothing; @arr }, timeout: 0) - end) + specify "receives message with matchers" do + body = { on_thread: + -> do + [receive(on(Symbol, &identity)), + receive(on(Numeric, &:succ)), + receive(on(Numeric, :got_it), timeout: 0, timeout_value: :nothing)] + end, + on_pool: + -> do + @arr = [] + receive(on(Symbol) do |v1| + @arr.push v1 + receive(on(Numeric) do |v2| + @arr << v2.succ + receive(on(Numeric, :got_it), on(TIMEOUT) { @arr << :nothing; @arr }, timeout: 0) end) - end } - actor = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - actor.tell 'junk' - actor.tell 1 - actor.tell :v - expect(actor.terminated.value!).to eq [:v, 2, :nothing] - end + end) + end } + actor = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + actor.tell 'junk' + actor.tell 1 + actor.tell :v + expect(actor.terminated.value!).to eq [:v, 2, :nothing] + end - describe "monitoring" do - specify "(de)monitor" do - body_receive = { on_thread: - -> { receive }, - on_pool: - -> { receive { |v| v } } } + describe "monitoring" do + specify "(de)monitor" do + body_receive = { on_thread: + -> { receive }, + on_pool: + -> { receive { |v| v } } } - body = { on_thread: - -> do - actor = receive + body = { on_thread: + -> do + actor = receive + reference = monitor actor + monitored = monitoring? reference + demonitor reference + result = [monitored, monitoring?(reference)] + actor.tell :finish + result + end, + on_pool: + -> do + receive do |actor| reference = monitor actor monitored = monitoring? reference demonitor reference result = [monitored, monitoring?(reference)] actor.tell :finish result - end, - on_pool: - -> do - receive do |actor| - reference = monitor actor - monitored = monitoring? reference - demonitor reference - result = [monitored, monitoring?(reference)] - actor.tell :finish - result - end - end } - a1 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive.fetch(type)) - a1.tell a2 - expect(a1.terminated.value!).to eq [true, false] - expect(a2.terminated.value!).to eq :finish - end + end + end } + a1 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive.fetch(type)) + a1.tell a2 + expect(a1.terminated.value!).to eq [true, false] + expect(a2.terminated.value!).to eq :finish + end - specify "demonitor" do - body = { on_thread: - -> do - actor = receive + specify "demonitor" do + body = { on_thread: + -> do + actor = receive + reference = monitor actor + monitored = monitoring? reference + actor.tell :done + actor.terminated.wait + demonitor = demonitor reference, :flush, :info + [monitored, monitoring?(reference), demonitor, receive(timeout: 0)] + end, + on_pool: + -> do + receive do |actor| reference = monitor actor monitored = monitoring? reference actor.tell :done actor.terminated.wait demonitor = demonitor reference, :flush, :info - [monitored, monitoring?(reference), demonitor, receive(timeout: 0)] - end, - on_pool: - -> do - receive do |actor| - reference = monitor actor - monitored = monitoring? reference - actor.tell :done - actor.terminated.wait - demonitor = demonitor reference, :flush, :info - results = [monitored, monitoring?(reference), demonitor] - receive(on(ANY) { |v| [*results, v] }, - on(TIMEOUT) { [*results, nil] }, - timeout: 0) - end - end } + results = [monitored, monitoring?(reference), demonitor] + receive(on(ANY) { |v| [*results, v] }, + on(TIMEOUT) { [*results, nil] }, + timeout: 0) + end + end } - a1 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - body = { on_thread: -> { receive }, - on_pool: -> { receive(&identity) } } - a2 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - a1.tell a2 + a1 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + body = { on_thread: -> { receive }, + on_pool: -> { receive(&identity) } } + a2 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + a1.tell a2 - a1.terminated.wait - expect(a1.terminated.value!).to eq [true, false, false, nil] - expect(a2.terminated.value!).to eq :done - end + a1.terminated.wait + expect(a1.terminated.value!).to eq [true, false, false, nil] + expect(a2.terminated.value!).to eq :done + end - specify "demonitor should leave the down message in the inbox if it's already there" do - body = { on_thread: - -> do - actor = receive + specify "demonitor should leave the down message in the inbox if it's already there" do + body = { on_thread: + -> do + actor = receive + reference = monitor actor + monitored = monitoring? reference + actor.tell :done + actor.terminated.wait + demonitor = demonitor reference, :info + [reference, monitored, monitoring?(reference), demonitor, receive(timeout: 0)] + end, + on_pool: + -> do + receive do |actor| reference = monitor actor monitored = monitoring? reference actor.tell :done actor.terminated.wait demonitor = demonitor reference, :info - [reference, monitored, monitoring?(reference), demonitor, receive(timeout: 0)] - end, - on_pool: - -> do - receive do |actor| - reference = monitor actor - monitored = monitoring? reference - actor.tell :done - actor.terminated.wait - demonitor = demonitor reference, :info - results = [reference, monitored, monitoring?(reference), demonitor] - receive(on(ANY) { |v| [*results, v] }, - on(TIMEOUT) { [*results, nil] }, - timeout: 0) - end - end } + results = [reference, monitored, monitoring?(reference), demonitor] + receive(on(ANY) { |v| [*results, v] }, + on(TIMEOUT) { [*results, nil] }, + timeout: 0) + end + end } - a1 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - body = { on_thread: -> { receive }, - on_pool: -> { receive(&identity) } } - a2 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - a1.tell a2 + a1 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + body = { on_thread: -> { receive }, + on_pool: -> { receive(&identity) } } + a2 = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + a1.tell a2 - reference, monitored, monitoring, demonitor, message = a1.terminated.value! - expect(monitored).to eq true - expect(monitoring).to eq false - expect(demonitor).to eq false - expect(message).to eq Concurrent::ErlangActor::Down.new(a2, reference, :normal) - expect(a2.terminated.value!).to eq :done - end - - specify "notifications 1" do - body = { on_thread: - -> do - b = spawn { [:done, receive] } - ref = monitor b - b.tell 42 - [b, ref, receive] - end, - on_pool: - -> do - b = spawn { receive on(ANY) { |v| [:done, v] } } - ref = monitor b - b.tell 42 - receive on(ANY) { |v| [b, ref, v] } - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - b, ref, down = a.terminated.value! - expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, :normal) - expect(b.terminated.value!).to eq [:done, 42] - end - - specify "notifications 2" do - body = { on_thread: - -> do - b = spawn { :done } - b.terminated.wait - ref = monitor b - [b, ref, receive(timeout: 1, timeout_value: :timeout)] - end, - on_pool: - -> do - b = spawn { :done } - b.terminated.wait - ref = monitor b - receive(timeout: 1) { |v| [b, ref, v] } - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - b, ref, down = a.terminated.value! - expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, Concurrent::ErlangActor::NoActor.new(b)) - expect(b.terminated.value!).to eq :done - end - - # FIXME (pitr-ch 20-Jan-2019): test concurrent exit and monitor(), same for link + reference, monitored, monitoring, demonitor, message = a1.terminated.value! + expect(monitored).to eq true + expect(monitoring).to eq false + expect(demonitor).to eq false + expect(message).to eq Concurrent::ErlangActor::Down.new(a2, reference, :normal) + expect(a2.terminated.value!).to eq :done end - describe 'linking' do - body_receive_test_linked = { on_thread: - -> { linked?(receive) }, - on_pool: - -> { receive { |a| linked? a } } } + specify "notifications 1" do + body = { on_thread: + -> do + b = spawn { [:done, receive] } + ref = monitor b + b.tell 42 + [b, ref, receive] + end, + on_pool: + -> do + b = spawn { receive on(ANY) { |v| [:done, v] } } + ref = monitor b + b.tell 42 + receive on(ANY) { |v| [b, ref, v] } + end } - specify 'links' do - body1 = { on_thread: - -> do - actor = receive + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + b, ref, down = a.terminated.value! + expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, :normal) + expect(b.terminated.value!).to eq [:done, 42] + end + + specify "notifications 2" do + body = { on_thread: + -> do + b = spawn { :done } + b.terminated.wait + ref = monitor b + [b, ref, receive(timeout: 1, timeout_value: :timeout)] + end, + on_pool: + -> do + b = spawn { :done } + b.terminated.wait + ref = monitor b + receive(timeout: 1) { |v| [b, ref, v] } + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + b, ref, down = a.terminated.value! + expect(down).to eq Concurrent::ErlangActor::Down.new(b, ref, Concurrent::ErlangActor::NoActor.new(b)) + expect(b.terminated.value!).to eq :done + end + + # FIXME (pitr-ch 20-Jan-2019): test concurrent exit and monitor(), same for link + end + + describe 'linking' do + body_receive_test_linked = { on_thread: + -> { linked?(receive) }, + on_pool: + -> { receive { |a| linked? a } } } + + specify 'links' do + body1 = { on_thread: + -> do + actor = receive + link actor + linked = linked? actor + actor.tell pid + linked + end, + on_pool: + -> do + receive do |actor| link actor linked = linked? actor actor.tell pid linked - end, - on_pool: - -> do - receive do |actor| - link actor - linked = linked? actor - actor.tell pid - linked - end - end } + end + end } - a1 = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type)) - a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive_test_linked.fetch(type)) + a1 = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type)) + a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive_test_linked.fetch(type)) - a1.tell a2 - expect(a1.terminated.value!).to be_truthy - expect(a2.terminated.value!).to be_truthy - end + a1.tell a2 + expect(a1.terminated.value!).to be_truthy + expect(a2.terminated.value!).to be_truthy + end - specify 'unlinks' do - body1 = { on_thread: - -> do - actor = receive + specify 'unlinks' do + body1 = { on_thread: + -> do + actor = receive + link actor + unlink actor + linked = linked? actor + actor.tell pid + linked + end, + on_pool: + -> do + receive do |actor| link actor unlink actor linked = linked? actor actor.tell pid linked - end, - on_pool: - -> do - receive do |actor| - link actor - unlink actor - linked = linked? actor - actor.tell pid - linked - end - end } - - a1 = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type)) - a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive_test_linked.fetch(type)) - a1.tell a2 - expect(a1.terminated.value!).to be_falsey - expect(a2.terminated.value!).to be_falsey - end - - specify 'link dead' do - a = Concurrent::ErlangActor.spawn(type: type) do - b = spawn { :done } - b.terminated.wait - link b - end - expect { a.terminated.value! }.to raise_error Concurrent::ErlangActor::NoActor - end - - specify 'link dead when trapping' do - body1 = { on_thread: - -> do - b = spawn { :done } - b.terminated.wait - sleep 0.1 - trap - link b - [b, receive] - end, - on_pool: - -> do - b = spawn { :done } - b.terminated.wait - sleep 0.1 - trap - link b - receive { |v| [b, v] } - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type)) - - b, captured = a.terminated.value! - expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, Concurrent::ErlangActor::NoActor.new(b)) - end - - - describe 'exit/1 when linked' do - # https://learnyousomeerlang.com/errors-and-processes#links - specify 1 do - body = { on_thread: - -> do - b = spawn(link: true) { :ok } - [receive(timeout: 0.01), b] - end, - on_pool: - -> do - b = spawn(link: true) { :ok } - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 0.01) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - message, b = a.terminated.value! - expect(message).to eq nil - expect(b.terminated.value!).to eq :ok - end - - specify 2 do - body = { on_thread: - -> do - b = spawn(link: true) { :ok } - trap - [receive(timeout: 1), b] - end, - on_pool: - -> do - b = spawn(link: true) { :ok } - trap - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - message, b = a.terminated.value! - expect(message).to eq Concurrent::ErlangActor::Terminated.new(b, :normal) - expect(b.terminated.value!).to eq :ok - end - - specify 3 do - body = { on_thread: - -> do - spawn(link: true) { terminate :boom } - receive(timeout: 1) - end, - on_pool: - -> do - spawn(link: true) { terminate :boom } - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - expect(a.terminated.reason).to eq :boom - end - - specify 4 do - body = { on_thread: - -> do - b = spawn(link: true) { terminate :boom } - trap - [receive(timeout: 1), b] - end, - on_pool: - -> do - b = spawn(link: true) { terminate :boom } - trap - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - trapped_exit, b = a.terminated.value! - expect(trapped_exit).to eq Concurrent::ErlangActor::Terminated.new(b, :boom) - expect(b.terminated.reason).to eq :boom - end - - specify 5 do - body = { on_thread: - -> do - b = spawn(link: true) { terminate :normal, value: :ok } - [receive(timeout: 0.01), b] - end, - on_pool: - -> do - b = spawn(link: true) { terminate :normal, value: :ok } - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 0.01) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - message, b = a.terminated.value! - expect(message).to eq nil - expect(b.terminated.value!).to eq :ok - end - - specify 6 do - body = { on_thread: - -> do - b = spawn(link: true) { terminate :normal, value: :ok } - trap - [receive(timeout: 1), b] - end, - on_pool: - -> do - b = spawn(link: true) { terminate :normal, value: :ok } - trap - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - message, b = a.terminated.value! - expect(message).to eq Concurrent::ErlangActor::Terminated.new(b, :normal) - expect(b.terminated.value!).to eq :ok - end - - specify 7 do - body = { on_thread: - -> do - spawn(link: true) { raise 'err' } - receive(timeout: 1) - end, - on_pool: - -> do - spawn(link: true) { raise 'err' } - receive(timeout: 1) { |v| v } - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - expect { a.terminated.value! }.to raise_error(RuntimeError, 'err') - end - - specify 8 do - body = { on_thread: - -> do - b = spawn(link: true) { raise 'err' } - trap - [receive(timeout: 1), b] - end, - on_pool: - -> do - b = spawn(link: true) { raise 'err' } - trap - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - trapped_exit, b = a.terminated.value! - expect(trapped_exit).to be_a Concurrent::ErlangActor::Terminated - expect(trapped_exit.from).to eq b - expect(trapped_exit.reason).to eq b.terminated.reason - expect(trapped_exit.reason).to be_a RuntimeError - expect(trapped_exit.reason.message).to eq 'err' - end - - specify 9 do - body = { on_thread: - -> do - b = spawn(link: true) { throw :uncaught } - trap - [receive(timeout: 1), b] - end, - on_pool: - -> do - b = spawn(link: true) { throw :uncaught } - trap - receive(on(ANY) { |v| [v, b] }, - on(TIMEOUT) { |v| [nil, b] }, - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - trapped_exit, b = a.terminated.value! - expect(trapped_exit).to be_a Concurrent::ErlangActor::Terminated - expect(trapped_exit.from).to eq b - expect(trapped_exit.reason).to eq b.terminated.reason - expect(trapped_exit.reason).to be_a ArgumentError - expect(trapped_exit.reason.message).to match(/uncaught throw :uncaught/) - end - end - - describe 'exit/2 when linked' do - # https://learnyousomeerlang.com/errors-and-processes#links - specify 1 do - body = { on_thread: - -> do - terminate pid, :normal # sends the signal to mailbox - # TODO (pitr-ch 17-Jan-2019): does erlang require receive to process signals? - receive(timeout: 0.01) - :continued - end, - on_pool: - -> do - terminate pid, :normal # sends the signal to mailbox - receive(on(ANY, :continued), - on(TIMEOUT, :timeout), - timeout: 0.01) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - expect(a.terminated.value!).to eq nil - end - - specify 2 do - body = { on_thread: - -> do - terminate pid, :normal - trap - receive(timeout: 0) - end, - on_pool: - -> do - terminate pid, :normal - trap - receive(on(ANY, &identity), on(TIMEOUT, nil), timeout: 0) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - captured_exit = a.terminated.value! - expect(captured_exit).to eq Concurrent::ErlangActor::Terminated.new(a, :normal) - end - - specify 3 do - body = { on_thread: - -> do - b = spawn(link: true) { receive timeout: 0.01, timeout_value: :timeout } - terminate b, :normal - b - end, - on_pool: - -> do - b = spawn(link: true) do - receive(on(ANY, :not_happening), - on(TIMEOUT, :timeout), - timeout: 0.01) - end - - terminate b, :normal - b - end } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - b = a.terminated.value! - expect(b.terminated.value!).to eq :timeout - end - - specify 4 do - body = { on_thread: - -> do - b = spawn(link: true) { trap; receive timeout: 1, timeout_value: :timeout } - terminate b, :normal - b - end, - on_pool: - -> do - b = spawn(link: true) do - trap - receive(on(ANY, &identity), - on(TIMEOUT, :timeout), - timeout: 1) - end - - terminate b, :normal - b - end } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - b = a.terminated.value! - expect(b.terminated.value!).to eq Concurrent::ErlangActor::Terminated.new(a, :normal) - end - - specify 5 do - body = { on_thread: - -> do - b = spawn(link: true) { receive timeout: 0.01; terminate :continued } - terminate b, :normal - trap - [b, receive(timeout: 1)] - end, - on_pool: - -> do - b = spawn(link: true) do - receive(on(ANY, :not_happening), - on(TIMEOUT) { terminate :continued }, - timeout: 0.01) - end - - terminate b, :normal - trap - receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - b, captured = a.terminated.value! - expect(b.terminated.reason).to eq :continued - # normal is never send from b to a back - expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, :continued) - end - - specify 6 do - body = { on_thread: - -> do - b = spawn(link: true) { receive timeout: 1; :done } - terminate b, :remote_err - receive timeout: 1 - end, - on_pool: - -> do - b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) } - terminate b, :remote_err - receive(on(ANY) { |v| [b, v] }, - on(TIMEOUT, :timeout), - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - a.terminated.wait - expect(a.terminated.reason).to eq :remote_err - end - - specify 7 do - body = { on_thread: - -> do - b = spawn(link: true) { receive timeout: 1; :done } - terminate b, :remote_err - trap - [b, receive(timeout: 1)] - end, - on_pool: - -> do - b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) } - terminate b, :remote_err - trap - receive(on(ANY) { |v| [b, v] }, - on(TIMEOUT, :timeout), - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - b, captured = a.terminated.value! - expect(b.terminated.reason).to eq :remote_err - expect(captured.reason).to eq :remote_err - end - - specify 8 do - body = { on_thread: - -> do - b = spawn(link: true) { receive timeout: 1; :done } - terminate b, :kill - receive timeout: 1 - end, - on_pool: - -> do - b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) } - terminate b, :kill - receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - expect(a.terminated.reason).to eq :killed - end - - specify 9 do - body = { on_thread: - -> do - b = spawn(link: true) { receive timeout: 1; :done } - terminate b, :kill - trap - [b, receive(timeout: 1)] - end, - on_pool: - -> do - b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) } - terminate b, :kill - trap - receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - b, captured = a.terminated.value! - expect(b.terminated.reason).to eq :killed - expect(captured.reason).to eq :killed - end - - specify 10 do - body = { on_thread: - -> do - terminate pid, :kill - receive timeout: 0 - end, - on_pool: - -> do - terminate pid, :kill - receive(on(ANY, :continued), on(TIMEOUT, :timeout), timeout: 0) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - expect(a.terminated.reason).to eq :killed - end - - specify 11 do - body = { on_thread: - -> do - terminate pid, :kill - trap - receive timeout: 0 - end, - on_pool: - -> do - terminate pid, :kill - trap - receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 0) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - expect(a.terminated.reason).to eq :killed - end - - # explained in - # http://erlang.org/pipermail/erlang-questions/2009-October/047241.html - - specify 12 do - body = { on_thread: - -> do - spawn(link: true) { terminate :kill } - receive timeout: 1 - end, - on_pool: - -> do - spawn(link: true) { terminate :kill } - receive(on(ANY, :continued), - on(TIMEOUT, :timeout), - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - expect(a.terminated.reason).to eq :kill - end - - specify 13 do - body = { on_thread: - -> do - b = spawn(link: true) { terminate :kill } - trap - [b, receive(timeout: 1)] - end, - on_pool: - -> do - b = spawn(link: true) { terminate :kill } - trap - receive(on(ANY) { |v| [b, v] }, - on(TIMEOUT, :timeout), - timeout: 1) - end } - - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - - b, captured = a.terminated.value! - - expect(b.terminated.reason).to eq :kill - expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, :kill) - end - - end + end + end } + + a1 = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type)) + a2 = Concurrent::ErlangActor.spawn(type: type, &body_receive_test_linked.fetch(type)) + a1.tell a2 + expect(a1.terminated.value!).to be_falsey + expect(a2.terminated.value!).to be_falsey end - specify 'spawn(link: true)' do + specify 'link dead' do a = Concurrent::ErlangActor.spawn(type: type) do - b = spawn(link: true) { :v } - linked? b + b = spawn { :done } + b.terminated.wait + link b end - expect(a.terminated.value!).to be_truthy - - a = Concurrent::ErlangActor.spawn(type: type) do - b = spawn { :v } - linked? b - end - expect(a.terminated.value!).to be_falsey + expect { a.terminated.value! }.to raise_error Concurrent::ErlangActor::NoActor end - specify 'termination' do - a = Concurrent::ErlangActor.spawn(type: type) { :v } - expect(a.terminated.value!).to eq :v + specify 'link dead when trapping' do + body1 = { on_thread: + -> do + b = spawn { :done } + b.terminated.wait + sleep 0.1 + trap + link b + [b, receive] + end, + on_pool: + -> do + b = spawn { :done } + b.terminated.wait + sleep 0.1 + trap + link b + receive { |v| [b, v] } + end } - a = Concurrent::ErlangActor.spawn(type: type) { raise 'err' } - expect { a.terminated.value! }.to raise_error(RuntimeError, 'err') + a = Concurrent::ErlangActor.spawn(type: type, &body1.fetch(type)) - a = Concurrent::ErlangActor.spawn(type: type) { terminate :normal, value: :val } - expect(a.terminated.value!).to eq :val - - a = Concurrent::ErlangActor.spawn(type: type) { terminate :er } - expect(a.terminated.reason).to eq :er + b, captured = a.terminated.value! + expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, Concurrent::ErlangActor::NoActor.new(b)) end - describe 'asking' do - specify "replies" do - body = { on_thread: -> { reply receive }, - on_pool: -> { receive { |v| reply v } } } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect(a.ask(:v)).to eq :v - - body = { on_thread: -> { v = receive; reply v; reply v; }, - on_pool: -> { receive { |v| reply v; reply v } } } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect(a.ask(:v)).to eq :v - expect(a.terminated.value!).to be_falsey + describe 'exit/1 when linked' do + # https://learnyousomeerlang.com/errors-and-processes#links + specify 1 do body = { on_thread: -> do - v = receive - reply v - reply_resolution true, v.to_s, nil + b = spawn(link: true) { :ok } + [receive(timeout: 0.01), b] end, on_pool: -> do - receive do |v| - reply v - reply_resolution true, v.to_s, nil - end + b = spawn(link: true) { :ok } + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) end } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect(a.ask(:v)).to eq :v - expect(a.terminated.value!).to be_falsey - - body = { on_thread: -> { reply_resolution false, nil, receive }, - on_pool: -> { receive { |v| reply_resolution false, nil, v } } } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect { a.ask(:err) }.to raise_error StandardError, 'err' - - body = { on_thread: -> { reply_resolution false, nil, receive }, - on_pool: -> { receive { |v| reply_resolution false, nil, v } } } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect(a.ask_op(:err).reason).to eq :err - end - - specify "timing out" do - skip('flaky on truffleruby') if Concurrent.on_truffleruby? - - count_down = Concurrent::CountDownLatch.new - body = { on_thread: -> { m = receive; count_down.wait; reply m }, - on_pool: -> { receive { |m| count_down.wait; reply m } } } - a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect(a.ask(:err, 0, 42)).to eq 42 - count_down.count_down - expect(a.terminated.value!).to eq false - - body = { on_thread: -> { reply receive }, - on_pool: -> { receive { |m| reply m } } } - b = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect(b.ask(:v, 1, 42)).to eq :v - expect(b.terminated.value!).to eq true - end - - specify "rejects on no reply" do - body = { on_thread: -> { receive; receive }, - on_pool: -> { receive { receive {} } } } a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) - expect(a.ask_op(:v).reason).to eq Concurrent::ErlangActor::NoReply - expect { raise a.ask_op(:v).wait }.to raise_error Concurrent::ErlangActor::NoActor - expect { raise a.ask(:v) }.to raise_error Concurrent::ErlangActor::NoActor + + message, b = a.terminated.value! + expect(message).to eq nil + expect(b.terminated.value!).to eq :ok + end + + specify 2 do + body = { on_thread: + -> do + b = spawn(link: true) { :ok } + trap + [receive(timeout: 1), b] + end, + on_pool: + -> do + b = spawn(link: true) { :ok } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + message, b = a.terminated.value! + expect(message).to eq Concurrent::ErlangActor::Terminated.new(b, :normal) + expect(b.terminated.value!).to eq :ok + end + + specify 3 do + body = { on_thread: + -> do + spawn(link: true) { terminate :boom } + receive(timeout: 1) + end, + on_pool: + -> do + spawn(link: true) { terminate :boom } + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :boom + end + + specify 4 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :boom } + trap + [receive(timeout: 1), b] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :boom } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + trapped_exit, b = a.terminated.value! + expect(trapped_exit).to eq Concurrent::ErlangActor::Terminated.new(b, :boom) + expect(b.terminated.reason).to eq :boom + end + + specify 5 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + [receive(timeout: 0.01), b] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + message, b = a.terminated.value! + expect(message).to eq nil + expect(b.terminated.value!).to eq :ok + end + + specify 6 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + trap + [receive(timeout: 1), b] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :normal, value: :ok } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + message, b = a.terminated.value! + expect(message).to eq Concurrent::ErlangActor::Terminated.new(b, :normal) + expect(b.terminated.value!).to eq :ok + end + + specify 7 do + body = { on_thread: + -> do + spawn(link: true) { raise 'err' } + receive(timeout: 1) + end, + on_pool: + -> do + spawn(link: true) { raise 'err' } + receive(timeout: 1) { |v| v } + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + expect { a.terminated.value! }.to raise_error(RuntimeError, 'err') + end + + specify 8 do + body = { on_thread: + -> do + b = spawn(link: true) { raise 'err' } + trap + [receive(timeout: 1), b] + end, + on_pool: + -> do + b = spawn(link: true) { raise 'err' } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + trapped_exit, b = a.terminated.value! + expect(trapped_exit).to be_a Concurrent::ErlangActor::Terminated + expect(trapped_exit.from).to eq b + expect(trapped_exit.reason).to eq b.terminated.reason + expect(trapped_exit.reason).to be_a RuntimeError + expect(trapped_exit.reason.message).to eq 'err' + end + + specify 9 do + body = { on_thread: + -> do + b = spawn(link: true) { throw :uncaught } + trap + [receive(timeout: 1), b] + end, + on_pool: + -> do + b = spawn(link: true) { throw :uncaught } + trap + receive(on(ANY) { |v| [v, b] }, + on(TIMEOUT) { |v| [nil, b] }, + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + trapped_exit, b = a.terminated.value! + expect(trapped_exit).to be_a Concurrent::ErlangActor::Terminated + expect(trapped_exit.from).to eq b + expect(trapped_exit.reason).to eq b.terminated.reason + expect(trapped_exit.reason).to be_a ArgumentError + expect(trapped_exit.reason.message).to match(/uncaught throw :uncaught/) + end + end + + describe 'exit/2 when linked' do + # https://learnyousomeerlang.com/errors-and-processes#links + specify 1 do + body = { on_thread: + -> do + terminate pid, :normal # sends the signal to mailbox + # TODO (pitr-ch 17-Jan-2019): does erlang require receive to process signals? + receive(timeout: 0.01) + :continued + end, + on_pool: + -> do + terminate pid, :normal # sends the signal to mailbox + receive(on(ANY, :continued), + on(TIMEOUT, :timeout), + timeout: 0.01) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + expect(a.terminated.value!).to eq nil + end + + specify 2 do + body = { on_thread: + -> do + terminate pid, :normal + trap + receive(timeout: 0) + end, + on_pool: + -> do + terminate pid, :normal + trap + receive(on(ANY, &identity), on(TIMEOUT, nil), timeout: 0) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + captured_exit = a.terminated.value! + expect(captured_exit).to eq Concurrent::ErlangActor::Terminated.new(a, :normal) + end + + specify 3 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 0.01, timeout_value: :timeout } + terminate b, :normal + b + end, + on_pool: + -> do + b = spawn(link: true) do + receive(on(ANY, :not_happening), + on(TIMEOUT, :timeout), + timeout: 0.01) + end + + terminate b, :normal + b + end } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + b = a.terminated.value! + expect(b.terminated.value!).to eq :timeout + end + + specify 4 do + body = { on_thread: + -> do + b = spawn(link: true) { trap; receive timeout: 1, timeout_value: :timeout } + terminate b, :normal + b + end, + on_pool: + -> do + b = spawn(link: true) do + trap + receive(on(ANY, &identity), + on(TIMEOUT, :timeout), + timeout: 1) + end + + terminate b, :normal + b + end } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + b = a.terminated.value! + expect(b.terminated.value!).to eq Concurrent::ErlangActor::Terminated.new(a, :normal) + end + + specify 5 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 0.01; terminate :continued } + terminate b, :normal + trap + [b, receive(timeout: 1)] + end, + on_pool: + -> do + b = spawn(link: true) do + receive(on(ANY, :not_happening), + on(TIMEOUT) { terminate :continued }, + timeout: 0.01) + end + + terminate b, :normal + trap + receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + b, captured = a.terminated.value! + expect(b.terminated.reason).to eq :continued + # normal is never send from b to a back + expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, :continued) + end + + specify 6 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 1; :done } + terminate b, :remote_err + receive timeout: 1 + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) } + terminate b, :remote_err + receive(on(ANY) { |v| [b, v] }, + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + a.terminated.wait + expect(a.terminated.reason).to eq :remote_err + end + + specify 7 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 1; :done } + terminate b, :remote_err + trap + [b, receive(timeout: 1)] + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :timeout), timeout: 1) } + terminate b, :remote_err + trap + receive(on(ANY) { |v| [b, v] }, + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + b, captured = a.terminated.value! + expect(b.terminated.reason).to eq :remote_err + expect(captured.reason).to eq :remote_err + end + + specify 8 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 1; :done } + terminate b, :kill + receive timeout: 1 + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) } + terminate b, :kill + receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :killed + end + + specify 9 do + body = { on_thread: + -> do + b = spawn(link: true) { receive timeout: 1; :done } + terminate b, :kill + trap + [b, receive(timeout: 1)] + end, + on_pool: + -> do + b = spawn(link: true) { receive(on(ANY, :done), on(TIMEOUT, :done), timeout: 1) } + terminate b, :kill + trap + receive(on(ANY) { |v| [b, v] }, on(TIMEOUT, :timeout), timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + b, captured = a.terminated.value! + expect(b.terminated.reason).to eq :killed + expect(captured.reason).to eq :killed + end + + specify 10 do + body = { on_thread: + -> do + terminate pid, :kill + receive timeout: 0 + end, + on_pool: + -> do + terminate pid, :kill + receive(on(ANY, :continued), on(TIMEOUT, :timeout), timeout: 0) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :killed + end + + specify 11 do + body = { on_thread: + -> do + terminate pid, :kill + trap + receive timeout: 0 + end, + on_pool: + -> do + terminate pid, :kill + trap + receive(on(ANY, &identity), on(TIMEOUT, :timeout), timeout: 0) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :killed + end + + # explained in + # http://erlang.org/pipermail/erlang-questions/2009-October/047241.html + + specify 12 do + body = { on_thread: + -> do + spawn(link: true) { terminate :kill } + receive timeout: 1 + end, + on_pool: + -> do + spawn(link: true) { terminate :kill } + receive(on(ANY, :continued), + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + expect(a.terminated.reason).to eq :kill + end + + specify 13 do + body = { on_thread: + -> do + b = spawn(link: true) { terminate :kill } + trap + [b, receive(timeout: 1)] + end, + on_pool: + -> do + b = spawn(link: true) { terminate :kill } + trap + receive(on(ANY) { |v| [b, v] }, + on(TIMEOUT, :timeout), + timeout: 1) + end } + + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + + b, captured = a.terminated.value! + + expect(b.terminated.reason).to eq :kill + expect(captured).to eq Concurrent::ErlangActor::Terminated.new(b, :kill) end end end - describe 'on thread' do - let(:type) { :on_thread } - it_behaves_like 'erlang actor' - - specify do - actor = Concurrent::ErlangActor.spawn(type: :on_thread) do - Thread.abort_on_exception = true - while true - receive on(Symbol) { |s| reply s.to_s }, - on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ }, - # put last works as else - on(ANY) { |v| reply :bad_message; terminate [:bad_message, v] } - end - end - expect(actor.ask(1)).to eq 2 - expect(actor.ask(:value)).to eq 'value' - expect(actor.ask(-1)).to eq :bad_message - expect { actor.ask 'junk' }.to raise_error Concurrent::ErlangActor::NoActor - expect(actor.terminated.reason).to eq [:bad_message, -1] + specify 'spawn(link: true)' do + a = Concurrent::ErlangActor.spawn(type: type) do + b = spawn(link: true) { :v } + linked? b end + expect(a.terminated.value!).to be_truthy + + a = Concurrent::ErlangActor.spawn(type: type) do + b = spawn { :v } + linked? b + end + expect(a.terminated.value!).to be_falsey end - describe 'on pool' do - let(:type) { :on_pool } - it_behaves_like 'erlang actor' + specify 'termination' do + a = Concurrent::ErlangActor.spawn(type: type) { :v } + expect(a.terminated.value!).to eq :v - include Concurrent::ErlangActor::EnvironmentConstants + a = Concurrent::ErlangActor.spawn(type: type) { raise 'err' } + expect { a.terminated.value! }.to raise_error(RuntimeError, 'err') - specify "receives message repeatedly with keep" do - actor = Concurrent::ErlangActor.spawn(type: :on_pool) do - receive on(ANY) { |v| v == :done ? terminate(:normal, value: 42) : reply(v) }, - keep: true - end - expect(actor.ask(1)).to eq 1 - expect(actor.ask(2)).to eq 2 - actor.tell :done - expect(actor.terminated.value!).to eq 42 + a = Concurrent::ErlangActor.spawn(type: type) { terminate :normal, value: :val } + expect(a.terminated.value!).to eq :val + + a = Concurrent::ErlangActor.spawn(type: type) { terminate :er } + expect(a.terminated.reason).to eq :er + end + + describe 'asking' do + specify "replies" do + body = { on_thread: -> { reply receive }, + on_pool: -> { receive { |v| reply v } } } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect(a.ask(:v)).to eq :v + + body = { on_thread: -> { v = receive; reply v; reply v; }, + on_pool: -> { receive { |v| reply v; reply v } } } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect(a.ask(:v)).to eq :v + expect(a.terminated.value!).to be_falsey + + body = { on_thread: + -> do + v = receive + reply v + reply_resolution true, v.to_s, nil + end, + on_pool: + -> do + receive do |v| + reply v + reply_resolution true, v.to_s, nil + end + end } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect(a.ask(:v)).to eq :v + expect(a.terminated.value!).to be_falsey + + body = { on_thread: -> { reply_resolution false, nil, receive }, + on_pool: -> { receive { |v| reply_resolution false, nil, v } } } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect { a.ask(:err) }.to raise_error StandardError, 'err' + + body = { on_thread: -> { reply_resolution false, nil, receive }, + on_pool: -> { receive { |v| reply_resolution false, nil, v } } } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect(a.ask_op(:err).reason).to eq :err end - specify "class defined" do - definition_module = Module.new do - def start - @sum = 0 - receive on(Numeric, &method(:count)), - on(:done, &method(:stop)), - on(TIMEOUT, &method(:fail)), - keep: true, - timeout: 0.1 - end + specify "timing out" do + skip('flaky on truffleruby') if Concurrent.on_truffleruby? - def count(message) - reply @sum += message - end + count_down = Concurrent::CountDownLatch.new + body = { on_thread: -> { m = receive; count_down.wait; reply m }, + on_pool: -> { receive { |m| count_down.wait; reply m } } } + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect(a.ask(:err, 0, 42)).to eq 42 + count_down.count_down + expect(a.terminated.value!).to eq false - def stop(_message) - terminate :normal, value: @sum - end + body = { on_thread: -> { reply receive }, + on_pool: -> { receive { |m| reply m } } } + b = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect(b.ask(:v, 1, 42)).to eq :v + expect(b.terminated.value!).to eq true + end - def fail(_message) - terminate :timeout - end - end - definition_class = Class.new Concurrent::ErlangActor::Environment do - include definition_module - end + specify "rejects on no reply" do + body = { on_thread: -> { receive; receive }, + on_pool: -> { receive { receive {} } } } - actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: definition_class) { start } - actor.tell 1 - expect(actor.ask(2)).to eq 3 - actor.tell :done - expect(actor.terminated.value!).to eq 3 - - actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: definition_module) - actor.tell 1 - expect(actor.ask(2)).to eq 3 - expect(actor.terminated.reason).to eq :timeout + a = Concurrent::ErlangActor.spawn(type: type, &body.fetch(type)) + expect(a.ask_op(:v).reason).to eq Concurrent::ErlangActor::NoReply + expect { raise a.ask_op(:v).wait }.to raise_error Concurrent::ErlangActor::NoActor + expect { raise a.ask(:v) }.to raise_error Concurrent::ErlangActor::NoActor end end end + + describe 'on thread' do + let(:type) { :on_thread } + it_behaves_like 'erlang actor' + + specify do + actor = Concurrent::ErlangActor.spawn(type: :on_thread) do + Thread.abort_on_exception = true + while true + receive on(Symbol) { |s| reply s.to_s }, + on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ }, + # put last works as else + on(ANY) { |v| reply :bad_message; terminate [:bad_message, v] } + end + end + expect(actor.ask(1)).to eq 2 + expect(actor.ask(:value)).to eq 'value' + expect(actor.ask(-1)).to eq :bad_message + expect { actor.ask 'junk' }.to raise_error Concurrent::ErlangActor::NoActor + expect(actor.terminated.reason).to eq [:bad_message, -1] + end + end + + describe 'on pool' do + let(:type) { :on_pool } + it_behaves_like 'erlang actor' + + include Concurrent::ErlangActor::EnvironmentConstants + + specify "receives message repeatedly with keep" do + actor = Concurrent::ErlangActor.spawn(type: :on_pool) do + receive on(ANY) { |v| v == :done ? terminate(:normal, value: 42) : reply(v) }, + keep: true + end + expect(actor.ask(1)).to eq 1 + expect(actor.ask(2)).to eq 2 + actor.tell :done + expect(actor.terminated.value!).to eq 42 + end + + specify "class defined" do + definition_module = Module.new do + def start + @sum = 0 + receive on(Numeric, &method(:count)), + on(:done, &method(:stop)), + on(TIMEOUT, &method(:fail)), + keep: true, + timeout: 0.1 + end + + def count(message) + reply @sum += message + end + + def stop(_message) + terminate :normal, value: @sum + end + + def fail(_message) + terminate :timeout + end + end + definition_class = Class.new Concurrent::ErlangActor::Environment do + include definition_module + end + + actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: definition_class) { start } + actor.tell 1 + expect(actor.ask(2)).to eq 3 + actor.tell :done + expect(actor.terminated.value!).to eq 3 + + actor = Concurrent::ErlangActor.spawn(type: :on_pool, environment: definition_module) + actor.tell 1 + expect(actor.ask(2)).to eq 3 + expect(actor.terminated.reason).to eq :timeout + end + + end end end diff --git a/spec/concurrent/executor/wrapping_executor_loaded_manualy.rb b/spec/concurrent/executor/wrapping_executor_loaded_manualy.rb deleted file mode 100644 index 9d0e920a..00000000 --- a/spec/concurrent/executor/wrapping_executor_loaded_manualy.rb +++ /dev/null @@ -1,48 +0,0 @@ -module Concurrent - RSpec.describe WrappingExecutor do - - let(:wrapping_executor) { WrappingExecutor.new(executor, &wrapper) } - let(:executor) { Concurrent.global_fast_executor } - let(:wrapper) { nil } - let(:args) { { foo: 'bar', baz: 42 } } - let(:task) { -> (*args) { return nil } } - - subject { wrapping_executor } - - it { is_expected.to be_kind_of(WrappingExecutor) } - it { is_expected.to respond_to(:post) } - it { is_expected.to respond_to(:can_overflow?) } - it { is_expected.to respond_to(:serialized?) } - - describe '#post' do - context 'with passthrough wrapper' do - let(:wrapper) { -> (*args, &task) { return *args, task } } - - it { - expect(executor).to receive(:post).with(args) { |&block| expect(block).to be(task) } - wrapping_executor.post(args, &task) - } - end - - context 'with wrapper modifying args' do - let(:wrapper) { -> (*args, &task) { return *args, { xyz: 'abc' }, task } } - - it { - expect(executor).to receive(:post).with(args, { xyz: 'abc' }) { |&block| expect(block).to be(task) } - wrapping_executor.post(args, &task) - } - end - - context 'with wrapper modifying task' do - let(:wrapper) { -> (*args, &task) { return *args, another_task } } - let(:another_task) { -> (*args) { return true } } - - it { - expect(executor).to receive(:post).with(args) { |&block| expect(block).to be(another_task) } - wrapping_executor.post(args, &task) - } - end - - end - end -end diff --git a/spec/concurrent/executor/wrapping_executor_spec.rb b/spec/concurrent/executor/wrapping_executor_spec.rb index ac3f0c82..9d0e920a 100644 --- a/spec/concurrent/executor/wrapping_executor_spec.rb +++ b/spec/concurrent/executor/wrapping_executor_spec.rb @@ -1 +1,48 @@ -require_relative 'wrapping_executor_loaded_manualy' if Concurrent.ruby_version :>=, 2, 1, 0 +module Concurrent + RSpec.describe WrappingExecutor do + + let(:wrapping_executor) { WrappingExecutor.new(executor, &wrapper) } + let(:executor) { Concurrent.global_fast_executor } + let(:wrapper) { nil } + let(:args) { { foo: 'bar', baz: 42 } } + let(:task) { -> (*args) { return nil } } + + subject { wrapping_executor } + + it { is_expected.to be_kind_of(WrappingExecutor) } + it { is_expected.to respond_to(:post) } + it { is_expected.to respond_to(:can_overflow?) } + it { is_expected.to respond_to(:serialized?) } + + describe '#post' do + context 'with passthrough wrapper' do + let(:wrapper) { -> (*args, &task) { return *args, task } } + + it { + expect(executor).to receive(:post).with(args) { |&block| expect(block).to be(task) } + wrapping_executor.post(args, &task) + } + end + + context 'with wrapper modifying args' do + let(:wrapper) { -> (*args, &task) { return *args, { xyz: 'abc' }, task } } + + it { + expect(executor).to receive(:post).with(args, { xyz: 'abc' }) { |&block| expect(block).to be(task) } + wrapping_executor.post(args, &task) + } + end + + context 'with wrapper modifying task' do + let(:wrapper) { -> (*args, &task) { return *args, another_task } } + let(:another_task) { -> (*args) { return true } } + + it { + expect(executor).to receive(:post).with(args) { |&block| expect(block).to be(another_task) } + wrapping_executor.post(args, &task) + } + end + + end + end +end