2016-05-08 03:32:51 -04:00
|
|
|
package networkdb
|
|
|
|
|
|
|
|
import (
|
2016-06-14 16:45:24 -04:00
|
|
|
"net"
|
2017-09-24 19:44:16 -04:00
|
|
|
"time"
|
2016-05-08 03:32:51 -04:00
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
"github.com/gogo/protobuf/proto"
|
2017-07-26 18:03:47 -04:00
|
|
|
"github.com/sirupsen/logrus"
|
2016-05-08 03:32:51 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
type delegate struct {
|
|
|
|
nDB *NetworkDB
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *delegate) NodeMeta(limit int) []byte {
|
|
|
|
return []byte{}
|
|
|
|
}
|
|
|
|
|
2017-05-31 19:04:00 -04:00
|
|
|
func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
|
|
|
|
nDB.Lock()
|
|
|
|
defer nDB.Unlock()
|
|
|
|
|
|
|
|
for _, nodes := range []map[string]*node{
|
|
|
|
nDB.failedNodes,
|
|
|
|
nDB.leftNodes,
|
|
|
|
nDB.nodes,
|
|
|
|
} {
|
|
|
|
if n, ok := nodes[nEvent.NodeName]; ok {
|
|
|
|
if n.ltime >= nEvent.LTime {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return n
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-20 21:21:19 -04:00
|
|
|
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
|
|
|
|
nDB.Lock()
|
|
|
|
defer nDB.Unlock()
|
|
|
|
|
|
|
|
for _, nodes := range []map[string]*node{
|
|
|
|
nDB.failedNodes,
|
|
|
|
nDB.leftNodes,
|
|
|
|
nDB.nodes,
|
|
|
|
} {
|
|
|
|
if n, ok := nodes[nEvent.NodeName]; ok {
|
|
|
|
if n.ltime >= nEvent.LTime {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-23 19:04:48 -04:00
|
|
|
delete(nodes, n.Name)
|
2016-09-20 21:21:19 -04:00
|
|
|
return n
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-23 19:04:48 -04:00
|
|
|
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
2017-05-31 19:04:00 -04:00
|
|
|
// Update our local clock if the received messages has newer
|
|
|
|
// time.
|
|
|
|
nDB.networkClock.Witness(nEvent.LTime)
|
|
|
|
|
|
|
|
n := nDB.getNode(nEvent)
|
2016-09-20 21:21:19 -04:00
|
|
|
if n == nil {
|
|
|
|
return false
|
|
|
|
}
|
2017-05-31 19:04:00 -04:00
|
|
|
// If its a node leave event for a manager and this is the only manager we
|
|
|
|
// know of we want the reconnect logic to kick in. In a single manager
|
|
|
|
// cluster manager's gossip can't be bootstrapped unless some other node
|
|
|
|
// connects to it.
|
|
|
|
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
|
|
|
|
for _, ip := range nDB.bootStrapIP {
|
|
|
|
if ip.Equal(n.Addr) {
|
|
|
|
n.ltime = nEvent.LTime
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
n = nDB.checkAndGetNode(nEvent)
|
2017-08-10 11:50:52 -04:00
|
|
|
if n == nil {
|
|
|
|
return false
|
|
|
|
}
|
2016-09-20 21:21:19 -04:00
|
|
|
|
|
|
|
n.ltime = nEvent.LTime
|
|
|
|
|
|
|
|
switch nEvent.Type {
|
|
|
|
case NodeEventTypeJoin:
|
|
|
|
nDB.Lock()
|
2017-07-13 19:54:24 -04:00
|
|
|
_, found := nDB.nodes[n.Name]
|
2016-09-20 21:21:19 -04:00
|
|
|
nDB.nodes[n.Name] = n
|
|
|
|
nDB.Unlock()
|
2017-07-13 19:54:24 -04:00
|
|
|
if !found {
|
|
|
|
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
|
|
|
|
}
|
2016-09-20 21:21:19 -04:00
|
|
|
return true
|
|
|
|
case NodeEventTypeLeave:
|
|
|
|
nDB.Lock()
|
|
|
|
nDB.leftNodes[n.Name] = n
|
|
|
|
nDB.Unlock()
|
2017-05-31 19:04:00 -04:00
|
|
|
logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
|
2016-09-20 21:21:19 -04:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
|
2016-05-08 03:32:51 -04:00
|
|
|
// Update our local clock if the received messages has newer
|
|
|
|
// time.
|
|
|
|
nDB.networkClock.Witness(nEvent.LTime)
|
|
|
|
|
|
|
|
nDB.Lock()
|
2017-08-10 11:50:52 -04:00
|
|
|
defer nDB.Unlock()
|
2016-05-08 03:32:51 -04:00
|
|
|
|
2017-10-05 21:20:25 -04:00
|
|
|
if nEvent.NodeName == nDB.config.NodeID {
|
2016-08-11 02:31:01 -04:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2016-05-08 03:32:51 -04:00
|
|
|
nodeNetworks, ok := nDB.networks[nEvent.NodeName]
|
|
|
|
if !ok {
|
|
|
|
// We haven't heard about this node at all. Ignore the leave
|
2016-06-07 17:28:28 -04:00
|
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
2016-05-08 03:32:51 -04:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
nodeNetworks = make(map[string]*network)
|
|
|
|
nDB.networks[nEvent.NodeName] = nodeNetworks
|
|
|
|
}
|
|
|
|
|
|
|
|
if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
|
|
|
|
// We have the latest state. Ignore the event
|
|
|
|
// since it is stale.
|
|
|
|
if n.ltime >= nEvent.LTime {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
n.ltime = nEvent.LTime
|
2016-06-07 17:28:28 -04:00
|
|
|
n.leaving = nEvent.Type == NetworkEventTypeLeave
|
2016-05-08 03:32:51 -04:00
|
|
|
if n.leaving {
|
2017-09-24 19:44:16 -04:00
|
|
|
n.reapTime = reapNetworkInterval
|
2017-08-10 11:50:52 -04:00
|
|
|
|
|
|
|
// The remote node is leaving the network, but not the gossip cluster.
|
|
|
|
// Mark all its entries in deleted state, this will guarantee that
|
|
|
|
// if some node bulk sync with us, the deleted state of
|
|
|
|
// these entries will be propagated.
|
|
|
|
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
2017-07-13 19:54:24 -04:00
|
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
|
|
|
nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
|
|
|
} else {
|
|
|
|
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
|
|
|
}
|
|
|
|
|
2016-05-08 03:32:51 -04:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
if nEvent.Type == NetworkEventTypeLeave {
|
2016-05-08 03:32:51 -04:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
// This remote network join is being seen the first time.
|
|
|
|
nodeNetworks[nEvent.NetworkID] = &network{
|
|
|
|
id: nEvent.NetworkID,
|
|
|
|
ltime: nEvent.LTime,
|
|
|
|
}
|
|
|
|
|
2016-08-23 01:20:48 -04:00
|
|
|
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
|
2016-05-08 03:32:51 -04:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
2017-09-24 19:44:16 -04:00
|
|
|
// Update our local clock if the received messages has newer time.
|
2016-05-08 03:32:51 -04:00
|
|
|
nDB.tableClock.Witness(tEvent.LTime)
|
|
|
|
|
2016-08-11 02:31:01 -04:00
|
|
|
// Ignore the table events for networks that are in the process of going away
|
|
|
|
nDB.RLock()
|
2017-10-05 21:20:25 -04:00
|
|
|
networks := nDB.networks[nDB.config.NodeID]
|
2016-08-11 02:31:01 -04:00
|
|
|
network, ok := networks[tEvent.NetworkID]
|
2017-08-10 11:50:52 -04:00
|
|
|
// Check if the owner of the event is still part of the network
|
|
|
|
nodes := nDB.networkNodes[tEvent.NetworkID]
|
|
|
|
var nodePresent bool
|
|
|
|
for _, node := range nodes {
|
|
|
|
if node == tEvent.NodeName {
|
|
|
|
nodePresent = true
|
|
|
|
break
|
|
|
|
}
|
2016-08-11 02:31:01 -04:00
|
|
|
}
|
2017-08-10 11:50:52 -04:00
|
|
|
nDB.RUnlock()
|
|
|
|
if !ok || network.leaving || !nodePresent {
|
|
|
|
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
|
2016-08-23 01:20:48 -04:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2017-08-10 11:50:52 -04:00
|
|
|
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
|
2016-08-23 01:20:48 -04:00
|
|
|
if err == nil {
|
2016-05-08 03:32:51 -04:00
|
|
|
// We have the latest state. Ignore the event
|
|
|
|
// since it is stale.
|
2016-08-23 01:20:48 -04:00
|
|
|
if e.ltime >= tEvent.LTime {
|
2016-05-08 03:32:51 -04:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-08-23 01:20:48 -04:00
|
|
|
e = &entry{
|
2016-05-08 03:32:51 -04:00
|
|
|
ltime: tEvent.LTime,
|
|
|
|
node: tEvent.NodeName,
|
|
|
|
value: tEvent.Value,
|
2016-06-07 17:28:28 -04:00
|
|
|
deleting: tEvent.Type == TableEventTypeDelete,
|
2017-09-24 19:44:16 -04:00
|
|
|
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
2017-09-24 19:44:16 -04:00
|
|
|
// All the entries marked for deletion should have a reapTime set greater than 0
|
|
|
|
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
|
|
|
|
// field. If that is not the case, this can be a BUG
|
|
|
|
if e.deleting && e.reapTime == 0 {
|
|
|
|
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
|
|
|
|
e.reapTime = reapEntryInterval
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
nDB.Lock()
|
2017-09-24 19:44:16 -04:00
|
|
|
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
|
2016-05-08 03:32:51 -04:00
|
|
|
nDB.Unlock()
|
|
|
|
|
2017-08-10 11:50:52 -04:00
|
|
|
if err != nil && tEvent.Type == TableEventTypeDelete {
|
2017-09-24 19:44:16 -04:00
|
|
|
// If it is a delete event and we did not have a state for it, don't propagate to the application
|
|
|
|
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
|
|
|
|
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
|
2017-10-05 21:20:25 -04:00
|
|
|
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
|
|
|
|
return e.reapTime > reapEntryInterval/6
|
2017-08-10 11:50:52 -04:00
|
|
|
}
|
|
|
|
|
2016-05-08 03:32:51 -04:00
|
|
|
var op opType
|
2016-06-07 17:28:28 -04:00
|
|
|
switch tEvent.Type {
|
|
|
|
case TableEventTypeCreate:
|
2016-05-08 03:32:51 -04:00
|
|
|
op = opCreate
|
2016-06-07 17:28:28 -04:00
|
|
|
case TableEventTypeUpdate:
|
2016-05-08 03:32:51 -04:00
|
|
|
op = opUpdate
|
2016-06-07 17:28:28 -04:00
|
|
|
case TableEventTypeDelete:
|
2016-05-08 03:32:51 -04:00
|
|
|
op = opDelete
|
|
|
|
}
|
|
|
|
|
|
|
|
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
|
2016-05-08 03:32:51 -04:00
|
|
|
// Decode the parts
|
2016-06-07 17:28:28 -04:00
|
|
|
parts, err := decodeCompoundMessage(buf)
|
2016-05-08 03:32:51 -04:00
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Failed to decode compound request: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Handle each message
|
|
|
|
for _, part := range parts {
|
2016-06-07 17:28:28 -04:00
|
|
|
nDB.handleMessage(part, isBulkSync)
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
|
|
|
|
var tEvent TableEvent
|
|
|
|
if err := proto.Unmarshal(buf, &tEvent); err != nil {
|
2016-05-08 03:32:51 -04:00
|
|
|
logrus.Errorf("Error decoding table event message: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-07-07 07:23:10 -04:00
|
|
|
// Ignore messages that this node generated.
|
2017-10-05 21:20:25 -04:00
|
|
|
if tEvent.NodeName == nDB.config.NodeID {
|
2016-07-07 07:23:10 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-08-10 11:50:52 -04:00
|
|
|
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
|
2016-06-07 17:28:28 -04:00
|
|
|
var err error
|
|
|
|
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
|
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
|
|
|
return
|
|
|
|
}
|
2016-05-08 03:32:51 -04:00
|
|
|
|
|
|
|
nDB.RLock()
|
2017-10-05 21:20:25 -04:00
|
|
|
n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID]
|
2016-05-08 03:32:51 -04:00
|
|
|
nDB.RUnlock()
|
|
|
|
|
2017-09-24 19:44:16 -04:00
|
|
|
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
|
|
|
|
if !ok || n.leaving || n.tableBroadcasts == nil {
|
2016-06-07 17:28:28 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-09-24 19:44:16 -04:00
|
|
|
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
|
2016-06-07 17:28:28 -04:00
|
|
|
msg: buf,
|
2016-05-08 03:32:51 -04:00
|
|
|
id: tEvent.NetworkID,
|
|
|
|
tname: tEvent.TableName,
|
|
|
|
key: tEvent.Key,
|
2017-09-24 19:44:16 -04:00
|
|
|
node: tEvent.NodeName,
|
2016-05-08 03:32:51 -04:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-09-20 21:21:19 -04:00
|
|
|
func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
|
|
|
|
var nEvent NodeEvent
|
|
|
|
if err := proto.Unmarshal(buf, &nEvent); err != nil {
|
|
|
|
logrus.Errorf("Error decoding node event message: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
|
|
|
|
var err error
|
|
|
|
buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
|
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
|
|
|
|
msg: buf,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-08 03:32:51 -04:00
|
|
|
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
|
2016-06-07 17:28:28 -04:00
|
|
|
var nEvent NetworkEvent
|
|
|
|
if err := proto.Unmarshal(buf, &nEvent); err != nil {
|
2016-05-08 03:32:51 -04:00
|
|
|
logrus.Errorf("Error decoding network event message: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
|
2016-06-07 17:28:28 -04:00
|
|
|
var err error
|
|
|
|
buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
|
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
|
|
|
return
|
|
|
|
}
|
2016-05-08 03:32:51 -04:00
|
|
|
|
|
|
|
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
|
2016-06-07 17:28:28 -04:00
|
|
|
msg: buf,
|
2016-05-08 03:32:51 -04:00
|
|
|
id: nEvent.NetworkID,
|
|
|
|
node: nEvent.NodeName,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (nDB *NetworkDB) handleBulkSync(buf []byte) {
|
2016-06-07 17:28:28 -04:00
|
|
|
var bsm BulkSyncMessage
|
|
|
|
if err := proto.Unmarshal(buf, &bsm); err != nil {
|
2016-05-08 03:32:51 -04:00
|
|
|
logrus.Errorf("Error decoding bulk sync message: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if bsm.LTime > 0 {
|
|
|
|
nDB.tableClock.Witness(bsm.LTime)
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
nDB.handleMessage(bsm.Payload, true)
|
2016-05-08 03:32:51 -04:00
|
|
|
|
|
|
|
// Don't respond to a bulk sync which was not unsolicited
|
|
|
|
if !bsm.Unsolicited {
|
2016-10-03 14:52:17 -04:00
|
|
|
nDB.Lock()
|
2016-05-08 03:32:51 -04:00
|
|
|
ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
|
|
|
|
if ok {
|
|
|
|
close(ch)
|
2016-10-03 14:52:17 -04:00
|
|
|
delete(nDB.bulkSyncAckTbl, bsm.NodeName)
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
2016-10-03 14:52:17 -04:00
|
|
|
nDB.Unlock()
|
2016-05-08 03:32:51 -04:00
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-06-14 16:45:24 -04:00
|
|
|
var nodeAddr net.IP
|
2016-08-11 02:31:01 -04:00
|
|
|
nDB.RLock()
|
2016-06-14 16:45:24 -04:00
|
|
|
if node, ok := nDB.nodes[bsm.NodeName]; ok {
|
|
|
|
nodeAddr = node.Addr
|
|
|
|
}
|
2016-08-11 02:31:01 -04:00
|
|
|
nDB.RUnlock()
|
2016-06-14 16:45:24 -04:00
|
|
|
|
2016-05-08 03:32:51 -04:00
|
|
|
if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
|
2016-06-14 16:45:24 -04:00
|
|
|
logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
|
|
|
|
mType, data, err := decodeMessage(buf)
|
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Error decoding gossip message to get message type: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
switch mType {
|
2016-09-20 21:21:19 -04:00
|
|
|
case MessageTypeNodeEvent:
|
|
|
|
nDB.handleNodeMessage(data)
|
2016-06-07 17:28:28 -04:00
|
|
|
case MessageTypeNetworkEvent:
|
|
|
|
nDB.handleNetworkMessage(data)
|
|
|
|
case MessageTypeTableEvent:
|
|
|
|
nDB.handleTableMessage(data, isBulkSync)
|
|
|
|
case MessageTypeBulkSync:
|
|
|
|
nDB.handleBulkSync(data)
|
|
|
|
case MessageTypeCompound:
|
|
|
|
nDB.handleCompound(data, isBulkSync)
|
2016-05-08 03:32:51 -04:00
|
|
|
default:
|
2017-10-05 21:20:25 -04:00
|
|
|
logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType)
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *delegate) NotifyMsg(buf []byte) {
|
|
|
|
if len(buf) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
d.nDB.handleMessage(buf, false)
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
2016-09-20 21:21:19 -04:00
|
|
|
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
|
|
|
|
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
|
|
|
|
return msgs
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (d *delegate) LocalState(join bool) []byte {
|
2016-09-23 19:04:48 -04:00
|
|
|
if join {
|
|
|
|
// Update all the local node/network state to a new time to
|
|
|
|
// force update on the node we are trying to rejoin, just in
|
|
|
|
// case that node has these in leaving state still. This is
|
|
|
|
// facilitate fast convergence after recovering from a gossip
|
|
|
|
// failure.
|
|
|
|
d.nDB.updateLocalNetworkTime()
|
|
|
|
}
|
|
|
|
|
2016-05-08 03:32:51 -04:00
|
|
|
d.nDB.RLock()
|
|
|
|
defer d.nDB.RUnlock()
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
pp := NetworkPushPull{
|
2016-09-20 21:21:19 -04:00
|
|
|
LTime: d.nDB.networkClock.Time(),
|
2017-10-05 21:20:25 -04:00
|
|
|
NodeName: d.nDB.config.NodeID,
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
for name, nn := range d.nDB.networks {
|
|
|
|
for _, n := range nn {
|
2016-06-07 17:28:28 -04:00
|
|
|
pp.Networks = append(pp.Networks, &NetworkEntry{
|
|
|
|
LTime: n.ltime,
|
|
|
|
NetworkID: n.id,
|
|
|
|
NodeName: name,
|
|
|
|
Leaving: n.leaving,
|
2016-05-08 03:32:51 -04:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
buf, err := encodeMessage(MessageTypePushPull, &pp)
|
2016-05-08 03:32:51 -04:00
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Failed to encode local network state: %v", err)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return buf
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
|
|
|
if len(buf) == 0 {
|
|
|
|
logrus.Error("zero byte remote network state received")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
var gMsg GossipMessage
|
|
|
|
err := proto.Unmarshal(buf, &gMsg)
|
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if gMsg.Type != MessageTypePushPull {
|
2016-05-08 03:32:51 -04:00
|
|
|
logrus.Errorf("Invalid message type %v received from remote", buf[0])
|
|
|
|
}
|
|
|
|
|
2016-06-07 17:28:28 -04:00
|
|
|
pp := NetworkPushPull{}
|
|
|
|
if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
|
2016-05-08 03:32:51 -04:00
|
|
|
logrus.Errorf("Failed to decode remote network state: %v", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-09-20 21:21:19 -04:00
|
|
|
nodeEvent := &NodeEvent{
|
|
|
|
LTime: pp.LTime,
|
|
|
|
NodeName: pp.NodeName,
|
|
|
|
Type: NodeEventTypeJoin,
|
|
|
|
}
|
|
|
|
d.nDB.handleNodeEvent(nodeEvent)
|
|
|
|
|
2016-05-08 03:32:51 -04:00
|
|
|
for _, n := range pp.Networks {
|
2016-06-07 17:28:28 -04:00
|
|
|
nEvent := &NetworkEvent{
|
2016-05-08 03:32:51 -04:00
|
|
|
LTime: n.LTime,
|
|
|
|
NodeName: n.NodeName,
|
2016-06-07 17:28:28 -04:00
|
|
|
NetworkID: n.NetworkID,
|
|
|
|
Type: NetworkEventTypeJoin,
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
if n.Leaving {
|
2016-06-07 17:28:28 -04:00
|
|
|
nEvent.Type = NetworkEventTypeLeave
|
2016-05-08 03:32:51 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
d.nDB.handleNetworkEvent(nEvent)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|