mirror of
https://github.com/capistrano/capistrano
synced 2023-03-27 23:21:18 -04:00
Connect to multiple servers in parallel, rather than serially.
git-svn-id: http://svn.rubyonrails.org/rails/tools/capistrano@5027 5ecf4fe2-1ee6-0310-87b1-e25e094e27de
This commit is contained in:
parent
d4a464f02d
commit
39d1c960e9
3 changed files with 47 additions and 45 deletions
|
@ -1,5 +1,7 @@
|
|||
*SVN*
|
||||
|
||||
* Connect to multiple servers in parallel, rather than serially. [Jamis Buck]
|
||||
|
||||
* Add SCM module for Mercurial (closes #4150) [Matthew Elder]
|
||||
|
||||
* Remove unused line in SCM::Base (closes #5619) [chris@seagul.co.uk]
|
||||
|
|
|
@ -463,9 +463,22 @@ module Capistrano
|
|||
|
||||
def establish_connections(servers)
|
||||
@factory = establish_gateway if needs_gateway?
|
||||
servers.each do |server|
|
||||
@sessions[server] ||= @factory.connect_to(server)
|
||||
servers = Array(servers)
|
||||
|
||||
# because Net::SSH uses lazy loading for things, we need to make sure
|
||||
# that at least one connection has been made successfully, to kind of
|
||||
# "prime the pump", before we go gung-ho and do mass connection in
|
||||
# parallel. Otherwise, the threads start doing things in wierd orders
|
||||
# and causing Net::SSH to die of confusion.
|
||||
|
||||
if !@establish_gateway && @sessions.empty?
|
||||
server, servers = servers.first, servers[1..-1]
|
||||
@sessions[server] = @factory.connect_to(server)
|
||||
end
|
||||
|
||||
servers.map { |server|
|
||||
Thread.new { @sessions[server] ||= @factory.connect_to(server) }
|
||||
}.each { |t| t.join }
|
||||
end
|
||||
|
||||
def establish_gateway
|
||||
|
|
|
@ -31,28 +31,23 @@ module Capistrano
|
|||
def initialize(server, config) #:nodoc:
|
||||
@config = config
|
||||
@pending_forward_requests = {}
|
||||
@mutex = Mutex.new
|
||||
@next_port = MAX_PORT
|
||||
@terminate_thread = false
|
||||
@port_guard = Mutex.new
|
||||
|
||||
mutex = Mutex.new
|
||||
waiter = ConditionVariable.new
|
||||
|
||||
@thread = Thread.new do
|
||||
@config.logger.trace "starting connection to gateway #{server}"
|
||||
SSH.connect(server, @config) do |@session|
|
||||
@config.logger.trace "gateway connection established"
|
||||
@mutex.synchronize { waiter.signal }
|
||||
connection = @session.registry[:connection][:driver]
|
||||
loop do
|
||||
break if @terminate_thread
|
||||
sleep 0.1 unless connection.reader_ready?
|
||||
connection.process true
|
||||
Thread.new { process_next_pending_connection_request }
|
||||
end
|
||||
mutex.synchronize { waiter.signal }
|
||||
@session.loop { !@terminate_thread }
|
||||
end
|
||||
end
|
||||
|
||||
@mutex.synchronize { waiter.wait(@mutex) }
|
||||
mutex.synchronize { waiter.wait(mutex) }
|
||||
end
|
||||
|
||||
# Shuts down all forwarded connections and terminates the gateway.
|
||||
|
@ -73,45 +68,37 @@ module Capistrano
|
|||
# host to the server, via the gateway, and then opens and returns a new
|
||||
# Net::SSH connection via that port.
|
||||
def connect_to(server)
|
||||
@mutex.synchronize do
|
||||
@pending_forward_requests[server] = ConditionVariable.new
|
||||
@pending_forward_requests[server].wait(@mutex)
|
||||
@pending_forward_requests.delete(server)
|
||||
connection = nil
|
||||
|
||||
thread = Thread.new do
|
||||
@config.logger.trace "establishing connection to #{server} via gateway"
|
||||
port = next_port
|
||||
|
||||
begin
|
||||
@session.forward.local(port, server, 22)
|
||||
connection = SSH.connect('127.0.0.1', @config, port)
|
||||
@config.logger.trace "connection to #{server} via gateway established"
|
||||
rescue Errno::EADDRINUSE
|
||||
port = next_port
|
||||
retry
|
||||
rescue Exception => e
|
||||
puts e.class.name
|
||||
puts e.backtrace.join("\n")
|
||||
end
|
||||
end
|
||||
|
||||
thread.join
|
||||
connection or raise "Could not establish connection to #{server}"
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def next_port
|
||||
port = @next_port
|
||||
@next_port -= 1
|
||||
@next_port = MAX_PORT if @next_port < MIN_PORT
|
||||
port
|
||||
end
|
||||
|
||||
def process_next_pending_connection_request
|
||||
@mutex.synchronize do
|
||||
key = @pending_forward_requests.keys.detect { |k| ConditionVariable === @pending_forward_requests[k] } or return
|
||||
var = @pending_forward_requests[key]
|
||||
|
||||
@config.logger.trace "establishing connection to #{key} via gateway"
|
||||
|
||||
port = next_port
|
||||
|
||||
begin
|
||||
@session.forward.local(port, key, 22)
|
||||
@pending_forward_requests[key] = SSH.connect('127.0.0.1', @config,
|
||||
port)
|
||||
@config.logger.trace "connection to #{key} via gateway established"
|
||||
rescue Errno::EADDRINUSE
|
||||
port = next_port
|
||||
retry
|
||||
rescue Object
|
||||
@pending_forward_requests[key] = nil
|
||||
raise
|
||||
ensure
|
||||
var.signal
|
||||
end
|
||||
@port_guard.synchronize do
|
||||
port = @next_port
|
||||
@next_port -= 1
|
||||
@next_port = MAX_PORT if @next_port < MIN_PORT
|
||||
port
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue