From f0fcb0bbe662ed019a9e564d4566e6858f0cb0bd Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Thu, 16 Nov 2017 16:30:27 -0800 Subject: [PATCH] Fixed race on quick node fail/join The previous logic was not properly handling the case of a node that was failing and oining back in short period of time. The issue was in the handling of the network messages. When a node joins it sync with other nodes, these are passing the whole list of nodes that at best of their knowledge are part of a network. At this point if the node receives that node A is part of the network it saves it before having received the notification that node A is actually alive (coming from memberlist). If node A failed the source node will receive the notification while the new joined node won't because memberlist never advertise node A as available. In this case the new node will never purge node A from its state but also worse, will accept any table notification where node A is the owner and so will end up in a out of sync state with the rest of the cluster. This commit contains also some code cleanup around the area of node management Signed-off-by: Flavio Crisciani --- libnetwork/diagnose/diagnose.go | 3 +- libnetwork/networkdb/cluster.go | 18 ++++-- libnetwork/networkdb/delegate.go | 90 ++++++++++++++------------ libnetwork/networkdb/event_delegate.go | 38 ++++++++++- libnetwork/networkdb/networkdb.go | 7 ++ 5 files changed, 102 insertions(+), 54 deletions(-) diff --git a/libnetwork/diagnose/diagnose.go b/libnetwork/diagnose/diagnose.go index 7e299cc254..9682fff78b 100644 --- a/libnetwork/diagnose/diagnose.go +++ b/libnetwork/diagnose/diagnose.go @@ -4,7 +4,6 @@ import ( "fmt" "net" "net/http" - "strconv" "sync" "github.com/sirupsen/logrus" @@ -82,7 +81,7 @@ func (n *Server) EnableDebug(ip string, port int) { // go func() { // http.Serve(n.sk, n.mux) // }() - http.ListenAndServe(":"+strconv.Itoa(port), n.mux) + http.ListenAndServe(fmt.Sprintf(":%d", port), n.mux) } // DisableDebug stop the dubug and closes the tcp socket diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 8ff2129517..198caceeb8 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -255,13 +255,18 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto func (nDB *NetworkDB) reapDeadNode() { nDB.Lock() defer nDB.Unlock() - for id, n := range nDB.failedNodes { - if n.reapTime > 0 { - n.reapTime -= nodeReapPeriod - continue + for _, nodeMap := range []map[string]*node{ + nDB.failedNodes, + nDB.leftNodes, + } { + for id, n := range nodeMap { + if n.reapTime > nodeReapPeriod { + n.reapTime -= nodeReapPeriod + continue + } + logrus.Debugf("Garbage collect node %v", n.Name) + delete(nodeMap, id) } - logrus.Debugf("Removing failed node %v from gossip cluster", n.Name) - delete(nDB.failedNodes, id) } } @@ -374,7 +379,6 @@ func (nDB *NetworkDB) gossip() { thisNodeNetworks := nDB.networks[nDB.config.NodeID] for nid := range thisNodeNetworks { networkNodes[nid] = nDB.networkNodes[nid] - } printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 03fcfe1d9d..21c3bc0dcf 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -16,9 +16,12 @@ func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } -func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node { - nDB.Lock() - defer nDB.Unlock() +// getNode searches the node inside the tables +// returns true if the node was respectively in the active list, explicit node leave list or failed list +func (nDB *NetworkDB) getNode(nEvent *NodeEvent, extract bool) (bool, bool, bool, *node) { + var active bool + var left bool + var failed bool for _, nodes := range []map[string]*node{ nDB.failedNodes, @@ -26,35 +29,19 @@ func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node { nDB.nodes, } { if n, ok := nodes[nEvent.NodeName]; ok { + active = &nodes == &nDB.nodes + left = &nodes == &nDB.leftNodes + failed = &nodes == &nDB.failedNodes if n.ltime >= nEvent.LTime { - return nil + return active, left, failed, nil } - return n + if extract { + delete(nodes, n.Name) + } + return active, left, failed, n } } - return nil -} - -func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node { - nDB.Lock() - defer nDB.Unlock() - - for _, nodes := range []map[string]*node{ - nDB.failedNodes, - nDB.leftNodes, - nDB.nodes, - } { - if n, ok := nodes[nEvent.NodeName]; ok { - if n.ltime >= nEvent.LTime { - return nil - } - - delete(nodes, n.Name) - return n - } - } - - return nil + return active, left, failed, nil } func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { @@ -62,11 +49,14 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { // time. nDB.networkClock.Witness(nEvent.LTime) - n := nDB.getNode(nEvent) + nDB.RLock() + active, left, _, n := nDB.getNode(nEvent, false) if n == nil { + nDB.RUnlock() return false } - // If its a node leave event for a manager and this is the only manager we + nDB.RUnlock() + // If it is a node leave event for a manager and this is the only manager we // know of we want the reconnect logic to kick in. In a single manager // cluster manager's gossip can't be bootstrapped unless some other node // connects to it. @@ -79,28 +69,38 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { } } - n = nDB.checkAndGetNode(nEvent) - if n == nil { - return false - } - n.ltime = nEvent.LTime switch nEvent.Type { case NodeEventTypeJoin: + if active { + // the node is already marked as active nothing to do + return false + } nDB.Lock() - _, found := nDB.nodes[n.Name] - nDB.nodes[n.Name] = n - nDB.Unlock() - if !found { + // Because the lock got released on the previous check we have to do it again and re verify the status of the node + // All of this is to avoid a big lock on the function + if active, _, _, n = nDB.getNode(nEvent, true); !active && n != nil { + n.reapTime = 0 + nDB.nodes[n.Name] = n logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) } + nDB.Unlock() return true case NodeEventTypeLeave: + if left { + // the node is already marked as left nothing to do. + return false + } nDB.Lock() - nDB.leftNodes[n.Name] = n + // Because the lock got released on the previous check we have to do it again and re verify the status of the node + // All of this is to avoid a big lock on the function + if _, left, _, n = nDB.getNode(nEvent, true); !left && n != nil { + n.reapTime = nodeReapInterval + nDB.leftNodes[n.Name] = n + logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) + } nDB.Unlock() - logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) return true } @@ -162,6 +162,12 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { return false } + // If the node is not known from memberlist we cannot process save any state of it else if it actually + // dies we won't receive any notification and we will remain stuck with it + if _, ok := nDB.nodes[nEvent.NodeName]; !ok { + return false + } + // This remote network join is being seen the first time. nodeNetworks[nEvent.NetworkID] = &network{ id: nEvent.NetworkID, @@ -466,7 +472,7 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { var gMsg GossipMessage err := proto.Unmarshal(buf, &gMsg) if err != nil { - logrus.Errorf("Error unmarshalling push pull messsage: %v", err) + logrus.Errorf("Error unmarshalling push pull message: %v", err) return } diff --git a/libnetwork/networkdb/event_delegate.go b/libnetwork/networkdb/event_delegate.go index 74aa465294..6075718a63 100644 --- a/libnetwork/networkdb/event_delegate.go +++ b/libnetwork/networkdb/event_delegate.go @@ -21,10 +21,29 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) { } } +func (e *eventDelegate) purgeReincarnation(mn *memberlist.Node) { + for name, node := range e.nDB.failedNodes { + if node.Addr.Equal(mn.Addr) { + logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr) + delete(e.nDB.failedNodes, name) + return + } + } + + for name, node := range e.nDB.leftNodes { + if node.Addr.Equal(mn.Addr) { + logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr) + delete(e.nDB.leftNodes, name) + return + } + } +} + func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr) e.broadcastNodeEvent(mn.Addr, opCreate) e.nDB.Lock() + defer e.nDB.Unlock() // In case the node is rejoining after a failure or leave, // wait until an explicit join message arrives before adding // it to the nodes just to make sure this is not a stale @@ -32,12 +51,15 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { _, fOk := e.nDB.failedNodes[mn.Name] _, lOk := e.nDB.leftNodes[mn.Name] if fOk || lOk { - e.nDB.Unlock() return } + // Every node has a unique ID + // Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous + // failed or shutdown one + e.purgeReincarnation(mn) + e.nDB.nodes[mn.Name] = &node{Node: *mn} - e.nDB.Unlock() logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr) } @@ -49,18 +71,28 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { // If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted // If the node instead left because was going down, then it makes sense to just delete all its state e.nDB.Lock() + defer e.nDB.Unlock() e.nDB.deleteNetworkEntriesForNode(mn.Name) e.nDB.deleteNodeTableEntries(mn.Name) if n, ok := e.nDB.nodes[mn.Name]; ok { delete(e.nDB.nodes, mn.Name) + // Check if a new incarnation of the same node already joined + // In that case this node can simply be removed and no further action are needed + for name, node := range e.nDB.nodes { + if node.Addr.Equal(mn.Addr) { + logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", name, node.Addr, mn.Name, mn.Addr) + return + } + } + // In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h) // Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map n.reapTime = nodeReapInterval e.nDB.failedNodes[mn.Name] = n failed = true } - e.nDB.Unlock() + if failed { logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr) } diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 388a82c47b..025e0ca843 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -310,6 +310,10 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo { Name: node.Name, IP: node.Addr.String(), }) + } else { + // Added for testing purposes, this condition should never happen else mean that the network list + // is out of sync with the node list + peers = append(peers, PeerInfo{}) } } return peers @@ -593,6 +597,9 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries} nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: func() int { + //TODO fcrisciani this can be optimized maybe avoiding the lock? + // this call is done each GetBroadcasts call to evaluate the number of + // replicas for the message nDB.RLock() defer nDB.RUnlock() return len(nDB.networkNodes[nid])