mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
26821d9b57
Provides both a forked process and threaded parallelization options. To use add `parallelize` to your test suite. Takes a `workers` argument that controls how many times the process is forked. For each process a new database will be created suffixed with the worker number; test-database-0 and test-database-1 respectively. If `ENV["PARALLEL_WORKERS"]` is set the workers argument will be ignored and the environment variable will be used instead. This is useful for CI environments, or other environments where you may need more workers than you do for local testing. If the number of workers is set to `1` or fewer, the tests will not be parallelized. The default parallelization method is to fork processes. If you'd like to use threads instead you can pass `with: :threads` to the `parallelize` method. Note the threaded parallelization does not create multiple database and will not work with system tests at this time. parallelize(workers: 2, with: :threads) The threaded parallelization uses Minitest's parallel exector directly. The processes paralleliztion uses a Ruby Drb server. For parallelization via threads a setup hook and cleanup hook are provided. ``` class ActiveSupport::TestCase parallelize_setup do |worker| # setup databases end parallelize_teardown do |worker| # cleanup database end parallelize(workers: 2) end ``` [Eileen M. Uchitelle, Aaron Patterson]
102 lines
2 KiB
Ruby
102 lines
2 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "drb"
|
|
require "drb/unix"
|
|
|
|
module ActiveSupport
|
|
module Testing
|
|
class Parallelization # :nodoc:
|
|
class Server
|
|
include DRb::DRbUndumped
|
|
|
|
def initialize
|
|
@queue = Queue.new
|
|
end
|
|
|
|
def record(reporter, result)
|
|
reporter.synchronize do
|
|
reporter.record(result)
|
|
end
|
|
end
|
|
|
|
def <<(o)
|
|
@queue << o
|
|
end
|
|
|
|
def pop; @queue.pop; end
|
|
end
|
|
|
|
@after_fork_hooks = []
|
|
|
|
def self.after_fork_hook(&blk)
|
|
@after_fork_hooks << blk
|
|
end
|
|
|
|
def self.after_fork_hooks
|
|
@after_fork_hooks
|
|
end
|
|
|
|
@run_cleanup_hooks = []
|
|
|
|
def self.run_cleanup_hook(&blk)
|
|
@run_cleanup_hooks << blk
|
|
end
|
|
|
|
def self.run_cleanup_hooks
|
|
@run_cleanup_hooks
|
|
end
|
|
|
|
def initialize(queue_size)
|
|
@queue_size = queue_size
|
|
@queue = Server.new
|
|
@pool = []
|
|
|
|
@url = DRb.start_service("drbunix:", @queue).uri
|
|
end
|
|
|
|
def after_fork(worker)
|
|
self.class.after_fork_hooks.each do |cb|
|
|
cb.call(worker)
|
|
end
|
|
end
|
|
|
|
def run_cleanup(worker)
|
|
self.class.run_cleanup_hooks.each do |cb|
|
|
cb.call(worker)
|
|
end
|
|
end
|
|
|
|
def start
|
|
@pool = @queue_size.times.map do |worker|
|
|
fork do
|
|
DRb.stop_service
|
|
|
|
after_fork(worker)
|
|
|
|
queue = DRbObject.new_with_uri(@url)
|
|
|
|
while job = queue.pop
|
|
klass = job[0]
|
|
method = job[1]
|
|
reporter = job[2]
|
|
result = Minitest.run_one_method(klass, method)
|
|
|
|
queue.record(reporter, result)
|
|
end
|
|
|
|
run_cleanup(worker)
|
|
end
|
|
end
|
|
end
|
|
|
|
def <<(work)
|
|
@queue << work
|
|
end
|
|
|
|
def shutdown
|
|
@queue_size.times { @queue << nil }
|
|
@pool.each { |pid| Process.waitpid pid }
|
|
end
|
|
end
|
|
end
|
|
end
|