package networkdb import ( "fmt" "time" "github.com/Sirupsen/logrus" "github.com/hashicorp/serf/serf" ) type networkData struct { LTime serf.LamportTime ID string NodeName string Leaving bool } type networkPushPull struct { LTime serf.LamportTime Networks []networkData } type delegate struct { nDB *NetworkDB } func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool { // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.Lock() defer nDB.Unlock() nodeNetworks, ok := nDB.networks[nEvent.NodeName] if !ok { // We haven't heard about this node at all. Ignore the leave if nEvent.Event == networkLeave { return false } nodeNetworks = make(map[string]*network) nDB.networks[nEvent.NodeName] = nodeNetworks } if n, ok := nodeNetworks[nEvent.NetworkID]; ok { // We have the latest state. Ignore the event // since it is stale. if n.ltime >= nEvent.LTime { return false } n.ltime = nEvent.LTime n.leaving = nEvent.Event == networkLeave if n.leaving { n.leaveTime = time.Now() } return true } if nEvent.Event == networkLeave { return false } // This remote network join is being seen the first time. nodeNetworks[nEvent.NetworkID] = &network{ id: nEvent.NetworkID, ltime: nEvent.LTime, } nDB.networkNodes[nEvent.NetworkID] = append(nDB.networkNodes[nEvent.NetworkID], nEvent.NodeName) return true } func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool { // Update our local clock if the received messages has newer // time. nDB.tableClock.Witness(tEvent.LTime) if entry, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key); err == nil { // We have the latest state. Ignore the event // since it is stale. if entry.ltime >= tEvent.LTime { return false } } entry := &entry{ ltime: tEvent.LTime, node: tEvent.NodeName, value: tEvent.Value, deleting: tEvent.Event == tableEntryDelete, } if entry.deleting { entry.deleteTime = time.Now() } nDB.Lock() nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), entry) nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), entry) nDB.Unlock() var op opType switch tEvent.Event { case tableEntryCreate: op = opCreate case tableEntryUpdate: op = opUpdate case tableEntryDelete: op = opDelete } nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value)) return true } func (nDB *NetworkDB) handleCompound(buf []byte) { // Decode the parts trunc, parts, err := decodeCompoundMessage(buf[1:]) if err != nil { logrus.Errorf("Failed to decode compound request: %v", err) return } // Log any truncation if trunc > 0 { logrus.Warnf("Compound request had %d truncated messages", trunc) } // Handle each message for _, part := range parts { nDB.handleMessage(part) } } func (nDB *NetworkDB) handleTableMessage(buf []byte) { var tEvent tableEventData if err := decodeMessage(buf[1:], &tEvent); err != nil { logrus.Errorf("Error decoding table event message: %v", err) return } if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { // Copy the buffer since we cannot rely on the slice not changing newBuf := make([]byte, len(buf)) copy(newBuf, buf) nDB.RLock() n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID] nDB.RUnlock() if !ok { return } broadcastQ := n.tableBroadcasts broadcastQ.QueueBroadcast(&tableEventMessage{ msg: newBuf, id: tEvent.NetworkID, tname: tEvent.TableName, key: tEvent.Key, node: nDB.config.NodeName, }) } } func (nDB *NetworkDB) handleNetworkMessage(buf []byte) { var nEvent networkEventData if err := decodeMessage(buf[1:], &nEvent); err != nil { logrus.Errorf("Error decoding network event message: %v", err) return } if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast { // Copy the buffer since it we cannot rely on the slice not changing newBuf := make([]byte, len(buf)) copy(newBuf, buf) nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{ msg: newBuf, id: nEvent.NetworkID, node: nEvent.NodeName, }) } } func (nDB *NetworkDB) handleBulkSync(buf []byte) { var bsm bulkSyncMessage if err := decodeMessage(buf[1:], &bsm); err != nil { logrus.Errorf("Error decoding bulk sync message: %v", err) return } if bsm.LTime > 0 { nDB.tableClock.Witness(bsm.LTime) } nDB.handleMessage(bsm.Payload) // Don't respond to a bulk sync which was not unsolicited if !bsm.Unsolicited { nDB.RLock() ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName] nDB.RUnlock() if ok { close(ch) } return } if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil { logrus.Errorf("Error in responding to bulk sync from node %s: %v", nDB.nodes[bsm.NodeName].Addr, err) } } func (nDB *NetworkDB) handleMessage(buf []byte) { msgType := messageType(buf[0]) switch msgType { case networkEventMsg: nDB.handleNetworkMessage(buf) case tableEventMsg: nDB.handleTableMessage(buf) case compoundMsg: nDB.handleCompound(buf) case bulkSyncMsg: nDB.handleBulkSync(buf) default: logrus.Errorf("%s: unknown message type %d payload = %v", nDB.config.NodeName, msgType, buf[:8]) } } func (d *delegate) NotifyMsg(buf []byte) { if len(buf) == 0 { return } d.nDB.handleMessage(buf) } func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) } func (d *delegate) LocalState(join bool) []byte { d.nDB.RLock() defer d.nDB.RUnlock() pp := networkPushPull{ LTime: d.nDB.networkClock.Time(), } for name, nn := range d.nDB.networks { for _, n := range nn { pp.Networks = append(pp.Networks, networkData{ LTime: n.ltime, ID: n.id, NodeName: name, Leaving: n.leaving, }) } } buf, err := encodeMessage(networkPushPullMsg, &pp) if err != nil { logrus.Errorf("Failed to encode local network state: %v", err) return nil } return buf } func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { if len(buf) == 0 { logrus.Error("zero byte remote network state received") return } if messageType(buf[0]) != networkPushPullMsg { logrus.Errorf("Invalid message type %v received from remote", buf[0]) } pp := networkPushPull{} if err := decodeMessage(buf[1:], &pp); err != nil { logrus.Errorf("Failed to decode remote network state: %v", err) return } if pp.LTime > 0 { d.nDB.networkClock.Witness(pp.LTime) } for _, n := range pp.Networks { nEvent := &networkEventData{ LTime: n.LTime, NodeName: n.NodeName, NetworkID: n.ID, Event: networkJoin, } if n.Leaving { nEvent.Event = networkLeave } d.nDB.handleNetworkEvent(nEvent) } }