41 lines
1.3 KiB
Ruby
41 lines
1.3 KiB
Ruby
|
# frozen_string_literal: true
|
||
|
|
||
|
module ConcurrentHelpers
|
||
|
Cancelled = Class.new(StandardError)
|
||
|
|
||
|
# To test for contention, we may need to run some actions in parallel. This
|
||
|
# helper takes an array of blocks and schedules them all on different threads
|
||
|
# in a fixed-size thread pool.
|
||
|
#
|
||
|
# @param [Array[Proc]] blocks
|
||
|
# @param [Integer] task_wait_time: time to wait for each task (upper bound on
|
||
|
# reasonable task execution time)
|
||
|
# @param [Integer] max_concurrency: maximum number of tasks to run at once
|
||
|
#
|
||
|
def run_parallel(blocks, task_wait_time: 20.seconds, max_concurrency: Concurrent.processor_count - 1)
|
||
|
thread_pool = Concurrent::FixedThreadPool.new(
|
||
|
[2, max_concurrency].max, { max_queue: blocks.size }
|
||
|
)
|
||
|
opts = { executor: thread_pool }
|
||
|
|
||
|
error = Concurrent::MVar.new
|
||
|
|
||
|
blocks.map { |block| Concurrent::Future.execute(opts, &block) }.each do |future|
|
||
|
future.wait(task_wait_time)
|
||
|
|
||
|
if future.complete?
|
||
|
error.put(future.reason) if future.reason && error.empty?
|
||
|
else
|
||
|
future.cancel
|
||
|
error.put(Cancelled.new) if error.empty?
|
||
|
end
|
||
|
end
|
||
|
|
||
|
raise error.take if error.full?
|
||
|
ensure
|
||
|
thread_pool.shutdown
|
||
|
thread_pool.wait_for_termination(10)
|
||
|
thread_pool.kill if thread_pool.running?
|
||
|
end
|
||
|
end
|