package docker import ( "encoding/binary" "fmt" "github.com/dotcloud/docker/utils" "io" "log" "net" "sync" "syscall" "time" ) const ( UDPConnTrackTimeout = 90 * time.Second UDPBufSize = 2048 ) type Proxy interface { // Start forwarding traffic back and forth the front and back-end // addresses. Run() // Stop forwarding traffic and close both ends of the Proxy. Close() // Return the address on which the proxy is listening. FrontendAddr() net.Addr // Return the proxied address. BackendAddr() net.Addr } type TCPProxy struct { listener *net.TCPListener frontendAddr *net.TCPAddr backendAddr *net.TCPAddr } func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr) (*TCPProxy, error) { listener, err := net.ListenTCP("tcp", frontendAddr) if err != nil { return nil, err } // If the port in frontendAddr was 0 then ListenTCP will have a picked // a port to listen on, hence the call to Addr to get that actual port: return &TCPProxy{ listener: listener, frontendAddr: listener.Addr().(*net.TCPAddr), backendAddr: backendAddr, }, nil } func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) { backend, err := net.DialTCP("tcp", nil, proxy.backendAddr) if err != nil { log.Printf("Can't forward traffic to backend tcp/%v: %v\n", proxy.backendAddr, err.Error()) client.Close() return } event := make(chan int64) var broker = func(to, from *net.TCPConn) { written, err := io.Copy(to, from) if err != nil { err, ok := err.(*net.OpError) // If the socket we are writing to is shutdown with // SHUT_WR, forward it to the other end of the pipe: if ok && err.Err == syscall.EPIPE { from.CloseWrite() } } to.CloseRead() event <- written } utils.Debugf("Forwarding traffic between tcp/%v and tcp/%v", client.RemoteAddr(), backend.RemoteAddr()) go broker(client, backend) go broker(backend, client) var transferred int64 = 0 for i := 0; i < 2; i++ { select { case written := <-event: transferred += written case <-quit: // Interrupt the two brokers and "join" them. client.Close() backend.Close() for ; i < 2; i++ { transferred += <-event } goto done } } client.Close() backend.Close() done: utils.Debugf("%v bytes transferred between tcp/%v and tcp/%v", transferred, client.RemoteAddr(), backend.RemoteAddr()) } func (proxy *TCPProxy) Run() { quit := make(chan bool) defer close(quit) utils.Debugf("Starting proxy on tcp/%v for tcp/%v", proxy.frontendAddr, proxy.backendAddr) for { client, err := proxy.listener.Accept() if err != nil { utils.Debugf("Stopping proxy on tcp/%v for tcp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error()) return } go proxy.clientLoop(client.(*net.TCPConn), quit) } } func (proxy *TCPProxy) Close() { proxy.listener.Close() } func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr } func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr } // A net.Addr where the IP is split into two fields so you can use it as a key // in a map: type connTrackKey struct { IPHigh uint64 IPLow uint64 Port int } func newConnTrackKey(addr *net.UDPAddr) *connTrackKey { if len(addr.IP) == net.IPv4len { return &connTrackKey{ IPHigh: 0, IPLow: uint64(binary.BigEndian.Uint32(addr.IP)), Port: addr.Port, } } return &connTrackKey{ IPHigh: binary.BigEndian.Uint64(addr.IP[:8]), IPLow: binary.BigEndian.Uint64(addr.IP[8:]), Port: addr.Port, } } type connTrackMap map[connTrackKey]*net.UDPConn type UDPProxy struct { listener *net.UDPConn frontendAddr *net.UDPAddr backendAddr *net.UDPAddr connTrackTable connTrackMap connTrackLock sync.Mutex } func NewUDPProxy(frontendAddr, backendAddr *net.UDPAddr) (*UDPProxy, error) { listener, err := net.ListenUDP("udp", frontendAddr) if err != nil { return nil, err } return &UDPProxy{ listener: listener, frontendAddr: listener.LocalAddr().(*net.UDPAddr), backendAddr: backendAddr, connTrackTable: make(connTrackMap), }, nil } func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) { defer func() { proxy.connTrackLock.Lock() delete(proxy.connTrackTable, *clientKey) proxy.connTrackLock.Unlock() utils.Debugf("Done proxying between udp/%v and udp/%v", clientAddr.String(), proxy.backendAddr.String()) proxyConn.Close() }() readBuf := make([]byte, UDPBufSize) for { proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout)) again: read, err := proxyConn.Read(readBuf) if err != nil { if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED { // This will happen if the last write failed // (e.g: nothing is actually listening on the // proxied port on the container), ignore it // and continue until UDPConnTrackTimeout // expires: goto again } return } for i := 0; i != read; { written, err := proxy.listener.WriteToUDP(readBuf[i:read], clientAddr) if err != nil { return } i += written utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, clientAddr.String()) } } } func (proxy *UDPProxy) Run() { readBuf := make([]byte, UDPBufSize) utils.Debugf("Starting proxy on udp/%v for udp/%v", proxy.frontendAddr, proxy.backendAddr) for { read, from, err := proxy.listener.ReadFromUDP(readBuf) if err != nil { // NOTE: Apparently ReadFrom doesn't return // ECONNREFUSED like Read do (see comment in // UDPProxy.replyLoop) utils.Debugf("Stopping proxy on udp/%v for udp/%v (%v)", proxy.frontendAddr, proxy.backendAddr, err.Error()) break } fromKey := newConnTrackKey(from) proxy.connTrackLock.Lock() proxyConn, hit := proxy.connTrackTable[*fromKey] if !hit { proxyConn, err = net.DialUDP("udp", nil, proxy.backendAddr) if err != nil { log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err) continue } proxy.connTrackTable[*fromKey] = proxyConn go proxy.replyLoop(proxyConn, from, fromKey) } proxy.connTrackLock.Unlock() for i := 0; i != read; { written, err := proxyConn.Write(readBuf[i:read]) if err != nil { log.Printf("Can't proxy a datagram to udp/%s: %v\n", proxy.backendAddr.String(), err) break } i += written utils.Debugf("Forwarded %v/%v bytes to udp/%v", i, read, proxy.backendAddr.String()) } } } func (proxy *UDPProxy) Close() { proxy.listener.Close() proxy.connTrackLock.Lock() defer proxy.connTrackLock.Unlock() for _, conn := range proxy.connTrackTable { conn.Close() } } func (proxy *UDPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr } func (proxy *UDPProxy) BackendAddr() net.Addr { return proxy.backendAddr } func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) { switch frontendAddr.(type) { case *net.UDPAddr: return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr)) case *net.TCPAddr: return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr)) default: panic(fmt.Errorf("Unsupported protocol")) } }