mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Use protobuf in networkdb core messages
Convert all networkdb core message types from go message types to protobuf message types. This faciliates future modification of the message structure without breaking backward compatibility. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
parent
9dd822b8cc
commit
77abea9c1e
8 changed files with 2578 additions and 215 deletions
|
@ -47,7 +47,7 @@ check: ${build_image}.created
|
|||
|
||||
check-code:
|
||||
@echo "Checking code... "
|
||||
test -z "$$(golint ./... | tee /dev/stderr)"
|
||||
test -z "$$(golint ./... | grep -v .pb.go: | tee /dev/stderr)"
|
||||
go vet ./...
|
||||
@echo "Done checking code"
|
||||
|
||||
|
|
|
@ -5,20 +5,6 @@ import (
|
|||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
type networkEventType uint8
|
||||
|
||||
const (
|
||||
networkJoin networkEventType = 1 + iota
|
||||
networkLeave
|
||||
)
|
||||
|
||||
type networkEventData struct {
|
||||
Event networkEventType
|
||||
LTime serf.LamportTime
|
||||
NodeName string
|
||||
NetworkID string
|
||||
}
|
||||
|
||||
type networkEventMessage struct {
|
||||
id string
|
||||
node string
|
||||
|
@ -37,15 +23,15 @@ func (m *networkEventMessage) Message() []byte {
|
|||
func (m *networkEventMessage) Finished() {
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) sendNetworkEvent(nid string, event networkEventType, ltime serf.LamportTime) error {
|
||||
nEvent := networkEventData{
|
||||
Event: event,
|
||||
func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltime serf.LamportTime) error {
|
||||
nEvent := NetworkEvent{
|
||||
Type: event,
|
||||
LTime: ltime,
|
||||
NodeName: nDB.config.NodeName,
|
||||
NetworkID: nid,
|
||||
}
|
||||
|
||||
raw, err := encodeMessage(networkEventMsg, &nEvent)
|
||||
raw, err := encodeMessage(MessageTypeNetworkEvent, &nEvent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -58,24 +44,6 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event networkEventType, ltime
|
|||
return nil
|
||||
}
|
||||
|
||||
type tableEventType uint8
|
||||
|
||||
const (
|
||||
tableEntryCreate tableEventType = 1 + iota
|
||||
tableEntryUpdate
|
||||
tableEntryDelete
|
||||
)
|
||||
|
||||
type tableEventData struct {
|
||||
Event tableEventType
|
||||
LTime serf.LamportTime
|
||||
NetworkID string
|
||||
TableName string
|
||||
NodeName string
|
||||
Value []byte
|
||||
Key string
|
||||
}
|
||||
|
||||
type tableEventMessage struct {
|
||||
id string
|
||||
tname string
|
||||
|
@ -96,9 +64,9 @@ func (m *tableEventMessage) Message() []byte {
|
|||
func (m *tableEventMessage) Finished() {
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) sendTableEvent(event tableEventType, nid string, tname string, key string, entry *entry) error {
|
||||
tEvent := tableEventData{
|
||||
Event: event,
|
||||
func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname string, key string, entry *entry) error {
|
||||
tEvent := TableEvent{
|
||||
Type: event,
|
||||
LTime: entry.ltime,
|
||||
NodeName: nDB.config.NodeName,
|
||||
NetworkID: nid,
|
||||
|
@ -107,7 +75,7 @@ func (nDB *NetworkDB) sendTableEvent(event tableEventType, nid string, tname str
|
|||
Value: entry.value,
|
||||
}
|
||||
|
||||
raw, err := encodeMessage(tableEventMsg, &tEvent)
|
||||
raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
const reapInterval = 2 * time.Second
|
||||
|
@ -222,21 +221,13 @@ func (nDB *NetworkDB) gossip() {
|
|||
}
|
||||
|
||||
// Send the compound message
|
||||
if err := nDB.memberlist.SendToUDP(mnode, compound.Bytes()); err != nil {
|
||||
if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil {
|
||||
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type bulkSyncMessage struct {
|
||||
LTime serf.LamportTime
|
||||
Unsolicited bool
|
||||
NodeName string
|
||||
Networks []string
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) bulkSyncTables() {
|
||||
var networks []string
|
||||
nDB.RLock()
|
||||
|
@ -331,8 +322,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
|
|||
}
|
||||
|
||||
params := strings.Split(path[1:], "/")
|
||||
tEvent := tableEventData{
|
||||
Event: tableEntryCreate,
|
||||
tEvent := TableEvent{
|
||||
Type: TableEventTypeCreate,
|
||||
LTime: entry.ltime,
|
||||
NodeName: entry.node,
|
||||
NetworkID: nid,
|
||||
|
@ -341,7 +332,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
|
|||
Value: entry.value,
|
||||
}
|
||||
|
||||
msg, err := encodeMessage(tableEventMsg, &tEvent)
|
||||
msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
|
||||
if err != nil {
|
||||
logrus.Errorf("Encode failure during bulk sync: %#v", tEvent)
|
||||
return false
|
||||
|
@ -356,15 +347,15 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
|
|||
// Create a compound message
|
||||
compound := makeCompoundMessage(msgs)
|
||||
|
||||
bsm := bulkSyncMessage{
|
||||
bsm := BulkSyncMessage{
|
||||
LTime: nDB.tableClock.Time(),
|
||||
Unsolicited: unsolicited,
|
||||
NodeName: nDB.config.NodeName,
|
||||
Networks: networks,
|
||||
Payload: compound.Bytes(),
|
||||
Payload: compound,
|
||||
}
|
||||
|
||||
buf, err := encodeMessage(bulkSyncMsg, &bsm)
|
||||
buf, err := encodeMessage(MessageTypeBulkSync, &bsm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode bulk sync message: %v", err)
|
||||
}
|
||||
|
|
|
@ -5,21 +5,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
type networkData struct {
|
||||
LTime serf.LamportTime
|
||||
ID string
|
||||
NodeName string
|
||||
Leaving bool
|
||||
}
|
||||
|
||||
type networkPushPull struct {
|
||||
LTime serf.LamportTime
|
||||
Networks []networkData
|
||||
}
|
||||
|
||||
type delegate struct {
|
||||
nDB *NetworkDB
|
||||
}
|
||||
|
@ -28,7 +16,7 @@ func (d *delegate) NodeMeta(limit int) []byte {
|
|||
return []byte{}
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool {
|
||||
func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
|
||||
// Update our local clock if the received messages has newer
|
||||
// time.
|
||||
nDB.networkClock.Witness(nEvent.LTime)
|
||||
|
@ -39,7 +27,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool {
|
|||
nodeNetworks, ok := nDB.networks[nEvent.NodeName]
|
||||
if !ok {
|
||||
// We haven't heard about this node at all. Ignore the leave
|
||||
if nEvent.Event == networkLeave {
|
||||
if nEvent.Type == NetworkEventTypeLeave {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -55,7 +43,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool {
|
|||
}
|
||||
|
||||
n.ltime = nEvent.LTime
|
||||
n.leaving = nEvent.Event == networkLeave
|
||||
n.leaving = nEvent.Type == NetworkEventTypeLeave
|
||||
if n.leaving {
|
||||
n.leaveTime = time.Now()
|
||||
}
|
||||
|
@ -63,7 +51,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
if nEvent.Event == networkLeave {
|
||||
if nEvent.Type == NetworkEventTypeLeave {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -77,7 +65,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool {
|
||||
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
||||
// Update our local clock if the received messages has newer
|
||||
// time.
|
||||
nDB.tableClock.Witness(tEvent.LTime)
|
||||
|
@ -94,7 +82,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool {
|
|||
ltime: tEvent.LTime,
|
||||
node: tEvent.NodeName,
|
||||
value: tEvent.Value,
|
||||
deleting: tEvent.Event == tableEntryDelete,
|
||||
deleting: tEvent.Type == TableEventTypeDelete,
|
||||
}
|
||||
|
||||
if entry.deleting {
|
||||
|
@ -107,12 +95,12 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool {
|
|||
nDB.Unlock()
|
||||
|
||||
var op opType
|
||||
switch tEvent.Event {
|
||||
case tableEntryCreate:
|
||||
switch tEvent.Type {
|
||||
case TableEventTypeCreate:
|
||||
op = opCreate
|
||||
case tableEntryUpdate:
|
||||
case TableEventTypeUpdate:
|
||||
op = opUpdate
|
||||
case tableEntryDelete:
|
||||
case TableEventTypeDelete:
|
||||
op = opDelete
|
||||
}
|
||||
|
||||
|
@ -122,17 +110,12 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool {
|
|||
|
||||
func (nDB *NetworkDB) handleCompound(buf []byte) {
|
||||
// Decode the parts
|
||||
trunc, parts, err := decodeCompoundMessage(buf[1:])
|
||||
parts, err := decodeCompoundMessage(buf)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to decode compound request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Log any truncation
|
||||
if trunc > 0 {
|
||||
logrus.Warnf("Compound request had %d truncated messages", trunc)
|
||||
}
|
||||
|
||||
// Handle each message
|
||||
for _, part := range parts {
|
||||
nDB.handleMessage(part)
|
||||
|
@ -140,16 +123,19 @@ func (nDB *NetworkDB) handleCompound(buf []byte) {
|
|||
}
|
||||
|
||||
func (nDB *NetworkDB) handleTableMessage(buf []byte) {
|
||||
var tEvent tableEventData
|
||||
if err := decodeMessage(buf[1:], &tEvent); err != nil {
|
||||
var tEvent TableEvent
|
||||
if err := proto.Unmarshal(buf, &tEvent); err != nil {
|
||||
logrus.Errorf("Error decoding table event message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
|
||||
// Copy the buffer since we cannot rely on the slice not changing
|
||||
newBuf := make([]byte, len(buf))
|
||||
copy(newBuf, buf)
|
||||
var err error
|
||||
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
nDB.RLock()
|
||||
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
|
||||
|
@ -161,7 +147,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte) {
|
|||
|
||||
broadcastQ := n.tableBroadcasts
|
||||
broadcastQ.QueueBroadcast(&tableEventMessage{
|
||||
msg: newBuf,
|
||||
msg: buf,
|
||||
id: tEvent.NetworkID,
|
||||
tname: tEvent.TableName,
|
||||
key: tEvent.Key,
|
||||
|
@ -171,19 +157,22 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte) {
|
|||
}
|
||||
|
||||
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
|
||||
var nEvent networkEventData
|
||||
if err := decodeMessage(buf[1:], &nEvent); err != nil {
|
||||
var nEvent NetworkEvent
|
||||
if err := proto.Unmarshal(buf, &nEvent); err != nil {
|
||||
logrus.Errorf("Error decoding network event message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
|
||||
// Copy the buffer since it we cannot rely on the slice not changing
|
||||
newBuf := make([]byte, len(buf))
|
||||
copy(newBuf, buf)
|
||||
var err error
|
||||
buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
|
||||
msg: newBuf,
|
||||
msg: buf,
|
||||
id: nEvent.NetworkID,
|
||||
node: nEvent.NodeName,
|
||||
})
|
||||
|
@ -191,8 +180,8 @@ func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
|
|||
}
|
||||
|
||||
func (nDB *NetworkDB) handleBulkSync(buf []byte) {
|
||||
var bsm bulkSyncMessage
|
||||
if err := decodeMessage(buf[1:], &bsm); err != nil {
|
||||
var bsm BulkSyncMessage
|
||||
if err := proto.Unmarshal(buf, &bsm); err != nil {
|
||||
logrus.Errorf("Error decoding bulk sync message: %v", err)
|
||||
return
|
||||
}
|
||||
|
@ -221,19 +210,23 @@ func (nDB *NetworkDB) handleBulkSync(buf []byte) {
|
|||
}
|
||||
|
||||
func (nDB *NetworkDB) handleMessage(buf []byte) {
|
||||
msgType := messageType(buf[0])
|
||||
mType, data, err := decodeMessage(buf)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error decoding gossip message to get message type: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
switch msgType {
|
||||
case networkEventMsg:
|
||||
nDB.handleNetworkMessage(buf)
|
||||
case tableEventMsg:
|
||||
nDB.handleTableMessage(buf)
|
||||
case compoundMsg:
|
||||
nDB.handleCompound(buf)
|
||||
case bulkSyncMsg:
|
||||
nDB.handleBulkSync(buf)
|
||||
switch mType {
|
||||
case MessageTypeNetworkEvent:
|
||||
nDB.handleNetworkMessage(data)
|
||||
case MessageTypeTableEvent:
|
||||
nDB.handleTableMessage(data)
|
||||
case MessageTypeBulkSync:
|
||||
nDB.handleBulkSync(data)
|
||||
case MessageTypeCompound:
|
||||
nDB.handleCompound(data)
|
||||
default:
|
||||
logrus.Errorf("%s: unknown message type %d payload = %v", nDB.config.NodeName, msgType, buf[:8])
|
||||
logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -253,22 +246,22 @@ func (d *delegate) LocalState(join bool) []byte {
|
|||
d.nDB.RLock()
|
||||
defer d.nDB.RUnlock()
|
||||
|
||||
pp := networkPushPull{
|
||||
pp := NetworkPushPull{
|
||||
LTime: d.nDB.networkClock.Time(),
|
||||
}
|
||||
|
||||
for name, nn := range d.nDB.networks {
|
||||
for _, n := range nn {
|
||||
pp.Networks = append(pp.Networks, networkData{
|
||||
LTime: n.ltime,
|
||||
ID: n.id,
|
||||
NodeName: name,
|
||||
Leaving: n.leaving,
|
||||
pp.Networks = append(pp.Networks, &NetworkEntry{
|
||||
LTime: n.ltime,
|
||||
NetworkID: n.id,
|
||||
NodeName: name,
|
||||
Leaving: n.leaving,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
buf, err := encodeMessage(networkPushPullMsg, &pp)
|
||||
buf, err := encodeMessage(MessageTypePushPull, &pp)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to encode local network state: %v", err)
|
||||
return nil
|
||||
|
@ -283,12 +276,19 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
|||
return
|
||||
}
|
||||
|
||||
if messageType(buf[0]) != networkPushPullMsg {
|
||||
var gMsg GossipMessage
|
||||
err := proto.Unmarshal(buf, &gMsg)
|
||||
if err != nil {
|
||||
logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if gMsg.Type != MessageTypePushPull {
|
||||
logrus.Errorf("Invalid message type %v received from remote", buf[0])
|
||||
}
|
||||
|
||||
pp := networkPushPull{}
|
||||
if err := decodeMessage(buf[1:], &pp); err != nil {
|
||||
pp := NetworkPushPull{}
|
||||
if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
|
||||
logrus.Errorf("Failed to decode remote network state: %v", err)
|
||||
return
|
||||
}
|
||||
|
@ -298,15 +298,15 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
|||
}
|
||||
|
||||
for _, n := range pp.Networks {
|
||||
nEvent := &networkEventData{
|
||||
nEvent := &NetworkEvent{
|
||||
LTime: n.LTime,
|
||||
NodeName: n.NodeName,
|
||||
NetworkID: n.ID,
|
||||
Event: networkJoin,
|
||||
NetworkID: n.NetworkID,
|
||||
Type: NetworkEventTypeJoin,
|
||||
}
|
||||
|
||||
if n.Leaving {
|
||||
nEvent.Event = networkLeave
|
||||
nEvent.Type = NetworkEventTypeLeave
|
||||
}
|
||||
|
||||
d.nDB.handleNetworkEvent(nEvent)
|
||||
|
|
|
@ -1,32 +1,6 @@
|
|||
package networkdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
)
|
||||
|
||||
type messageType uint8
|
||||
|
||||
const (
|
||||
// For network join/leave event message
|
||||
networkEventMsg messageType = 1 + iota
|
||||
|
||||
// For pushing/pulling network/node association state
|
||||
networkPushPullMsg
|
||||
|
||||
// For table entry CRUD event message
|
||||
tableEventMsg
|
||||
|
||||
// For building a compound message which packs many different
|
||||
// message types together
|
||||
compoundMsg
|
||||
|
||||
// For syncing table entries in bulk b/w nodes.
|
||||
bulkSyncMsg
|
||||
)
|
||||
import "github.com/gogo/protobuf/proto"
|
||||
|
||||
const (
|
||||
// Max udp message size chosen to avoid network packet
|
||||
|
@ -37,86 +11,92 @@ const (
|
|||
// bytes (num messages)
|
||||
compoundHeaderOverhead = 5
|
||||
|
||||
// Overhead for each embedded message in a compound message 2
|
||||
// Overhead for each embedded message in a compound message 4
|
||||
// bytes (len of embedded message)
|
||||
compoundOverhead = 2
|
||||
compoundOverhead = 4
|
||||
)
|
||||
|
||||
func decodeMessage(buf []byte, out interface{}) error {
|
||||
var handle codec.MsgpackHandle
|
||||
return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
|
||||
func encodeRawMessage(t MessageType, raw []byte) ([]byte, error) {
|
||||
gMsg := GossipMessage{
|
||||
Type: t,
|
||||
Data: raw,
|
||||
}
|
||||
|
||||
buf, err := proto.Marshal(&gMsg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
buf.WriteByte(uint8(t))
|
||||
func encodeMessage(t MessageType, msg interface{}) ([]byte, error) {
|
||||
buf, err := proto.Marshal(msg.(proto.Message))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
handle := codec.MsgpackHandle{}
|
||||
encoder := codec.NewEncoder(buf, &handle)
|
||||
err := encoder.Encode(msg)
|
||||
return buf.Bytes(), err
|
||||
buf, err = encodeRawMessage(t, buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func decodeMessage(buf []byte) (MessageType, []byte, error) {
|
||||
var gMsg GossipMessage
|
||||
|
||||
err := proto.Unmarshal(buf, &gMsg)
|
||||
if err != nil {
|
||||
return MessageTypeInvalid, nil, err
|
||||
}
|
||||
|
||||
return gMsg.Type, gMsg.Data, nil
|
||||
}
|
||||
|
||||
// makeCompoundMessage takes a list of messages and generates
|
||||
// a single compound message containing all of them
|
||||
func makeCompoundMessage(msgs [][]byte) *bytes.Buffer {
|
||||
// Create a local buffer
|
||||
buf := bytes.NewBuffer(nil)
|
||||
func makeCompoundMessage(msgs [][]byte) []byte {
|
||||
cMsg := CompoundMessage{}
|
||||
|
||||
// Write out the type
|
||||
buf.WriteByte(uint8(compoundMsg))
|
||||
|
||||
// Write out the number of message
|
||||
binary.Write(buf, binary.BigEndian, uint32(len(msgs)))
|
||||
|
||||
// Add the message lengths
|
||||
cMsg.Messages = make([]*CompoundMessage_SimpleMessage, 0, len(msgs))
|
||||
for _, m := range msgs {
|
||||
binary.Write(buf, binary.BigEndian, uint16(len(m)))
|
||||
cMsg.Messages = append(cMsg.Messages, &CompoundMessage_SimpleMessage{
|
||||
Payload: m,
|
||||
})
|
||||
}
|
||||
|
||||
// Append the messages
|
||||
for _, m := range msgs {
|
||||
buf.Write(m)
|
||||
buf, err := proto.Marshal(&cMsg)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
gMsg := GossipMessage{
|
||||
Type: MessageTypeCompound,
|
||||
Data: buf,
|
||||
}
|
||||
|
||||
buf, err = proto.Marshal(&gMsg)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
// decodeCompoundMessage splits a compound message and returns
|
||||
// the slices of individual messages. Also returns the number
|
||||
// of truncated messages and any potential error
|
||||
func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
|
||||
if len(buf) < 1 {
|
||||
err = fmt.Errorf("missing compound length byte")
|
||||
return
|
||||
}
|
||||
numParts := binary.BigEndian.Uint32(buf[0:4])
|
||||
buf = buf[4:]
|
||||
|
||||
// Check we have enough bytes
|
||||
if len(buf) < int(numParts*2) {
|
||||
err = fmt.Errorf("truncated len slice")
|
||||
return
|
||||
// the slices of individual messages. Returns any potential error.
|
||||
func decodeCompoundMessage(buf []byte) ([][]byte, error) {
|
||||
var cMsg CompoundMessage
|
||||
if err := proto.Unmarshal(buf, &cMsg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decode the lengths
|
||||
lengths := make([]uint16, numParts)
|
||||
for i := 0; i < int(numParts); i++ {
|
||||
lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
|
||||
parts := make([][]byte, 0, len(cMsg.Messages))
|
||||
for _, m := range cMsg.Messages {
|
||||
parts = append(parts, m.Payload)
|
||||
}
|
||||
buf = buf[numParts*2:]
|
||||
|
||||
// Split each message
|
||||
for idx, msgLen := range lengths {
|
||||
if len(buf) < int(msgLen) {
|
||||
trunc = int(numParts) - idx
|
||||
return
|
||||
}
|
||||
|
||||
// Extract the slice, seek past on the buffer
|
||||
slice := buf[:msgLen]
|
||||
buf = buf[msgLen:]
|
||||
parts = append(parts, slice)
|
||||
}
|
||||
return
|
||||
return parts, nil
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package networkdb
|
||||
|
||||
//go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
@ -206,7 +208,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
|
|||
value: value,
|
||||
}
|
||||
|
||||
if err := nDB.sendTableEvent(tableEntryCreate, nid, tname, key, entry); err != nil {
|
||||
if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
|
||||
return fmt.Errorf("cannot send table create event: %v", err)
|
||||
}
|
||||
|
||||
|
@ -234,7 +236,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
|
|||
value: value,
|
||||
}
|
||||
|
||||
if err := nDB.sendTableEvent(tableEntryUpdate, nid, tname, key, entry); err != nil {
|
||||
if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
|
||||
return fmt.Errorf("cannot send table update event: %v", err)
|
||||
}
|
||||
|
||||
|
@ -264,7 +266,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
|
|||
deleteTime: time.Now(),
|
||||
}
|
||||
|
||||
if err := nDB.sendTableEvent(tableEntryDelete, nid, tname, key, entry); err != nil {
|
||||
if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
|
||||
return fmt.Errorf("cannot send table delete event: %v", err)
|
||||
}
|
||||
|
||||
|
@ -352,7 +354,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
|
|||
nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
|
||||
nDB.Unlock()
|
||||
|
||||
if err := nDB.sendNetworkEvent(nid, networkJoin, ltime); err != nil {
|
||||
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
|
||||
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
|
||||
}
|
||||
|
||||
|
@ -371,7 +373,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
|
|||
// network.
|
||||
func (nDB *NetworkDB) LeaveNetwork(nid string) error {
|
||||
ltime := nDB.networkClock.Increment()
|
||||
if err := nDB.sendNetworkEvent(nid, networkLeave, ltime); err != nil {
|
||||
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
|
||||
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
|
||||
}
|
||||
|
||||
|
|
2266
libnetwork/networkdb/networkdb.pb.go
Normal file
2266
libnetwork/networkdb/networkdb.pb.go
Normal file
File diff suppressed because it is too large
Load diff
156
libnetwork/networkdb/networkdb.proto
Normal file
156
libnetwork/networkdb/networkdb.proto
Normal file
|
@ -0,0 +1,156 @@
|
|||
syntax = "proto3";
|
||||
|
||||
import "gogoproto/gogo.proto";
|
||||
|
||||
package networkdb;
|
||||
|
||||
option (gogoproto.marshaler_all) = true;
|
||||
option (gogoproto.unmarshaler_all) = true;
|
||||
option (gogoproto.stringer_all) = true;
|
||||
option (gogoproto.gostring_all) = true;
|
||||
option (gogoproto.sizer_all) = true;
|
||||
option (gogoproto.goproto_stringer_all) = false;
|
||||
|
||||
// MessageType enum defines all the core message types that networkdb
|
||||
// uses to communicate to peers.
|
||||
enum MessageType {
|
||||
option (gogoproto.goproto_enum_prefix) = false;
|
||||
option (gogoproto.enum_customname) = "MessageType";
|
||||
|
||||
INVALID = 0 [(gogoproto.enumvalue_customname) = "MessageTypeInvalid"];
|
||||
|
||||
// NetworEvent message type is used to communicate network
|
||||
// attachments on the node.
|
||||
NETWORK_EVENT = 1 [(gogoproto.enumvalue_customname) = "MessageTypeNetworkEvent"];
|
||||
|
||||
// TableEvent message type is used to communicate any table
|
||||
// CRUD event that happened on the node.
|
||||
TABLE_EVENT = 2 [(gogoproto.enumvalue_customname) = "MessageTypeTableEvent"];
|
||||
|
||||
// PushPull message type is used to syncup all network
|
||||
// attachments on a peer node either during startup of this
|
||||
// node or with a random peer node periodically thereafter.
|
||||
PUSH_PULL = 3 [(gogoproto.enumvalue_customname) = "MessageTypePushPull"];
|
||||
|
||||
// BulkSync message is used to bulksync the whole networkdb
|
||||
// state with a peer node during startup of this node or with
|
||||
// a random peer node periodically thereafter.
|
||||
BULK_SYNC = 4 [(gogoproto.enumvalue_customname) = "MessageTypeBulkSync"];
|
||||
|
||||
// Compound message type is used to form a compound message
|
||||
// which is a pack of many message of above types, packed into
|
||||
// a single compound message.
|
||||
COMPOUND = 5 [(gogoproto.enumvalue_customname) = "MessageTypeCompound"];
|
||||
}
|
||||
|
||||
// GossipMessage is a basic message header used by all messages types.
|
||||
message GossipMessage {
|
||||
MessageType type = 1; // type defines one of the message types defined above.
|
||||
bytes data = 2; // Payload of the message of any type defined here.
|
||||
}
|
||||
|
||||
// NetworkEvent message payload definition.
|
||||
message NetworkEvent {
|
||||
enum Type {
|
||||
option (gogoproto.goproto_enum_prefix) = false;
|
||||
option (gogoproto.enum_customname) = "Type";
|
||||
|
||||
INVALID = 0 [(gogoproto.enumvalue_customname) = "NetworkEventTypeInvalid"];
|
||||
// Join event is generated when this node joins a network.
|
||||
JOIN = 1 [(gogoproto.enumvalue_customname) = "NetworkEventTypeJoin"];;
|
||||
// Leave event is generated when this node leaves a network.
|
||||
LEAVE = 2 [(gogoproto.enumvalue_customname) = "NetworkEventTypeLeave"];;
|
||||
}
|
||||
|
||||
Type type = 1;
|
||||
|
||||
// Lamport time using a network lamport clock indicating the
|
||||
// time this event was generated on the node where it was
|
||||
// generated.
|
||||
uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
|
||||
// Source node name.
|
||||
string node_name = 3;
|
||||
// ID of the network for which the event is generated.
|
||||
string network_id = 4 [(gogoproto.customname) = "NetworkID"];
|
||||
}
|
||||
|
||||
// NetworkEntry for push pull of networks.
|
||||
message NetworkEntry {
|
||||
// ID of the network
|
||||
string network_id = 1 [(gogoproto.customname) = "NetworkID"];
|
||||
// Latest lamport time of the network attachment when this
|
||||
// network event was recorded.
|
||||
uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
|
||||
// Source node name where this network attachment happened.
|
||||
string node_name = 3;
|
||||
// Indicates if a leave from this network is in progress.
|
||||
bool leaving = 4;
|
||||
}
|
||||
|
||||
// NetworkPushpull message payload definition.
|
||||
message NetworkPushPull {
|
||||
// Lamport time when this push pull was initiated.
|
||||
uint64 l_time = 1 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
|
||||
repeated NetworkEntry networks = 2;
|
||||
}
|
||||
|
||||
// TableEvent message payload definition.
|
||||
message TableEvent {
|
||||
enum Type {
|
||||
option (gogoproto.goproto_enum_prefix) = false;
|
||||
option (gogoproto.enum_customname) = "Type";
|
||||
|
||||
INVALID = 0 [(gogoproto.enumvalue_customname) = "TableEventTypeInvalid"];
|
||||
// Create signifies that this table entry was just
|
||||
// created.
|
||||
CREATE = 1 [(gogoproto.enumvalue_customname) = "TableEventTypeCreate"];
|
||||
// Update signifies that this table entry was just
|
||||
// updated.
|
||||
UPDATE = 2 [(gogoproto.enumvalue_customname) = "TableEventTypeUpdate"];
|
||||
// Delete signifies that this table entry was just
|
||||
// updated.
|
||||
DELETE = 3 [(gogoproto.enumvalue_customname) = "TableEventTypeDelete"];
|
||||
}
|
||||
|
||||
Type type = 1;
|
||||
// Lamport time when this event was generated.
|
||||
uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
|
||||
// Node name where this event originated.
|
||||
string node_name = 3;
|
||||
// ID of the network to which this table entry belongs.
|
||||
string network_id = 4 [(gogoproto.customname) = "NetworkID"];
|
||||
// Name of the table to which this table entry belongs.
|
||||
string table_name = 5;
|
||||
// Entry key.
|
||||
string key = 6;
|
||||
// Entry value.
|
||||
bytes value = 7;
|
||||
}
|
||||
|
||||
// BulkSync message payload definition.
|
||||
message BulkSyncMessage {
|
||||
// Lamport time when this bulk sync was initiated.
|
||||
uint64 l_time = 1 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
|
||||
// Indicates if this bulksync is a response to a bulk sync
|
||||
// request from a peer node.
|
||||
bool unsolicited = 2;
|
||||
// Name of the node which is producing this bulk sync message.
|
||||
string node_name = 3;
|
||||
// List of network names whose table entries are getting
|
||||
// bulksynced as part of the bulksync.
|
||||
repeated string networks = 4;
|
||||
// Bulksync payload
|
||||
bytes payload = 5;
|
||||
}
|
||||
|
||||
// Compound message payload definition.
|
||||
message CompoundMessage {
|
||||
message SimpleMessage {
|
||||
// Bytestring payload of a message constructed using
|
||||
// other message type definitions.
|
||||
bytes Payload = 1;
|
||||
}
|
||||
|
||||
// A list of simple messages.
|
||||
repeated SimpleMessage messages = 1;
|
||||
}
|
Loading…
Reference in a new issue