Move parallel into internal library

This commit is contained in:
Markus Schirp 2014-12-09 00:10:31 +00:00
parent f2c9686115
commit 801500510e
28 changed files with 1097 additions and 758 deletions

View file

@ -1,3 +1,3 @@
---
threshold: 18
total_score: 1200
total_score: 1191

View file

@ -33,10 +33,7 @@ FeatureEnvy:
- Mutant::Mutator::Node#children_indices
- Mutant::Meta::Example::Verification#format_mutation # False positive, its a utility
- Mutant::Reporter::CLI#subject_results
- Mutant::Runner#finish
- Mutant::Runner::Master#stop_worker
- Mutant::Runner::Worker#run_mutation
- Mutant::Runner::Worker#handle
- Mutant::Parallel::Worker#handle
- Mutant::Subject#source_lines
IrresponsibleModule:
enabled: true
@ -62,6 +59,7 @@ NestedIterators:
- Mutant::RequireHighjack#infect
- Mutant::RequireHighjack#disinfect
- Mutant::Subject#tests
- Mutant::Parallel::Master#run
- Parser::Lexer#self.new
max_allowed_nesting: 1
ignore_iterators: []
@ -72,14 +70,12 @@ RepeatedConditional:
exclude:
- Mutant::Mutator
- Mutant::Meta::Example::DSL
- Mutant::Runner::Master
max_ifs: 1
TooManyInstanceVariables:
enabled: true
exclude:
- Mutant::Mutator # 4 vars
- Mutant::Runner::Master # 4 vars
- Mutant::Runner::Scheduler # 4 vars
- Mutant::Parallel::Master # 4 vars
max_instance_variables: 3
TooManyMethods:
enabled: true
@ -87,6 +83,7 @@ TooManyMethods:
- Mutant::CLI
- Mutant::Mutator::Node
- Mutant::Meta::Example::Verification
- Mutant::Parallel::Master
max_methods: 10
TooManyStatements:
enabled: true
@ -94,7 +91,6 @@ TooManyStatements:
- Mutant::Isolation::Fork#self.call
- Mutant::Reporter::CLI::Printer::EnvProgress#run
- Mutant::Reporter::CLI::Printer::Config#run
- Mutant::Runner#initialize
- Mutant::Zombifier::File#self.find
- Mutant::CLI#add_environment_options
max_statements: 7

View file

@ -105,6 +105,10 @@ require 'mutant/actor/receiver'
require 'mutant/actor/sender'
require 'mutant/actor/mailbox'
require 'mutant/actor/env'
require 'mutant/parallel'
require 'mutant/parallel/master'
require 'mutant/parallel/worker'
require 'mutant/parallel/source'
require 'mutant/cache'
require 'mutant/delegator'
require 'mutant/warning_filter'
@ -200,9 +204,7 @@ require 'mutant/cli'
require 'mutant/color'
require 'mutant/diff'
require 'mutant/runner'
require 'mutant/runner/scheduler'
require 'mutant/runner/master'
require 'mutant/runner/worker'
require 'mutant/runner/sink'
require 'mutant/result'
require 'mutant/reporter'
require 'mutant/reporter/null'
@ -230,7 +232,7 @@ module Mutant
isolation: Mutant::Isolation::Fork,
reporter: Reporter::CLI.build($stdout),
zombie: false,
jobs: Mutant.ci? ? CI_DEFAULT_PROCESSOR_COUNT : Parallel.processor_count,
jobs: Mutant.ci? ? CI_DEFAULT_PROCESSOR_COUNT : ::Parallel.processor_count,
actor_env: Mutant::Actor::Env.new(Thread),
expected_coverage: 100.0
)

View file

@ -71,6 +71,22 @@ module Mutant
#
attr_reader :matchable_scopes
# Kill mutation
#
# @param [Mutation] mutation
#
# @return [Result::Mutation]
#
# @api private
#
def kill_mutation(mutation)
test_result = mutation.kill(config.isolation, config.integration)
Result::Mutation.new(
mutation: mutation,
test_result: test_result
)
end
private
# Return scope name

93
lib/mutant/parallel.rb Normal file
View file

@ -0,0 +1,93 @@
module Mutant
# Parallel excecution engine of arbitrary payloads
module Parallel
# Driver for parallelized execution
class Driver
include Concord.new(:binding)
# Return scheduler status
#
# @return [Object]
#
# @api private
#
def status
binding.call(__method__)
end
# Stop master gracefully
#
# @return [self]
#
# @api private
#
def stop
binding.call(__method__)
self
end
end # Driver
# Run async computation returing driver
#
# @return [Driver]
#
# @api private
#
def self.async(config)
Driver.new(config.env.new_mailbox.bind(Master.call(config)))
end
# Job result sink
class Sink
include AbstractType
# Process job result
#
# @param [Object]
#
# @return [self]
#
# @api private
#
abstract_method :result
# Return status
#
# @return [Object]
#
# @api private
#
abstract_method :status
# Test if processing should stop
#
# @return [Boolean]
#
# @api private
#
abstract_method :stop?
end # Sink
# Job to push to workers
class Job
include Adamantium::Flat, Anima.new(:index, :payload)
end # Job
# Job result object received from workers
class JobResult
include Adamantium::Flat, Anima.new(:job, :payload)
end # JobResult
# Parallel run configuration
class Config
include Anima::Update, Adamantium::Flat, Anima.new(:env, :processor, :source, :sink, :jobs)
end # Config
# Parallel execution status
class Status
include Adamantium::Flat, Anima::Update, Anima.new(:payload, :done, :active_jobs)
end
end # Parallel
end # Mutant

View file

