From 746113d5840d064843a3d0b4c2cd68459b1d9415 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Thu, 22 Sep 2016 18:42:55 -0700 Subject: [PATCH 1/2] Vendoring libnetwork @6caf9022fa09 Signed-off-by: Jana Radhakrishnan --- hack/vendor.sh | 2 +- .../src/github.com/docker/libnetwork/agent.go | 9 +- .../docker/libnetwork/cluster/provider.go | 1 + .../libnetwork/drivers/overlay/filter.go | 15 ++ .../docker/libnetwork/networkdb/cluster.go | 1 + .../docker/libnetwork/networkdb/networkdb.go | 4 + .../github.com/docker/libnetwork/sandbox.go | 12 +- .../docker/libnetwork/sandbox_dns_unix.go | 8 +- .../docker/libnetwork/service_linux.go | 184 ++++++++++++++---- 9 files changed, 182 insertions(+), 54 deletions(-) diff --git a/hack/vendor.sh b/hack/vendor.sh index 1b0435c774..c045757268 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -70,7 +70,7 @@ clone git github.com/RackSec/srslog 365bf33cd9acc21ae1c355209865f17228ca534e clone git github.com/imdario/mergo 0.2.1 #get libnetwork packages -clone git github.com/docker/libnetwork e69621c5fb6882627f83187ebefe7709a7211277 +clone git github.com/docker/libnetwork 6caf9022fa093e0247f9f4b572edca868c27ece3 clone git github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/vendor/src/github.com/docker/libnetwork/agent.go b/vendor/src/github.com/docker/libnetwork/agent.go index 837653e499..fb0c342257 100644 --- a/vendor/src/github.com/docker/libnetwork/agent.go +++ b/vendor/src/github.com/docker/libnetwork/agent.go @@ -172,10 +172,12 @@ func (c *controller) agentSetup() error { advAddr := clusterProvider.GetAdvertiseAddress() remote := clusterProvider.GetRemoteAddress() remoteAddr, _, _ := net.SplitHostPort(remote) + listen := clusterProvider.GetListenAddress() + listenAddr, _, _ := net.SplitHostPort(listen) - logrus.Infof("Initializing Libnetwork Agent Local-addr=%s Adv-addr=%s Remote-addr =%s", bindAddr, advAddr, remoteAddr) + logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr) if advAddr != "" && c.agent == nil { - if err := c.agentInit(bindAddr, advAddr); err != nil { + if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil { logrus.Errorf("Error in agentInit : %v", err) } else { c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { @@ -236,7 +238,7 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) { return keys[1].Key, keys[1].LamportTime, nil } -func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error { +func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr string) error { if !c.isAgent() { return nil } @@ -252,6 +254,7 @@ func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error logrus.Info("Gossip cluster hostname ", nodeName) nDB, err := networkdb.New(&networkdb.Config{ + BindAddr: listenAddr, AdvertiseAddr: advertiseAddr, NodeName: nodeName, Keys: keys, diff --git a/vendor/src/github.com/docker/libnetwork/cluster/provider.go b/vendor/src/github.com/docker/libnetwork/cluster/provider.go index 7bbf5d3557..572bac85a6 100644 --- a/vendor/src/github.com/docker/libnetwork/cluster/provider.go +++ b/vendor/src/github.com/docker/libnetwork/cluster/provider.go @@ -10,6 +10,7 @@ type Provider interface { IsManager() bool IsAgent() bool GetLocalAddress() string + GetListenAddress() string GetAdvertiseAddress() string GetRemoteAddress() string ListenClusterEvents() <-chan struct{} diff --git a/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go b/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go index 2bf76b33a5..40cd7d9f28 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go @@ -12,6 +12,13 @@ const globalChain = "DOCKER-OVERLAY" var filterOnce sync.Once +var filterChan = make(chan struct{}, 1) + +func filterWait() func() { + filterChan <- struct{}{} + return func() { <-filterChan } +} + func chainExists(cname string) bool { if _, err := iptables.Raw("-L", cname); err != nil { return false @@ -69,10 +76,14 @@ func setNetworkChain(cname string, remove bool) error { } func addNetworkChain(cname string) error { + defer filterWait()() + return setNetworkChain(cname, false) } func removeNetworkChain(cname string) error { + defer filterWait()() + return setNetworkChain(cname, true) } @@ -119,9 +130,13 @@ func setFilters(cname, brName string, remove bool) error { } func addFilters(cname, brName string) error { + defer filterWait()() + return setFilters(cname, brName, false) } func removeFilters(cname, brName string) error { + defer filterWait()() + return setFilters(cname, brName, true) } diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go b/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go index 0a21cae5ae..ed24e4c7a6 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go @@ -86,6 +86,7 @@ func (nDB *NetworkDB) RemoveKey(key []byte) { func (nDB *NetworkDB) clusterInit() error { config := memberlist.DefaultLANConfig() config.Name = nDB.config.NodeName + config.BindAddr = nDB.config.BindAddr config.AdvertiseAddr = nDB.config.AdvertiseAddr if nDB.config.BindPort != 0 { diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go index c452a90835..1502d7300e 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go @@ -121,6 +121,10 @@ type Config struct { // NodeName is the cluster wide unique name for this node. NodeName string + // BindAddr is the IP on which networkdb listens. It can be + // 0.0.0.0 to listen on all addresses on the host. + BindAddr string + // AdvertiseAddr is the node's IP address that we advertise for // cluster communication. AdvertiseAddr string diff --git a/vendor/src/github.com/docker/libnetwork/sandbox.go b/vendor/src/github.com/docker/libnetwork/sandbox.go index 3bb1bb3644..f5d86ef20e 100644 --- a/vendor/src/github.com/docker/libnetwork/sandbox.go +++ b/vendor/src/github.com/docker/libnetwork/sandbox.go @@ -421,8 +421,7 @@ func (sb *sandbox) ResolveIP(ip string) string { } func (sb *sandbox) ExecFunc(f func()) error { - sb.osSbox.InvokeFunc(f) - return nil + return sb.osSbox.InvokeFunc(f) } func (sb *sandbox) ResolveService(name string) ([]*net.SRV, []net.IP) { @@ -639,9 +638,12 @@ func (sb *sandbox) SetKey(basePath string) error { if oldosSbox != nil && sb.resolver != nil { sb.resolver.Stop() - sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0)) - if err := sb.resolver.Start(); err != nil { - log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err) + if err := sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0)); err == nil { + if err := sb.resolver.Start(); err != nil { + log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err) + } + } else { + log.Errorf("Resolver Setup Function failed for container %s, %q", sb.ContainerID(), err) } } diff --git a/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go b/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go index 45b035e3e5..3f734511d8 100644 --- a/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go +++ b/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go @@ -46,9 +46,13 @@ func (sb *sandbox) startResolver(restore bool) { } sb.resolver.SetExtServers(sb.extDNS) - sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0)) + if err = sb.osSbox.InvokeFunc(sb.resolver.SetupFunc(0)); err != nil { + log.Errorf("Resolver Setup function failed for container %s, %q", sb.ContainerID(), err) + return + } + if err = sb.resolver.Start(); err != nil { - log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err) + log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err) } }) } diff --git a/vendor/src/github.com/docker/libnetwork/service_linux.go b/vendor/src/github.com/docker/libnetwork/service_linux.go index 3fafa9af44..5e3802a823 100644 --- a/vendor/src/github.com/docker/libnetwork/service_linux.go +++ b/vendor/src/github.com/docker/libnetwork/service_linux.go @@ -26,6 +26,7 @@ import ( func init() { reexec.Register("fwmarker", fwMarker) + reexec.Register("redirecter", redirecter) } func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { @@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { n := ep.getNetwork() eIP := ep.Iface().Address() + if n.ingress { + if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil { + logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err) + } + } + if sb.ingress { // For the ingress sandbox if this is not gateway // endpoint do nothing. @@ -380,17 +387,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P } if addService { - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, false) - if err := programIngress(gwIP, iPorts, false); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, false) + if err := programIngress(gwIP, filteredPorts, false); err != nil { logrus.Errorf("Failed to add ingress: %v", err) return } } - logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts) - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil { + logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts) + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) return } @@ -453,15 +460,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err) } - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, true) - if err := programIngress(gwIP, iPorts, true); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, true) + if err := programIngress(gwIP, filteredPorts, true); err != nil { logrus.Errorf("Failed to delete ingress: %v", err) } } - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) } } @@ -715,33 +722,57 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error { return nil } +func writePortsToFile(ports []*PortConfig) (string, error) { + f, err := ioutil.TempFile("", "port_configs") + if err != nil { + return "", err + } + defer f.Close() + + buf, err := proto.Marshal(&EndpointRecord{ + IngressPorts: ports, + }) + + n, err := f.Write(buf) + if err != nil { + return "", err + } + + if n < len(buf) { + return "", io.ErrShortWrite + } + + return f.Name(), nil +} + +func readPortsFromFile(fileName string) ([]*PortConfig, error) { + buf, err := ioutil.ReadFile(fileName) + if err != nil { + return nil, err + } + + var epRec EndpointRecord + err = proto.Unmarshal(buf, &epRec) + if err != nil { + return nil, err + } + + return epRec.IngressPorts, nil +} + // Invoke fwmarker reexec routine to mark vip destined packets with // the passed firewall mark. func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { var ingressPortsFile string + if len(ingressPorts) != 0 { - f, err := ioutil.TempFile("", "port_configs") + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) if err != nil { return err } - buf, err := proto.Marshal(&EndpointRecord{ - IngressPorts: ingressPorts, - }) - - n, err := f.Write(buf) - if err != nil { - f.Close() - return err - } - - if n < len(buf) { - f.Close() - return io.ErrShortWrite - } - - ingressPortsFile = f.Name() - f.Close() + defer os.Remove(ingressPortsFile) } addDelOpt := "-A" @@ -775,20 +806,12 @@ func fwMarker() { var ingressPorts []*PortConfig if os.Args[5] != "" { - buf, err := ioutil.ReadFile(os.Args[5]) + var err error + ingressPorts, err = readPortsFromFile(os.Args[5]) if err != nil { - logrus.Errorf("Failed to read ports config file: %v", err) + logrus.Errorf("Failed reading ingress ports file: %v", err) os.Exit(6) } - - var epRec EndpointRecord - err = proto.Unmarshal(buf, &epRec) - if err != nil { - logrus.Errorf("Failed to unmarshal ports config data: %v", err) - os.Exit(7) - } - - ingressPorts = epRec.IngressPorts } vip := os.Args[2] @@ -801,11 +824,7 @@ func fwMarker() { rules := [][]string{} for _, iPort := range ingressPorts { - rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d", - addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) - rules = append(rules, rule) - - rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", + rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark)) rules = append(rules, rule) } @@ -852,3 +871,82 @@ func fwMarker() { } } } + +func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error { + var ingressPortsFile string + + if len(ingressPorts) != 0 { + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) + if err != nil { + return err + } + defer os.Remove(ingressPortsFile) + } + + cmd := &exec.Cmd{ + Path: reexec.Self(), + Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile), + Stdout: os.Stdout, + Stderr: os.Stderr, + } + + if err := cmd.Run(); err != nil { + return fmt.Errorf("reexec failed: %v", err) + } + + return nil +} + +// Redirecter reexec function. +func redirecter() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + if len(os.Args) < 4 { + logrus.Error("invalid number of arguments..") + os.Exit(1) + } + + var ingressPorts []*PortConfig + if os.Args[3] != "" { + var err error + ingressPorts, err = readPortsFromFile(os.Args[3]) + if err != nil { + logrus.Errorf("Failed reading ingress ports file: %v", err) + os.Exit(2) + } + } + + eIP, _, err := net.ParseCIDR(os.Args[2]) + if err != nil { + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err) + os.Exit(3) + } + + rules := [][]string{} + for _, iPort := range ingressPorts { + rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d", + eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) + rules = append(rules, rule) + } + + ns, err := netns.GetFromPath(os.Args[1]) + if err != nil { + logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err) + os.Exit(4) + } + defer ns.Close() + + if err := netns.Set(ns); err != nil { + logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err) + os.Exit(5) + } + + for _, rule := range rules { + if err := iptables.RawCombinedOutputNative(rule...); err != nil { + logrus.Errorf("setting up rule failed, %v: %v", rule, err) + os.Exit(5) + } + } +} From fbcdca7ebcd45d74c40fbccbb07f2d3decfb8628 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Thu, 22 Sep 2016 18:43:54 -0700 Subject: [PATCH 2/2] Add GetListenAddress in ClusterProvider Signed-off-by: Jana Radhakrishnan --- daemon/cluster/cluster.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index 0850e80512..70d1bfce2d 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -719,6 +719,13 @@ func (c *Cluster) GetLocalAddress() string { return c.actualLocalAddr } +// GetListenAddress returns the listen address. +func (c *Cluster) GetListenAddress() string { + c.RLock() + defer c.RUnlock() + return c.listenAddr +} + // GetAdvertiseAddress returns the remotely reachable address of this node. func (c *Cluster) GetAdvertiseAddress() string { c.RLock()