diff --git a/CHANGELOG b/CHANGELOG index 7c8eb5a7..b90a2361 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,7 @@ *SVN* +* Connect to multiple servers in parallel, rather than serially. [Jamis Buck] + * Add SCM module for Mercurial (closes #4150) [Matthew Elder] * Remove unused line in SCM::Base (closes #5619) [chris@seagul.co.uk] diff --git a/lib/capistrano/actor.rb b/lib/capistrano/actor.rb index 05d2956d..f220314f 100644 --- a/lib/capistrano/actor.rb +++ b/lib/capistrano/actor.rb @@ -463,9 +463,22 @@ module Capistrano def establish_connections(servers) @factory = establish_gateway if needs_gateway? - servers.each do |server| - @sessions[server] ||= @factory.connect_to(server) + servers = Array(servers) + + # because Net::SSH uses lazy loading for things, we need to make sure + # that at least one connection has been made successfully, to kind of + # "prime the pump", before we go gung-ho and do mass connection in + # parallel. Otherwise, the threads start doing things in wierd orders + # and causing Net::SSH to die of confusion. + + if !@establish_gateway && @sessions.empty? + server, servers = servers.first, servers[1..-1] + @sessions[server] = @factory.connect_to(server) end + + servers.map { |server| + Thread.new { @sessions[server] ||= @factory.connect_to(server) } + }.each { |t| t.join } end def establish_gateway diff --git a/lib/capistrano/gateway.rb b/lib/capistrano/gateway.rb index f3704be7..8760250d 100644 --- a/lib/capistrano/gateway.rb +++ b/lib/capistrano/gateway.rb @@ -31,28 +31,23 @@ module Capistrano def initialize(server, config) #:nodoc: @config = config @pending_forward_requests = {} - @mutex = Mutex.new @next_port = MAX_PORT @terminate_thread = false + @port_guard = Mutex.new + mutex = Mutex.new waiter = ConditionVariable.new @thread = Thread.new do @config.logger.trace "starting connection to gateway #{server}" SSH.connect(server, @config) do |@session| @config.logger.trace "gateway connection established" - @mutex.synchronize { waiter.signal } - connection = @session.registry[:connection][:driver] - loop do - break if @terminate_thread - sleep 0.1 unless connection.reader_ready? - connection.process true - Thread.new { process_next_pending_connection_request } - end + mutex.synchronize { waiter.signal } + @session.loop { !@terminate_thread } end end - @mutex.synchronize { waiter.wait(@mutex) } + mutex.synchronize { waiter.wait(mutex) } end # Shuts down all forwarded connections and terminates the gateway. @@ -73,45 +68,37 @@ module Capistrano # host to the server, via the gateway, and then opens and returns a new # Net::SSH connection via that port. def connect_to(server) - @mutex.synchronize do - @pending_forward_requests[server] = ConditionVariable.new - @pending_forward_requests[server].wait(@mutex) - @pending_forward_requests.delete(server) + connection = nil + + thread = Thread.new do + @config.logger.trace "establishing connection to #{server} via gateway" + port = next_port + + begin + @session.forward.local(port, server, 22) + connection = SSH.connect('127.0.0.1', @config, port) + @config.logger.trace "connection to #{server} via gateway established" + rescue Errno::EADDRINUSE + port = next_port + retry + rescue Exception => e + puts e.class.name + puts e.backtrace.join("\n") + end end + + thread.join + connection or raise "Could not establish connection to #{server}" end private def next_port - port = @next_port - @next_port -= 1 - @next_port = MAX_PORT if @next_port < MIN_PORT - port - end - - def process_next_pending_connection_request - @mutex.synchronize do - key = @pending_forward_requests.keys.detect { |k| ConditionVariable === @pending_forward_requests[k] } or return - var = @pending_forward_requests[key] - - @config.logger.trace "establishing connection to #{key} via gateway" - - port = next_port - - begin - @session.forward.local(port, key, 22) - @pending_forward_requests[key] = SSH.connect('127.0.0.1', @config, - port) - @config.logger.trace "connection to #{key} via gateway established" - rescue Errno::EADDRINUSE - port = next_port - retry - rescue Object - @pending_forward_requests[key] = nil - raise - ensure - var.signal - end + @port_guard.synchronize do + port = @next_port + @next_port -= 1 + @next_port = MAX_PORT if @next_port < MIN_PORT + port end end end