@ -1,8 +1,8 @@
module Mutant
class Runner
# Master actor to control workers
module Parallel
# Master parallel worker
class Master
include Concord.new(:env, :actor)
include Concord.new(:config, :actor)
private_class_method :new
@ -14,14 +14,12 @@ module Mutant
#
# @api private
#
def self.call(env)
env.config.actor_env.spawn do |actor|
new(env, actor).__send__(:run)
def self.call(config)
config.env.spawn do |actor|
new(config, actor).__send__(:run)
end
end
private
# Initialize object
#
# @return [undefined]
@ -31,12 +29,14 @@ module Mutant
def initialize(*)
super
@scheduler = Scheduler.new(env)
@workers = env.config.jobs
@stop = false
@stopping = false
@stop = false
@workers = 0
@active_jobs = Set.new
@index = 0
end
private
# Run work loop
#
# @return [self]
@ -44,17 +44,27 @@ module Mutant
# @api private
#
def run
@workers.times do |id|
Worker.run(
id: id,
config: env.config,
parent: actor.sender
)
config.jobs.times do
@workers += 1
config.env.spawn do |worker_actor|
Worker.run(
actor: worker_actor,
processor: config.processor,
parent: actor.sender
)
end
end
receive_loop
end
MAP = IceNine.deep_freeze(
ready: :handle_ready,
status: :handle_status,
result: :handle_result,
stop: :handle_stop
)
# Handle messages
#
# @param [Actor::Message] message
@ -65,18 +75,10 @@ module Mutant
#
def handle(message)
type, payload = message.type, message.payload
case type
when :ready
ready_worker(payload)
when :status
handle_status(payload)
when :result
handle_result(payload)
when :stop
handle_stop(payload)
else
method = MAP.fetch(type) do
fail Actor::ProtocolError, "Unexpected message: #{type.inspect}"
end
__send__(method, payload)
end
# Run receive loop
@ -86,10 +88,7 @@ module Mutant
# @api private
#
def receive_loop
loop do
break if @workers.zero? && @stop
handle(actor.receiver.call)
end
handle(actor.receiver.call) until @workers.zero? && @stop
end
# Handle status
@ -101,7 +100,12 @@ module Mutant
# @api private
#
def handle_status(sender)
sender.call(Actor::Message.new(:status, @scheduler.status))
status = Status.new(
payload: sink.status,
done: sink.stop? || @workers.zero?,
active_jobs: @active_jobs.dup.freeze
)
sender.call(Actor::Message.new(:status, status))
end
# Handle result
@ -113,9 +117,8 @@ module Mutant
# @api private
#
def handle_result(job_result)
return if @stopping
@scheduler.job_result(job_result)
@stopping = env.config.fail_fast && @scheduler.status.done
@active_jobs.delete(job_result.job)
sink.result(job_result.payload)
end
# Handle stop
@ -127,7 +130,6 @@ module Mutant
# @api private
#
def handle_stop(sender)
@stopping = true
@stop = true
receive_loop
sender.call(Actor::Message.new(:stop))
@ -141,18 +143,29 @@ module Mutant
#
# @api private
#
def ready_worker(sender)
if @stopping
def handle_ready(sender)
if stop_work?
stop_worker(sender)
return
end
job = @scheduler.next_job
sender.call(Actor::Message.new(:job, next_job))
end
if job
sender.call(Actor::Message.new(:job, job))
else
stop_worker(sender)
# Return next job if any
#
# @return [Job]
# if next job is available
#
# @return [nil]
#
def next_job
Job.new(
index: @index,
payload: source.next
).tap do |job|
@index += 1
@active_jobs << job
end
end
@ -169,6 +182,36 @@ module Mutant
sender.call(Actor::Message.new(:stop))
end
# Test if scheduling stopped
#
# @return [Boolean]
#
# @api private
#
def stop_work?
@stop || !source.next? || sink.stop?
end
# Return source
#
# @return [Source]
#
# @api private
#
def source
config.source
end
# Return source
#
# @return [Sink]
#
# @api private
#
def sink
config.sink
end
end # Master
end # Runner
end # Parallel
end # Mutant

View file

@ -0,0 +1,73 @@
module Mutant
module Parallel
# Job source for parallel execution
class Source
include AbstractType
NoJobError = Class.new(RuntimeError)
# Return next job
#
# @return [Object]
#
# @raise [NoJobError]
# when no next job is available
#
# @api private
#
abstract_method :next
# Test if next job is available
#
# @return [Boolean]
#
# @api private
#
abstract_method :next?
# Job source backed by a finite array
class Array
include Concord.new(:jobs)
# Initialize objecto
#
# @return [undefined]
#
# @api private
#
def initialize(*)
super
@next_index = 0
end
# Test if next job is available
#
# @return [Boolean]
#
# @api private
#
def next?
@next_index < jobs.length
end
# Return next job
#
# @return [Object]
#
# @raise [NoJobError]
# when no next job is available
#
# @api private
#
def next
fail NoJobError unless next?
jobs.fetch(@next_index).tap do
@next_index += 1
end
end
end # Array
end # Source
end # Parallel
end # Mutant

View file

@ -1,27 +1,23 @@
module Mutant
class Runner
# Mutation killing worker receiving work from parent
module Parallel
# Parallel execution worker
class Worker
include Adamantium::Flat, Anima.new(:config, :id, :parent)
private_class_method :new
include Adamantium::Flat, Anima.new(:actor, :processor, :parent)
# Run worker
#
# @param [Hash<Symbol, Object] attributes
#
# @return [Actor::Sender]
# @return [self]
#
# @api private
#
def self.run(attributes)
attributes.fetch(:config).actor_env.spawn do |actor|
worker = new(attributes)
worker.__send__(:run, actor)
end
new(attributes).run
self
end
private
private_class_method :new
# Worker loop
#
@ -31,12 +27,14 @@ module Mutant
#
# rubocop:disable Lint/Loop
#
def run(actor)
def run
begin
parent.call(Actor::Message.new(:ready, actor.sender))
end until handle(actor.receiver.call)
end
private
# Handle job
#
# @param [Message] message
@ -67,25 +65,10 @@ module Mutant
# @api private
#
def handle_job(job)
parent.call(Actor::Message.new(:result, JobResult.new(job: job, result: run_mutation(job.mutation))))
end
# Run mutation
#
# @param [Mutation] mutation
#
# @return [Result::Mutation]
#
# @api private
#
def run_mutation(mutation)
test_result = mutation.kill(config.isolation, config.integration)
Result::Mutation.new(
mutation: mutation,
test_result: test_result
)
result = processor.call(job.payload)
parent.call(Actor::Message.new(:result, JobResult.new(job: job, payload: result)))
end
end # Worker
end # Runner
end # Parallel
end # Mutant

