From b8d514432db9b03370be3ad6a23c4fc336860d0c Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Sat, 5 Aug 2017 12:04:20 -0700 Subject: [PATCH 1/3] Revert "Avoid peerUpdate logic in swarm mode" This reverts commit b65a4ee10e2d34ba41d7d3447fd42c21a26b1ebb. Signed-off-by: Flavio Crisciani --- libnetwork/drivers/overlay/peerdb.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go index 510c6138d4..21cd1fbe3d 100644 --- a/libnetwork/drivers/overlay/peerdb.go +++ b/libnetwork/drivers/overlay/peerdb.go @@ -207,12 +207,6 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM } func (d *driver) peerDbUpdateSandbox(nid string) { - // The following logic is useful only in non swarm mode - // In swarm mode instead the programmig will come directly from networkDB - if !d.isSerfAlive() { - return - } - d.peerDb.Lock() pMap, ok := d.peerDb.mp[nid] if !ok { From 5c52ff49e053a47145907f73e2a28c29f0133a0e Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Thu, 27 Jul 2017 11:43:13 -0700 Subject: [PATCH 2/3] Funnel peerAdd and peerDelete in a channel Remove the need for the wait group and avoid new locks Added utility to print the method name and the caller name Signed-off-by: Flavio Crisciani --- libnetwork/common/caller.go | 29 +++++++ libnetwork/common/caller_test.go | 49 ++++++++++++ libnetwork/drivers/overlay/joinleave.go | 5 +- libnetwork/drivers/overlay/ov_network.go | 5 +- libnetwork/drivers/overlay/ov_serf.go | 11 +-- libnetwork/drivers/overlay/overlay.go | 20 ++++- libnetwork/drivers/overlay/peerdb.go | 96 ++++++++++++++++++------ 7 files changed, 175 insertions(+), 40 deletions(-) create mode 100644 libnetwork/common/caller.go create mode 100644 libnetwork/common/caller_test.go diff --git a/libnetwork/common/caller.go b/libnetwork/common/caller.go new file mode 100644 index 0000000000..0dec3bc0bc --- /dev/null +++ b/libnetwork/common/caller.go @@ -0,0 +1,29 @@ +package common + +import ( + "runtime" + "strings" +) + +func callerInfo(i int) string { + ptr, _, _, ok := runtime.Caller(i) + fName := "unknown" + if ok { + f := runtime.FuncForPC(ptr) + if f != nil { + // f.Name() is like: github.com/docker/libnetwork/common.MethodName + tmp := strings.Split(f.Name(), ".") + if len(tmp) > 0 { + fName = tmp[len(tmp)-1] + } + } + } + + return fName +} + +// CallerName returns the name of the function at the specified level +// level == 0 means current method name +func CallerName(level int) string { + return callerInfo(2 + level) +} diff --git a/libnetwork/common/caller_test.go b/libnetwork/common/caller_test.go new file mode 100644 index 0000000000..babfbb7bdb --- /dev/null +++ b/libnetwork/common/caller_test.go @@ -0,0 +1,49 @@ +package common + +import "testing" + +func fun1() string { + return CallerName(0) +} + +func fun2() string { + return CallerName(1) +} + +func fun3() string { + return fun4() +} + +func fun4() string { + return CallerName(0) +} + +func fun5() string { + return fun6() +} + +func fun6() string { + return CallerName(1) +} + +func TestCaller(t *testing.T) { + funName := fun1() + if funName != "fun1" { + t.Fatalf("error on fun1 caller %s", funName) + } + + funName = fun2() + if funName != "TestCaller" { + t.Fatalf("error on fun2 caller %s", funName) + } + + funName = fun3() + if funName != "fun4" { + t.Fatalf("error on fun2 caller %s", funName) + } + + funName = fun5() + if funName != "fun5" { + t.Fatalf("error on fun5 caller %s", funName) + } +} diff --git a/libnetwork/drivers/overlay/joinleave.go b/libnetwork/drivers/overlay/joinleave.go index cdbb428281..31c311f4fc 100644 --- a/libnetwork/drivers/overlay/joinleave.go +++ b/libnetwork/drivers/overlay/joinleave.go @@ -120,8 +120,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo, } } - d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, - net.ParseIP(d.advertiseAddress), true) + d.peerAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true) if err := d.checkEncryption(nid, nil, n.vxlanID(s), true, true); err != nil { logrus.Warn(err) @@ -205,7 +204,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri return } - d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false) + d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false, false) } // Leave method is invoked when a Sandbox detaches from an endpoint. diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go index 01f6287bed..01b53ac71c 100644 --- a/libnetwork/drivers/overlay/ov_network.go +++ b/libnetwork/drivers/overlay/ov_network.go @@ -765,10 +765,7 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { logrus.Errorf("could not resolve peer %q: %v", ip, err) continue } - - if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil { - logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err) - } + n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss, false) } else { // If the gc_thresh values are lower kernel might knock off the neighor entries. // When we get a L3 miss check if its a valid peer and reprogram the neighbor diff --git a/libnetwork/drivers/overlay/ov_serf.go b/libnetwork/drivers/overlay/ov_serf.go index 9002bce6b7..20954ef237 100644 --- a/libnetwork/drivers/overlay/ov_serf.go +++ b/libnetwork/drivers/overlay/ov_serf.go @@ -120,15 +120,10 @@ func (d *driver) processEvent(u serf.UserEvent) { switch action { case "join": - if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, - net.ParseIP(vtepStr), true, false, false); err != nil { - logrus.Errorf("Peer add failed in the driver: %v\n", err) - } + d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), + true, false, false, false) case "leave": - if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, - net.ParseIP(vtepStr), true); err != nil { - logrus.Errorf("Peer delete failed in the driver: %v\n", err) - } + d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), true) } } diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go index 8d19b2e1d4..11eda6781b 100644 --- a/libnetwork/drivers/overlay/overlay.go +++ b/libnetwork/drivers/overlay/overlay.go @@ -3,6 +3,7 @@ package overlay //go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto import ( + "context" "fmt" "net" "sync" @@ -50,6 +51,8 @@ type driver struct { joinOnce sync.Once localJoinOnce sync.Once keys []*key + peerOpCh chan *peerOperation + peerOpCancel context.CancelFunc sync.Mutex } @@ -64,10 +67,16 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { peerDb: peerNetworkMap{ mp: map[string]*peerMap{}, }, - secMap: &encrMap{nodes: map[string][]*spi{}}, - config: config, + secMap: &encrMap{nodes: map[string][]*spi{}}, + config: config, + peerOpCh: make(chan *peerOperation), } + // Launch the go routine for processing peer operations + ctx, cancel := context.WithCancel(context.Background()) + d.peerOpCancel = cancel + go d.peerOpRoutine(ctx, d.peerOpCh) + if data, ok := config[netlabel.GlobalKVClient]; ok { var err error dsc, ok := data.(discoverapi.DatastoreConfigData) @@ -161,7 +170,7 @@ func (d *driver) restoreEndpoints() error { } n.incEndpointCount() - d.peerDbAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true) + d.peerAdd(ep.nid, ep.id, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true, false, false, true) } return nil } @@ -170,6 +179,11 @@ func (d *driver) restoreEndpoints() error { func Fini(drv driverapi.Driver) { d := drv.(*driver) + // Notify the peer go routine to return + if d.peerOpCancel != nil { + d.peerOpCancel() + } + if d.exitCh != nil { waitCh := make(chan struct{}) diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go index 21cd1fbe3d..e9f249cfa4 100644 --- a/libnetwork/drivers/overlay/peerdb.go +++ b/libnetwork/drivers/overlay/peerdb.go @@ -1,12 +1,14 @@ package overlay import ( + "context" "fmt" "net" "sync" "syscall" "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/common" ) const ovPeerTable = "overlay_peer_table" @@ -59,8 +61,6 @@ func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error { return nil } -var peerDbWg sync.WaitGroup - func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error { d.peerDb.Lock() nids := []string{} @@ -141,8 +141,6 @@ func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net. func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, isLocal bool) { - peerDbWg.Wait() - d.peerDb.Lock() pMap, ok := d.peerDb.mp[nid] if !ok { @@ -173,7 +171,6 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP) peerEntry { - peerDbWg.Wait() d.peerDb.Lock() pMap, ok := d.peerDb.mp[nid] @@ -215,9 +212,6 @@ func (d *driver) peerDbUpdateSandbox(nid string) { } d.peerDb.Unlock() - peerDbWg.Add(1) - - var peerOps []func() pMap.Lock() for pKeyStr, pEntry := range pMap.mp { var pKey peerKey @@ -233,28 +227,67 @@ func (d *driver) peerDbUpdateSandbox(nid string) { // pointing to the same memory location for every iteration. Make // a copy of pEntry before capturing it in the following closure. entry := pEntry - op := func() { - if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, - pKey.peerMac, entry.vtep, - false, false, false); err != nil { - logrus.Errorf("peerdbupdate in sandbox failed for ip %s and mac %s: %v", - pKey.peerIP, pKey.peerMac, err) - } - } - peerOps = append(peerOps, op) + d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, false, false, false, false) } pMap.Unlock() +} - for _, op := range peerOps { - op() +type peerOperation struct { + isAdd bool + networkID string + endpointID string + peerIP net.IP + peerIPMask net.IPMask + peerMac net.HardwareAddr + vtepIP net.IP + updateDB bool + l2Miss bool + l3Miss bool + localPeer bool + callerName string +} + +func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) { + var err error + for { + select { + case <-ctx.Done(): + return + case op := <-ch: + if op.isAdd { + err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer) + } else { + err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer) + } + if err != nil { + logrus.Warnf("Peer operation failed:%s op:%v", err, op) + } + } } - - peerDbWg.Done() } func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, - peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error { + peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) { + callerName := common.CallerName(1) + d.peerOpCh <- &peerOperation{ + isAdd: true, + networkID: nid, + endpointID: eid, + peerIP: peerIP, + peerIPMask: peerIPMask, + peerMac: peerMac, + vtepIP: vtep, + updateDB: updateDb, + l2Miss: l2Miss, + l3Miss: l3Miss, + localPeer: localPeer, + callerName: callerName, + } +} + +func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, + peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, updateOnlyDB bool) error { if err := validateID(nid, eid); err != nil { return err @@ -262,6 +295,9 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, if updateDb { d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, false) + if updateOnlyDB { + return nil + } } n := d.network(nid) @@ -311,6 +347,22 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, } func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) { + callerName := common.CallerName(1) + d.peerOpCh <- &peerOperation{ + isAdd: false, + networkID: nid, + endpointID: eid, + peerIP: peerIP, + peerIPMask: peerIPMask, + peerMac: peerMac, + vtepIP: vtep, + updateDB: updateDb, + callerName: callerName, + } +} + +func (d *driver) peerDeleteOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { if err := validateID(nid, eid); err != nil { From 2e38c53def80c6808b6203b86953e885dd56be14 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Sat, 5 Aug 2017 08:42:20 -0700 Subject: [PATCH 3/3] PeerInit for the sandbox init Move the sandbox init logic into the go routine that handles peer operations. This is to avoid deadlocks in the use of the pMap.Lock for the network Signed-off-by: Flavio Crisciani --- libnetwork/agent.go | 7 +- libnetwork/drivers/overlay/ov_network.go | 4 +- libnetwork/drivers/overlay/peerdb.go | 87 +++++++++++++++--------- libnetwork/networkdb/networkdb.go | 4 +- 4 files changed, 63 insertions(+), 39 deletions(-) diff --git a/libnetwork/agent.go b/libnetwork/agent.go index 4877df1c34..0ccdd87de7 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -741,11 +741,12 @@ func (n *network) addDriverWatches() { return } - agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool { - if nid == n.ID() { + agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool { + // skip the entries that are mark for deletion, this is safe because this function is + // called at initialization time so there is no state to delete + if nid == n.ID() && !deleted { d.EventNotify(driverapi.Create, nid, table.name, key, value) } - return false }) } diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go index 01b53ac71c..9575350bf6 100644 --- a/libnetwork/drivers/overlay/ov_network.go +++ b/libnetwork/drivers/overlay/ov_network.go @@ -683,10 +683,12 @@ func (n *network) initSandbox(restore bool) error { return fmt.Errorf("could not get network sandbox (oper %t): %v", restore, err) } + // this is needed to let the peerAdd configure the sandbox n.setSandbox(sbox) if !restore { - n.driver.peerDbUpdateSandbox(n.id) + // Initialize the sandbox with all the peers previously received from networkdb + n.driver.initSandboxPeerDB(n.id) } var nlSock *nl.NetlinkSocket diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go index e9f249cfa4..4b41bf7658 100644 --- a/libnetwork/drivers/overlay/peerdb.go +++ b/libnetwork/drivers/overlay/peerdb.go @@ -203,38 +203,31 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM return pEntry } -func (d *driver) peerDbUpdateSandbox(nid string) { - d.peerDb.Lock() - pMap, ok := d.peerDb.mp[nid] - if !ok { - d.peerDb.Unlock() - return - } - d.peerDb.Unlock() - - pMap.Lock() - for pKeyStr, pEntry := range pMap.mp { - var pKey peerKey - if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil { - logrus.Errorf("peer key scan failed: %v", err) - } - - if pEntry.isLocal { - continue - } - - // Go captures variables by reference. The pEntry could be - // pointing to the same memory location for every iteration. Make - // a copy of pEntry before capturing it in the following closure. - entry := pEntry - - d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, false, false, false, false) - } - pMap.Unlock() +// The overlay uses a lazy initialization approach, this means that when a network is created +// and the driver registered the overlay does not allocate resources till the moment that a +// sandbox is actually created. +// At the moment of this call, that happens when a sandbox is initialized, is possible that +// networkDB has already delivered some events of peers already available on remote nodes, +// these peers are saved into the peerDB and this function is used to properly configure +// the network sandbox with all those peers that got previously notified. +// Note also that this method sends a single message on the channel and the go routine on the +// other side, will atomically loop on the whole table of peers and will program their state +// in one single atomic operation. This is fundamental to guarantee consistency, and avoid that +// new peerAdd or peerDelete gets reordered during the sandbox init. +func (d *driver) initSandboxPeerDB(nid string) { + d.peerInit(nid) } +type peerOperationType int32 + +const ( + peerOperationINIT peerOperationType = iota + peerOperationADD + peerOperationDELETE +) + type peerOperation struct { - isAdd bool + opType peerOperationType networkID string endpointID string peerIP net.IP @@ -255,9 +248,12 @@ func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) { case <-ctx.Done(): return case op := <-ch: - if op.isAdd { + switch op.opType { + case peerOperationINIT: + err = d.peerInitOp(op.networkID) + case peerOperationADD: err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer) - } else { + case peerOperationDELETE: err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer) } if err != nil { @@ -267,11 +263,33 @@ func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) { } } +func (d *driver) peerInit(nid string) { + callerName := common.CallerName(1) + d.peerOpCh <- &peerOperation{ + opType: peerOperationINIT, + networkID: nid, + callerName: callerName, + } +} + +func (d *driver) peerInitOp(nid string) error { + return d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool { + // Local entries do not need to be added + if pEntry.isLocal { + return false + } + + d.peerAddOp(nid, pEntry.eid, pKey.peerIP, pEntry.peerIPMask, pKey.peerMac, pEntry.vtep, false, false, false, false) + // return false to loop on all entries + return false + }) +} + func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) { callerName := common.CallerName(1) d.peerOpCh <- &peerOperation{ - isAdd: true, + opType: peerOperationADD, networkID: nid, endpointID: eid, peerIP: peerIP, @@ -307,6 +325,9 @@ func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask sbox := n.sandbox() if sbox == nil { + // We are hitting this case for all the events that are arriving before that the sandbox + // is being created. The peer got already added into the database and the sanbox init will + // call the peerDbUpdateSandbox that will configure all these peers from the database return nil } @@ -350,7 +371,7 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas peerMac net.HardwareAddr, vtep net.IP, updateDb bool) { callerName := common.CallerName(1) d.peerOpCh <- &peerOperation{ - isAdd: false, + opType: peerOperationDELETE, networkID: nid, endpointID: eid, peerIP: peerIP, diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 6447a16357..4f1860ff91 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -524,7 +524,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) { // WalkTable walks a single table in NetworkDB and invokes the passed // function for each entry in the table passing the network, key, // value. The walk stops if the passed function returns a true. -func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error { +func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error { nDB.RLock() values := make(map[string]interface{}) nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool { @@ -537,7 +537,7 @@ func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bo params := strings.Split(k[1:], "/") nid := params[1] key := params[2] - if fn(nid, key, v.(*entry).value) { + if fn(nid, key, v.(*entry).value, v.(*entry).deleting) { return nil } }