From b190ee3ccff63d27e42f246d3cf910cd84c79257 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Tue, 12 Dec 2017 08:23:49 -0800 Subject: [PATCH] Cleanup node management logic Created method to handle the node state change with cleanup operation associated. Realign testing client with the new diagnostic interface Signed-off-by: Flavio Crisciani --- libnetwork/diagnose/server.go | 3 +- libnetwork/networkdb/delegate.go | 77 +++------ libnetwork/networkdb/event_delegate.go | 61 ++----- libnetwork/networkdb/networkdb_test.go | 155 ++++++++++++++++++ libnetwork/networkdb/networkdbdiagnose.go | 1 + libnetwork/networkdb/nodemgmt.go | 120 ++++++++++++++ .../test/networkDb/dbclient/ndbClient.go | 8 +- .../test/networkDb/dbserver/ndbServer.go | 2 + 8 files changed, 323 insertions(+), 104 deletions(-) create mode 100644 libnetwork/networkdb/nodemgmt.go diff --git a/libnetwork/diagnose/server.go b/libnetwork/diagnose/server.go index c841e51966..2330b655f8 100644 --- a/libnetwork/diagnose/server.go +++ b/libnetwork/diagnose/server.go @@ -91,7 +91,7 @@ func (s *Server) EnableDebug(ip string, port int) { } logrus.Infof("Starting the diagnose server listening on %d for commands", port) - srv := &http.Server{Addr: fmt.Sprintf("127.0.0.1:%d", port), Handler: s} + srv := &http.Server{Addr: fmt.Sprintf("%s:%d", ip, port), Handler: s} s.srv = srv s.enable = 1 go func(n *Server) { @@ -101,7 +101,6 @@ func (s *Server) EnableDebug(ip string, port int) { atomic.SwapInt32(&n.enable, 0) } }(s) - } // DisableDebug stop the dubug and closes the tcp socket diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 21c3bc0dcf..072c6221e5 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -16,46 +16,28 @@ func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } -// getNode searches the node inside the tables -// returns true if the node was respectively in the active list, explicit node leave list or failed list -func (nDB *NetworkDB) getNode(nEvent *NodeEvent, extract bool) (bool, bool, bool, *node) { - var active bool - var left bool - var failed bool - - for _, nodes := range []map[string]*node{ - nDB.failedNodes, - nDB.leftNodes, - nDB.nodes, - } { - if n, ok := nodes[nEvent.NodeName]; ok { - active = &nodes == &nDB.nodes - left = &nodes == &nDB.leftNodes - failed = &nodes == &nDB.failedNodes - if n.ltime >= nEvent.LTime { - return active, left, failed, nil - } - if extract { - delete(nodes, n.Name) - } - return active, left, failed, n - } - } - return active, left, failed, nil -} - func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.RLock() - active, left, _, n := nDB.getNode(nEvent, false) + defer nDB.RUnlock() + + // check if the node exists + n, _, _ := nDB.findNode(nEvent.NodeName) if n == nil { - nDB.RUnlock() return false } - nDB.RUnlock() + + // check if the event is fresh + if n.ltime >= nEvent.LTime { + return false + } + + // If we are here means that the event is fresher and the node is known. Update the laport time + n.ltime = nEvent.LTime + // If it is a node leave event for a manager and this is the only manager we // know of we want the reconnect logic to kick in. In a single manager // cluster manager's gossip can't be bootstrapped unless some other node @@ -63,45 +45,32 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave { for _, ip := range nDB.bootStrapIP { if ip.Equal(n.Addr) { - n.ltime = nEvent.LTime return true } } } - n.ltime = nEvent.LTime - switch nEvent.Type { case NodeEventTypeJoin: - if active { - // the node is already marked as active nothing to do + moved, err := nDB.changeNodeState(n.Name, nodeActiveState) + if err != nil { + logrus.WithError(err).Error("unable to find the node to move") return false } - nDB.Lock() - // Because the lock got released on the previous check we have to do it again and re verify the status of the node - // All of this is to avoid a big lock on the function - if active, _, _, n = nDB.getNode(nEvent, true); !active && n != nil { - n.reapTime = 0 - nDB.nodes[n.Name] = n + if moved { logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) } - nDB.Unlock() - return true + return moved case NodeEventTypeLeave: - if left { - // the node is already marked as left nothing to do. + moved, err := nDB.changeNodeState(n.Name, nodeLeftState) + if err != nil { + logrus.WithError(err).Error("unable to find the node to move") return false } - nDB.Lock() - // Because the lock got released on the previous check we have to do it again and re verify the status of the node - // All of this is to avoid a big lock on the function - if _, left, _, n = nDB.getNode(nEvent, true); !left && n != nil { - n.reapTime = nodeReapInterval - nDB.leftNodes[n.Name] = n + if moved { logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) } - nDB.Unlock() - return true + return moved } return false diff --git a/libnetwork/networkdb/event_delegate.go b/libnetwork/networkdb/event_delegate.go index 6e11805610..89aa7c470c 100644 --- a/libnetwork/networkdb/event_delegate.go +++ b/libnetwork/networkdb/event_delegate.go @@ -21,24 +21,6 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) { } } -func (e *eventDelegate) purgeReincarnation(mn *memberlist.Node) { - for name, node := range e.nDB.failedNodes { - if node.Addr.Equal(mn.Addr) { - logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr) - delete(e.nDB.failedNodes, name) - return - } - } - - for name, node := range e.nDB.leftNodes { - if node.Addr.Equal(mn.Addr) { - logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr) - delete(e.nDB.leftNodes, name) - return - } - } -} - func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr) e.broadcastNodeEvent(mn.Addr, opCreate) @@ -57,44 +39,35 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { // Every node has a unique ID // Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous // failed or shutdown one - e.purgeReincarnation(mn) + e.nDB.purgeReincarnation(mn) e.nDB.nodes[mn.Name] = &node{Node: *mn} logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr) } func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { - var failed bool logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr) e.broadcastNodeEvent(mn.Addr, opDelete) - // The node left or failed, delete all the entries created by it. - // If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted - // If the node instead left because was going down, then it makes sense to just delete all its state + e.nDB.Lock() defer e.nDB.Unlock() - e.nDB.deleteNodeFromNetworks(mn.Name) - e.nDB.deleteNodeTableEntries(mn.Name) - if n, ok := e.nDB.nodes[mn.Name]; ok { - delete(e.nDB.nodes, mn.Name) - // Check if a new incarnation of the same node already joined - // In that case this node can simply be removed and no further action are needed - for name, node := range e.nDB.nodes { - if node.Addr.Equal(mn.Addr) { - logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", name, node.Addr, mn.Name, mn.Addr) - return - } - } - - // In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h) - // Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map - n.reapTime = nodeReapInterval - e.nDB.failedNodes[mn.Name] = n - failed = true + n, currState, _ := e.nDB.findNode(mn.Name) + if n == nil { + logrus.Errorf("Node %s/%s not found in the node lists", mn.Name, mn.Addr) + return } - - if failed { - logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr) + // if the node was active means that did not send the leave cluster message, so it's probable that + // failed. Else would be already in the left list so nothing else has to be done + if currState == nodeActiveState { + moved, err := e.nDB.changeNodeState(mn.Name, nodeFailedState) + if err != nil { + logrus.WithError(err).Errorf("impossible condition, node %s/%s not present in the list", mn.Name, mn.Addr) + return + } + if moved { + logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr) + } } } diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 01dee06769..6fea56193e 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "log" + "net" "os" "sync/atomic" "testing" @@ -12,6 +13,7 @@ import ( "github.com/docker/docker/pkg/stringid" "github.com/docker/go-events" + "github.com/hashicorp/memberlist" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -580,3 +582,156 @@ func TestNetworkDBGarbageCollection(t *testing.T) { closeNetworkDBInstances(dbs) } + +func TestFindNode(t *testing.T) { + dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) + + dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}} + dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}} + dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}} + + // active nodes is 2 because the testing node is in the list + assert.Equal(t, 2, len(dbs[0].nodes)) + assert.Equal(t, 1, len(dbs[0].failedNodes)) + assert.Equal(t, 1, len(dbs[0].leftNodes)) + + n, currState, m := dbs[0].findNode("active") + assert.NotNil(t, n) + assert.Equal(t, "active", n.Name) + assert.Equal(t, nodeActiveState, currState) + assert.NotNil(t, m) + // delete the entry manually + delete(m, "active") + + // test if can be still find + n, currState, m = dbs[0].findNode("active") + assert.Nil(t, n) + assert.Equal(t, nodeNotFound, currState) + assert.Nil(t, m) + + n, currState, m = dbs[0].findNode("failed") + assert.NotNil(t, n) + assert.Equal(t, "failed", n.Name) + assert.Equal(t, nodeFailedState, currState) + assert.NotNil(t, m) + + // find and remove + n, currState, m = dbs[0].findNode("left") + assert.NotNil(t, n) + assert.Equal(t, "left", n.Name) + assert.Equal(t, nodeLeftState, currState) + assert.NotNil(t, m) + delete(m, "left") + + n, currState, m = dbs[0].findNode("left") + assert.Nil(t, n) + assert.Equal(t, nodeNotFound, currState) + assert.Nil(t, m) + + closeNetworkDBInstances(dbs) +} + +func TestChangeNodeState(t *testing.T) { + dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) + + dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}} + dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}} + dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}} + + // active nodes is 4 because the testing node is in the list + assert.Equal(t, 4, len(dbs[0].nodes)) + + n, currState, m := dbs[0].findNode("node1") + assert.NotNil(t, n) + assert.Equal(t, nodeActiveState, currState) + assert.Equal(t, "node1", n.Name) + assert.NotNil(t, m) + + // node1 to failed + dbs[0].changeNodeState("node1", nodeFailedState) + + n, currState, m = dbs[0].findNode("node1") + assert.NotNil(t, n) + assert.Equal(t, nodeFailedState, currState) + assert.Equal(t, "node1", n.Name) + assert.NotNil(t, m) + assert.NotEqual(t, time.Duration(0), n.reapTime) + + // node1 back to active + dbs[0].changeNodeState("node1", nodeActiveState) + + n, currState, m = dbs[0].findNode("node1") + assert.NotNil(t, n) + assert.Equal(t, nodeActiveState, currState) + assert.Equal(t, "node1", n.Name) + assert.NotNil(t, m) + assert.Equal(t, time.Duration(0), n.reapTime) + + // node1 to left + dbs[0].changeNodeState("node1", nodeLeftState) + dbs[0].changeNodeState("node2", nodeLeftState) + dbs[0].changeNodeState("node3", nodeLeftState) + + n, currState, m = dbs[0].findNode("node1") + assert.NotNil(t, n) + assert.Equal(t, nodeLeftState, currState) + assert.Equal(t, "node1", n.Name) + assert.NotNil(t, m) + assert.NotEqual(t, time.Duration(0), n.reapTime) + + n, currState, m = dbs[0].findNode("node2") + assert.NotNil(t, n) + assert.Equal(t, nodeLeftState, currState) + assert.Equal(t, "node2", n.Name) + assert.NotNil(t, m) + assert.NotEqual(t, time.Duration(0), n.reapTime) + + n, currState, m = dbs[0].findNode("node3") + assert.NotNil(t, n) + assert.Equal(t, nodeLeftState, currState) + assert.Equal(t, "node3", n.Name) + assert.NotNil(t, m) + assert.NotEqual(t, time.Duration(0), n.reapTime) + + // active nodes is 1 because the testing node is in the list + assert.Equal(t, 1, len(dbs[0].nodes)) + assert.Equal(t, 0, len(dbs[0].failedNodes)) + assert.Equal(t, 3, len(dbs[0].leftNodes)) + + closeNetworkDBInstances(dbs) +} + +func TestNodeReincarnation(t *testing.T) { + dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) + + dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}} + dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}} + dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}} + + // active nodes is 2 because the testing node is in the list + assert.Equal(t, 2, len(dbs[0].nodes)) + assert.Equal(t, 1, len(dbs[0].failedNodes)) + assert.Equal(t, 1, len(dbs[0].leftNodes)) + + b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}) + assert.True(t, b) + dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}} + + b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")}) + assert.True(t, b) + dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}} + + b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")}) + assert.True(t, b) + dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}} + + b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")}) + assert.False(t, b) + + // active nodes is 1 because the testing node is in the list + assert.Equal(t, 4, len(dbs[0].nodes)) + assert.Equal(t, 0, len(dbs[0].failedNodes)) + assert.Equal(t, 3, len(dbs[0].leftNodes)) + + closeNetworkDBInstances(dbs) +} diff --git a/libnetwork/networkdb/networkdbdiagnose.go b/libnetwork/networkdb/networkdbdiagnose.go index 3c6032b4ea..a4443bcbff 100644 --- a/libnetwork/networkdb/networkdbdiagnose.go +++ b/libnetwork/networkdb/networkdbdiagnose.go @@ -399,6 +399,7 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { Value: encodedValue, Owner: v.owner, }) + i++ } log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done") diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json) diff --git a/libnetwork/networkdb/nodemgmt.go b/libnetwork/networkdb/nodemgmt.go new file mode 100644 index 0000000000..f5a7498522 --- /dev/null +++ b/libnetwork/networkdb/nodemgmt.go @@ -0,0 +1,120 @@ +package networkdb + +import ( + "fmt" + + "github.com/hashicorp/memberlist" + "github.com/sirupsen/logrus" +) + +type nodeState int + +const ( + nodeNotFound nodeState = -1 + nodeActiveState nodeState = 0 + nodeLeftState nodeState = 1 + nodeFailedState nodeState = 2 +) + +var nodeStateName = map[nodeState]string{ + -1: "NodeNotFound", + 0: "NodeActive", + 1: "NodeLeft", + 2: "NodeFailed", +} + +// findNode search the node into the 3 node lists and returns the node pointer and the list +// where it got found +func (nDB *NetworkDB) findNode(nodeName string) (*node, nodeState, map[string]*node) { + for i, nodes := range []map[string]*node{ + nDB.nodes, + nDB.leftNodes, + nDB.failedNodes, + } { + if n, ok := nodes[nodeName]; ok { + return n, nodeState(i), nodes + } + } + return nil, nodeNotFound, nil +} + +// changeNodeState changes the state of the node specified, returns true if the node was moved, +// false if there was no need to change the node state. Error will be returned if the node does not +// exists +func (nDB *NetworkDB) changeNodeState(nodeName string, newState nodeState) (bool, error) { + n, currState, m := nDB.findNode(nodeName) + if n == nil { + return false, fmt.Errorf("node %s not found", nodeName) + } + + switch newState { + case nodeActiveState: + if currState == nodeActiveState { + return false, nil + } + + delete(m, nodeName) + // reset the node reap time + n.reapTime = 0 + nDB.nodes[nodeName] = n + case nodeLeftState: + if currState == nodeLeftState { + return false, nil + } + + delete(m, nodeName) + nDB.leftNodes[nodeName] = n + case nodeFailedState: + if currState == nodeFailedState { + return false, nil + } + + delete(m, nodeName) + nDB.failedNodes[nodeName] = n + } + + logrus.Infof("Node %s change state %s --> %s", nodeName, nodeStateName[currState], nodeStateName[newState]) + + if newState == nodeLeftState || newState == nodeFailedState { + // set the node reap time, if not already set + // It is possible that a node passes from failed to left and the reaptime was already set so keep that value + if n.reapTime == 0 { + n.reapTime = nodeReapInterval + } + // The node leave or fails, delete all the entries created by it. + // If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted + // If the node instead left because was going down, then it makes sense to just delete all its state + nDB.deleteNodeFromNetworks(n.Name) + nDB.deleteNodeTableEntries(n.Name) + } + + return true, nil +} + +func (nDB *NetworkDB) purgeReincarnation(mn *memberlist.Node) bool { + for name, node := range nDB.nodes { + if node.Addr.Equal(mn.Addr) && node.Port == mn.Port && mn.Name != name { + logrus.Infof("Node %s/%s, is the new incarnation of the active node %s/%s", mn.Name, mn.Addr, name, node.Addr) + nDB.changeNodeState(name, nodeLeftState) + return true + } + } + + for name, node := range nDB.failedNodes { + if node.Addr.Equal(mn.Addr) && node.Port == mn.Port && mn.Name != name { + logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr) + nDB.changeNodeState(name, nodeLeftState) + return true + } + } + + for name, node := range nDB.leftNodes { + if node.Addr.Equal(mn.Addr) && node.Port == mn.Port && mn.Name != name { + logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr) + nDB.changeNodeState(name, nodeLeftState) + return true + } + } + + return false +} diff --git a/libnetwork/test/networkDb/dbclient/ndbClient.go b/libnetwork/test/networkDb/dbclient/ndbClient.go index aaf2dec4d8..e2574fc3cd 100644 --- a/libnetwork/test/networkDb/dbclient/ndbClient.go +++ b/libnetwork/test/networkDb/dbclient/ndbClient.go @@ -74,7 +74,7 @@ func leaveNetwork(ip, port, network string, doneCh chan resultTuple) { } func writeTableKey(ip, port, networkName, tableName, key string) { - createPath := "/createentry?nid=" + networkName + "&tname=" + tableName + "&value=v&key=" + createPath := "/createentry?unsafe&nid=" + networkName + "&tname=" + tableName + "&value=v&key=" httpGetFatalError(ip, port, createPath+key) } @@ -91,7 +91,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) { doneCh <- resultTuple{id: ip, result: -1} return } - peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`) + peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`) peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1]) doneCh <- resultTuple{id: ip, result: peersNum} @@ -105,7 +105,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) { doneCh <- resultTuple{id: ip, result: -1} return } - peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`) + peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`) peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1]) doneCh <- resultTuple{id: ip, result: peersNum} @@ -119,7 +119,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r doneCh <- resultTuple{id: ip, result: -1} return } - elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`) + elementsRegexp := regexp.MustCompile(`total entries: ([0-9]+)`) entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) doneCh <- resultTuple{id: ip, result: entriesNum} } diff --git a/libnetwork/test/networkDb/dbserver/ndbServer.go b/libnetwork/test/networkDb/dbserver/ndbServer.go index 55b0d52bd4..3bdd36dc2e 100644 --- a/libnetwork/test/networkDb/dbserver/ndbServer.go +++ b/libnetwork/test/networkDb/dbserver/ndbServer.go @@ -66,6 +66,8 @@ func Server(args []string) { server.RegisterHandler(nil, testerPaths2Func) server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func) server.EnableDebug("", port) + // block here + select {} } func getIPInterface(name string) (string, error) {