Cleanup node management logic
Created method to handle the node state change with cleanup operation associated. Realign testing client with the new diagnostic interface Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
parent
3e99131f55
commit
b190ee3ccf
|
@ -91,7 +91,7 @@ func (s *Server) EnableDebug(ip string, port int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("Starting the diagnose server listening on %d for commands", port)
|
logrus.Infof("Starting the diagnose server listening on %d for commands", port)
|
||||||
srv := &http.Server{Addr: fmt.Sprintf("127.0.0.1:%d", port), Handler: s}
|
srv := &http.Server{Addr: fmt.Sprintf("%s:%d", ip, port), Handler: s}
|
||||||
s.srv = srv
|
s.srv = srv
|
||||||
s.enable = 1
|
s.enable = 1
|
||||||
go func(n *Server) {
|
go func(n *Server) {
|
||||||
|
@ -101,7 +101,6 @@ func (s *Server) EnableDebug(ip string, port int) {
|
||||||
atomic.SwapInt32(&n.enable, 0)
|
atomic.SwapInt32(&n.enable, 0)
|
||||||
}
|
}
|
||||||
}(s)
|
}(s)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisableDebug stop the dubug and closes the tcp socket
|
// DisableDebug stop the dubug and closes the tcp socket
|
||||||
|
|
|
@ -16,46 +16,28 @@ func (d *delegate) NodeMeta(limit int) []byte {
|
||||||
return []byte{}
|
return []byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getNode searches the node inside the tables
|
|
||||||
// returns true if the node was respectively in the active list, explicit node leave list or failed list
|
|
||||||
func (nDB *NetworkDB) getNode(nEvent *NodeEvent, extract bool) (bool, bool, bool, *node) {
|
|
||||||
var active bool
|
|
||||||
var left bool
|
|
||||||
var failed bool
|
|
||||||
|
|
||||||
for _, nodes := range []map[string]*node{
|
|
||||||
nDB.failedNodes,
|
|
||||||
nDB.leftNodes,
|
|
||||||
nDB.nodes,
|
|
||||||
} {
|
|
||||||
if n, ok := nodes[nEvent.NodeName]; ok {
|
|
||||||
active = &nodes == &nDB.nodes
|
|
||||||
left = &nodes == &nDB.leftNodes
|
|
||||||
failed = &nodes == &nDB.failedNodes
|
|
||||||
if n.ltime >= nEvent.LTime {
|
|
||||||
return active, left, failed, nil
|
|
||||||
}
|
|
||||||
if extract {
|
|
||||||
delete(nodes, n.Name)
|
|
||||||
}
|
|
||||||
return active, left, failed, n
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return active, left, failed, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
||||||
// Update our local clock if the received messages has newer
|
// Update our local clock if the received messages has newer
|
||||||
// time.
|
// time.
|
||||||
nDB.networkClock.Witness(nEvent.LTime)
|
nDB.networkClock.Witness(nEvent.LTime)
|
||||||
|
|
||||||
nDB.RLock()
|
nDB.RLock()
|
||||||
active, left, _, n := nDB.getNode(nEvent, false)
|
defer nDB.RUnlock()
|
||||||
|
|
||||||
|
// check if the node exists
|
||||||
|
n, _, _ := nDB.findNode(nEvent.NodeName)
|
||||||
if n == nil {
|
if n == nil {
|
||||||
nDB.RUnlock()
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
nDB.RUnlock()
|
|
||||||
|
// check if the event is fresh
|
||||||
|
if n.ltime >= nEvent.LTime {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are here means that the event is fresher and the node is known. Update the laport time
|
||||||
|
n.ltime = nEvent.LTime
|
||||||
|
|
||||||
// If it is a node leave event for a manager and this is the only manager we
|
// If it is a node leave event for a manager and this is the only manager we
|
||||||
// know of we want the reconnect logic to kick in. In a single manager
|
// know of we want the reconnect logic to kick in. In a single manager
|
||||||
// cluster manager's gossip can't be bootstrapped unless some other node
|
// cluster manager's gossip can't be bootstrapped unless some other node
|
||||||
|
@ -63,45 +45,32 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
||||||
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
|
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
|
||||||
for _, ip := range nDB.bootStrapIP {
|
for _, ip := range nDB.bootStrapIP {
|
||||||
if ip.Equal(n.Addr) {
|
if ip.Equal(n.Addr) {
|
||||||
n.ltime = nEvent.LTime
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n.ltime = nEvent.LTime
|
|
||||||
|
|
||||||
switch nEvent.Type {
|
switch nEvent.Type {
|
||||||
case NodeEventTypeJoin:
|
case NodeEventTypeJoin:
|
||||||
if active {
|
moved, err := nDB.changeNodeState(n.Name, nodeActiveState)
|
||||||
// the node is already marked as active nothing to do
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("unable to find the node to move")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
nDB.Lock()
|
if moved {
|
||||||
// Because the lock got released on the previous check we have to do it again and re verify the status of the node
|
|
||||||
// All of this is to avoid a big lock on the function
|
|
||||||
if active, _, _, n = nDB.getNode(nEvent, true); !active && n != nil {
|
|
||||||
n.reapTime = 0
|
|
||||||
nDB.nodes[n.Name] = n
|
|
||||||
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
|
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
|
||||||
}
|
}
|
||||||
nDB.Unlock()
|
return moved
|
||||||
return true
|
|
||||||
case NodeEventTypeLeave:
|
case NodeEventTypeLeave:
|
||||||
if left {
|
moved, err := nDB.changeNodeState(n.Name, nodeLeftState)
|
||||||
// the node is already marked as left nothing to do.
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("unable to find the node to move")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
nDB.Lock()
|
if moved {
|
||||||
// Because the lock got released on the previous check we have to do it again and re verify the status of the node
|
|
||||||
// All of this is to avoid a big lock on the function
|
|
||||||
if _, left, _, n = nDB.getNode(nEvent, true); !left && n != nil {
|
|
||||||
n.reapTime = nodeReapInterval
|
|
||||||
nDB.leftNodes[n.Name] = n
|
|
||||||
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
|
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
|
||||||
}
|
}
|
||||||
nDB.Unlock()
|
return moved
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -21,24 +21,6 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventDelegate) purgeReincarnation(mn *memberlist.Node) {
|
|
||||||
for name, node := range e.nDB.failedNodes {
|
|
||||||
if node.Addr.Equal(mn.Addr) {
|
|
||||||
logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr)
|
|
||||||
delete(e.nDB.failedNodes, name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for name, node := range e.nDB.leftNodes {
|
|
||||||
if node.Addr.Equal(mn.Addr) {
|
|
||||||
logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr)
|
|
||||||
delete(e.nDB.leftNodes, name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
||||||
logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
|
logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
|
||||||
e.broadcastNodeEvent(mn.Addr, opCreate)
|
e.broadcastNodeEvent(mn.Addr, opCreate)
|
||||||
|
@ -57,44 +39,35 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
|
||||||
// Every node has a unique ID
|
// Every node has a unique ID
|
||||||
// Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous
|
// Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous
|
||||||
// failed or shutdown one
|
// failed or shutdown one
|
||||||
e.purgeReincarnation(mn)
|
e.nDB.purgeReincarnation(mn)
|
||||||
|
|
||||||
e.nDB.nodes[mn.Name] = &node{Node: *mn}
|
e.nDB.nodes[mn.Name] = &node{Node: *mn}
|
||||||
logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
|
logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
|
||||||
var failed bool
|
|
||||||
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
|
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
|
||||||
e.broadcastNodeEvent(mn.Addr, opDelete)
|
e.broadcastNodeEvent(mn.Addr, opDelete)
|
||||||
// The node left or failed, delete all the entries created by it.
|
|
||||||
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
|
|
||||||
// If the node instead left because was going down, then it makes sense to just delete all its state
|
|
||||||
e.nDB.Lock()
|
e.nDB.Lock()
|
||||||
defer e.nDB.Unlock()
|
defer e.nDB.Unlock()
|
||||||
e.nDB.deleteNodeFromNetworks(mn.Name)
|
|
||||||
e.nDB.deleteNodeTableEntries(mn.Name)
|
|
||||||
if n, ok := e.nDB.nodes[mn.Name]; ok {
|
|
||||||
delete(e.nDB.nodes, mn.Name)
|
|
||||||
|
|
||||||
// Check if a new incarnation of the same node already joined
|
n, currState, _ := e.nDB.findNode(mn.Name)
|
||||||
// In that case this node can simply be removed and no further action are needed
|
if n == nil {
|
||||||
for name, node := range e.nDB.nodes {
|
logrus.Errorf("Node %s/%s not found in the node lists", mn.Name, mn.Addr)
|
||||||
if node.Addr.Equal(mn.Addr) {
|
return
|
||||||
logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", name, node.Addr, mn.Name, mn.Addr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h)
|
|
||||||
// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
|
|
||||||
n.reapTime = nodeReapInterval
|
|
||||||
e.nDB.failedNodes[mn.Name] = n
|
|
||||||
failed = true
|
|
||||||
}
|
}
|
||||||
|
// if the node was active means that did not send the leave cluster message, so it's probable that
|
||||||
if failed {
|
// failed. Else would be already in the left list so nothing else has to be done
|
||||||
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
|
if currState == nodeActiveState {
|
||||||
|
moved, err := e.nDB.changeNodeState(mn.Name, nodeFailedState)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Errorf("impossible condition, node %s/%s not present in the list", mn.Name, mn.Addr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if moved {
|
||||||
|
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -12,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/stringid"
|
"github.com/docker/docker/pkg/stringid"
|
||||||
"github.com/docker/go-events"
|
"github.com/docker/go-events"
|
||||||
|
"github.com/hashicorp/memberlist"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -580,3 +582,156 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
|
||||||
|
|
||||||
closeNetworkDBInstances(dbs)
|
closeNetworkDBInstances(dbs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFindNode(t *testing.T) {
|
||||||
|
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
|
||||||
|
|
||||||
|
dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
|
||||||
|
dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
|
||||||
|
dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
|
||||||
|
|
||||||
|
// active nodes is 2 because the testing node is in the list
|
||||||
|
assert.Equal(t, 2, len(dbs[0].nodes))
|
||||||
|
assert.Equal(t, 1, len(dbs[0].failedNodes))
|
||||||
|
assert.Equal(t, 1, len(dbs[0].leftNodes))
|
||||||
|
|
||||||
|
n, currState, m := dbs[0].findNode("active")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, "active", n.Name)
|
||||||
|
assert.Equal(t, nodeActiveState, currState)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
// delete the entry manually
|
||||||
|
delete(m, "active")
|
||||||
|
|
||||||
|
// test if can be still find
|
||||||
|
n, currState, m = dbs[0].findNode("active")
|
||||||
|
assert.Nil(t, n)
|
||||||
|
assert.Equal(t, nodeNotFound, currState)
|
||||||
|
assert.Nil(t, m)
|
||||||
|
|
||||||
|
n, currState, m = dbs[0].findNode("failed")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, "failed", n.Name)
|
||||||
|
assert.Equal(t, nodeFailedState, currState)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
|
||||||
|
// find and remove
|
||||||
|
n, currState, m = dbs[0].findNode("left")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, "left", n.Name)
|
||||||
|
assert.Equal(t, nodeLeftState, currState)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
delete(m, "left")
|
||||||
|
|
||||||
|
n, currState, m = dbs[0].findNode("left")
|
||||||
|
assert.Nil(t, n)
|
||||||
|
assert.Equal(t, nodeNotFound, currState)
|
||||||
|
assert.Nil(t, m)
|
||||||
|
|
||||||
|
closeNetworkDBInstances(dbs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChangeNodeState(t *testing.T) {
|
||||||
|
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
|
||||||
|
|
||||||
|
dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
|
||||||
|
dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
|
||||||
|
dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
|
||||||
|
|
||||||
|
// active nodes is 4 because the testing node is in the list
|
||||||
|
assert.Equal(t, 4, len(dbs[0].nodes))
|
||||||
|
|
||||||
|
n, currState, m := dbs[0].findNode("node1")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, nodeActiveState, currState)
|
||||||
|
assert.Equal(t, "node1", n.Name)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
|
||||||
|
// node1 to failed
|
||||||
|
dbs[0].changeNodeState("node1", nodeFailedState)
|
||||||
|
|
||||||
|
n, currState, m = dbs[0].findNode("node1")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, nodeFailedState, currState)
|
||||||
|
assert.Equal(t, "node1", n.Name)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
assert.NotEqual(t, time.Duration(0), n.reapTime)
|
||||||
|
|
||||||
|
// node1 back to active
|
||||||
|
dbs[0].changeNodeState("node1", nodeActiveState)
|
||||||
|
|
||||||
|
n, currState, m = dbs[0].findNode("node1")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, nodeActiveState, currState)
|
||||||
|
assert.Equal(t, "node1", n.Name)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
assert.Equal(t, time.Duration(0), n.reapTime)
|
||||||
|
|
||||||
|
// node1 to left
|
||||||
|
dbs[0].changeNodeState("node1", nodeLeftState)
|
||||||
|
dbs[0].changeNodeState("node2", nodeLeftState)
|
||||||
|
dbs[0].changeNodeState("node3", nodeLeftState)
|
||||||
|
|
||||||
|
n, currState, m = dbs[0].findNode("node1")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, nodeLeftState, currState)
|
||||||
|
assert.Equal(t, "node1", n.Name)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
assert.NotEqual(t, time.Duration(0), n.reapTime)
|
||||||
|
|
||||||
|
n, currState, m = dbs[0].findNode("node2")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, nodeLeftState, currState)
|
||||||
|
assert.Equal(t, "node2", n.Name)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
assert.NotEqual(t, time.Duration(0), n.reapTime)
|
||||||
|
|
||||||
|
n, currState, m = dbs[0].findNode("node3")
|
||||||
|
assert.NotNil(t, n)
|
||||||
|
assert.Equal(t, nodeLeftState, currState)
|
||||||
|
assert.Equal(t, "node3", n.Name)
|
||||||
|
assert.NotNil(t, m)
|
||||||
|
assert.NotEqual(t, time.Duration(0), n.reapTime)
|
||||||
|
|
||||||
|
// active nodes is 1 because the testing node is in the list
|
||||||
|
assert.Equal(t, 1, len(dbs[0].nodes))
|
||||||
|
assert.Equal(t, 0, len(dbs[0].failedNodes))
|
||||||
|
assert.Equal(t, 3, len(dbs[0].leftNodes))
|
||||||
|
|
||||||
|
closeNetworkDBInstances(dbs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNodeReincarnation(t *testing.T) {
|
||||||
|
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
|
||||||
|
|
||||||
|
dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
|
||||||
|
dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
|
||||||
|
dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
|
||||||
|
|
||||||
|
// active nodes is 2 because the testing node is in the list
|
||||||
|
assert.Equal(t, 2, len(dbs[0].nodes))
|
||||||
|
assert.Equal(t, 1, len(dbs[0].failedNodes))
|
||||||
|
assert.Equal(t, 1, len(dbs[0].leftNodes))
|
||||||
|
|
||||||
|
b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
|
||||||
|
assert.True(t, b)
|
||||||
|
dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
|
||||||
|
|
||||||
|
b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
|
||||||
|
assert.True(t, b)
|
||||||
|
dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
|
||||||
|
|
||||||
|
b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
|
||||||
|
assert.True(t, b)
|
||||||
|
dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
|
||||||
|
|
||||||
|
b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
|
||||||
|
assert.False(t, b)
|
||||||
|
|
||||||
|
// active nodes is 1 because the testing node is in the list
|
||||||
|
assert.Equal(t, 4, len(dbs[0].nodes))
|
||||||
|
assert.Equal(t, 0, len(dbs[0].failedNodes))
|
||||||
|
assert.Equal(t, 3, len(dbs[0].leftNodes))
|
||||||
|
|
||||||
|
closeNetworkDBInstances(dbs)
|
||||||
|
}
|
||||||
|
|
|
@ -399,6 +399,7 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||||
Value: encodedValue,
|
Value: encodedValue,
|
||||||
Owner: v.owner,
|
Owner: v.owner,
|
||||||
})
|
})
|
||||||
|
i++
|
||||||
}
|
}
|
||||||
log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
|
log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
|
||||||
diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
|
diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
package networkdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/memberlist"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nodeState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
nodeNotFound nodeState = -1
|
||||||
|
nodeActiveState nodeState = 0
|
||||||
|
nodeLeftState nodeState = 1
|
||||||
|
nodeFailedState nodeState = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
var nodeStateName = map[nodeState]string{
|
||||||
|
-1: "NodeNotFound",
|
||||||
|
0: "NodeActive",
|
||||||
|
1: "NodeLeft",
|
||||||
|
2: "NodeFailed",
|
||||||
|
}
|
||||||
|
|
||||||
|
// findNode search the node into the 3 node lists and returns the node pointer and the list
|
||||||
|
// where it got found
|
||||||
|
func (nDB *NetworkDB) findNode(nodeName string) (*node, nodeState, map[string]*node) {
|
||||||
|
for i, nodes := range []map[string]*node{
|
||||||
|
nDB.nodes,
|
||||||
|
nDB.leftNodes,
|
||||||
|
nDB.failedNodes,
|
||||||
|
} {
|
||||||
|
if n, ok := nodes[nodeName]; ok {
|
||||||
|
return n, nodeState(i), nodes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nodeNotFound, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// changeNodeState changes the state of the node specified, returns true if the node was moved,
|
||||||
|
// false if there was no need to change the node state. Error will be returned if the node does not
|
||||||
|
// exists
|
||||||
|
func (nDB *NetworkDB) changeNodeState(nodeName string, newState nodeState) (bool, error) {
|
||||||
|
n, currState, m := nDB.findNode(nodeName)
|
||||||
|
if n == nil {
|
||||||
|
return false, fmt.Errorf("node %s not found", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch newState {
|
||||||
|
case nodeActiveState:
|
||||||
|
if currState == nodeActiveState {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(m, nodeName)
|
||||||
|
// reset the node reap time
|
||||||
|
n.reapTime = 0
|
||||||
|
nDB.nodes[nodeName] = n
|
||||||
|
case nodeLeftState:
|
||||||
|
if currState == nodeLeftState {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(m, nodeName)
|
||||||
|
nDB.leftNodes[nodeName] = n
|
||||||
|
case nodeFailedState:
|
||||||
|
if currState == nodeFailedState {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(m, nodeName)
|
||||||
|
nDB.failedNodes[nodeName] = n
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Infof("Node %s change state %s --> %s", nodeName, nodeStateName[currState], nodeStateName[newState])
|
||||||
|
|
||||||
|
if newState == nodeLeftState || newState == nodeFailedState {
|
||||||
|
// set the node reap time, if not already set
|
||||||
|
// It is possible that a node passes from failed to left and the reaptime was already set so keep that value
|
||||||
|
if n.reapTime == 0 {
|
||||||
|
n.reapTime = nodeReapInterval
|
||||||
|
}
|
||||||
|
// The node leave or fails, delete all the entries created by it.
|
||||||
|
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
|
||||||
|
// If the node instead left because was going down, then it makes sense to just delete all its state
|
||||||
|
nDB.deleteNodeFromNetworks(n.Name)
|
||||||
|
nDB.deleteNodeTableEntries(n.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nDB *NetworkDB) purgeReincarnation(mn *memberlist.Node) bool {
|
||||||
|
for name, node := range nDB.nodes {
|
||||||
|
if node.Addr.Equal(mn.Addr) && node.Port == mn.Port && mn.Name != name {
|
||||||
|
logrus.Infof("Node %s/%s, is the new incarnation of the active node %s/%s", mn.Name, mn.Addr, name, node.Addr)
|
||||||
|
nDB.changeNodeState(name, nodeLeftState)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, node := range nDB.failedNodes {
|
||||||
|
if node.Addr.Equal(mn.Addr) && node.Port == mn.Port && mn.Name != name {
|
||||||
|
logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr)
|
||||||
|
nDB.changeNodeState(name, nodeLeftState)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, node := range nDB.leftNodes {
|
||||||
|
if node.Addr.Equal(mn.Addr) && node.Port == mn.Port && mn.Name != name {
|
||||||
|
logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr)
|
||||||
|
nDB.changeNodeState(name, nodeLeftState)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
|
@ -74,7 +74,7 @@ func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeTableKey(ip, port, networkName, tableName, key string) {
|
func writeTableKey(ip, port, networkName, tableName, key string) {
|
||||||
createPath := "/createentry?nid=" + networkName + "&tname=" + tableName + "&value=v&key="
|
createPath := "/createentry?unsafe&nid=" + networkName + "&tname=" + tableName + "&value=v&key="
|
||||||
httpGetFatalError(ip, port, createPath+key)
|
httpGetFatalError(ip, port, createPath+key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
|
||||||
doneCh <- resultTuple{id: ip, result: -1}
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`)
|
peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
|
||||||
peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
|
peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
|
||||||
|
|
||||||
doneCh <- resultTuple{id: ip, result: peersNum}
|
doneCh <- resultTuple{id: ip, result: peersNum}
|
||||||
|
@ -105,7 +105,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
|
||||||
doneCh <- resultTuple{id: ip, result: -1}
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`)
|
peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
|
||||||
peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
|
peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
|
||||||
|
|
||||||
doneCh <- resultTuple{id: ip, result: peersNum}
|
doneCh <- resultTuple{id: ip, result: peersNum}
|
||||||
|
@ -119,7 +119,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
|
||||||
doneCh <- resultTuple{id: ip, result: -1}
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
|
elementsRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
|
||||||
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
|
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
|
||||||
doneCh <- resultTuple{id: ip, result: entriesNum}
|
doneCh <- resultTuple{id: ip, result: entriesNum}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,8 @@ func Server(args []string) {
|
||||||
server.RegisterHandler(nil, testerPaths2Func)
|
server.RegisterHandler(nil, testerPaths2Func)
|
||||||
server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func)
|
server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func)
|
||||||
server.EnableDebug("", port)
|
server.EnableDebug("", port)
|
||||||
|
// block here
|
||||||
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getIPInterface(name string) (string, error) {
|
func getIPInterface(name string) (string, error) {
|
||||||
|
|
Loading…
Reference in New Issue