From 1f289dc411eb32814989bdf34e54b231e75e9131 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Tue, 29 Nov 2016 22:53:17 -0800 Subject: [PATCH] Vendoring libnetwork to address some concurrency issues Addresses #28697, #28845, #28712, #26111 Signed-off-by: Madhu Venugopal (cherry picked from commit 8f082e418ba662eb700a3a9343371fa6d9c28d5a) --- vendor.conf | 2 +- vendor/github.com/docker/libnetwork/agent.go | 137 +++++++++++------- .../docker/libnetwork/controller.go | 11 +- .../drivers/overlay/ovmanager/ovmanager.go | 2 +- .../github.com/docker/libnetwork/idm/idm.go | 8 +- .../docker/libnetwork/iptables/iptables.go | 3 +- .../github.com/docker/libnetwork/network.go | 13 +- .../docker/libnetwork/networkdb/cluster.go | 6 + .../github.com/docker/libnetwork/resolver.go | 8 +- .../docker/libnetwork/service_common.go | 5 +- .../docker/libnetwork/service_linux.go | 2 +- 11 files changed, 125 insertions(+), 72 deletions(-) diff --git a/vendor.conf b/vendor.conf index e7510b3d01..4a032bbafe 100644 --- a/vendor.conf +++ b/vendor.conf @@ -23,7 +23,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5 github.com/imdario/mergo 0.2.1 #get libnetwork packages -github.com/docker/libnetwork dd0ddde6749fdffe310087e1c3616142d8c3ef9e +github.com/docker/libnetwork fd27f22aaa35e3d57f88688f919d05b744f431fd github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/vendor/github.com/docker/libnetwork/agent.go b/vendor/github.com/docker/libnetwork/agent.go index c4f18ba9a8..540aa79cf8 100644 --- a/vendor/github.com/docker/libnetwork/agent.go +++ b/vendor/github.com/docker/libnetwork/agent.go @@ -7,6 +7,7 @@ import ( "net" "os" "sort" + "sync" "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" @@ -39,6 +40,7 @@ type agent struct { advertiseAddr string epTblCancel func() driverCancelFuncs map[string][]func() + sync.Mutex } func getBindAddr(ifaceName string) (string, error) { @@ -86,9 +88,16 @@ func resolveAddr(addrOrInterface string) (string, error) { func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { drvEnc := discoverapi.DriverEncryptionUpdate{} - a := c.agent + a := c.getAgent() + if a == nil { + logrus.Debug("Skipping key change as agent is nil") + return nil + } + // Find the deleted key. If the deleted key was the primary key, // a new primary key should be set before removing if from keyring. + c.Lock() + added := []byte{} deleted := []byte{} j := len(c.keys) for i := 0; i < j; { @@ -127,7 +136,7 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { if !same { c.keys = append(c.keys, key) if key.Subsystem == subsysGossip { - a.networkDB.SetKey(key.Key) + added = key.Key } if key.Subsystem == subsysIPSec { @@ -136,6 +145,11 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { } } } + c.Unlock() + + if len(added) > 0 { + a.networkDB.SetKey(added) + } key, tag, err := c.getPrimaryKeyTag(subsysGossip) if err != nil { @@ -166,8 +180,10 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { } func (c *controller) agentSetup() error { + c.Lock() clusterProvider := c.cfg.Daemon.ClusterProvider - + agent := c.agent + c.Unlock() bindAddr := clusterProvider.GetLocalAddress() advAddr := clusterProvider.GetAdvertiseAddress() remote := clusterProvider.GetRemoteAddress() @@ -176,7 +192,7 @@ func (c *controller) agentSetup() error { listenAddr, _, _ := net.SplitHostPort(listen) logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr) - if advAddr != "" && c.agent == nil { + if advAddr != "" && agent == nil { if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil { logrus.Errorf("Error in agentInit : %v", err) } else { @@ -208,6 +224,9 @@ func (c *controller) agentSetup() error { // For a given subsystem getKeys sorts the keys by lamport time and returns // slice of keys and lamport time which can used as a unique tag for the keys func (c *controller) getKeys(subsys string) ([][]byte, []uint64) { + c.Lock() + defer c.Unlock() + sort.Sort(ByTime(c.keys)) keys := [][]byte{} @@ -227,6 +246,8 @@ func (c *controller) getKeys(subsys string) ([][]byte, []uint64) { // getPrimaryKeyTag returns the primary key for a given subsystem from the // list of sorted key and the associated tag func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) { + c.Lock() + defer c.Unlock() sort.Sort(ByTime(c.keys)) keys := []*types.EncryptionKey{} for _, key := range c.keys { @@ -265,6 +286,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st ch, cancel := nDB.Watch("endpoint_table", "", "") + c.Lock() c.agent = &agent{ networkDB: nDB, bindAddr: bindAddr, @@ -272,6 +294,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st epTblCancel: cancel, driverCancelFuncs: make(map[string][]func()), } + c.Unlock() go c.handleTableEvents(ch, c.handleEpTableEvent) @@ -294,21 +317,22 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st } func (c *controller) agentJoin(remote string) error { - if c.agent == nil { + agent := c.getAgent() + if agent == nil { return nil } - - return c.agent.networkDB.Join([]string{remote}) + return agent.networkDB.Join([]string{remote}) } func (c *controller) agentDriverNotify(d driverapi.Driver) { - if c.agent == nil { + agent := c.getAgent() + if agent == nil { return } d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{ - Address: c.agent.advertiseAddr, - BindAddress: c.agent.bindAddr, + Address: agent.advertiseAddr, + BindAddress: agent.bindAddr, Self: true, }) @@ -339,11 +363,19 @@ func (c *controller) agentClose() { return } + var cancelList []func() + + agent.Lock() for _, cancelFuncs := range agent.driverCancelFuncs { for _, cancel := range cancelFuncs { - cancel() + cancelList = append(cancelList, cancel) } } + agent.Unlock() + + for _, cancel := range cancelList { + cancel() + } agent.epTblCancel() @@ -354,13 +386,7 @@ func (n *network) isClusterEligible() bool { if n.driverScope() != datastore.GlobalScope { return false } - - c := n.getController() - if c.agent == nil { - return false - } - - return true + return n.getController().getAgent() != nil } func (n *network) joinCluster() error { @@ -368,8 +394,12 @@ func (n *network) joinCluster() error { return nil } - c := n.getController() - return c.agent.networkDB.JoinNetwork(n.ID()) + agent := n.getController().getAgent() + if agent == nil { + return nil + } + + return agent.networkDB.JoinNetwork(n.ID()) } func (n *network) leaveCluster() error { @@ -377,8 +407,12 @@ func (n *network) leaveCluster() error { return nil } - c := n.getController() - return c.agent.networkDB.LeaveNetwork(n.ID()) + agent := n.getController().getAgent() + if agent == nil { + return nil + } + + return agent.networkDB.LeaveNetwork(n.ID()) } func (ep *endpoint) addDriverInfoToCluster() error { @@ -390,10 +424,7 @@ func (ep *endpoint) addDriverInfoToCluster() error { return nil } - ctrlr := n.ctrlr - ctrlr.Lock() - agent := ctrlr.agent - ctrlr.Unlock() + agent := n.getController().getAgent() if agent == nil { return nil } @@ -415,10 +446,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error { return nil } - ctrlr := n.ctrlr - ctrlr.Lock() - agent := ctrlr.agent - ctrlr.Unlock() + agent := n.getController().getAgent() if agent == nil { return nil } @@ -438,6 +466,7 @@ func (ep *endpoint) addServiceInfoToCluster() error { } c := n.getController() + agent := c.getAgent() if !ep.isAnonymous() && ep.Iface().Address() != nil { var ingressPorts []*PortConfig if ep.svcID != "" { @@ -466,8 +495,10 @@ func (ep *endpoint) addServiceInfoToCluster() error { return err } - if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil { - return err + if agent != nil { + if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil { + return err + } } } @@ -481,6 +512,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error { } c := n.getController() + agent := c.getAgent() + if !ep.isAnonymous() { if ep.svcID != "" && ep.Iface().Address() != nil { var ingressPorts []*PortConfig @@ -492,9 +525,10 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error { return err } } - - if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { - return err + if agent != nil { + if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { + return err + } } } return nil @@ -506,16 +540,15 @@ func (n *network) addDriverWatches() { } c := n.getController() + agent := c.getAgent() + if agent == nil { + return + } for _, tableName := range n.driverTables { - c.Lock() - if c.agent == nil { - c.Unlock() - return - } - ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "") - c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel) - c.Unlock() - + ch, cancel := agent.networkDB.Watch(tableName, n.ID(), "") + agent.Lock() + agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel) + agent.Unlock() go c.handleTableEvents(ch, n.handleDriverTableEvent) d, err := n.driver(false) if err != nil { @@ -523,7 +556,7 @@ func (n *network) addDriverWatches() { return } - c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool { + agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool { if nid == n.ID() { d.EventNotify(driverapi.Create, nid, tableName, key, value) } @@ -538,11 +571,15 @@ func (n *network) cancelDriverWatches() { return } - c := n.getController() - c.Lock() - cancelFuncs := c.agent.driverCancelFuncs[n.ID()] - delete(c.agent.driverCancelFuncs, n.ID()) - c.Unlock() + agent := n.getController().getAgent() + if agent == nil { + return + } + + agent.Lock() + cancelFuncs := agent.driverCancelFuncs[n.ID()] + delete(agent.driverCancelFuncs, n.ID()) + agent.Unlock() for _, cancel := range cancelFuncs { cancel() diff --git a/vendor/github.com/docker/libnetwork/controller.go b/vendor/github.com/docker/libnetwork/controller.go index 6eaa1c6638..68bccf074b 100644 --- a/vendor/github.com/docker/libnetwork/controller.go +++ b/vendor/github.com/docker/libnetwork/controller.go @@ -237,12 +237,13 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { func (c *controller) SetClusterProvider(provider cluster.Provider) { c.Lock() - defer c.Unlock() c.cfg.Daemon.ClusterProvider = provider + disableProviderCh := c.cfg.Daemon.DisableProvider + c.Unlock() if provider != nil { go c.clusterAgentInit() } else { - c.cfg.Daemon.DisableProvider <- struct{}{} + disableProviderCh <- struct{}{} } } @@ -295,6 +296,12 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error { return c.handleKeyChange(keys) } +func (c *controller) getAgent() *agent { + c.Lock() + defer c.Unlock() + return c.agent +} + func (c *controller) clusterAgentInit() { clusterProvider := c.cfg.Daemon.ClusterProvider for { diff --git a/vendor/github.com/docker/libnetwork/drivers/overlay/ovmanager/ovmanager.go b/vendor/github.com/docker/libnetwork/drivers/overlay/ovmanager/ovmanager.go index 2f69083f25..aaf94e586f 100644 --- a/vendor/github.com/docker/libnetwork/drivers/overlay/ovmanager/ovmanager.go +++ b/vendor/github.com/docker/libnetwork/drivers/overlay/ovmanager/ovmanager.go @@ -57,7 +57,7 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { config: config, } - d.vxlanIdm, err = idm.New(nil, "vxlan-id", 1, vxlanIDEnd) + d.vxlanIdm, err = idm.New(nil, "vxlan-id", 0, vxlanIDEnd) if err != nil { return fmt.Errorf("failed to initialize vxlan id manager: %v", err) } diff --git a/vendor/github.com/docker/libnetwork/idm/idm.go b/vendor/github.com/docker/libnetwork/idm/idm.go index 84839c1c9b..20be113c83 100644 --- a/vendor/github.com/docker/libnetwork/idm/idm.go +++ b/vendor/github.com/docker/libnetwork/idm/idm.go @@ -15,7 +15,7 @@ type Idm struct { handle *bitseq.Handle } -// New returns an instance of id manager for a set of [start-end] numerical ids +// New returns an instance of id manager for a [start,end] set of numerical ids func New(ds datastore.DataStore, id string, start, end uint64) (*Idm, error) { if id == "" { return nil, fmt.Errorf("Invalid id") @@ -54,7 +54,7 @@ func (i *Idm) GetSpecificID(id uint64) error { return i.handle.Set(id - i.start) } -// GetIDInRange returns the first available id in the set within a range +// GetIDInRange returns the first available id in the set within a [start,end] range func (i *Idm) GetIDInRange(start, end uint64) (uint64, error) { if i.handle == nil { return 0, fmt.Errorf("ID set is not initialized") @@ -64,7 +64,9 @@ func (i *Idm) GetIDInRange(start, end uint64) (uint64, error) { return 0, fmt.Errorf("Requested range does not belong to the set") } - return i.handle.SetAnyInRange(start, end-start) + ordinal, err := i.handle.SetAnyInRange(start-i.start, end-i.start) + + return i.start + ordinal, err } // Release releases the specified id diff --git a/vendor/github.com/docker/libnetwork/iptables/iptables.go b/vendor/github.com/docker/libnetwork/iptables/iptables.go index 3884257361..e3a41b5564 100644 --- a/vendor/github.com/docker/libnetwork/iptables/iptables.go +++ b/vendor/github.com/docker/libnetwork/iptables/iptables.go @@ -45,6 +45,7 @@ var ( iptablesPath string supportsXlock = false supportsCOpt = false + xLockWaitMsg = "Another app is currently holding the xtables lock; waiting" // used to lock iptables commands if xtables lock is not supported bestEffortLock sync.Mutex // ErrIptablesNotFound is returned when the rule is not found. @@ -402,7 +403,7 @@ func raw(args ...string) ([]byte, error) { } // ignore iptables' message about xtables lock - if strings.Contains(string(output), "waiting for it to exit") { + if strings.Contains(string(output), xLockWaitMsg) { output = []byte("") } diff --git a/vendor/github.com/docker/libnetwork/network.go b/vendor/github.com/docker/libnetwork/network.go index ef40ea2bcc..79e7e49a95 100644 --- a/vendor/github.com/docker/libnetwork/network.go +++ b/vendor/github.com/docker/libnetwork/network.go @@ -1485,17 +1485,12 @@ func (n *network) Peers() []networkdb.PeerInfo { return []networkdb.PeerInfo{} } - var nDB *networkdb.NetworkDB - n.ctrlr.Lock() - if n.ctrlr.agentInitDone == nil && n.ctrlr.agent != nil { - nDB = n.ctrlr.agent.networkDB + agent := n.getController().getAgent() + if agent == nil { + return []networkdb.PeerInfo{} } - n.ctrlr.Unlock() - if nDB != nil { - return n.ctrlr.agent.networkDB.Peers(n.id) - } - return []networkdb.PeerInfo{} + return agent.networkDB.Peers(n.ID()) } func (n *network) DriverOptions() map[string]string { diff --git a/vendor/github.com/docker/libnetwork/networkdb/cluster.go b/vendor/github.com/docker/libnetwork/networkdb/cluster.go index c871d92f44..2c1a438c54 100644 --- a/vendor/github.com/docker/libnetwork/networkdb/cluster.go +++ b/vendor/github.com/docker/libnetwork/networkdb/cluster.go @@ -45,6 +45,8 @@ func (l *logWriter) Write(p []byte) (int, error) { // SetKey adds a new key to the key ring func (nDB *NetworkDB) SetKey(key []byte) { logrus.Debugf("Adding key %s", hex.EncodeToString(key)[0:5]) + nDB.Lock() + defer nDB.Unlock() for _, dbKey := range nDB.config.Keys { if bytes.Equal(key, dbKey) { return @@ -60,6 +62,8 @@ func (nDB *NetworkDB) SetKey(key []byte) { // been added apriori through SetKey func (nDB *NetworkDB) SetPrimaryKey(key []byte) { logrus.Debugf("Primary Key %s", hex.EncodeToString(key)[0:5]) + nDB.RLock() + defer nDB.RUnlock() for _, dbKey := range nDB.config.Keys { if bytes.Equal(key, dbKey) { if nDB.keyring != nil { @@ -74,6 +78,8 @@ func (nDB *NetworkDB) SetPrimaryKey(key []byte) { // can't be the primary key func (nDB *NetworkDB) RemoveKey(key []byte) { logrus.Debugf("Remove Key %s", hex.EncodeToString(key)[0:5]) + nDB.Lock() + defer nDB.Unlock() for i, dbKey := range nDB.config.Keys { if bytes.Equal(key, dbKey) { nDB.config.Keys = append(nDB.config.Keys[:i], nDB.config.Keys[i+1:]...) diff --git a/vendor/github.com/docker/libnetwork/resolver.go b/vendor/github.com/docker/libnetwork/resolver.go index 68e4831d53..ca3850c58e 100644 --- a/vendor/github.com/docker/libnetwork/resolver.go +++ b/vendor/github.com/docker/libnetwork/resolver.go @@ -418,8 +418,12 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) { } execErr := r.backend.ExecFunc(extConnect) - if execErr != nil || err != nil { - logrus.Debugf("Connect failed, %s", err) + if execErr != nil { + logrus.Warn(execErr) + continue + } + if err != nil { + logrus.Warnf("Connect failed: %s", err) continue } logrus.Debugf("Query %s[%d] from %s, forwarding to %s:%s", name, query.Question[0].Qtype, diff --git a/vendor/github.com/docker/libnetwork/service_common.go b/vendor/github.com/docker/libnetwork/service_common.go index a0172f5944..b43c6403f9 100644 --- a/vendor/github.com/docker/libnetwork/service_common.go +++ b/vendor/github.com/docker/libnetwork/service_common.go @@ -156,11 +156,10 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in c.Lock() s, ok := c.serviceBindings[skey] + c.Unlock() if !ok { - c.Unlock() return nil } - c.Unlock() s.Lock() lb, ok := s.loadBalancers[nid] @@ -188,7 +187,9 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in if len(s.loadBalancers) == 0 { // All loadbalancers for the service removed. Time to // remove the service itself. + c.Lock() delete(c.serviceBindings, skey) + c.Unlock() } // Remove loadbalancer service(if needed) and backend in all diff --git a/vendor/github.com/docker/libnetwork/service_linux.go b/vendor/github.com/docker/libnetwork/service_linux.go index be8dc84d3e..fbcc89d22e 100644 --- a/vendor/github.com/docker/libnetwork/service_linux.go +++ b/vendor/github.com/docker/libnetwork/service_linux.go @@ -34,8 +34,8 @@ func init() { func (n *network) connectedLoadbalancers() []*loadBalancer { c := n.getController() - serviceBindings := make([]*service, 0, len(c.serviceBindings)) c.Lock() + serviceBindings := make([]*service, 0, len(c.serviceBindings)) for _, s := range c.serviceBindings { serviceBindings = append(serviceBindings, s) }