mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
5f5dad3c02
Currently if there is any transient gossip failure in any node the recoevry process depends on other nodes propogating the information indirectly. In cases if these transient failures affects all the nodes that this node has in its memberlist then this node will be permenantly cutoff from the the gossip channel. Added node state management code in networkdb to address these problems by trying to rejoin the cluster via the failed nodes when there is a failure. This also necessitates the need to add new messages called node event messages to differentiate between node leave and node failure. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
437 lines
9.5 KiB
Go
437 lines
9.5 KiB
Go
package networkdb
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/gogo/protobuf/proto"
|
|
)
|
|
|
|
type delegate struct {
|
|
nDB *NetworkDB
|
|
}
|
|
|
|
func (d *delegate) NodeMeta(limit int) []byte {
|
|
return []byte{}
|
|
}
|
|
|
|
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
|
|
nDB.Lock()
|
|
defer nDB.Unlock()
|
|
|
|
for _, nodes := range []map[string]*node{
|
|
nDB.failedNodes,
|
|
nDB.leftNodes,
|
|
nDB.nodes,
|
|
} {
|
|
if n, ok := nodes[nEvent.NodeName]; ok {
|
|
if n.ltime >= nEvent.LTime {
|
|
return nil
|
|
}
|
|
|
|
delete(nDB.failedNodes, n.Name)
|
|
return n
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
|
// Update our local clock if the received messages has newer
|
|
// time.
|
|
nDB.networkClock.Witness(nEvent.LTime)
|
|
|
|
n := nDB.checkAndGetNode(nEvent)
|
|
if n == nil {
|
|
return false
|
|
}
|
|
|
|
n.ltime = nEvent.LTime
|
|
|
|
switch nEvent.Type {
|
|
case NodeEventTypeJoin:
|
|
nDB.Lock()
|
|
nDB.nodes[n.Name] = n
|
|
nDB.Unlock()
|
|
return true
|
|
case NodeEventTypeLeave:
|
|
nDB.Lock()
|
|
nDB.leftNodes[n.Name] = n
|
|
nDB.Unlock()
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
|
|
// Update our local clock if the received messages has newer
|
|
// time.
|
|
nDB.networkClock.Witness(nEvent.LTime)
|
|
|
|
nDB.Lock()
|
|
defer nDB.Unlock()
|
|
|
|
if nEvent.NodeName == nDB.config.NodeName {
|
|
return false
|
|
}
|
|
|
|
nodeNetworks, ok := nDB.networks[nEvent.NodeName]
|
|
if !ok {
|
|
// We haven't heard about this node at all. Ignore the leave
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
|
return false
|
|
}
|
|
|
|
nodeNetworks = make(map[string]*network)
|
|
nDB.networks[nEvent.NodeName] = nodeNetworks
|
|
}
|
|
|
|
if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
|
|
// We have the latest state. Ignore the event
|
|
// since it is stale.
|
|
if n.ltime >= nEvent.LTime {
|
|
return false
|
|
}
|
|
|
|
n.ltime = nEvent.LTime
|
|
n.leaving = nEvent.Type == NetworkEventTypeLeave
|
|
if n.leaving {
|
|
n.leaveTime = time.Now()
|
|
}
|
|
|
|
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
|
return true
|
|
}
|
|
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
|
return false
|
|
}
|
|
|
|
// This remote network join is being seen the first time.
|
|
nodeNetworks[nEvent.NetworkID] = &network{
|
|
id: nEvent.NetworkID,
|
|
ltime: nEvent.LTime,
|
|
}
|
|
|
|
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
|
return true
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
|
// Update our local clock if the received messages has newer
|
|
// time.
|
|
nDB.tableClock.Witness(tEvent.LTime)
|
|
|
|
// Ignore the table events for networks that are in the process of going away
|
|
nDB.RLock()
|
|
networks := nDB.networks[nDB.config.NodeName]
|
|
network, ok := networks[tEvent.NetworkID]
|
|
nDB.RUnlock()
|
|
if !ok || network.leaving {
|
|
return true
|
|
}
|
|
|
|
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
|
|
if err != nil && tEvent.Type == TableEventTypeDelete {
|
|
// If it is a delete event and we don't have the entry here nothing to do.
|
|
return false
|
|
}
|
|
|
|
if err == nil {
|
|
// We have the latest state. Ignore the event
|
|
// since it is stale.
|
|
if e.ltime >= tEvent.LTime {
|
|
return false
|
|
}
|
|
}
|
|
|
|
e = &entry{
|
|
ltime: tEvent.LTime,
|
|
node: tEvent.NodeName,
|
|
value: tEvent.Value,
|
|
deleting: tEvent.Type == TableEventTypeDelete,
|
|
}
|
|
|
|
if e.deleting {
|
|
e.deleteTime = time.Now()
|
|
}
|
|
|
|
nDB.Lock()
|
|
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
|
|
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
|
|
nDB.Unlock()
|
|
|
|
var op opType
|
|
switch tEvent.Type {
|
|
case TableEventTypeCreate:
|
|
op = opCreate
|
|
case TableEventTypeUpdate:
|
|
op = opUpdate
|
|
case TableEventTypeDelete:
|
|
op = opDelete
|
|
}
|
|
|
|
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
|
|
return true
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
|
|
// Decode the parts
|
|
parts, err := decodeCompoundMessage(buf)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to decode compound request: %v", err)
|
|
return
|
|
}
|
|
|
|
// Handle each message
|
|
for _, part := range parts {
|
|
nDB.handleMessage(part, isBulkSync)
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
|
|
var tEvent TableEvent
|
|
if err := proto.Unmarshal(buf, &tEvent); err != nil {
|
|
logrus.Errorf("Error decoding table event message: %v", err)
|
|
return
|
|
}
|
|
|
|
// Ignore messages that this node generated.
|
|
if tEvent.NodeName == nDB.config.NodeName {
|
|
return
|
|
}
|
|
|
|
// Do not rebroadcast a bulk sync
|
|
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
|
|
var err error
|
|
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
|
return
|
|
}
|
|
|
|
nDB.RLock()
|
|
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
|
|
nDB.RUnlock()
|
|
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
broadcastQ := n.tableBroadcasts
|
|
|
|
if broadcastQ == nil {
|
|
return
|
|
}
|
|
|
|
broadcastQ.QueueBroadcast(&tableEventMessage{
|
|
msg: buf,
|
|
id: tEvent.NetworkID,
|
|
tname: tEvent.TableName,
|
|
key: tEvent.Key,
|
|
node: nDB.config.NodeName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
|
|
var nEvent NodeEvent
|
|
if err := proto.Unmarshal(buf, &nEvent); err != nil {
|
|
logrus.Errorf("Error decoding node event message: %v", err)
|
|
return
|
|
}
|
|
|
|
if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
|
|
var err error
|
|
buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
|
|
return
|
|
}
|
|
|
|
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
|
|
msg: buf,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
|
|
var nEvent NetworkEvent
|
|
if err := proto.Unmarshal(buf, &nEvent); err != nil {
|
|
logrus.Errorf("Error decoding network event message: %v", err)
|
|
return
|
|
}
|
|
|
|
if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
|
|
var err error
|
|
buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
|
return
|
|
}
|
|
|
|
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
|
|
msg: buf,
|
|
id: nEvent.NetworkID,
|
|
node: nEvent.NodeName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleBulkSync(buf []byte) {
|
|
var bsm BulkSyncMessage
|
|
if err := proto.Unmarshal(buf, &bsm); err != nil {
|
|
logrus.Errorf("Error decoding bulk sync message: %v", err)
|
|
return
|
|
}
|
|
|
|
if bsm.LTime > 0 {
|
|
nDB.tableClock.Witness(bsm.LTime)
|
|
}
|
|
|
|
nDB.handleMessage(bsm.Payload, true)
|
|
|
|
// Don't respond to a bulk sync which was not unsolicited
|
|
if !bsm.Unsolicited {
|
|
nDB.RLock()
|
|
ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
|
|
nDB.RUnlock()
|
|
if ok {
|
|
close(ch)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
var nodeAddr net.IP
|
|
nDB.RLock()
|
|
if node, ok := nDB.nodes[bsm.NodeName]; ok {
|
|
nodeAddr = node.Addr
|
|
}
|
|
nDB.RUnlock()
|
|
|
|
if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
|
|
logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
|
|
mType, data, err := decodeMessage(buf)
|
|
if err != nil {
|
|
logrus.Errorf("Error decoding gossip message to get message type: %v", err)
|
|
return
|
|
}
|
|
|
|
switch mType {
|
|
case MessageTypeNodeEvent:
|
|
nDB.handleNodeMessage(data)
|
|
case MessageTypeNetworkEvent:
|
|
nDB.handleNetworkMessage(data)
|
|
case MessageTypeTableEvent:
|
|
nDB.handleTableMessage(data, isBulkSync)
|
|
case MessageTypeBulkSync:
|
|
nDB.handleBulkSync(data)
|
|
case MessageTypeCompound:
|
|
nDB.handleCompound(data, isBulkSync)
|
|
default:
|
|
logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
|
|
}
|
|
}
|
|
|
|
func (d *delegate) NotifyMsg(buf []byte) {
|
|
if len(buf) == 0 {
|
|
return
|
|
}
|
|
|
|
d.nDB.handleMessage(buf, false)
|
|
}
|
|
|
|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
|
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
|
|
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
|
|
return msgs
|
|
}
|
|
|
|
func (d *delegate) LocalState(join bool) []byte {
|
|
d.nDB.RLock()
|
|
defer d.nDB.RUnlock()
|
|
|
|
pp := NetworkPushPull{
|
|
LTime: d.nDB.networkClock.Time(),
|
|
NodeName: d.nDB.config.NodeName,
|
|
}
|
|
|
|
for name, nn := range d.nDB.networks {
|
|
for _, n := range nn {
|
|
pp.Networks = append(pp.Networks, &NetworkEntry{
|
|
LTime: n.ltime,
|
|
NetworkID: n.id,
|
|
NodeName: name,
|
|
Leaving: n.leaving,
|
|
})
|
|
}
|
|
}
|
|
|
|
buf, err := encodeMessage(MessageTypePushPull, &pp)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to encode local network state: %v", err)
|
|
return nil
|
|
}
|
|
|
|
return buf
|
|
}
|
|
|
|
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
|
if len(buf) == 0 {
|
|
logrus.Error("zero byte remote network state received")
|
|
return
|
|
}
|
|
|
|
var gMsg GossipMessage
|
|
err := proto.Unmarshal(buf, &gMsg)
|
|
if err != nil {
|
|
logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
|
|
return
|
|
}
|
|
|
|
if gMsg.Type != MessageTypePushPull {
|
|
logrus.Errorf("Invalid message type %v received from remote", buf[0])
|
|
}
|
|
|
|
pp := NetworkPushPull{}
|
|
if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
|
|
logrus.Errorf("Failed to decode remote network state: %v", err)
|
|
return
|
|
}
|
|
|
|
if pp.LTime > 0 {
|
|
d.nDB.networkClock.Witness(pp.LTime)
|
|
}
|
|
|
|
nodeEvent := &NodeEvent{
|
|
LTime: pp.LTime,
|
|
NodeName: pp.NodeName,
|
|
Type: NodeEventTypeJoin,
|
|
}
|
|
d.nDB.handleNodeEvent(nodeEvent)
|
|
|
|
for _, n := range pp.Networks {
|
|
nEvent := &NetworkEvent{
|
|
LTime: n.LTime,
|
|
NodeName: n.NodeName,
|
|
NetworkID: n.NetworkID,
|
|
Type: NetworkEventTypeJoin,
|
|
}
|
|
|
|
if n.Leaving {
|
|
nEvent.Type = NetworkEventTypeLeave
|
|
}
|
|
|
|
d.nDB.handleNetworkEvent(nEvent)
|
|
}
|
|
|
|
}
|