View file

@ -39,15 +39,14 @@ module Mutant
# Report progress object
#
# @param [Runner::Collector] collector
# @param [Parallel::Status] status
#
# @return [self]
#
# @api private
#
def progress(collector)
write(format.progress(collector))
def progress(status)
write(format.progress(status))
self
end

View file

@ -137,7 +137,7 @@ module Mutant
# Printer for runner status
class Status < self
delegate(:active_jobs, :env_result)
delegate(:active_jobs, :payload)
# Print progress for collector
#
@ -146,7 +146,7 @@ module Mutant
# @api private
#
def run
visit(EnvProgress, object.env_result)
visit(EnvProgress, payload)
info('Active subjects: %d', active_subject_results.length)
visit_collection(SubjectProgress, active_subject_results)
job_status
@ -164,8 +164,8 @@ module Mutant
def job_status
return if active_jobs.empty?
info('Active Jobs:')
object.active_jobs.sort_by(&:index).each do |job|
info('%d: %s', job.index, job.mutation.identification)
active_jobs.sort_by(&:index).each do |job|
info('%d: %s', job.index, job.payload.identification)
end
end
@ -176,9 +176,10 @@ module Mutant
# @api private
#
def active_subject_results
active_subjects = active_jobs.map(&:mutation).flat_map(&:subject).to_set
active_mutation_jobs = active_jobs.select { |job| job.payload.is_a?(Mutant::Mutation) }
active_subjects = active_mutation_jobs.map(&:payload).flat_map(&:subject).to_set
env_result.subject_results.select do |subject_result|
payload.subject_results.select do |subject_result|
active_subjects.include?(subject_result.subject)
end
end
@ -385,7 +386,7 @@ module Mutant
# @api private
#
def object
super().env_result
super().payload
end
end

View file

@ -98,16 +98,6 @@ module Mutant
end
memoize :success?
# Test if runner should continue on env
#
# @return [Boolean]
#
# @api private
#
def continue?
!env.config.fail_fast || subject_results.all?(&:success?)
end
# Return failed subject results
#
# @return [Array<Result::Subject>]

View file

@ -3,25 +3,6 @@ module Mutant
class Runner
include Adamantium::Flat, Concord.new(:env), Procto.call(:result)
# Status of the runner execution
class Status
include Adamantium, Anima::Update, Anima.new(
:env_result,
:active_jobs,
:done
)
end # Status
# Job to push to workers
class Job
include Adamantium::Flat, Anima.new(:index, :mutation)
end # Job
# Job result object received from workers
class JobResult
include Adamantium::Flat, Anima.new(:job, :result)
end
# Initialize object
#
# @return [undefined]
@ -32,26 +13,8 @@ module Mutant
super
reporter.start(env)
config.integration.setup
@master = config.actor_env.new_mailbox.bind(Master.call(env))
status = nil
loop do
status = current_status
break if status.done
reporter.progress(status)
Kernel.sleep(reporter.delay)
end
reporter.progress(status)
@master.call(:stop)
@result = status.env_result
reporter.report(@result)
run_mutation_analysis
end
# Return result
@ -64,6 +27,59 @@ module Mutant
private
# Run mutation analysis
#
# @return [undefined]
#
# @api private
#
def run_mutation_analysis
config.integration.setup
@result = run_driver(Parallel.async(mutation_test_config))
reporter.report(@result)
end
# Run driver
#
# @param [Driver] driver
#
# @return [Object]
# the last returned status payload
#
# @api private
#
def run_driver(driver)
status = nil
loop do
status = driver.status
reporter.progress(status)
break if status.done
Kernel.sleep(reporter.delay)
end
driver.stop
status.payload
end
# Return mutation test config
#
# @return [Parallell::Config]
#
# @api private
#
def mutation_test_config
Parallel::Config.new(
env: config.actor_env,
jobs: config.jobs,
source: Parallel::Source::Array.new(env.mutations),
sink: Sink.new(env),
processor: env.method(:kill_mutation)
)
end
# Return reporter
#
# @return [Reporter]
@ -84,15 +100,5 @@ module Mutant
env.config
end
# Return current status
#
# @return [Status]
#
# @api private
#
def current_status
@master.call(:status)
end
end # Runner
end # Mutant

View file

