package networkdb import ( "net" "time" "github.com/gogo/protobuf/proto" "github.com/sirupsen/logrus" ) type delegate struct { nDB *NetworkDB } func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.Lock() defer nDB.Unlock() // check if the node exists n, _, _ := nDB.findNode(nEvent.NodeName) if n == nil { return false } // check if the event is fresh if n.ltime >= nEvent.LTime { return false } // If we are here means that the event is fresher and the node is known. Update the laport time n.ltime = nEvent.LTime // If the node is not known from memberlist we cannot process save any state of it else if it actually // dies we won't receive any notification and we will remain stuck with it if _, ok := nDB.nodes[nEvent.NodeName]; !ok { logrus.Error("node: %s is unknown to memberlist", nEvent.NodeName) return false } switch nEvent.Type { case NodeEventTypeJoin: moved, err := nDB.changeNodeState(n.Name, nodeActiveState) if err != nil { logrus.WithError(err).Error("unable to find the node to move") return false } if moved { logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) } return moved case NodeEventTypeLeave: moved, err := nDB.changeNodeState(n.Name, nodeLeftState) if err != nil { logrus.WithError(err).Error("unable to find the node to move") return false } if moved { logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) } return moved } return false } func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.Lock() defer nDB.Unlock() if nEvent.NodeName == nDB.config.NodeID { return false } nodeNetworks, ok := nDB.networks[nEvent.NodeName] if !ok { // We haven't heard about this node at all. Ignore the leave if nEvent.Type == NetworkEventTypeLeave { return false } nodeNetworks = make(map[string]*network) nDB.networks[nEvent.NodeName] = nodeNetworks } if n, ok := nodeNetworks[nEvent.NetworkID]; ok { // We have the latest state. Ignore the event // since it is stale. if n.ltime >= nEvent.LTime { return false } n.ltime = nEvent.LTime n.leaving = nEvent.Type == NetworkEventTypeLeave if n.leaving { n.reapTime = nDB.config.reapNetworkInterval // The remote node is leaving the network, but not the gossip cluster. // Mark all its entries in deleted state, this will guarantee that // if some node bulk sync with us, the deleted state of // these entries will be propagated. nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) } if nEvent.Type == NetworkEventTypeLeave { nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName) } else { nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) } return true } if nEvent.Type == NetworkEventTypeLeave { return false } // If the node is not known from memberlist we cannot process save any state of it else if it actually // dies we won't receive any notification and we will remain stuck with it if _, ok := nDB.nodes[nEvent.NodeName]; !ok { return false } // This remote network join is being seen the first time. nodeNetworks[nEvent.NetworkID] = &network{ id: nEvent.NetworkID, ltime: nEvent.LTime, } nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) return true } func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { // Update our local clock if the received messages has newer time. nDB.tableClock.Witness(tEvent.LTime) // Ignore the table events for networks that are in the process of going away nDB.RLock() networks := nDB.networks[nDB.config.NodeID] network, ok := networks[tEvent.NetworkID] // Check if the owner of the event is still part of the network nodes := nDB.networkNodes[tEvent.NetworkID] var nodePresent bool for _, node := range nodes { if node == tEvent.NodeName { nodePresent = true break } } nDB.RUnlock() if !ok || network.leaving || !nodePresent { // I'm out of the network OR the event owner is not anymore part of the network so do not propagate return false } nDB.Lock() e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key) if err == nil { // We have the latest state. Ignore the event // since it is stale. if e.ltime >= tEvent.LTime { nDB.Unlock() return false } } e = &entry{ ltime: tEvent.LTime, node: tEvent.NodeName, value: tEvent.Value, deleting: tEvent.Type == TableEventTypeDelete, reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second, } // All the entries marked for deletion should have a reapTime set greater than 0 // This case can happen if the cluster is running different versions of the engine where the old version does not have the // field. If that is not the case, this can be a BUG if e.deleting && e.reapTime == 0 { logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", nDB.config.Hostname, nDB.config.NodeID, tEvent) e.reapTime = nDB.config.reapEntryInterval } nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e) nDB.Unlock() if err != nil && tEvent.Type == TableEventTypeDelete { // If it is a delete event and we did not have a state for it, don't propagate to the application // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. // This also avoids that deletion of entries close to their garbage collection ends up circuling around forever return e.reapTime > nDB.config.reapEntryInterval/6 } var op opType switch tEvent.Type { case TableEventTypeCreate: op = opCreate case TableEventTypeUpdate: op = opUpdate case TableEventTypeDelete: op = opDelete } nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value)) return true } func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) { // Decode the parts parts, err := decodeCompoundMessage(buf) if err != nil { logrus.Errorf("Failed to decode compound request: %v", err) return } // Handle each message for _, part := range parts { nDB.handleMessage(part, isBulkSync) } } func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { var tEvent TableEvent if err := proto.Unmarshal(buf, &tEvent); err != nil { logrus.Errorf("Error decoding table event message: %v", err) return } // Ignore messages that this node generated. if tEvent.NodeName == nDB.config.NodeID { return } if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { var err error buf, err = encodeRawMessage(MessageTypeTableEvent, buf) if err != nil { logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err) return } nDB.RLock() n, ok := nDB.networks[nDB.config.NodeID][tEvent.NetworkID] nDB.RUnlock() // if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present if !ok || n.leaving || n.tableBroadcasts == nil { return } n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ msg: buf, id: tEvent.NetworkID, tname: tEvent.TableName, key: tEvent.Key, node: tEvent.NodeName, }) } } func (nDB *NetworkDB) handleNodeMessage(buf []byte) { var nEvent NodeEvent if err := proto.Unmarshal(buf, &nEvent); err != nil { logrus.Errorf("Error decoding node event message: %v", err) return } if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast { var err error buf, err = encodeRawMessage(MessageTypeNodeEvent, buf) if err != nil { logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err) return } nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{ msg: buf, }) } } func (nDB *NetworkDB) handleNetworkMessage(buf []byte) { var nEvent NetworkEvent if err := proto.Unmarshal(buf, &nEvent); err != nil { logrus.Errorf("Error decoding network event message: %v", err) return } if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast { var err error buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf) if err != nil { logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err) return } nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{ msg: buf, id: nEvent.NetworkID, node: nEvent.NodeName, }) } } func (nDB *NetworkDB) handleBulkSync(buf []byte) { var bsm BulkSyncMessage if err := proto.Unmarshal(buf, &bsm); err != nil { logrus.Errorf("Error decoding bulk sync message: %v", err) return } if bsm.LTime > 0 { nDB.tableClock.Witness(bsm.LTime) } nDB.handleMessage(bsm.Payload, true) // Don't respond to a bulk sync which was not unsolicited if !bsm.Unsolicited { nDB.Lock() ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName] if ok { close(ch) delete(nDB.bulkSyncAckTbl, bsm.NodeName) } nDB.Unlock() return } var nodeAddr net.IP nDB.RLock() if node, ok := nDB.nodes[bsm.NodeName]; ok { nodeAddr = node.Addr } nDB.RUnlock() if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil { logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err) } } func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) { mType, data, err := decodeMessage(buf) if err != nil { logrus.Errorf("Error decoding gossip message to get message type: %v", err) return } switch mType { case MessageTypeNodeEvent: nDB.handleNodeMessage(data) case MessageTypeNetworkEvent: nDB.handleNetworkMessage(data) case MessageTypeTableEvent: nDB.handleTableMessage(data, isBulkSync) case MessageTypeBulkSync: nDB.handleBulkSync(data) case MessageTypeCompound: nDB.handleCompound(data, isBulkSync) default: logrus.Errorf("%v(%v): unknown message type %d", nDB.config.Hostname, nDB.config.NodeID, mType) } } func (d *delegate) NotifyMsg(buf []byte) { if len(buf) == 0 { return } d.nDB.handleMessage(buf, false) } func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...) return msgs } func (d *delegate) LocalState(join bool) []byte { if join { // Update all the local node/network state to a new time to // force update on the node we are trying to rejoin, just in // case that node has these in leaving state still. This is // facilitate fast convergence after recovering from a gossip // failure. d.nDB.updateLocalNetworkTime() } d.nDB.RLock() defer d.nDB.RUnlock() pp := NetworkPushPull{ LTime: d.nDB.networkClock.Time(), NodeName: d.nDB.config.NodeID, } for name, nn := range d.nDB.networks { for _, n := range nn { pp.Networks = append(pp.Networks, &NetworkEntry{ LTime: n.ltime, NetworkID: n.id, NodeName: name, Leaving: n.leaving, }) } } buf, err := encodeMessage(MessageTypePushPull, &pp) if err != nil { logrus.Errorf("Failed to encode local network state: %v", err) return nil } return buf } func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { if len(buf) == 0 { logrus.Error("zero byte remote network state received") return } var gMsg GossipMessage err := proto.Unmarshal(buf, &gMsg) if err != nil { logrus.Errorf("Error unmarshalling push pull message: %v", err) return } if gMsg.Type != MessageTypePushPull { logrus.Errorf("Invalid message type %v received from remote", buf[0]) } pp := NetworkPushPull{} if err := proto.Unmarshal(gMsg.Data, &pp); err != nil { logrus.Errorf("Failed to decode remote network state: %v", err) return } nodeEvent := &NodeEvent{ LTime: pp.LTime, NodeName: pp.NodeName, Type: NodeEventTypeJoin, } d.nDB.handleNodeEvent(nodeEvent) for _, n := range pp.Networks { nEvent := &NetworkEvent{ LTime: n.LTime, NodeName: n.NodeName, NetworkID: n.NetworkID, Type: NetworkEventTypeJoin, } if n.Leaving { nEvent.Type = NetworkEventTypeLeave } d.nDB.handleNetworkEvent(nEvent) } }