diff --git a/libnetwork/service_linux.go b/libnetwork/service_linux.go index 9f7c4930c0..ac3f775fdf 100644 --- a/libnetwork/service_linux.go +++ b/libnetwork/service_linux.go @@ -10,6 +10,7 @@ import ( "runtime" "strconv" "strings" + "sync" "syscall" "github.com/Sirupsen/logrus" @@ -367,20 +368,106 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po } } +const ingressChain = "DOCKER-INGRESS" + +var ( + ingressOnce sync.Once + ingressProxyMu sync.Mutex + ingressProxyTbl = make(map[string]io.Closer) +) + func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) error { - addDelOpt := "-A" + addDelOpt := "-I" if isDelete { addDelOpt = "-D" } - for _, iPort := range ingressPorts { - rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j DNAT --to-destination %s:%d", - addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, gwIP, iPort.NodePort)) - if err := iptables.RawCombinedOutput(rule...); err != nil { - return fmt.Errorf("setting up rule failed, %v: %v", rule, err) + chainExists := iptables.ExistChain(ingressChain, iptables.Nat) + + ingressOnce.Do(func() { + if chainExists { + // Flush ingress chain rules during init if it + // exists. It might contain stale rules from + // previous life. + if err := iptables.RawCombinedOutput("-t", "nat", "-F", ingressChain); err != nil { + logrus.Errorf("Could not flush ingress chain rules during init: %v", err) + } + } + }) + + if !isDelete { + if !chainExists { + if err := iptables.RawCombinedOutput("-t", "nat", "-N", ingressChain); err != nil { + return fmt.Errorf("failed to create ingress chain: %v", err) + } + } + + if !iptables.Exists(iptables.Nat, ingressChain, "-j", "RETURN") { + if err := iptables.RawCombinedOutput("-t", "nat", "-A", ingressChain, "-j", "RETURN"); err != nil { + return fmt.Errorf("failed to add return rule in ingress chain: %v", err) + } + } + + for _, chain := range []string{"OUTPUT", "PREROUTING"} { + if !iptables.Exists(iptables.Nat, chain, "-j", ingressChain) { + if err := iptables.RawCombinedOutput("-t", "nat", "-I", chain, "-j", ingressChain); err != nil { + return fmt.Errorf("failed to add jump rule in %s to ingress chain: %v", chain, err) + } + } } } + for _, iPort := range ingressPorts { + if iptables.ExistChain(ingressChain, iptables.Nat) { + rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d", + addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, gwIP, iPort.NodePort)) + if err := iptables.RawCombinedOutput(rule...); err != nil { + return fmt.Errorf("setting up rule failed, %v: %v", rule, err) + } + } + + if err := plumbProxy(iPort, isDelete); err != nil { + return fmt.Errorf("failed to create proxy for port %d: %v", iPort.NodePort, err) + } + } + + return nil +} + +func plumbProxy(iPort *PortConfig, isDelete bool) error { + var ( + err error + l io.Closer + ) + + portSpec := fmt.Sprintf("%d/%s", iPort.NodePort, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)])) + if isDelete { + ingressProxyMu.Lock() + if listener, ok := ingressProxyTbl[portSpec]; ok { + if listener != nil { + listener.Close() + } + } + ingressProxyMu.Unlock() + + return nil + } + + switch iPort.Protocol { + case ProtocolTCP: + l, err = net.ListenTCP("tcp", &net.TCPAddr{Port: int(iPort.NodePort)}) + case ProtocolUDP: + l, err = net.ListenUDP("udp", &net.UDPAddr{Port: int(iPort.NodePort)}) + } + + if err != nil { + return err + } + + ingressProxyMu.Lock() + ingressProxyTbl[portSpec] = l + ingressProxyMu.Unlock() + return nil }