diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 2f8ca48686..2096ea622e 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -88,12 +88,25 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { } func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { + var flushEntries bool // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.Lock() - defer nDB.Unlock() + defer func() { + nDB.Unlock() + // When a node leaves a network on the last task removal cleanup the + // local entries for this network & node combination. When the tasks + // on a network are removed we could have missed the gossip updates. + // Not doing this cleanup can leave stale entries because bulksyncs + // from the node will no longer include this network state. + // + // deleteNodeNetworkEntries takes nDB lock. + if flushEntries { + nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) + } + }() if nEvent.NodeName == nDB.config.NodeName { return false @@ -121,6 +134,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { n.leaving = nEvent.Type == NetworkEventTypeLeave if n.leaving { n.reapTime = reapInterval + flushEntries = true } nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 9e5e61caef..86b0128b60 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -372,6 +372,37 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { nDB.Unlock() } +func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { + nDB.Lock() + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), + func(path string, v interface{}) bool { + oldEntry := v.(*entry) + params := strings.Split(path[1:], "/") + nid := params[0] + tname := params[1] + key := params[2] + + if oldEntry.node != node { + return false + } + + entry := &entry{ + ltime: oldEntry.ltime, + node: node, + value: oldEntry.value, + deleting: true, + reapTime: reapInterval, + } + + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + + nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) + return false + }) + nDB.Unlock() +} + func (nDB *NetworkDB) deleteNodeTableEntries(node string) { nDB.Lock() nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {