Merge pull request #1936 from fcrisciani/netdb-nodeid

NetworkDB create NodeID for cluster nodes
This commit is contained in:
Madhu Venugopal 2017-09-28 02:13:40 -07:00 committed by GitHub
commit c8c13b47d2
7 changed files with 70 additions and 94 deletions

View File

@ -6,11 +6,9 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net" "net"
"os"
"sort" "sort"
"sync" "sync"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events" "github.com/docker/go-events"
"github.com/docker/libnetwork/cluster" "github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/datastore"
@ -282,12 +280,8 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
} }
keys, _ := c.getKeys(subsysGossip) keys, _ := c.getKeys(subsysGossip)
hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)
netDBConf := networkdb.DefaultConfig() netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys netDBConf.Keys = keys

View File

@ -32,7 +32,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
nEvent := NetworkEvent{ nEvent := NetworkEvent{
Type: event, Type: event,
LTime: ltime, LTime: ltime,
NodeName: nDB.config.NodeName, NodeName: nDB.config.NodeID,
NetworkID: nid, NetworkID: nid,
} }
@ -44,7 +44,7 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{ nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
msg: raw, msg: raw,
id: nid, id: nid,
node: nDB.config.NodeName, node: nDB.config.NodeID,
}) })
return nil return nil
} }
@ -72,7 +72,7 @@ func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
nEvent := NodeEvent{ nEvent := NodeEvent{
Type: event, Type: event,
LTime: nDB.networkClock.Increment(), LTime: nDB.networkClock.Increment(),
NodeName: nDB.config.NodeName, NodeName: nDB.config.NodeID,
} }
raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent) raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
@ -129,7 +129,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
tEvent := TableEvent{ tEvent := TableEvent{
Type: event, Type: event,
LTime: entry.ltime, LTime: entry.ltime,
NodeName: nDB.config.NodeName, NodeName: nDB.config.NodeID,
NetworkID: nid, NetworkID: nid,
TableName: tname, TableName: tname,
Key: key, Key: key,
@ -145,7 +145,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
var broadcastQ *memberlist.TransmitLimitedQueue var broadcastQ *memberlist.TransmitLimitedQueue
nDB.RLock() nDB.RLock()
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeName] thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if ok { if ok {
// The network may have been removed // The network may have been removed
network, networkOk := thisNodeNetworks[nid] network, networkOk := thisNodeNetworks[nid]
@ -168,7 +168,7 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
id: nid, id: nid,
tname: tname, tname: tname,
key: key, key: key,
node: nDB.config.NodeName, node: nDB.config.NodeID,
}) })
return nil return nil
} }

View File