@ -1,7 +1,7 @@
module Mutant
class Runner
# Job scheduler
class Scheduler
# Mutation result sink
class Sink
include Concord.new(:env)
# Initialize object
@ -12,9 +12,7 @@ module Mutant
#
def initialize(*)
super
@index = 0
@start = Time.now
@active_jobs = Set.new
@subject_results = Hash.new do |_hash, subject|
Result::Subject.new(
subject: subject,
@ -30,58 +28,17 @@ module Mutant
# @api private
#
def status
Status.new(
env_result: env_result,
done: done?,
active_jobs: @active_jobs.dup
)
env_result
end
# Return next job
#
# @return [Job]
# in case there is a next job
#
# @return [nil]
# otherwise
#
# @api private
def next_job
return unless next_mutation?
Job.new(
mutation: mutations.fetch(@index),
index: @index
).tap do |job|
@index += 1
@active_jobs << job
end
end
# Consume job result
#
# @param [JobResult] job_result
#
# @return [self]
#
# @api private
#
def job_result(job_result)
@active_jobs.delete(job_result.job)
mutation_result(job_result.result)
self
end
private
# Test if mutation run is done
# Test if scheduling stopped
#
# @return [Boolean]
#
# @api private
#
def done?
!env_result.continue? || (!next_mutation? && @active_jobs.empty?)
def stop?
env.config.fail_fast && !env_result.subject_results.all?(&:success?)
end
# Handle mutation finish
@ -92,7 +49,7 @@ module Mutant
#
# @api private
#
def mutation_result(mutation_result)
def result(mutation_result)
mutation = mutation_result.mutation
original = @subject_results[mutation.subject]
@ -100,27 +57,11 @@ module Mutant
@subject_results[mutation.subject] = original.update(
mutation_results: (original.mutation_results.dup << mutation_result)
)
self
end
# Test if a next mutation exist
#
# @return [Boolean]
#
# @api private
#
def next_mutation?
mutations.length > @index
end
# Return mutations
#
# @return [Array<Mutation>]
#
# @api private
#
def mutations
env.mutations
end
private
# Return current result
#

View file

@ -3,27 +3,37 @@ require 'mutant/actor'
# A fake actor used from specs
module FakeActor
class Expectation
include Concord::Public.new(:name, :message)
include Concord::Public.new(:name, :message, :block)
include Equalizer.new(:name, :message)
def self.new(_name, _message, _block = nil)
super
end
def verify(other)
unless eql?(other)
raise "Got:\n#{other.inspect}\nExpected:\n#{inspect}"
end
block.call(other.message) if block
end
end
class MessageSequence
include Adamantium::Flat, Concord.new(:messages)
include Adamantium::Flat, Concord::Public.new(:messages)
def self.new
super([])
end
def add(name, *message_arguments)
messages << Expectation.new(name, Mutant::Actor::Message.new(*message_arguments))
def add(name, *message_arguments, &block)
messages << Expectation.new(name, Mutant::Actor::Message.new(*message_arguments), block)
self
end
def sending(expectation)
raise "Unexpected send: #{expectation.inspect}" if messages.empty?
expected = messages.shift
unless expectation.eql?(expected)
raise "Got:\n#{expectation.inspect}\nExpected:\n#{expected.inspect}"
end
expected.verify(expectation)
self
end

View file

@ -16,8 +16,8 @@ module SharedContext
# rubocop:disable MethodLength
def setup_shared_context
let(:env) { double('env', config: config, subjects: [subject_a], mutations: mutations) }
let(:job_a) { Mutant::Runner::Job.new(index: 0, mutation: mutation_a) }
let(:job_b) { Mutant::Runner::Job.new(index: 1, mutation: mutation_b) }
let(:job_a) { Mutant::Parallel::Job.new(index: 0, payload: mutation_a) }
let(:job_b) { Mutant::Parallel::Job.new(index: 1, payload: mutation_b) }
let(:job_a_result) { Mutant::Runner::JobResult.new(job: job_a, result: mutation_a_result) }
let(:job_b_result) { Mutant::Runner::JobResult.new(job: job_b, result: mutation_b_result) }
let(:mutations) { [mutation_a, mutation_b] }
@ -27,6 +27,14 @@ module SharedContext
let(:actor_names) { [] }
let(:message_sequence) { FakeActor::MessageSequence.new }
let(:status) do
Mutant::Parallel::Status.new(
active_jobs: [].to_set,
payload: env_result,
done: true
)
end
let(:config) do
Mutant::Config::DEFAULT.update(
actor_env: actor_env,
@ -53,22 +61,6 @@ module SharedContext
allow(subject_a).to receive(:mutations).and_return([mutation_a, mutation_b])
end
let(:empty_status) do
Mutant::Runner::Status.new(
active_jobs: Set.new,
env_result: env_result.update(subject_results: [], runtime: 0.0),
done: false
)
end
let(:status) do
Mutant::Runner::Status.new(
active_jobs: Set.new,
env_result: env_result,
done: true
)
end
let(:env_result) do
Mutant::Result::Env.new(
env: env,

View file

@ -46,4 +46,20 @@ RSpec.describe Mutant::Env do
end
end
end
context '#kill_mutation' do
let(:object) { described_class.new(config) }
let(:result) { double('Result') }
let(:mutation) { double('Mutation') }
subject { object.kill_mutation(mutation) }
before do
expect(mutation).to receive(:kill).with(config.isolation, config.integration).and_return(result)
end
it 'uses the configured integration and isolation to kill mutation' do
should eql(Mutant::Result::Mutation.new(mutation: mutation, test_result: result))
end
end
end

View file

@ -0,0 +1,339 @@
RSpec.describe Mutant::Parallel::Master do
let(:message_sequence) { FakeActor::MessageSequence.new }
let(:actor_names) { [:master, :worker_a, :worker_b] }
let(:status) { double('Status') }
let(:sink) { FakeSink.new }
let(:processor) { double('Processor') }
let(:worker_a) { actor_env.mailbox(:worker_a).sender }
let(:worker_b) { actor_env.mailbox(:worker_b).sender }
let(:parent) { actor_env.mailbox(:parent).sender }
let(:job_payload_a) { double('Job Payload A') }
let(:job_payload_b) { double('Job Payload B') }
let(:job_result_payload_a) { double('Job Result Payload A') }
let(:job_result_payload_b) { double('Job Result Payload B') }
let(:job_a) { Mutant::Parallel::Job.new(index: 0, payload: job_payload_a) }
let(:job_b) { Mutant::Parallel::Job.new(index: 1, payload: job_payload_b) }
let(:job_result_a) { Mutant::Parallel::JobResult.new(job: job_a, payload: job_result_payload_a) }
let(:job_result_b) { Mutant::Parallel::JobResult.new(job: job_b, payload: job_result_payload_b) }
let(:actor_env) do
FakeActor::Env.new(message_sequence, actor_names)
end
shared_examples_for 'master behavior' do
it { should eql(actor_env.mailbox(:master).sender) }
it 'has expected results in sink' do
subject
expect(sink.results).to eql(expected_results)
end
it 'consumes all messages' do
subject
expect(message_sequence.messages).to eql([])
end
end
let(:config) do
Mutant::Parallel::Config.new(
jobs: 1,
env: actor_env,
source: Mutant::Parallel::Source::Array.new([job_payload_a, job_payload_b]),
sink: sink,
processor: processor
)
end
class FakeSink
def initialize
@results = []
@stop = false
end
attr_reader :results
def status
@results.length
end
def result(result)
@results << result
end
def stop
@stop = true
self
end
def stop?
@stop
end
end
# Needed because of rubies undefined-ivar-read-is-nil stuff
describe 'object initialization' do
let(:object) { described_class.send(:new, config, actor_env.mailbox(:master)) }
it 'initializes falsy ivars'do
expect(object.instance_variable_get(:@stop)).to be(false)
end
end
describe '.call' do
before do
expect(Mutant::Parallel::Worker).to receive(:run).with(
actor: actor_env.mailbox(:worker_a),
processor: processor,
parent: actor_env.mailbox(:master).sender
).and_return(worker_a)
end
subject { described_class.call(config) }
context 'with multiple workers configured' do
let(:config) { super().update(jobs: 2) }
let(:expected_results) { [] }
before do
expect(Mutant::Parallel::Worker).to receive(:run).with(
actor: actor_env.mailbox(:worker_b),
processor: processor,
parent: actor_env.mailbox(:master).sender
).and_return(worker_b)
sink.stop
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :ready, worker_b)
message_sequence.add(:worker_b, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
context 'explicit stop by scheduler state' do
context 'before jobs are processed' do
let(:expected_results) { [] }
before do
sink.stop
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
context 'while jobs are processed' do
let(:expected_results) { [job_result_payload_a] }
let(:sink) do
super().instance_eval do
def stop?
@results.length.equal?(1)
end
self
end
end
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_result_a)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
end
context 'external stop' do
context 'after jobs are done' do
let(:expected_results) { [job_result_payload_a, job_result_payload_b] }
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_result_a)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_b)
message_sequence.add(:master, :result, job_result_b)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
context 'when no jobs are active' do
let(:expected_results) { [job_result_payload_a] }
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:master, :result, job_result_a)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
context 'before any job got processed' do
let(:expected_results) { [] }
before do
message_sequence.add(:master, :stop, parent)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
end
context 'requesting status' do
context 'when jobs are done' do
let(:expected_status) { Mutant::Parallel::Status.new(payload: 2, active_jobs: Set.new, done: true) }
let(:expected_results) { [job_result_payload_a, job_result_payload_b] }
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_result_a)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_b)
message_sequence.add(:master, :result, job_result_b)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :status, parent)
# Special bit to kill a mutation that results in mutable active_jobs being passed.
message_sequence.add(:parent, :status, expected_status) do |message|
expect(message.payload.active_jobs.frozen?).to be(true)
end
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
context 'just after scheduler stops' do
let(:expected_status) { Mutant::Parallel::Status.new(payload: 1, active_jobs: [].to_set, done: true) }
let(:expected_results) { [job_result_payload_a] }
let(:sink) do
super().instance_eval do
def stop?
@results.length.equal?(1)
end
self
end
end
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_result_a)
message_sequence.add(:master, :status, parent)
message_sequence.add(:parent, :status, expected_status)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
context 'when jobs are active' do
let(:expected_status) { Mutant::Parallel::Status.new(payload: 1, active_jobs: [job_b].to_set, done: false) }
let(:expected_results) { [job_result_payload_a, job_result_payload_b] }
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_result_a)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_b)
message_sequence.add(:master, :status, parent)
message_sequence.add(:parent, :status, expected_status)
message_sequence.add(:master, :result, job_result_b)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
context 'before jobs are done' do
let(:expected_status) { Mutant::Parallel::Status.new(payload: 0, active_jobs: Set.new, done: false) }
let(:expected_results) { [] }
before do
message_sequence.add(:master, :status, parent)
message_sequence.add(:parent, :status, expected_status)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:parent, :stop)
end
include_examples 'master behavior'
end
end
context 'unhandled message received' do
before do
message_sequence.add(:master, :foo, parent)
end
it 'raises message' do
expect { subject }.to raise_error(Mutant::Actor::ProtocolError, 'Unexpected message: :foo')
end
end
end
end

