free_mutant/lib/mutant/parallel/master.rb

208 lines
4.1 KiB
Ruby
Raw Normal View History

2014-10-23 07:37:53 -04:00
module Mutant
2014-12-08 19:10:31 -05:00
module Parallel
# Master parallel worker
2014-10-23 07:37:53 -04:00
class Master
include Concord.new(:config, :mailbox)
2014-10-23 07:37:53 -04:00
private_class_method :new
2015-01-17 17:17:39 -05:00
# Run master
2014-10-23 07:37:53 -04:00
#
2015-01-17 17:17:39 -05:00
# @param [Config] config
2014-10-23 07:37:53 -04:00
#
# @return [Actor::Sender]
#
# @api private
2014-12-08 19:10:31 -05:00
def self.call(config)
config.env.spawn do |mailbox|
new(config, mailbox).__send__(:run)
2014-10-23 07:37:53 -04:00
end
end
# Initialize object
#
# @return [undefined]
#
# @api private
def initialize(*)
super
2014-12-08 19:10:31 -05:00
@stop = false
@workers = 0
@active_jobs = Set.new
@index = 0
2014-10-23 07:37:53 -04:00
end
2014-12-08 19:10:31 -05:00
private
2014-10-23 07:37:53 -04:00
# Run work loop
#
2014-12-21 20:28:30 -05:00
# rubocop:disable MethodLength
#
2015-07-03 11:24:31 -04:00
# @return [undefined]
#
2015-07-01 23:35:54 -04:00
# @api private
2014-10-23 07:37:53 -04:00
def run
2014-12-08 19:10:31 -05:00
config.jobs.times do
@workers += 1
config.env.spawn do |worker_mailbox|
2014-12-08 19:10:31 -05:00
Worker.run(
mailbox: worker_mailbox,
2014-12-08 19:10:31 -05:00
processor: config.processor,
parent: mailbox.sender
2014-12-08 19:10:31 -05:00
)
end
2014-10-23 07:37:53 -04:00
end
receive_loop
end
2014-12-08 19:10:31 -05:00
MAP = IceNine.deep_freeze(
ready: :handle_ready,
status: :handle_status,
result: :handle_result,
stop: :handle_stop
)
2014-10-23 07:37:53 -04:00
# Handle messages
#
# @param [Actor::Message] message
#
# @return [undefined]
#
# @api private
def handle(message)
type, payload = message.type, message.payload
2014-12-08 19:10:31 -05:00
method = MAP.fetch(type) do
2014-10-23 07:37:53 -04:00
fail Actor::ProtocolError, "Unexpected message: #{type.inspect}"
end
2014-12-08 19:10:31 -05:00
__send__(method, payload)
2014-10-23 07:37:53 -04:00
end
# Run receive loop
#
# @return [undefined]
#
# @api private
def receive_loop
handle(mailbox.receiver.call) until @workers.zero? && @stop
2014-10-23 07:37:53 -04:00
end
# Handle status
#
# @param [Actor::Sender] sender
#
2014-12-01 13:33:47 -05:00
# @return [undefined]
#
# @api private
2014-10-23 07:37:53 -04:00
def handle_status(sender)
2014-12-08 19:10:31 -05:00
status = Status.new(
payload: sink.status,
done: sink.stop? || @workers.zero?,
active_jobs: @active_jobs.dup.freeze
)
sender.call(Actor::Message.new(:status, status))
2014-10-23 07:37:53 -04:00
end
# Handle result
#
# @param [JobResult] job_result
#
# @return [undefined]
#
2014-12-01 13:33:47 -05:00
# @api private
2014-10-23 07:37:53 -04:00
def handle_result(job_result)
2014-12-08 19:10:31 -05:00
@active_jobs.delete(job_result.job)
sink.result(job_result.payload)
2014-10-23 07:37:53 -04:00
end
# Handle stop
#
# @param [Actor::Sender] sender
#
# @return [undefined]
#
# @api private
def handle_stop(sender)
@stop = true
receive_loop
sender.call(Actor::Message.new(:stop))
end
# Handle ready worker
#
# @param [Actor::Sender] sender
#
# @return [undefined]
#
# @api private
2014-12-08 19:10:31 -05:00
def handle_ready(sender)
if stop_work?
2014-10-23 07:37:53 -04:00
stop_worker(sender)
return
end
2014-12-08 19:10:31 -05:00
sender.call(Actor::Message.new(:job, next_job))
end
2014-10-23 07:37:53 -04:00
# Next job if any
2014-12-08 19:10:31 -05:00
#
# @return [Job]
# if next job is available
#
# @return [nil]
#
2015-07-01 23:35:54 -04:00
# @api private
2014-12-08 19:10:31 -05:00
def next_job
Job.new(
index: @index,
payload: source.next
).tap do |job|
@index += 1
@active_jobs << job
2014-10-23 07:37:53 -04:00
end
end
# Stop worker
#
# @param [Actor::Sender] sender
#
# @return [undefined]
#
# @api private
def stop_worker(sender)
@workers -= 1
sender.call(Actor::Message.new(:stop))
end
2014-12-08 19:10:31 -05:00
# Test if scheduling stopped
#
# @return [Boolean]
#
# @api private
def stop_work?
@stop || !source.next? || sink.stop?
end
# Job source
2014-12-08 19:10:31 -05:00
#
# @return [Source]
#
# @api private
def source
config.source
end
# Job result sink
2014-12-08 19:10:31 -05:00
#
# @return [Sink]
#
# @api private
def sink
config.sink
end
2014-10-23 07:37:53 -04:00
end # Master
2014-12-08 19:10:31 -05:00
end # Parallel
2014-10-23 07:37:53 -04:00
end # Mutant