+ Use a Queue for scheduling parallel tests. (tenderlove)
[git-p4: depot-paths = "//src/minitest/dev/": change = 9032]
This commit is contained in:
parent
a7f7d3b626
commit
34760e3b26
|
@ -12,7 +12,7 @@ lib/minitest/benchmark.rb
|
|||
lib/minitest/expectations.rb
|
||||
lib/minitest/hell.rb
|
||||
lib/minitest/mock.rb
|
||||
lib/minitest/parallel_each.rb
|
||||
lib/minitest/parallel.rb
|
||||
lib/minitest/pride.rb
|
||||
lib/minitest/pride_plugin.rb
|
||||
lib/minitest/spec.rb
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
require "optparse"
|
||||
require "thread"
|
||||
require "mutex_m"
|
||||
require "minitest/parallel"
|
||||
|
||||
##
|
||||
# :include: README.txt
|
||||
|
@ -12,6 +15,12 @@ module Minitest
|
|||
|
||||
mc = (class << self; self; end)
|
||||
|
||||
##
|
||||
# Parallel test executor
|
||||
|
||||
mc.send :attr_accessor, :parallel_executor
|
||||
self.parallel_executor = Parallel::Executor.new (ENV['N'] || 2).to_i
|
||||
|
||||
##
|
||||
# Filter object for backtraces.
|
||||
|
||||
|
@ -113,6 +122,7 @@ module Minitest
|
|||
|
||||
reporter.start
|
||||
__run reporter, options
|
||||
self.parallel_executor.shutdown
|
||||
reporter.report
|
||||
|
||||
reporter.passed?
|
||||
|
@ -126,9 +136,16 @@ module Minitest
|
|||
# loaded if a Runnable calls parallelize_me!.
|
||||
|
||||
def self.__run reporter, options
|
||||
Runnable.runnables.each do |runnable|
|
||||
runnable.run reporter, options
|
||||
end
|
||||
suites = Runnable.runnables
|
||||
parallel, serial = suites.partition { |s| s.test_order == :parallel }
|
||||
|
||||
# If we run the parallel tests before the serial tests, the parallel tests
|
||||
# could run in parallel with the serial tests. This would be bad because
|
||||
# the serial tests won't lock around Reporter#record. Run the serial tests
|
||||
# first, so that after they complete, the parallel tests will lock when
|
||||
# recording results.
|
||||
serial.map { |suite| suite.run reporter, options } +
|
||||
parallel.map { |suite| suite.run reporter, options }
|
||||
end
|
||||
|
||||
def self.process_args args = [] # :nodoc:
|
||||
|
@ -265,13 +282,15 @@ module Minitest
|
|||
|
||||
with_info_handler reporter do
|
||||
filtered_methods.each do |method_name|
|
||||
result = self.new(method_name).run
|
||||
raise "#{self}#run _must_ return self" unless self === result
|
||||
reporter.record result
|
||||
run_test self, method_name, reporter
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.run_test klass, method_name, reporter
|
||||
reporter.record Minitest.run_test(klass, method_name, reporter)
|
||||
end
|
||||
|
||||
def self.with_info_handler reporter, &block # :nodoc:
|
||||
handler = lambda do
|
||||
unless reporter.passed? then
|
||||
|
@ -369,6 +388,8 @@ module Minitest
|
|||
# you want. Go nuts.
|
||||
|
||||
class AbstractReporter
|
||||
include Mutex_m
|
||||
|
||||
##
|
||||
# Starts reporting on the run.
|
||||
|
||||
|
@ -408,6 +429,7 @@ module Minitest
|
|||
attr_accessor :options
|
||||
|
||||
def initialize io = $stdout, options = {} # :nodoc:
|
||||
super()
|
||||
self.io = io
|
||||
self.options = options
|
||||
end
|
||||
|
@ -565,6 +587,7 @@ module Minitest
|
|||
attr_accessor :reporters
|
||||
|
||||
def initialize *reporters # :nodoc:
|
||||
super()
|
||||
self.reporters = reporters
|
||||
end
|
||||
|
||||
|
@ -729,6 +752,12 @@ module Minitest
|
|||
end
|
||||
|
||||
self.backtrace_filter = BacktraceFilter.new
|
||||
|
||||
def self.run_test klass, method_name, reporter # :nodoc:
|
||||
result = klass.new(method_name).run
|
||||
raise "#{klass}#run _must_ return self" unless klass === result
|
||||
result
|
||||
end
|
||||
end
|
||||
|
||||
require "minitest/test"
|
||||
|
|
|
@ -131,6 +131,10 @@ module Minitest
|
|||
true
|
||||
end
|
||||
|
||||
def _synchronize # :nodoc:
|
||||
yield
|
||||
end
|
||||
|
||||
##
|
||||
# Fails unless +obj+ is empty.
|
||||
|
||||
|
@ -393,6 +397,8 @@ module Minitest
|
|||
# that.
|
||||
|
||||
def capture_io
|
||||
_synchronize do
|
||||
begin
|
||||
require 'stringio'
|
||||
|
||||
captured_stdout, captured_stderr = StringIO.new, StringIO.new
|
||||
|
@ -406,6 +412,8 @@ module Minitest
|
|||
ensure
|
||||
$stdout = orig_stdout
|
||||
$stderr = orig_stderr
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
|
@ -424,6 +432,8 @@ module Minitest
|
|||
# only use it when you need to test the output of a subprocess.
|
||||
|
||||
def capture_subprocess_io
|
||||
_synchronize do
|
||||
begin
|
||||
require 'tempfile'
|
||||
|
||||
captured_stdout, captured_stderr = Tempfile.new("out"), Tempfile.new("err")
|
||||
|
@ -443,6 +453,8 @@ module Minitest
|
|||
captured_stderr.unlink
|
||||
$stdout.reopen orig_stdout
|
||||
$stderr.reopen orig_stderr
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
module Minitest
|
||||
module Parallel
|
||||
class Executor
|
||||
attr_reader :size
|
||||
|
||||
def initialize size
|
||||
@size = size
|
||||
@queue = Queue.new
|
||||
@pool = size.times.map {
|
||||
Thread.new(@queue) do |queue|
|
||||
Thread.current.abort_on_exception = true
|
||||
while job = queue.pop
|
||||
klass, method, reporter = job
|
||||
result = Minitest.run_test klass, method, reporter
|
||||
reporter.synchronize { reporter.record result }
|
||||
end
|
||||
end
|
||||
}
|
||||
end
|
||||
|
||||
def << work; @queue << work; end
|
||||
|
||||
def shutdown
|
||||
size.times { @queue << nil }
|
||||
@pool.each(&:join)
|
||||
end
|
||||
end
|
||||
|
||||
module Test
|
||||
def _synchronize; Test.io_lock.synchronize { yield }; end
|
||||
|
||||
module ClassMethods
|
||||
def run_test klass, method_name, reporter
|
||||
MiniTest.parallel_executor << [klass, method_name, reporter]
|
||||
end
|
||||
def test_order; :parallel; end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,120 +0,0 @@
|
|||
##
|
||||
# Provides a parallel #each that lets you enumerate using N threads.
|
||||
# Use environment variable N to customize. Defaults to 2. Enumerable,
|
||||
# so all the goodies come along (tho not all are wrapped yet to
|
||||
# return another ParallelEach instance).
|
||||
|
||||
class Minitest::ParallelEach
|
||||
require 'thread'
|
||||
include Enumerable
|
||||
|
||||
##
|
||||
# How many Threads to use for this parallel #each.
|
||||
|
||||
N = (ENV['N'] || 2).to_i
|
||||
|
||||
##
|
||||
# Create a new ParallelEach instance over +list+.
|
||||
|
||||
def initialize list
|
||||
@queue = Queue.new # *sigh*... the Queue api sucks sooo much...
|
||||
|
||||
list.each { |i| @queue << i }
|
||||
N.times { @queue << nil }
|
||||
end
|
||||
|
||||
def select(&block) # :nodoc:
|
||||
self.class.new super
|
||||
end
|
||||
|
||||
alias find_all select # :nodoc:
|
||||
|
||||
##
|
||||
# Starts N threads that yield each element to your block. Joins the
|
||||
# threads at the end.
|
||||
|
||||
def each
|
||||
threads = N.times.map {
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
while job = @queue.pop
|
||||
yield job
|
||||
end
|
||||
end
|
||||
}
|
||||
threads.map(&:join)
|
||||
end
|
||||
|
||||
def count # :nodoc:
|
||||
[@queue.size - N, 0].max
|
||||
end
|
||||
|
||||
alias_method :size, :count # :nodoc:
|
||||
end
|
||||
|
||||
module Minitest
|
||||
class << self
|
||||
remove_method :__run
|
||||
end
|
||||
|
||||
class Test
|
||||
@mutex = Mutex.new
|
||||
|
||||
def self.synchronize # :nodoc:
|
||||
if @mutex then # see parallel_each.rb
|
||||
@mutex.synchronize { yield }
|
||||
else
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
alias :simple_capture_io :capture_io
|
||||
|
||||
def capture_io(&b)
|
||||
Test.synchronize do
|
||||
simple_capture_io(&b)
|
||||
end
|
||||
end
|
||||
|
||||
alias :simple_capture_subprocess_io :capture_subprocess_io
|
||||
|
||||
def capture_subprocess_io(&b)
|
||||
Test.synchronize do
|
||||
simple_capture_subprocess_io(&b)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Reporter
|
||||
@mutex = Mutex.new
|
||||
|
||||
def self.synchronize # :nodoc:
|
||||
if @mutex then # see parallel_each.rb
|
||||
@mutex.synchronize { yield }
|
||||
else
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
alias :simple_record :record
|
||||
|
||||
def record result
|
||||
Reporter.synchronize do
|
||||
simple_record result
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Runs all the +suites+ for a given +type+. Runs suites declaring
|
||||
# a test_order of +:parallel+ in parallel, and everything else
|
||||
# serial.
|
||||
|
||||
def self.__run reporter, options
|
||||
suites = Runnable.runnables
|
||||
parallel, serial = suites.partition { |s| s.test_order == :parallel }
|
||||
|
||||
ParallelEach.new(parallel).map { |suite| suite.run reporter, options } +
|
||||
serial.map { |suite| suite.run reporter, options }
|
||||
end
|
||||
end
|
|
@ -14,6 +14,9 @@ module Minitest
|
|||
PASSTHROUGH_EXCEPTIONS = [NoMemoryError, SignalException, # :nodoc:
|
||||
Interrupt, SystemExit]
|
||||
|
||||
class << self; attr_accessor :io_lock; end
|
||||
self.io_lock = Mutex.new
|
||||
|
||||
##
|
||||
# Call this at the top of your tests when you absolutely
|
||||
# positively need to have ordered tests. In doing so, you're
|
||||
|
@ -46,12 +49,8 @@ module Minitest
|
|||
# and your tests are awesome.
|
||||
|
||||
def self.parallelize_me!
|
||||
require "minitest/parallel_each"
|
||||
|
||||
class << self
|
||||
undef_method :test_order if method_defined? :test_order
|
||||
define_method :test_order do :parallel end
|
||||
end
|
||||
include Minitest::Parallel::Test
|
||||
extend Minitest::Parallel::Test::ClassMethods
|
||||
end
|
||||
|
||||
##
|
||||
|
@ -63,10 +62,7 @@ module Minitest
|
|||
methods = methods_matching(/^test_/)
|
||||
|
||||
case self.test_order
|
||||
when :parallel
|
||||
max = methods.size
|
||||
ParallelEach.new methods.sort.sort_by { rand max }
|
||||
when :random then
|
||||
when :random, :parallel then
|
||||
max = methods.size
|
||||
methods.sort.sort_by { rand max }
|
||||
when :alpha, :sorted then
|
||||
|
|
|
@ -22,6 +22,8 @@ class MetaMetaMetaTestCase < Minitest::Test
|
|||
|
||||
reporter.start
|
||||
|
||||
yield(reporter) if block_given?
|
||||
|
||||
@tus ||= [@tu]
|
||||
@tus.each do |tu|
|
||||
Minitest::Runnable.runnables.delete tu
|
||||
|
@ -36,7 +38,7 @@ class MetaMetaMetaTestCase < Minitest::Test
|
|||
reporter.reporters.first
|
||||
end
|
||||
|
||||
def assert_report expected, flags = %w[--seed 42]
|
||||
def assert_report expected, flags = %w[--seed 42], &block
|
||||
header = clean <<-EOM
|
||||
Run options: #{flags.map { |s| s =~ /\|/ ? s.inspect : s }.join " "}
|
||||
|
||||
|
@ -44,7 +46,7 @@ class MetaMetaMetaTestCase < Minitest::Test
|
|||
|
||||
EOM
|
||||
|
||||
run_tu_with_fresh_reporter flags
|
||||
run_tu_with_fresh_reporter flags, &block
|
||||
|
||||
output = normalize_output @output.string.dup
|
||||
|
||||
|
|
|
@ -490,15 +490,12 @@ class TestMinitestRunner < MetaMetaMetaTestCase
|
|||
end
|
||||
end
|
||||
|
||||
def test_parallel_each_size
|
||||
assert_equal 0, Minitest::ParallelEach.new([]).size
|
||||
end
|
||||
|
||||
def test_run_parallel
|
||||
skip "I don't have ParallelEach debugged yet" if maglev?
|
||||
|
||||
test_count = 2
|
||||
test_latch = Latch.new test_count
|
||||
wait_latch = Latch.new test_count
|
||||
main_latch = Latch.new
|
||||
|
||||
thread = Thread.new {
|
||||
|
@ -537,7 +534,19 @@ class TestMinitestRunner < MetaMetaMetaTestCase
|
|||
2 runs, 2 assertions, 0 failures, 0 errors, 0 skips
|
||||
EOM
|
||||
|
||||
assert_report expected
|
||||
assert_report(expected) do |reporter|
|
||||
reporter.extend(Module.new {
|
||||
define_method("record") do |result|
|
||||
super(result)
|
||||
wait_latch.release
|
||||
end
|
||||
|
||||
define_method("report") do
|
||||
wait_latch.await
|
||||
super()
|
||||
end
|
||||
})
|
||||
end
|
||||
assert thread.join
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue