1
0
Fork 0
mirror of https://github.com/capistrano/capistrano synced 2023-03-27 23:21:18 -04:00

make the gateway proxy set the xserver. make commands do a global select, like net-ssh-multi does

This commit is contained in:
Jamis Buck 2008-04-12 21:21:36 -06:00
parent 248cc3bcac
commit c0606ca85a
3 changed files with 25 additions and 181 deletions

View file

@ -32,20 +32,8 @@ module Capistrano
# fails (non-zero return code) on any of the hosts, this will raise a
# Capistrano::CommandError.
def process!
since = Time.now
loop do
active = 0
@channels.each do |ch|
active += 1 unless ch[:closed]
ch.connection.process(0)
end
break if active == 0
if Time.now - since >= 1
since = Time.now
@channels.each { |ch| ch.connection.send_global_request("keep-alive@openssh.com") }
end
sleep 0.01 # a brief respite, to keep the CPU from going crazy
break unless process_iteration
end
logger.trace "command finished" if logger
@ -70,6 +58,25 @@ module Capistrano
private
def process_iteration
sessions.each { |session| session.preprocess }
return false if @channels.all? { |ch| ch[:closed] }
readers = sessions.map { |session| session.listeners.keys }.flatten
writers = readers.select { |io| io.respond_to?(:pending_write?) && io.pending_write? }
readers, writers, = IO.select(readers, writers)
if readers
sessions.each do |session|
ios = session.listeners.keys
session.postprocess(ios & readers, ios & writers)
end
end
true
end
def logger
options[:logger]
end

View file

@ -29,8 +29,12 @@ module Capistrano
end
def connect_to(server)
@options[:logger].debug "establishing connection to `#{server}' via gateway" if @options[:logger]
local_host = ServerDefinition.new("127.0.0.1", :user => server.user, :port => @gateway.open(server.host, server.port || 22))
SSH.connect(local_host, @options)
session = SSH.connect(local_host, @options)
session.xserver = server
@options[:logger].trace "connected: `#{server}' (via gateway)" if @options[:logger]
session
end
end

View file

@ -1,167 +0,0 @@
require "#{File.dirname(__FILE__)}/utils"
require 'capistrano/gateway'
class GatewayTest < Test::Unit::TestCase
def teardown
Thread.list { |t| t.kill unless Thread.current == t }
end
def test_initialize_should_open_and_set_session_value
run_test_initialize_should_open_and_set_session_value
end
def test_initialize_when_connect_lags_should_open_and_set_session_value
run_test_initialize_should_open_and_set_session_value do |expects|
expects.with { |*args| sleep 0.2; true }
end
end
def test_shutdown_without_any_open_connections_should_terminate_session
gateway = new_gateway
gateway.shutdown!
assert !gateway.thread.alive?
assert !gateway.session.looping?
end
def test_connect_to_should_start_local_ports_at_65535
gateway = new_gateway
expect_connect_to(:host => "127.0.0.1", :port => 65535).returns(result = sess_with_xserver("app1"))
newsess = gateway.connect_to(server("app1"))
assert_equal result, newsess
assert_equal [65535, "app1", 22], gateway.session.forward.active_locals[65535]
end
def test_connect_to_should_decrement_port_and_retry_if_ports_are_in_use
gateway = new_gateway(:reserved => lambda { |n| n > 65000 })
expect_connect_to(:host => "127.0.0.1", :port => 65000).returns(result = sess_with_xserver("app1"))
newsess = gateway.connect_to(server("app1"))
assert_equal result, newsess
assert_equal [65000, "app1", 22], gateway.session.forward.active_locals[65000]
end
def test_connect_to_should_honor_user_specification_in_server_definition
gateway = new_gateway
expect_connect_to(:host => "127.0.0.1", :user => "jamis", :port => 65535).returns(result = sess_with_xserver("app1"))
newsess = gateway.connect_to(server("jamis@app1"))
assert_equal result, newsess
assert_equal [65535, "app1", 22], gateway.session.forward.active_locals[65535]
end
def test_connect_to_should_honor_port_specification_in_server_definition
gateway = new_gateway
expect_connect_to(:host => "127.0.0.1", :port => 65535).returns(result = sess_with_xserver("app1"))
newsess = gateway.connect_to(server("app1:1234"))
assert_equal result, newsess
assert_equal [65535, "app1", 1234], gateway.session.forward.active_locals[65535]
end
def test_connect_to_should_set_xserver_to_tunnel_target
gateway = new_gateway
expect_connect_to(:host => "127.0.0.1", :port => 65535).returns(result = sess_with_xserver("app1"))
newsess = gateway.connect_to(server("app1:1234"))
assert_equal result, newsess
end
def test_shutdown_should_cancel_active_forwarded_ports
gateway = new_gateway
expect_connect_to(:host => "127.0.0.1", :port => 65535).returns(sess_with_xserver("app1"))
gateway.connect_to(server("app1"))
assert !gateway.session.forward.active_locals.empty?
gateway.shutdown!
assert gateway.session.forward.active_locals.empty?
end
def test_error_while_connecting_should_cause_connection_to_fail
gateway = new_gateway
expect_connect_to(:host => "127.0.0.1").raises(RuntimeError)
gateway.expects(:warn).times(2)
assert_raises(Capistrano::ConnectionError) { gateway.connect_to(server("app1")) }
end
def test_connection_error_should_include_accessor_with_host_array
gateway = new_gateway
expect_connect_to(:host => "127.0.0.1").raises(RuntimeError)
gateway.expects(:warn).times(2)
begin
gateway.connect_to(server("app1"))
flunk "expected an exception to be raised"
rescue Capistrano::ConnectionError => e
assert e.respond_to?(:hosts)
assert_equal %w(app1), e.hosts.map { |h| h.to_s }
end
end
private
def sess_with_xserver(host)
s = server(host)
sess = mock("session")
sess.expects(:xserver=).with { |v| v.host == host }
sess
end
def expect_connect_to(options={})
Capistrano::SSH.expects(:connect).with do |server,config|
options.all? do |key, value|
case key
when :host then server.host == value
when :user then server.user == value
when :port then server.port == value
else false
end
end
end
end
def new_gateway(options={})
expect_connect_to(:host => "capistrano").yields(MockSession.new(options))
Capistrano::Gateway.new(server("capistrano"))
end
def run_test_initialize_should_open_and_set_session_value
session = mock("Net::SSH session")
session.expects(:loop)
expectation = Capistrano::SSH.expects(:connect).yields(session)
yield expectation if block_given?
gateway = Capistrano::Gateway.new(server("capistrano"))
gateway.thread.join
assert_equal session, gateway.session
end
class MockForward
attr_reader :active_locals
def initialize(options)
@options = options
@active_locals = {}
end
def cancel_local(port)
@active_locals.delete(port)
end
def local(lport, host, rport)
raise Errno::EADDRINUSE if @options[:reserved] && @options[:reserved][lport]
@active_locals[lport] = [lport, host, rport]
end
end
class MockSession
attr_reader :forward
def initialize(options={})
@forward = MockForward.new(options)
end
def looping?
@looping
end
def loop
@looping = true
sleep 0.1 while yield
@looping = false
end
end
end