View file

@ -0,0 +1,47 @@
RSpec.describe Mutant::Parallel::Source::Array do
let(:object) { described_class.new(jobs) }
let(:job_a) { double('Job A') }
let(:job_b) { double('Job B') }
let(:job_c) { double('Job B') }
let(:jobs) { [job_a, job_b, job_c] }
describe '#next' do
subject { object.next }
context 'when there is a next job' do
it 'returns that job' do
should be(job_a)
end
it 'does not return the same job twice' do
expect(object.next).to be(job_a)
expect(object.next).to be(job_b)
expect(object.next).to be(job_c)
end
end
context 'when there is no next job' do
let(:jobs) { [] }
it 'raises error' do
expect { subject }.to raise_error(Mutant::Parallel::Source::NoJobError)
end
end
end
describe '#next?' do
subject { object.next? }
context 'when there is a next job' do
it { should be(true) }
end
context 'when there is no next job' do
let(:jobs) { [] }
it { should be(false) }
end
end
end

View file

@ -1,21 +1,27 @@
RSpec.describe Mutant::Runner::Worker do
setup_shared_context
let(:actor) { actor_env.mailbox(:worker) }
let(:parent) { actor_env.mailbox(:parent).sender }
before do
message_sequence.add(:parent, :ready, actor.sender)
RSpec.describe Mutant::Parallel::Worker do
let(:actor_env) do
FakeActor::Env.new(message_sequence, actor_names)
end
let(:message_sequence) { FakeActor::MessageSequence.new }
let(:processor) { double('Processor') }
let(:actor) { actor_env.mailbox(:worker) }
let(:parent) { actor_env.mailbox(:parent).sender }
let(:payload) { double('Payload') }
let(:result_payload) { double('Result Payload') }
let(:attributes) do
{
config: config,
parent: parent,
id: 1
processor: processor,
parent: parent,
actor: actor
}
end
before do
message_sequence.add(:parent, :ready, actor.sender)
end
describe '.run' do
subject { described_class.run(attributes) }
@ -26,7 +32,7 @@ RSpec.describe Mutant::Runner::Worker do
let(:test_result) { double('Test Result') }
before do
expect(mutation).to receive(:kill).with(config.isolation, config.integration).and_return(test_result).ordered
expect(processor).to receive(:call).with(payload).and_return(result_payload)
message_sequence.add(:worker, :job, job)
message_sequence.add(:parent, :result, job_result)
@ -34,25 +40,16 @@ RSpec.describe Mutant::Runner::Worker do
message_sequence.add(:worker, :stop)
end
let(:test) { double('Test') }
let(:index) { double('Index') }
let(:test_result) { double('Test Result') }
let(:mutation) { double('Mutation') }
let(:job_result) { Mutant::Runner::JobResult.new(job: job, result: mutation_result) }
let(:job) { Mutant::Runner::Job.new(index: index, mutation: mutation) }
let(:mutation_result) do
Mutant::Result::Mutation.new(
mutation: mutation,
test_result: test_result
)
end
let(:index) { double('Index') }
let(:test_result) { double('Test Result') }
let(:job_result) { Mutant::Parallel::JobResult.new(job: job, payload: result_payload) }
let(:job) { Mutant::Parallel::Job.new(index: index, payload: payload) }
it 'signals ready and status to parent' do
subject
end
it { should eql(actor.sender) }
it { should be(described_class) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)

View file

@ -0,0 +1,16 @@
RSpec.describe Mutant::Parallel do
describe '.async' do
subject { described_class.async(config) }
let(:config) { double('Config', env: env) }
let(:env) { double('ENV', new_mailbox: mailbox) }
let(:mailbox) { Mutant::Actor::Mailbox.new }
let(:master) { double('Master') }
before do
expect(described_class::Master).to receive(:call).with(config).and_return(master)
end
it { should eql(described_class::Driver.new(mailbox.bind(master))) }
end
end

View file

@ -254,7 +254,7 @@ RSpec.describe Mutant::Reporter::CLI do
end
describe '#report' do
subject { object.report(status.env_result) }
subject { object.report(status.payload) }
context 'with full coverage' do
it_reports(<<-REPORT)

View file

@ -0,0 +1,9 @@
RSpec.describe Mutant::Reporter::Trace do
let(:object) { described_class.new }
describe '#delay' do
subject { object.delay }
it { should equal(0.0) }
end
end

View file

@ -1,55 +0,0 @@
RSpec.describe Mutant::Result::Env do
let(:object) do
described_class.new(
env: double('Env', config: config),
runtime: double('Runtime'),
subject_results: subject_results
)
end
let(:config) { double('Config', fail_fast: fail_fast) }
describe '#continue?' do
subject { object.continue? }
context 'config sets fail_fast flag' do
let(:fail_fast) { true }
context 'when mutation results are empty' do
let(:subject_results) { [] }
it { should be(true) }
end
context 'with failing mutation result' do
let(:subject_results) { [double('Subject Result', success?: false)] }
it { should be(false) }
end
context 'with successful mutation result' do
let(:subject_results) { [double('Subject Result', success?: true)] }
it { should be(true) }
end
context 'with failed and successful mutation result' do
let(:subject_results) do
[
double('Subject Result', success?: true),
double('Subject Result', success?: false)
]
end
it { should be(false) }
end
end
context 'config does not set fail fast flag' do
let(:fail_fast) { false }
let(:subject_results) { double('subject results') }
it { should be(true) }
end
end
end

View file

@ -0,0 +1,26 @@
RSpec.describe Mutant::Parallel::Driver do
let(:object) { described_class.new(binding) }
let(:binding) { double('Binding') }
let(:result) { double('Result') }
describe '#stop' do
subject { object.stop }
before do
expect(binding).to receive(:call).with(:stop)
end
it_should_behave_like 'a command method'
end
describe '#status' do
subject { object.status }
before do
expect(binding).to receive(:call).with(:status).and_return(result)
end
it { should be(result) }
end
end

View file

@ -1,199 +0,0 @@
RSpec.describe Mutant::Runner::Master do
setup_shared_context
describe 'object initialization' do
subject { described_class.__send__(:new, env, double('actor')) }
it 'initialized instance variables' do
expect(subject.instance_variable_get(:@stop)).to be(false)
expect(subject.instance_variable_get(:@stopping)).to be(false)
end
end
describe '.call' do
let(:actor_names) { [:master, :worker_a] }
let(:worker_a) { actor_env.mailbox(:worker_a).sender }
let(:worker_b) { actor_env.mailbox(:worker_b).sender }
let(:parent) { actor_env.mailbox(:parent).sender }
let(:job) { double('Job') }
before do
expect(Time).to receive(:now).and_return(Time.at(0)).at_most(5).times
expect(Mutant::Runner::Worker).to receive(:run).with(
id: 0,
config: env.config,
parent: actor_env.mailbox(:master).sender
).and_return(worker_a)
end
subject { described_class.call(env) }
context 'jobs done before external stop' do
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_a_result)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_b)
message_sequence.add(:master, :result, job_b_result)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
context 'stop by fail fast trigger first' do
update(:config) { { fail_fast: true } }
update(:mutation_b_test_result) { { passed: true } }
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_a_result)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_b)
message_sequence.add(:master, :result, job_b_result)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
context 'stop by fail fast trigger last' do
update(:config) { { fail_fast: true } }
update(:mutation_a_test_result) { { passed: true } }
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_a_result)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
context 'jobs active while external stop' do
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:master, :result, job_a_result)
message_sequence.add(:master, :status, parent)
message_sequence.add(:parent, :status, empty_status.update(active_jobs: [job_a].to_set))
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
context 'stop with pending jobs' do
before do
message_sequence.add(:master, :stop, parent)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
context 'unhandled message received' do
before do
message_sequence.add(:master, :foo, parent)
end
it 'raises message' do
expect { subject }.to raise_error(Mutant::Actor::ProtocolError, 'Unexpected message: :foo')
end
end
context 'request status late' do
let(:expected_status) { status.update(env_result: env_result.update(runtime: 0.0)) }
before do
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_a)
message_sequence.add(:master, :result, job_a_result)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :job, job_b)
message_sequence.add(:master, :result, job_b_result)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:master, :status, parent)
message_sequence.add(:parent, :status, expected_status)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
context 'request status early' do
before do
message_sequence.add(:master, :status, parent)
message_sequence.add(:parent, :status, empty_status)
message_sequence.add(:master, :stop, parent)
message_sequence.add(:master, :ready, worker_a)
message_sequence.add(:worker_a, :stop)
message_sequence.add(:parent, :stop)
end
it { should eql(actor_env.mailbox(:master).sender) }
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
end
end
end
end

View file

@ -1,161 +0,0 @@
require 'spec_helper'
describe Mutant::Runner::Scheduler do
let(:object) { described_class.new(env) }
before do
allow(Time).to receive(:now).and_return(Time.now)
end
setup_shared_context
let(:active_subject_a_result) do
subject_a_result.update(mutation_results: [])
end
describe '#job_result' do
subject { object.job_result(job_a_result) }
before do
expect(object.next_job).to eql(job_a)
end
it 'removes the tracking of job as active' do
expect { subject }.to change { object.status.active_jobs }.from([job_a].to_set).to(Set.new)
end
it 'aggregates results in #status' do
subject
object.job_result(job_b_result)
expect(object.status.env_result).to eql(
Mutant::Result::Env.new(
env: env,
runtime: 0.0,
subject_results: [subject_a_result]
)
)
end
it_should_behave_like 'a command method'
end
describe '#next_job' do
subject { object.next_job }
context 'when there is a next job' do
let(:mutations) { [mutation_a, mutation_b] }
it { should eql(job_a) }
it 'does not return the same job again' do
subject
expect(object.next_job).to eql(job_b)
expect(object.next_job).to be(nil)
end
it 'does record job as active' do
expect { subject }.to change { object.status.active_jobs }.from(Set.new).to([job_a].to_set)
end
end
context 'when there is no next job' do
let(:mutations) { [] }
it { should be(nil) }
end
end
describe '#status' do
subject { object.status }
context 'when empty' do
let(:expected_status) do
Mutant::Runner::Status.new(
env_result: Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: []),
active_jobs: Set.new,
done: false
)
end
it { should eql(expected_status) }
end
context 'when jobs are active' do
before do
object.next_job
object.next_job
end
let(:expected_status) do
Mutant::Runner::Status.new(
env_result: Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: []),
active_jobs: [job_a, job_b].to_set,
done: false
)
end
it { should eql(expected_status) }
end
context 'remaining jobs are active' do
before do
object.next_job
object.next_job
object.job_result(job_a_result)
end
update(:subject_a_result) { { mutation_results: [mutation_a_result] } }
let(:expected_status) do
Mutant::Runner::Status.new(
env_result: Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: [subject_a_result]),
active_jobs: [job_b].to_set,
done: false
)
end
it { should eql(expected_status) }
end
context 'under fail fast config with failed result' do
before do
object.next_job
object.next_job
object.job_result(job_a_result)
end
update(:subject_a_result) { { mutation_results: [mutation_a_result] } }
update(:mutation_a_test_result) { { passed: true } }
update(:config) { { fail_fast: true } }
let(:expected_status) do
Mutant::Runner::Status.new(
env_result: Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: [subject_a_result]),
active_jobs: [job_b].to_set,
done: true
)
end
it { should eql(expected_status) }
end
context 'when done' do
before do
object.next_job
object.next_job
object.status
object.job_result(job_a_result)
object.job_result(job_b_result)
end
let(:expected_status) do
Mutant::Runner::Status.new(
env_result: Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: [subject_a_result]),
active_jobs: Set.new,
done: true
)
end
it { should eql(expected_status) }
end
end
end

View file

