mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
28f4561e3f
Network DB is a network scoped gossip database built on top of hashicorp/memberlist providing an eventually consistent state store. It limits the scope of the gossip and periodic bulk syncing for table entries to only the nodes which participate in the network to which the gossip belongs. This designs make the gossip layer scale better and only consumes resources for the network state that the node participates in. Since the complete state for a network is maintained by all nodes participating in the network, all nodes will eventually converge to the same state. NetworkDB also provides facilities for the users of the package to watch on any table (or all tables) and get notified if there are state changes of interest that happened anywhere in the cluster when that state change eventually finds it's way to the watcher's node. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
315 lines
6.8 KiB
Go
315 lines
6.8 KiB
Go
package networkdb
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/hashicorp/serf/serf"
|
|
)
|
|
|
|
type networkData struct {
|
|
LTime serf.LamportTime
|
|
ID string
|
|
NodeName string
|
|
Leaving bool
|
|
}
|
|
|
|
type networkPushPull struct {
|
|
LTime serf.LamportTime
|
|
Networks []networkData
|
|
}
|
|
|
|
type delegate struct {
|
|
nDB *NetworkDB
|
|
}
|
|
|
|
func (d *delegate) NodeMeta(limit int) []byte {
|
|
return []byte{}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool {
|
|
// Update our local clock if the received messages has newer
|
|
// time.
|
|
nDB.networkClock.Witness(nEvent.LTime)
|
|
|
|
nDB.Lock()
|
|
defer nDB.Unlock()
|
|
|
|
nodeNetworks, ok := nDB.networks[nEvent.NodeName]
|
|
if !ok {
|
|
// We haven't heard about this node at all. Ignore the leave
|
|
if nEvent.Event == networkLeave {
|
|
return false
|
|
}
|
|
|
|
nodeNetworks = make(map[string]*network)
|
|
nDB.networks[nEvent.NodeName] = nodeNetworks
|
|
}
|
|
|
|
if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
|
|
// We have the latest state. Ignore the event
|
|
// since it is stale.
|
|
if n.ltime >= nEvent.LTime {
|
|
return false
|
|
}
|
|
|
|
n.ltime = nEvent.LTime
|
|
n.leaving = nEvent.Event == networkLeave
|
|
if n.leaving {
|
|
n.leaveTime = time.Now()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
if nEvent.Event == networkLeave {
|
|
return false
|
|
}
|
|
|
|
// This remote network join is being seen the first time.
|
|
nodeNetworks[nEvent.NetworkID] = &network{
|
|
id: nEvent.NetworkID,
|
|
ltime: nEvent.LTime,
|
|
}
|
|
|
|
nDB.networkNodes[nEvent.NetworkID] = append(nDB.networkNodes[nEvent.NetworkID], nEvent.NodeName)
|
|
return true
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool {
|
|
// Update our local clock if the received messages has newer
|
|
// time.
|
|
nDB.tableClock.Witness(tEvent.LTime)
|
|
|
|
if entry, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key); err == nil {
|
|
// We have the latest state. Ignore the event
|
|
// since it is stale.
|
|
if entry.ltime >= tEvent.LTime {
|
|
return false
|
|
}
|
|
}
|
|
|
|
entry := &entry{
|
|
ltime: tEvent.LTime,
|
|
node: tEvent.NodeName,
|
|
value: tEvent.Value,
|
|
deleting: tEvent.Event == tableEntryDelete,
|
|
}
|
|
|
|
if entry.deleting {
|
|
entry.deleteTime = time.Now()
|
|
}
|
|
|
|
nDB.Lock()
|
|
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), entry)
|
|
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), entry)
|
|
nDB.Unlock()
|
|
|
|
var op opType
|
|
switch tEvent.Event {
|
|
case tableEntryCreate:
|
|
op = opCreate
|
|
case tableEntryUpdate:
|
|
op = opUpdate
|
|
case tableEntryDelete:
|
|
op = opDelete
|
|
}
|
|
|
|
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
|
|
return true
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleCompound(buf []byte) {
|
|
// Decode the parts
|
|
trunc, parts, err := decodeCompoundMessage(buf[1:])
|
|
if err != nil {
|
|
logrus.Errorf("Failed to decode compound request: %v", err)
|
|
return
|
|
}
|
|
|
|
// Log any truncation
|
|
if trunc > 0 {
|
|
logrus.Warnf("Compound request had %d truncated messages", trunc)
|
|
}
|
|
|
|
// Handle each message
|
|
for _, part := range parts {
|
|
nDB.handleMessage(part)
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleTableMessage(buf []byte) {
|
|
var tEvent tableEventData
|
|
if err := decodeMessage(buf[1:], &tEvent); err != nil {
|
|
logrus.Errorf("Error decoding table event message: %v", err)
|
|
return
|
|
}
|
|
|
|
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
|
|
// Copy the buffer since we cannot rely on the slice not changing
|
|
newBuf := make([]byte, len(buf))
|
|
copy(newBuf, buf)
|
|
|
|
nDB.RLock()
|
|
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
|
|
nDB.RUnlock()
|
|
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
broadcastQ := n.tableBroadcasts
|
|
broadcastQ.QueueBroadcast(&tableEventMessage{
|
|
msg: newBuf,
|
|
id: tEvent.NetworkID,
|
|
tname: tEvent.TableName,
|
|
key: tEvent.Key,
|
|
node: nDB.config.NodeName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
|
|
var nEvent networkEventData
|
|
if err := decodeMessage(buf[1:], &nEvent); err != nil {
|
|
logrus.Errorf("Error decoding network event message: %v", err)
|
|
return
|
|
}
|
|
|
|
if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
|
|
// Copy the buffer since it we cannot rely on the slice not changing
|
|
newBuf := make([]byte, len(buf))
|
|
copy(newBuf, buf)
|
|
|
|
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
|
|
msg: newBuf,
|
|
id: nEvent.NetworkID,
|
|
node: nEvent.NodeName,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleBulkSync(buf []byte) {
|
|
var bsm bulkSyncMessage
|
|
if err := decodeMessage(buf[1:], &bsm); err != nil {
|
|
logrus.Errorf("Error decoding bulk sync message: %v", err)
|
|
return
|
|
}
|
|
|
|
if bsm.LTime > 0 {
|
|
nDB.tableClock.Witness(bsm.LTime)
|
|
}
|
|
|
|
nDB.handleMessage(bsm.Payload)
|
|
|
|
// Don't respond to a bulk sync which was not unsolicited
|
|
if !bsm.Unsolicited {
|
|
nDB.RLock()
|
|
ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
|
|
nDB.RUnlock()
|
|
if ok {
|
|
close(ch)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
|
|
logrus.Errorf("Error in responding to bulk sync from node %s: %v", nDB.nodes[bsm.NodeName].Addr, err)
|
|
}
|
|
}
|
|
|
|
func (nDB *NetworkDB) handleMessage(buf []byte) {
|
|
msgType := messageType(buf[0])
|
|
|
|
switch msgType {
|
|
case networkEventMsg:
|
|
nDB.handleNetworkMessage(buf)
|
|
case tableEventMsg:
|
|
nDB.handleTableMessage(buf)
|
|
case compoundMsg:
|
|
nDB.handleCompound(buf)
|
|
case bulkSyncMsg:
|
|
nDB.handleBulkSync(buf)
|
|
default:
|
|
logrus.Errorf("%s: unknown message type %d payload = %v", nDB.config.NodeName, msgType, buf[:8])
|
|
}
|
|
}
|
|
|
|
func (d *delegate) NotifyMsg(buf []byte) {
|
|
if len(buf) == 0 {
|
|
return
|
|
}
|
|
|
|
d.nDB.handleMessage(buf)
|
|
}
|
|
|
|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
|
return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
|
|
}
|
|
|
|
func (d *delegate) LocalState(join bool) []byte {
|
|
d.nDB.RLock()
|
|
defer d.nDB.RUnlock()
|
|
|
|
pp := networkPushPull{
|
|
LTime: d.nDB.networkClock.Time(),
|
|
}
|
|
|
|
for name, nn := range d.nDB.networks {
|
|
for _, n := range nn {
|
|
pp.Networks = append(pp.Networks, networkData{
|
|
LTime: n.ltime,
|
|
ID: n.id,
|
|
NodeName: name,
|
|
Leaving: n.leaving,
|
|
})
|
|
}
|
|
}
|
|
|
|
buf, err := encodeMessage(networkPushPullMsg, &pp)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to encode local network state: %v", err)
|
|
return nil
|
|
}
|
|
|
|
return buf
|
|
}
|
|
|
|
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
|
if len(buf) == 0 {
|
|
logrus.Error("zero byte remote network state received")
|
|
return
|
|
}
|
|
|
|
if messageType(buf[0]) != networkPushPullMsg {
|
|
logrus.Errorf("Invalid message type %v received from remote", buf[0])
|
|
}
|
|
|
|
pp := networkPushPull{}
|
|
if err := decodeMessage(buf[1:], &pp); err != nil {
|
|
logrus.Errorf("Failed to decode remote network state: %v", err)
|
|
return
|
|
}
|
|
|
|
if pp.LTime > 0 {
|
|
d.nDB.networkClock.Witness(pp.LTime)
|
|
}
|
|
|
|
for _, n := range pp.Networks {
|
|
nEvent := &networkEventData{
|
|
LTime: n.LTime,
|
|
NodeName: n.NodeName,
|
|
NetworkID: n.ID,
|
|
Event: networkJoin,
|
|
}
|
|
|
|
if n.Leaving {
|
|
nEvent.Event = networkLeave
|
|
}
|
|
|
|
d.nDB.handleNetworkEvent(nEvent)
|
|
}
|
|
|
|
}
|