@ -106,7 +106,7 @@ func (nDB *NetworkDB) clusterInit() error {
nDB.lastHealthTimestamp = nDB.lastStatsTimestamp nDB.lastHealthTimestamp = nDB.lastStatsTimestamp
config := memberlist.DefaultLANConfig() config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName config.Name = nDB.config.NodeID
config.BindAddr = nDB.config.BindAddr config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr config.AdvertiseAddr = nDB.config.AdvertiseAddr
config.UDPBufferSize = nDB.config.PacketBufferSize config.UDPBufferSize = nDB.config.PacketBufferSize
@ -329,7 +329,7 @@ func (nDB *NetworkDB) reapTableEntries() {
var nodeNetworks []string var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle // This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock() nDB.RLock()
for nid := range nDB.networks[nDB.config.NodeName] { for nid := range nDB.networks[nDB.config.NodeID] {
nodeNetworks = append(nodeNetworks, nid) nodeNetworks = append(nodeNetworks, nid)
} }
nDB.RUnlock() nDB.RUnlock()
@ -376,7 +376,7 @@ func (nDB *NetworkDB) reapTableEntries() {
func (nDB *NetworkDB) gossip() { func (nDB *NetworkDB) gossip() {
networkNodes := make(map[string][]string) networkNodes := make(map[string][]string)
nDB.RLock() nDB.RLock()
thisNodeNetworks := nDB.networks[nDB.config.NodeName] thisNodeNetworks := nDB.networks[nDB.config.NodeID]
for nid := range thisNodeNetworks { for nid := range thisNodeNetworks {
networkNodes[nid] = nDB.networkNodes[nid] networkNodes[nid] = nDB.networkNodes[nid]
@ -388,7 +388,7 @@ func (nDB *NetworkDB) gossip() {
if printHealth { if printHealth {
healthScore := nDB.memberlist.GetHealthScore() healthScore := nDB.memberlist.GetHealthScore()
if healthScore != 0 { if healthScore != 0 {
logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore) logrus.Warnf("NetworkDB stats %v(%v) - healthscore:%d (connectivity issues)", nDB.config.Hostname, nDB.config.NodeID, healthScore)
} }
nDB.lastHealthTimestamp = time.Now() nDB.lastHealthTimestamp = time.Now()
} }
@ -419,7 +419,8 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty // Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs) network.qMessagesSent += len(msgs)
if printStats { if printStats {
logrus.Infof("NetworkDB stats - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d", logrus.Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
nDB.config.Hostname, nDB.config.NodeID,
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(), nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0 network.qMessagesSent = 0
@ -456,7 +457,7 @@ func (nDB *NetworkDB) gossip() {
func (nDB *NetworkDB) bulkSyncTables() { func (nDB *NetworkDB) bulkSyncTables() {
var networks []string var networks []string
nDB.RLock() nDB.RLock()
for nid, network := range nDB.networks[nDB.config.NodeName] { for nid, network := range nDB.networks[nDB.config.NodeID] {
if network.leaving { if network.leaving {
continue continue
} }
@ -522,10 +523,10 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
var err error var err error
var networks []string var networks []string
for _, node := range nodes { for _, node := range nodes {
if node == nDB.config.NodeName { if node == nDB.config.NodeID {
continue continue
} }
logrus.Debugf("%s: Initiating bulk sync with node %v", nDB.config.NodeName, node) logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
networks = nDB.findCommonNetworks(node) networks = nDB.findCommonNetworks(node)
err = nDB.bulkSyncNode(networks, node, true) err = nDB.bulkSyncNode(networks, node, true)
// if its periodic bulksync stop after the first successful sync // if its periodic bulksync stop after the first successful sync
@ -556,7 +557,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
unsolMsg = "unsolicited" unsolMsg = "unsolicited"
} }
logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node) logrus.Debugf("%v(%v): Initiating %s bulk sync for networks %v with node %s",
nDB.config.Hostname, nDB.config.NodeID, unsolMsg, networks, node)
nDB.RLock() nDB.RLock()
mnode := nDB.nodes[node] mnode := nDB.nodes[node]
@ -608,7 +610,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
bsm := BulkSyncMessage{ bsm := BulkSyncMessage{
LTime: nDB.tableClock.Time(), LTime: nDB.tableClock.Time(),
Unsolicited: unsolicited, Unsolicited: unsolicited,
NodeName: nDB.config.NodeName, NodeName: nDB.config.NodeID,
Networks: networks, Networks: networks,
Payload: compound, Payload: compound,
} }
@ -640,7 +642,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C: case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node) logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch: case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime)) logrus.Debugf("%v(%v): Bulk sync to node %s took %s", nDB.config.Hostname, nDB.config.NodeID, node, time.Since(startTime))
} }
t.Stop() t.Stop()
} }
@ -677,7 +679,7 @@ OUTER:
idx := randomOffset(n) idx := randomOffset(n)
node := nodes[idx] node := nodes[idx]
if node == nDB.config.NodeName { if node == nDB.config.NodeID {
continue continue
} }

View File

@ -2,7 +2,6 @@ package networkdb
import ( import (
"net" "net"
"strings"
"time" "time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
@ -58,29 +57,6 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
return nil return nil
} }
func (nDB *NetworkDB) purgeSameNode(n *node) {
nDB.Lock()
defer nDB.Unlock()
prefix := strings.Split(n.Name, "-")[0]
for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
var nodeNames []string
for name, node := range nodes {
if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
nodeNames = append(nodeNames, name)
}
}
for _, name := range nodeNames {
delete(nodes, name)
}
}
}
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
// Update our local clock if the received messages has newer // Update our local clock if the received messages has newer
// time. // time.
@ -108,7 +84,6 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
return false return false
} }
nDB.purgeSameNode(n)
n.ltime = nEvent.LTime n.ltime = nEvent.LTime
switch nEvent.Type { switch nEvent.Type {
@ -140,7 +115,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
nDB.Lock() nDB.Lock()
defer nDB.Unlock() defer nDB.Unlock()
if nEvent.NodeName == nDB.config.NodeName { if nEvent.NodeName == nDB.config.NodeID {
return false return false
} }
@ -203,7 +178,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// Ignore the table events for networks that are in the process of going away // Ignore the table events for networks that are in the process of going away
nDB.RLock() nDB.RLock()
networks := nDB.networks[nDB.config.NodeName] networks := nDB.networks[nDB.config.NodeID]
network, ok := networks[tEvent.NetworkID] network, ok := networks[tEvent.NetworkID]
// Check if the owner of the event is still part of the network // Check if the owner of the event is still part of the network
nodes := nDB.networkNodes[tEvent.NetworkID] nodes := nDB.networkNodes[tEvent.NetworkID]
@ -292,7 +267,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
} }
// Ignore messages that this node generated. // Ignore messages that this node generated.
if tEvent.NodeName == nDB.config.NodeName { if tEvent.NodeName == nDB.config.NodeID {
return return
} }
@ -305,7 +280,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
} }
nDB.RLock() nDB.RLock()
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID] n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
nDB.RUnlock() nDB.RUnlock()
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present // if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
@ -424,7 +399,7 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
case MessageTypeCompound: case MessageTypeCompound:
nDB.handleCompound(data, isBulkSync) nDB.handleCompound(data, isBulkSync)
default: default:
logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType) logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
} }
} }
@ -457,7 +432,7 @@ func (d *delegate) LocalState(join bool) []byte {
pp := NetworkPushPull{ pp := NetworkPushPull{
LTime: d.nDB.networkClock.Time(), LTime: d.nDB.networkClock.Time(),
NodeName: d.nDB.config.NodeName, NodeName: d.nDB.config.NodeID,
} }
for name, nn := range d.nDB.networks { for name, nn := range d.nDB.networks {

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/armon/go-radix" "github.com/armon/go-radix"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events" "github.com/docker/go-events"
"github.com/docker/libnetwork/types" "github.com/docker/libnetwork/types"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
@ -151,8 +152,11 @@ type network struct {
// Config represents the configuration of the networdb instance and // Config represents the configuration of the networdb instance and
// can be passed by the caller. // can be passed by the caller.
type Config struct { type Config struct {
// NodeName is the cluster wide unique name for this node. // NodeID is the node unique identifier of the node when is part of the cluster
NodeName string NodeID string
// Hostname is the node hostname.
Hostname string
// BindAddr is the IP on which networkdb listens. It can be // BindAddr is the IP on which networkdb listens. It can be
// 0.0.0.0 to listen on all addresses on the host. // 0.0.0.0 to listen on all addresses on the host.
@ -210,7 +214,8 @@ type entry struct {
func DefaultConfig() *Config { func DefaultConfig() *Config {
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
return &Config{ return &Config{
NodeName: hostname, NodeID: stringid.TruncateID(stringid.GenerateRandomID()),
Hostname: hostname,
BindAddr: "0.0.0.0", BindAddr: "0.0.0.0",
PacketBufferSize: 1400, PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Minute, StatsPrintPeriod: 5 * time.Minute,
@ -236,6 +241,7 @@ func New(c *Config) (*NetworkDB, error) {
nDB.indexes[byTable] = radix.New() nDB.indexes[byTable] = radix.New()
nDB.indexes[byNetwork] = radix.New() nDB.indexes[byNetwork] = radix.New()
logrus.Debugf("New memberlist node - Node:%v will use memberlist nodeID:%v", c.Hostname, c.NodeID)
if err := nDB.clusterInit(); err != nil { if err := nDB.clusterInit(); err != nil {
return nil, err return nil, err
} }
@ -259,7 +265,7 @@ func (nDB *NetworkDB) Join(members []string) error {
// stopping timers, canceling goroutines etc. // stopping timers, canceling goroutines etc.
func (nDB *NetworkDB) Close() { func (nDB *NetworkDB) Close() {
if err := nDB.clusterLeave(); err != nil { if err := nDB.clusterLeave(); err != nil {
logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err) logrus.Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err)
} }
} }
@ -334,7 +340,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
entry := &entry{ entry := &entry{
ltime: nDB.tableClock.Increment(), ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName, node: nDB.config.NodeID,
value: value, value: value,
} }
@ -360,7 +366,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
entry := &entry{ entry := &entry{
ltime: nDB.tableClock.Increment(), ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName, node: nDB.config.NodeID,
value: value, value: value,
} }
@ -402,7 +408,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
entry := &entry{ entry := &entry{
ltime: nDB.tableClock.Increment(), ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName, node: nDB.config.NodeID,
value: value, value: value,
deleting: true, deleting: true,
reapTime: reapEntryInterval, reapTime: reapEntryInterval,
@ -451,7 +457,7 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
// entries owned by remote nodes, we will accept them and we notify the application // entries owned by remote nodes, we will accept them and we notify the application
func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
// Indicates if the delete is triggered for the local node // Indicates if the delete is triggered for the local node
isNodeLocal := node == nDB.config.NodeName isNodeLocal := node == nDB.config.NodeID
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
func(path string, v interface{}) bool { func(path string, v interface{}) bool {
@ -552,10 +558,10 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
ltime := nDB.networkClock.Increment() ltime := nDB.networkClock.Increment()
nDB.Lock() nDB.Lock()
nodeNetworks, ok := nDB.networks[nDB.config.NodeName] nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if !ok { if !ok {
nodeNetworks = make(map[string]*network) nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks nDB.networks[nDB.config.NodeID] = nodeNetworks
} }
n, ok := nodeNetworks[nid] n, ok := nodeNetworks[nid]
var entries int var entries int
@ -571,8 +577,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
}, },
RetransmitMult: 4, RetransmitMult: 4,
} }
nDB.addNetworkNode(nid, nDB.config.NodeID)
nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid] networkNodes := nDB.networkNodes[nid]
nDB.Unlock() nDB.Unlock()
@ -580,7 +585,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
} }
logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid) logrus.Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
if _, err := nDB.bulkSync(networkNodes, true); err != nil { if _, err := nDB.bulkSync(networkNodes, true); err != nil {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
} }
@ -604,12 +609,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
defer nDB.Unlock() defer nDB.Unlock()
// Remove myself from the list of the nodes participating to the network // Remove myself from the list of the nodes participating to the network
nDB.deleteNetworkNode(nid, nDB.config.NodeName) nDB.deleteNetworkNode(nid, nDB.config.NodeID)
// Update all the local entries marking them for deletion and delete all the remote entries // Update all the local entries marking them for deletion and delete all the remote entries
nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName) nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
nodeNetworks, ok := nDB.networks[nDB.config.NodeName] nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if !ok { if !ok {
return fmt.Errorf("could not find self node for network %s while trying to leave", nid) return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
} }
@ -619,7 +624,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
return fmt.Errorf("could not find network %s while trying to leave", nid) return fmt.Errorf("could not find network %s while trying to leave", nid)
} }
logrus.Debugf("%s: leaving network %s", nDB.config.NodeName, nid) logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
n.ltime = ltime n.ltime = ltime
n.reapTime = reapNetworkInterval n.reapTime = reapNetworkInterval
n.leaving = true n.leaving = true
@ -665,7 +670,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
defer nDB.RUnlock() defer nDB.RUnlock()
var networks []string var networks []string
for nid := range nDB.networks[nDB.config.NodeName] { for nid := range nDB.networks[nDB.config.NodeID] {
if n, ok := nDB.networks[nodeName][nid]; ok { if n, ok := nDB.networks[nodeName][nid]; ok {
if !n.leaving { if !n.leaving {
networks = append(networks, nid) networks = append(networks, nid)
@ -681,7 +686,7 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
defer nDB.Unlock() defer nDB.Unlock()
ltime := nDB.networkClock.Increment() ltime := nDB.networkClock.Increment()
for _, n := range nDB.networks[nDB.config.NodeName] { for _, n := range nDB.networks[nDB.config.NodeID] {
n.ltime = ltime n.ltime = ltime
} }
} }
@ -693,7 +698,7 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork { if !okNetwork {
// Add only if it is an insert not an update // Add only if it is an insert not an update
n, ok := nDB.networks[nDB.config.NodeName][nid] n, ok := nDB.networks[nDB.config.NodeID][nid]
if ok { if ok {
n.entriesNumber++ n.entriesNumber++
} }
@ -708,7 +713,7 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork { if okNetwork {
// Remove only if the delete is successful // Remove only if the delete is successful
n, ok := nDB.networks[nDB.config.NodeName][nid] n, ok := nDB.networks[nDB.config.NodeID][nid]
if ok { if ok {
n.entriesNumber-- n.entriesNumber--
} }

View File

@ -31,7 +31,7 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*Netwo
var dbs []*NetworkDB var dbs []*NetworkDB
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
conf := DefaultConfig() conf := DefaultConfig()
conf.NodeName = fmt.Sprintf("%s%d", namePrefix, i+1) conf.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
conf.BindPort = int(atomic.AddInt32(&dbPort, 1)) conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(conf) db, err := New(conf)
require.NoError(t, err) require.NoError(t, err)
@ -69,7 +69,7 @@ func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
} }
assert.Fail(t, fmt.Sprintf("%s: Node existence verification for node %s failed", db.config.NodeName, node)) assert.Fail(t, fmt.Sprintf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node))
} }
func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) { func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
@ -117,7 +117,7 @@ func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value s
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
} }
assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %s", db.config.NodeName)) assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID))
} }
func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) { func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
@ -157,12 +157,12 @@ func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
err := dbs[0].JoinNetwork("network1") err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true) dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
err = dbs[0].LeaveNetwork("network1") err = dbs[0].LeaveNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", false) dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
closeNetworkDBInstances(dbs) closeNetworkDBInstances(dbs)
} }
@ -181,11 +181,11 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
} }
for i := 1; i <= n; i++ { for i := 1; i <= n; i++ {
dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), true) dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
} }
for i := 1; i <= n; i++ { for i := 1; i <= n; i++ {
dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), true) dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
} }
for i := 1; i <= n; i++ { for i := 1; i <= n; i++ {
@ -199,11 +199,11 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
} }
for i := 1; i <= n; i++ { for i := 1; i <= n; i++ {
dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), false) dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
} }
for i := 1; i <= n; i++ { for i := 1; i <= n; i++ {
dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), false) dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
} }
closeNetworkDBInstances(dbs) closeNetworkDBInstances(dbs)
@ -215,7 +215,7 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) {
err := dbs[0].JoinNetwork("network1") err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true) dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
err = dbs[1].JoinNetwork("network1") err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
@ -245,7 +245,7 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) {
err := dbs[0].JoinNetwork("network1") err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true) dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
err = dbs[1].JoinNetwork("network1") err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
@ -361,7 +361,7 @@ func TestNetworkDBBulkSync(t *testing.T) {
err := dbs[0].JoinNetwork("network1") err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true) dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
n := 1000 n := 1000
for i := 1; i <= n; i++ { for i := 1; i <= n; i++ {
@ -374,7 +374,7 @@ func TestNetworkDBBulkSync(t *testing.T) {
err = dbs[1].JoinNetwork("network1") err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err) assert.NoError(t, err)
dbs[0].verifyNetworkExistence(t, "node2", "network1", true) dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
for i := 1; i <= n; i++ { for i := 1; i <= n; i++ {
dbs[1].verifyEntryExistence(t, "test_table", "network1", dbs[1].verifyEntryExistence(t, "test_table", "network1",
@ -397,7 +397,7 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) {
continue continue
} }
dbs[i].verifyNodeExistence(t, fmt.Sprintf("node%d", j+1), true) dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
} }
} }
@ -408,7 +408,7 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
for j := 0; j < n; j++ { for j := 0; j < n; j++ {
dbs[i].verifyNetworkExistence(t, fmt.Sprintf("node%d", j+1), "network1", true) dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
} }
} }
@ -473,7 +473,7 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
if len(dbs[0].networkNodes["network1"]) != 2 { if len(dbs[0].networkNodes["network1"]) != 2 {
t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"]) t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
} }
if n, ok := dbs[0].networks[dbs[0].config.NodeName]["network1"]; !ok || n.leaving { if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
t.Fatalf("The network should not be marked as leaving:%t", n.leaving) t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
} }
@ -487,7 +487,7 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
if len(dbs[1].networkNodes["network1"]) != 2 { if len(dbs[1].networkNodes["network1"]) != 2 {
t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"]) t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
} }
if n, ok := dbs[1].networks[dbs[1].config.NodeName]["network1"]; !ok || n.leaving { if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
t.Fatalf("The network should not be marked as leaving:%t", n.leaving) t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
} }

View File

@ -52,7 +52,7 @@ func Server(args []string) {
server = diagnose.Server{} server = diagnose.Server{}
server.Init() server.Init()
conf := networkdb.DefaultConfig() conf := networkdb.DefaultConfig()
conf.NodeName = localNodeName conf.Hostname = localNodeName
conf.AdvertiseAddr = ipAddr conf.AdvertiseAddr = ipAddr
conf.BindAddr = ipAddr conf.BindAddr = ipAddr
nDB, err = networkdb.New(conf) nDB, err = networkdb.New(conf)