Properly purge node networks when node goes away

When a node goes away purge all the network attachments from the node
and make sure we don't attempt bulk syncing to that node once removed.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2016-06-14 12:39:38 -07:00
parent 0654c113d9
commit f5f576ad34
3 changed files with 31 additions and 7 deletions

View File

@ -311,7 +311,16 @@ func (nDB *NetworkDB) bulkSyncTables() {
nid := networks[0]
networks = networks[1:]
completed, err := nDB.bulkSync(nid, false)
nDB.RLock()
nodes := nDB.networkNodes[nid]
nDB.RUnlock()
// No peer nodes on this network. Move on.
if len(nodes) == 0 {
continue
}
completed, err := nDB.bulkSync(nid, nodes, false)
if err != nil {
logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
continue
@ -334,11 +343,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
}
}
func (nDB *NetworkDB) bulkSync(nid string, all bool) ([]string, error) {
nDB.RLock()
nodes := nDB.networkNodes[nid]
nDB.RUnlock()
func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, error) {
if !all {
// If not all, then just pick one.
nodes = nDB.mRandomNodes(1, nodes)

View File

@ -14,6 +14,7 @@ func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
func (e *eventDelegate) NotifyLeave(n *memberlist.Node) {
e.nDB.deleteNodeTableEntries(n.Name)
e.nDB.deleteNetworkNodeEntries(n.Name)
e.nDB.Lock()
delete(e.nDB.nodes, n.Name)
e.nDB.Unlock()

View File

@ -286,6 +286,23 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
return nil
}
func (nDB *NetworkDB) deleteNetworkNodeEntries(deletedNode string) {
nDB.Lock()
for nid, nodes := range nDB.networkNodes {
updatedNodes := make([]string, 0, len(nodes))
for _, node := range nodes {
if node == deletedNode {
continue
}
updatedNodes = append(updatedNodes, node)
}
nDB.networkNodes[nid] = updatedNodes
}
nDB.Unlock()
}
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.Lock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
@ -359,6 +376,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
RetransmitMult: 4,
}
nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
nDB.Unlock()
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
@ -366,7 +384,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
}
logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
if _, err := nDB.bulkSync(nid, true); err != nil {
if _, err := nDB.bulkSync(nid, networkNodes, true); err != nil {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
}