mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
156 lines
4.7 KiB
Ruby
156 lines
4.7 KiB
Ruby
|
require 'thread'
|
||
|
require 'set'
|
||
|
|
||
|
require 'rake/promise'
|
||
|
|
||
|
module Rake
|
||
|
|
||
|
class ThreadPool # :nodoc: all
|
||
|
|
||
|
# Creates a ThreadPool object.
|
||
|
# The parameter is the size of the pool.
|
||
|
def initialize(thread_count)
|
||
|
@max_active_threads = [thread_count, 0].max
|
||
|
@threads = Set.new
|
||
|
@threads_mon = Monitor.new
|
||
|
@queue = Queue.new
|
||
|
@join_cond = @threads_mon.new_cond
|
||
|
|
||
|
@history_start_time = nil
|
||
|
@history = []
|
||
|
@history_mon = Monitor.new
|
||
|
@total_threads_in_play = 0
|
||
|
end
|
||
|
|
||
|
# Creates a future executed by the +ThreadPool+.
|
||
|
#
|
||
|
# The args are passed to the block when executing (similarly to
|
||
|
# <tt>Thread#new</tt>) The return value is an object representing
|
||
|
# a future which has been created and added to the queue in the
|
||
|
# pool. Sending <tt>#value</tt> to the object will sleep the
|
||
|
# current thread until the future is finished and will return the
|
||
|
# result (or raise an exception thrown from the future)
|
||
|
def future(*args, &block)
|
||
|
promise = Promise.new(args, &block)
|
||
|
promise.recorder = lambda { |*stats| stat(*stats) }
|
||
|
|
||
|
@queue.enq promise
|
||
|
stat :queued, :item_id => promise.object_id
|
||
|
start_thread
|
||
|
promise
|
||
|
end
|
||
|
|
||
|
# Waits until the queue of futures is empty and all threads have exited.
|
||
|
def join
|
||
|
@threads_mon.synchronize do
|
||
|
begin
|
||
|
stat :joining
|
||
|
@join_cond.wait unless @threads.empty?
|
||
|
stat :joined
|
||
|
rescue Exception => e
|
||
|
stat :joined
|
||
|
$stderr.puts e
|
||
|
$stderr.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
|
||
|
$stderr.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
|
||
|
$stderr.puts e.backtrace.join("\n")
|
||
|
@threads.each do |t|
|
||
|
$stderr.print "Thread #{t} status = #{t.status}\n"
|
||
|
# 1.8 doesn't support Thread#backtrace
|
||
|
$stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace
|
||
|
end
|
||
|
raise e
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# Enable the gathering of history events.
|
||
|
def gather_history #:nodoc:
|
||
|
@history_start_time = Time.now if @history_start_time.nil?
|
||
|
end
|
||
|
|
||
|
# Return a array of history events for the thread pool.
|
||
|
#
|
||
|
# History gathering must be enabled to be able to see the events
|
||
|
# (see #gather_history). Best to call this when the job is
|
||
|
# complete (i.e. after ThreadPool#join is called).
|
||
|
def history # :nodoc:
|
||
|
@history_mon.synchronize { @history.dup }.
|
||
|
sort_by { |i| i[:time] }.
|
||
|
each { |i| i[:time] -= @history_start_time }
|
||
|
end
|
||
|
|
||
|
# Return a hash of always collected statistics for the thread pool.
|
||
|
def statistics # :nodoc:
|
||
|
{
|
||
|
:total_threads_in_play => @total_threads_in_play,
|
||
|
:max_active_threads => @max_active_threads,
|
||
|
}
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
# processes one item on the queue. Returns true if there was an
|
||
|
# item to process, false if there was no item
|
||
|
def process_queue_item #:nodoc:
|
||
|
return false if @queue.empty?
|
||
|
|
||
|
# Even though we just asked if the queue was empty, it
|
||
|
# still could have had an item which by this statement
|
||
|
# is now gone. For this reason we pass true to Queue#deq
|
||
|
# because we will sleep indefinitely if it is empty.
|
||
|
promise = @queue.deq(true)
|
||
|
stat :dequeued, :item_id => promise.object_id
|
||
|
promise.work
|
||
|
return true
|
||
|
|
||
|
rescue ThreadError # this means the queue is empty
|
||
|
false
|
||
|
end
|
||
|
|
||
|
def start_thread # :nodoc:
|
||
|
@threads_mon.synchronize do
|
||
|
next unless @threads.count < @max_active_threads
|
||
|
|
||
|
t = Thread.new do
|
||
|
begin
|
||
|
while @threads.count <= @max_active_threads
|
||
|
break unless process_queue_item
|
||
|
end
|
||
|
ensure
|
||
|
@threads_mon.synchronize do
|
||
|
@threads.delete Thread.current
|
||
|
stat :ended, :thread_count => @threads.count
|
||
|
@join_cond.broadcast if @threads.empty?
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
@threads << t
|
||
|
stat :spawned, :new_thread => t.object_id, :thread_count => @threads.count
|
||
|
@total_threads_in_play = @threads.count if @threads.count > @total_threads_in_play
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def stat(event, data=nil) # :nodoc:
|
||
|
return if @history_start_time.nil?
|
||
|
info = {
|
||
|
:event => event,
|
||
|
:data => data,
|
||
|
:time => Time.now,
|
||
|
:thread => Thread.current.object_id,
|
||
|
}
|
||
|
@history_mon.synchronize { @history << info }
|
||
|
end
|
||
|
|
||
|
# for testing only
|
||
|
|
||
|
def __queue__ # :nodoc:
|
||
|
@queue
|
||
|
end
|
||
|
|
||
|
def __threads__ # :nodoc:
|
||
|
@threads.dup
|
||
|
end
|
||
|
end
|
||
|
|
||
|
end
|