@ -0,0 +1,162 @@
require 'spec_helper'
describe Mutant::Runner::Sink do
setup_shared_context
shared_context 'one result' do
before do
object.result(mutation_a_result)
end
end
shared_context 'two results' do
before do
object.result(mutation_a_result)
object.result(mutation_b_result)
end
end
let(:object) { described_class.new(env) }
before do
allow(Time).to receive(:now).and_return(Time.now)
end
describe '#result' do
subject { object.result(mutation_a_result) }
it 'aggregates results in #status' do
subject
object.result(mutation_b_result)
expect(object.status).to eql(
Mutant::Result::Env.new(
env: env,
runtime: 0.0,
subject_results: [subject_a_result]
)
)
end
it_should_behave_like 'a command method'
end
describe '#status' do
subject { object.status }
context 'no results' do
let(:expected_status) do
Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: [])
end
it { should eql(expected_status) }
end
context 'one result' do
include_context 'one result'
update(:subject_a_result) { { mutation_results: [mutation_a_result] } }
let(:expected_status) do
Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: [subject_a_result])
end
it { should eql(expected_status) }
end
context 'two results' do
include_context 'two results'
let(:expected_status) do
Mutant::Result::Env.new(env: env, runtime: 0.0, subject_results: [subject_a_result])
end
it { should eql(expected_status) }
end
end
describe '#stop?' do
subject { object.stop? }
context 'without fail fast' do
context 'no results' do
it { should be(false) }
end
context 'one result' do
include_context 'one result'
context 'when result is successful' do
it { should be(false) }
end
context 'when result failed' do
update(:mutation_a_test_result) { { passed: true } }
it { should be(false) }
end
end
context 'two results' do
include_context 'two results'
context 'when results are successful' do
it { should be(false) }
end
context 'when first result is unsuccessful' do
update(:mutation_a_test_result) { { passed: true } }
it { should be(false) }
end
context 'when second result is unsuccessful' do
update(:mutation_b_test_result) { { passed: true } }
it { should be(false) }
end
end
end
context 'with fail fast' do
update(:config) { { fail_fast: true } }
context 'no results' do
it { should be(false) }
end
context 'one result' do
include_context 'one result'
context 'when result is successful' do
it { should be(false) }
end
context 'when result failed' do
update(:mutation_a_test_result) { { passed: true } }
it { should be(true) }
end
end
context 'two results' do
include_context 'two results'
context 'when results are successful' do
it { should be(false) }
end
context 'when first result is unsuccessful' do
update(:mutation_a_test_result) { { passed: true } }
it { should be(true) }
end
context 'when second result is unsuccessful' do
update(:mutation_b_test_result) { { passed: true } }
it { should be(true) }
end
end
end
end
end

View file

@ -1,87 +1,84 @@
RSpec.describe Mutant::Runner do
setup_shared_context
# setup_shared_context
class FakeEnv
def self.kill_mutation
end
let(:integration) { double('Integration') }
let(:master_sender) { actor_env.spawn }
let(:runner_actor) { actor_env.mailbox(:runner) }
before do
expect(integration).to receive(:setup).ordered
expect(Mutant::Runner::Master).to receive(:call).with(env).and_return(master_sender).ordered
def self.mutations
[]
end
end
describe '.call' do
update(:config) { { integration: integration } }
let(:actor_names) { [:runner, :master] }
let(:integration) { double('Integration') }
let(:reporter) { double('Reporter', delay: delay) }
let(:driver) { double('Driver') }
let(:delay) { double('Delay') }
let(:env) { FakeEnv }
let(:env_result) { double('Env Result') }
let(:actor_env) { double('Actor ENV') }
let(:config) do
double(
'Config',
integration: integration,
reporter: reporter,
actor_env: actor_env,
jobs: 1
)
end
before do
allow(FakeEnv).to receive(:config).and_return(config)
end
let(:parallel_config) do
Mutant::Parallel::Config.new(
jobs: 1,
env: actor_env,
source: Mutant::Parallel::Source::Array.new(env.mutations),
sink: Mutant::Runner::Sink.new(env),
processor: env.method(:kill_mutation)
)
end
before do
expect(reporter).to receive(:start).with(env).ordered
expect(integration).to receive(:setup).ordered
expect(Mutant::Parallel).to receive(:async).with(parallel_config).and_return(driver).ordered
end
subject { described_class.call(env) }
context 'when status done gets returned immediately' do
context 'when runner finishes immediately' do
let(:status) { double('Status', done: true, payload: env_result) }
before do
message_sequence.add(:runner, :status, actor_env.mailbox(:current).sender)
message_sequence.add(:current, :status, status)
message_sequence.add(:runner, :stop, actor_env.mailbox(:current).sender)
message_sequence.add(:current, :stop)
end
it 'returns env result' do
should be(status.env_result)
end
it 'logs start' do
expect { subject }.to change { config.reporter.start_calls }.from([]).to([env])
end
it 'logs process' do
expect { subject }.to change { config.reporter.progress_calls }.from([]).to([status])
end
it 'logs result' do
expect { subject }.to change { config.reporter.report_calls }.from([]).to([status.env_result])
end
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
expect(driver).to receive(:status).and_return(status)
expect(reporter).to receive(:progress).with(status).ordered
expect(driver).to receive(:stop).ordered
expect(reporter).to receive(:report).with(env_result).ordered
end
end
context 'when status done gets returned immediately' do
let(:incomplete_status) { status.update(done: false) }
context 'when report iterations are done' do
let(:status_a) { double('Status A', done: false) }
let(:status_b) { double('Status B', done: true, payload: env_result) }
before do
expect(Kernel).to receive(:sleep).with(0.0).exactly(2).times.ordered
expect(driver).to receive(:status).and_return(status_a).ordered
expect(reporter).to receive(:progress).with(status_a).ordered
expect(Kernel).to receive(:sleep).with(reporter.delay).ordered
current_sender = actor_env.mailbox(:current).sender
expect(driver).to receive(:status).and_return(status_b).ordered
expect(reporter).to receive(:progress).with(status_b).ordered
expect(driver).to receive(:stop).ordered
message_sequence.add(:runner, :status, current_sender)
message_sequence.add(:current, :status, incomplete_status)
message_sequence.add(:runner, :status, current_sender)
message_sequence.add(:current, :status, incomplete_status)
message_sequence.add(:runner, :status, current_sender)
message_sequence.add(:current, :status, status)
message_sequence.add(:runner, :stop, current_sender)
message_sequence.add(:current, :stop)
expect(reporter).to receive(:report).with(env_result).ordered
end
it 'returns env result' do
should be(status.env_result)
end
it 'logs start' do
expect { subject }.to change { config.reporter.start_calls }.from([]).to([env])
end
it 'logs result' do
expect { subject }.to change { config.reporter.report_calls }.from([]).to([status.env_result])
end
it 'logs process' do
expected = [incomplete_status, incomplete_status, status]
expect { subject }.to change { config.reporter.progress_calls }.from([]).to(expected)
end
it 'consumes all messages' do
expect { subject }.to change(&message_sequence.method(:consumed?)).from(false).to(true)
should be(env_result)
end
end
end