From 2d2a2bc56810e9b1aa97a920fe488303cf775bf2 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Tue, 19 Sep 2017 13:42:35 -0700 Subject: [PATCH 1/3] Fix reapTime logic in NetworkDB - Added remainingReapTime field in the table event. Wihtout it a node that did not have a state for the element was marking the element for deletion setting the max reapTime. This was creating the possibility to keep the entry being resync between nodes forever avoding the purpose of the reap time itself. - On broadcast of the table event the node owner was rewritten with the local node name, this was not correct because the owner should continue to remain the original one of the message Signed-off-by: Flavio Crisciani --- libnetwork/.gitignore | 1 + libnetwork/networkdb/broadcast.go | 2 + libnetwork/networkdb/cluster.go | 13 +- libnetwork/networkdb/delegate.go | 34 +- libnetwork/networkdb/networkdb.go | 57 +- libnetwork/networkdb/networkdb.pb.go | 811 ++++++++++++++++----------- libnetwork/networkdb/networkdb.proto | 8 +- libnetwork/test/networkDb/README | 2 +- 8 files changed, 574 insertions(+), 354 deletions(-) diff --git a/libnetwork/.gitignore b/libnetwork/.gitignore index d1b1d4cbc6..f5e6e52d7f 100644 --- a/libnetwork/.gitignore +++ b/libnetwork/.gitignore @@ -38,3 +38,4 @@ cmd/dnet/dnet libnetworkbuild.created test/networkDb/testMain +test/networkDb/gossipdb diff --git a/libnetwork/networkdb/broadcast.go b/libnetwork/networkdb/broadcast.go index 52e96ec639..8317ed03f6 100644 --- a/libnetwork/networkdb/broadcast.go +++ b/libnetwork/networkdb/broadcast.go @@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st TableName: tname, Key: key, Value: entry.value, + // The duration in second is a float that below would be truncated + ResidualReapTime: int32(entry.reapTime.Seconds()), } raw, err := encodeMessage(MessageTypeTableEvent, &tEvent) diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index d15a5767ff..eb20524cc5 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -349,11 +349,11 @@ func (nDB *NetworkDB) reapTableEntries() { nid := params[1] key := params[2] - if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok { + okTable, okNetwork := nDB.deleteEntry(nid, tname, key) + if !okTable { logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) } - - if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok { + if !okNetwork { logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) } } @@ -406,8 +406,9 @@ func (nDB *NetworkDB) gossip() { // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent += len(msgs) if printStats { - logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d", - nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) + logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", + nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(), + network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) network.qMessagesSent = 0 } @@ -572,6 +573,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b TableName: params[1], Key: params[2], Value: entry.value, + // The duration in second is a float that below would be truncated + ResidualReapTime: int32(entry.reapTime.Seconds()), } msg, err := encodeMessage(MessageTypeTableEvent, &tEvent) diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index ffaf94e8c8..9f97e4d3ee 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -1,9 +1,9 @@ package networkdb import ( - "fmt" "net" "strings" + "time" "github.com/gogo/protobuf/proto" "github.com/sirupsen/logrus" @@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { } func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { - // Update our local clock if the received messages has newer - // time. + // Update our local clock if the received messages has newer time. nDB.tableClock.Witness(tEvent.LTime) // Ignore the table events for networks that are in the process of going away @@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { node: tEvent.NodeName, value: tEvent.Value, deleting: tEvent.Type == TableEventTypeDelete, + reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second, } - if e.deleting { + // All the entries marked for deletion should have a reapTime set greater than 0 + // This case can happens if the cluster is running different versions of the engine where the old version does not have the + // field. In both cases we should raise a warning message + if e.deleting && e.reapTime == 0 { + logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent) e.reapTime = reapInterval } nDB.Lock() - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e) + nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e) nDB.Unlock() if err != nil && tEvent.Type == TableEventTypeDelete { - // If it is a delete event and we didn't have the entry here don't repropagate - return true + // If it is a delete event and we did not have a state for it, don't propagate to the application + // If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around + // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. + return e.reapTime >= reapPeriod/6 } var op opType @@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID] nDB.RUnlock() - if !ok { + // if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present + if !ok || n.leaving || n.tableBroadcasts == nil { return } - broadcastQ := n.tableBroadcasts - - if broadcastQ == nil { - return - } - - broadcastQ.QueueBroadcast(&tableEventMessage{ + n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ msg: buf, id: tEvent.NetworkID, tname: tEvent.TableName, key: tEvent.Key, - node: nDB.config.NodeName, + node: tEvent.NodeName, }) } } diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 73dd999097..cf3671d0f9 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -141,6 +141,11 @@ type network struct { // Number of gossip messages sent related to this network during the last stats collection period qMessagesSent int + + // Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network. + // Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod + // interval + entriesNumber int } // Config represents the configuration of the networdb instance and @@ -338,8 +343,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { } nDB.Lock() - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + nDB.createOrUpdateEntry(nid, tname, key, entry) nDB.Unlock() return nil @@ -365,8 +369,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { } nDB.Lock() - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + nDB.createOrUpdateEntry(nid, tname, key, entry) nDB.Unlock() return nil @@ -410,8 +413,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { } nDB.Lock() - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + nDB.createOrUpdateEntry(nid, tname, key, entry) nDB.Unlock() return nil @@ -488,12 +490,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { // without doing a delete of all the objects entry.ltime++ } - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + nDB.createOrUpdateEntry(nid, tname, key, entry) } else { // the local node is leaving the network, all the entries of remote nodes can be safely removed - nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) - nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) + nDB.deleteEntry(nid, tname, key) } nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) @@ -513,8 +513,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) { nid := params[1] key := params[2] - nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) - nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) + nDB.deleteEntry(nid, tname, key) nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value)) return false @@ -558,7 +557,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nodeNetworks = make(map[string]*network) nDB.networks[nDB.config.NodeName] = nodeNetworks } - nodeNetworks[nid] = &network{id: nid, ltime: ltime} + n, ok := nodeNetworks[nid] + var entries int + if ok { + entries = n.entriesNumber + } + nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries} nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: func() int { nDB.RLock() @@ -567,6 +571,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { }, RetransmitMult: 4, } + nDB.addNetworkNode(nid, nDB.config.NodeName) networkNodes := nDB.networkNodes[nid] nDB.Unlock() @@ -679,3 +684,29 @@ func (nDB *NetworkDB) updateLocalNetworkTime() { n.ltime = ltime } } + +// createOrUpdateEntry this function handles the creation or update of entries into the local +// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated) +func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) { + _, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + if !okNetwork { + // Add only if it is an insert not an update + n := nDB.networks[nDB.config.NodeName][nid] + n.entriesNumber++ + } + return okTable, okNetwork +} + +// deleteEntry this function handles the deletion of entries into the local tree store. +// It is also used to keep in sync the entries number of the network (all tables are aggregated) +func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) { + _, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) + _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) + if okNetwork { + // Remove only if the delete is successful + n := nDB.networks[nDB.config.NodeName][nid] + n.entriesNumber-- + } + return okTable, okNetwork +} diff --git a/libnetwork/networkdb/networkdb.pb.go b/libnetwork/networkdb/networkdb.pb.go index dfbc7131fb..7087a57ca0 100644 --- a/libnetwork/networkdb/networkdb.pb.go +++ b/libnetwork/networkdb/networkdb.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-gogo. DO NOT EDIT. // source: networkdb.proto -// DO NOT EDIT! /* Package networkdb is a generated protocol buffer package. @@ -28,9 +27,6 @@ import _ "github.com/gogo/protobuf/gogoproto" import github_com_hashicorp_serf_serf "github.com/hashicorp/serf/serf" import strings "strings" -import github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto" -import sort "sort" -import strconv "strconv" import reflect "reflect" import io "io" @@ -42,7 +38,9 @@ var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. -const _ = proto.GoGoProtoPackageIsVersion1 +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package // MessageType enum defines all the core message types that networkdb // uses to communicate to peers. @@ -192,6 +190,20 @@ func (m *GossipMessage) Reset() { *m = GossipMessage{} } func (*GossipMessage) ProtoMessage() {} func (*GossipMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{0} } +func (m *GossipMessage) GetType() MessageType { + if m != nil { + return m.Type + } + return MessageTypeInvalid +} + +func (m *GossipMessage) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + // NodeEvent message payload definition. type NodeEvent struct { Type NodeEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NodeEvent_Type" json:"type,omitempty"` @@ -207,6 +219,20 @@ func (m *NodeEvent) Reset() { *m = NodeEvent{} } func (*NodeEvent) ProtoMessage() {} func (*NodeEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1} } +func (m *NodeEvent) GetType() NodeEvent_Type { + if m != nil { + return m.Type + } + return NodeEventTypeInvalid +} + +func (m *NodeEvent) GetNodeName() string { + if m != nil { + return m.NodeName + } + return "" +} + // NetworkEvent message payload definition. type NetworkEvent struct { Type NetworkEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NetworkEvent_Type" json:"type,omitempty"` @@ -224,6 +250,27 @@ func (m *NetworkEvent) Reset() { *m = NetworkEvent{} } func (*NetworkEvent) ProtoMessage() {} func (*NetworkEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2} } +func (m *NetworkEvent) GetType() NetworkEvent_Type { + if m != nil { + return m.Type + } + return NetworkEventTypeInvalid +} + +func (m *NetworkEvent) GetNodeName() string { + if m != nil { + return m.NodeName + } + return "" +} + +func (m *NetworkEvent) GetNetworkID() string { + if m != nil { + return m.NetworkID + } + return "" +} + // NetworkEntry for push pull of networks. type NetworkEntry struct { // ID of the network @@ -241,6 +288,27 @@ func (m *NetworkEntry) Reset() { *m = NetworkEntry{} } func (*NetworkEntry) ProtoMessage() {} func (*NetworkEntry) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{3} } +func (m *NetworkEntry) GetNetworkID() string { + if m != nil { + return m.NetworkID + } + return "" +} + +func (m *NetworkEntry) GetNodeName() string { + if m != nil { + return m.NodeName + } + return "" +} + +func (m *NetworkEntry) GetLeaving() bool { + if m != nil { + return m.Leaving + } + return false +} + // NetworkPushpull message payload definition. type NetworkPushPull struct { // Lamport time when this push pull was initiated. @@ -261,6 +329,13 @@ func (m *NetworkPushPull) GetNetworks() []*NetworkEntry { return nil } +func (m *NetworkPushPull) GetNodeName() string { + if m != nil { + return m.NodeName + } + return "" +} + // TableEvent message payload definition. type TableEvent struct { Type TableEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.TableEvent_Type" json:"type,omitempty"` @@ -276,12 +351,63 @@ type TableEvent struct { Key string `protobuf:"bytes,6,opt,name=key,proto3" json:"key,omitempty"` // Entry value. Value []byte `protobuf:"bytes,7,opt,name=value,proto3" json:"value,omitempty"` + // Residual reap time for the entry before getting deleted in seconds + ResidualReapTime int32 `protobuf:"varint,8,opt,name=residual_reap_time,json=residualReapTime,proto3" json:"residual_reap_time,omitempty"` } func (m *TableEvent) Reset() { *m = TableEvent{} } func (*TableEvent) ProtoMessage() {} func (*TableEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5} } +func (m *TableEvent) GetType() TableEvent_Type { + if m != nil { + return m.Type + } + return TableEventTypeInvalid +} + +func (m *TableEvent) GetNodeName() string { + if m != nil { + return m.NodeName + } + return "" +} + +func (m *TableEvent) GetNetworkID() string { + if m != nil { + return m.NetworkID + } + return "" +} + +func (m *TableEvent) GetTableName() string { + if m != nil { + return m.TableName + } + return "" +} + +func (m *TableEvent) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *TableEvent) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *TableEvent) GetResidualReapTime() int32 { + if m != nil { + return m.ResidualReapTime + } + return 0 +} + // BulkSync message payload definition. type BulkSyncMessage struct { // Lamport time when this bulk sync was initiated. @@ -302,6 +428,34 @@ func (m *BulkSyncMessage) Reset() { *m = BulkSyncMessage{} } func (*BulkSyncMessage) ProtoMessage() {} func (*BulkSyncMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{6} } +func (m *BulkSyncMessage) GetUnsolicited() bool { + if m != nil { + return m.Unsolicited + } + return false +} + +func (m *BulkSyncMessage) GetNodeName() string { + if m != nil { + return m.NodeName + } + return "" +} + +func (m *BulkSyncMessage) GetNetworks() []string { + if m != nil { + return m.Networks + } + return nil +} + +func (m *BulkSyncMessage) GetPayload() []byte { + if m != nil { + return m.Payload + } + return nil +} + // Compound message payload definition. type CompoundMessage struct { // A list of simple messages. @@ -322,7 +476,7 @@ func (m *CompoundMessage) GetMessages() []*CompoundMessage_SimpleMessage { type CompoundMessage_SimpleMessage struct { // Bytestring payload of a message constructed using // other message type definitions. - Payload []byte `protobuf:"bytes,1,opt,name=Payload,json=payload,proto3" json:"Payload,omitempty"` + Payload []byte `protobuf:"bytes,1,opt,name=Payload,proto3" json:"Payload,omitempty"` } func (m *CompoundMessage_SimpleMessage) Reset() { *m = CompoundMessage_SimpleMessage{} } @@ -331,6 +485,13 @@ func (*CompoundMessage_SimpleMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{7, 0} } +func (m *CompoundMessage_SimpleMessage) GetPayload() []byte { + if m != nil { + return m.Payload + } + return nil +} + func init() { proto.RegisterType((*GossipMessage)(nil), "networkdb.GossipMessage") proto.RegisterType((*NodeEvent)(nil), "networkdb.NodeEvent") @@ -413,7 +574,7 @@ func (this *TableEvent) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 11) + s := make([]string, 0, 12) s = append(s, "&networkdb.TableEvent{") s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") s = append(s, "LTime: "+fmt.Sprintf("%#v", this.LTime)+",\n") @@ -422,6 +583,7 @@ func (this *TableEvent) GoString() string { s = append(s, "TableName: "+fmt.Sprintf("%#v", this.TableName)+",\n") s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") + s = append(s, "ResidualReapTime: "+fmt.Sprintf("%#v", this.ResidualReapTime)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -469,197 +631,180 @@ func valueToGoStringNetworkdb(v interface{}, typ string) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) } -func extensionToGoStringNetworkdb(e map[int32]github_com_gogo_protobuf_proto.Extension) string { - if e == nil { - return "nil" - } - s := "map[int32]proto.Extension{" - keys := make([]int, 0, len(e)) - for k := range e { - keys = append(keys, int(k)) - } - sort.Ints(keys) - ss := []string{} - for _, k := range keys { - ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString()) - } - s += strings.Join(ss, ",") + "}" - return s -} -func (m *GossipMessage) Marshal() (data []byte, err error) { +func (m *GossipMessage) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *GossipMessage) MarshalTo(data []byte) (int, error) { +func (m *GossipMessage) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.Type != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.Type)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.Type)) } if len(m.Data) > 0 { - data[i] = 0x12 + dAtA[i] = 0x12 i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.Data))) - i += copy(data[i:], m.Data) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.Data))) + i += copy(dAtA[i:], m.Data) } return i, nil } -func (m *NodeEvent) Marshal() (data []byte, err error) { +func (m *NodeEvent) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *NodeEvent) MarshalTo(data []byte) (int, error) { +func (m *NodeEvent) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.Type != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.Type)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.Type)) } if m.LTime != 0 { - data[i] = 0x10 + dAtA[i] = 0x10 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.LTime)) } if len(m.NodeName) > 0 { - data[i] = 0x1a + dAtA[i] = 0x1a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) - i += copy(data[i:], m.NodeName) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NodeName))) + i += copy(dAtA[i:], m.NodeName) } return i, nil } -func (m *NetworkEvent) Marshal() (data []byte, err error) { +func (m *NetworkEvent) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *NetworkEvent) MarshalTo(data []byte) (int, error) { +func (m *NetworkEvent) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.Type != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.Type)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.Type)) } if m.LTime != 0 { - data[i] = 0x10 + dAtA[i] = 0x10 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.LTime)) } if len(m.NodeName) > 0 { - data[i] = 0x1a + dAtA[i] = 0x1a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) - i += copy(data[i:], m.NodeName) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NodeName))) + i += copy(dAtA[i:], m.NodeName) } if len(m.NetworkID) > 0 { - data[i] = 0x22 + dAtA[i] = 0x22 i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NetworkID))) - i += copy(data[i:], m.NetworkID) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NetworkID))) + i += copy(dAtA[i:], m.NetworkID) } return i, nil } -func (m *NetworkEntry) Marshal() (data []byte, err error) { +func (m *NetworkEntry) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *NetworkEntry) MarshalTo(data []byte) (int, error) { +func (m *NetworkEntry) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if len(m.NetworkID) > 0 { - data[i] = 0xa + dAtA[i] = 0xa i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NetworkID))) - i += copy(data[i:], m.NetworkID) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NetworkID))) + i += copy(dAtA[i:], m.NetworkID) } if m.LTime != 0 { - data[i] = 0x10 + dAtA[i] = 0x10 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.LTime)) } if len(m.NodeName) > 0 { - data[i] = 0x1a + dAtA[i] = 0x1a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) - i += copy(data[i:], m.NodeName) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NodeName))) + i += copy(dAtA[i:], m.NodeName) } if m.Leaving { - data[i] = 0x20 + dAtA[i] = 0x20 i++ if m.Leaving { - data[i] = 1 + dAtA[i] = 1 } else { - data[i] = 0 + dAtA[i] = 0 } i++ } return i, nil } -func (m *NetworkPushPull) Marshal() (data []byte, err error) { +func (m *NetworkPushPull) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *NetworkPushPull) MarshalTo(data []byte) (int, error) { +func (m *NetworkPushPull) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.LTime != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.LTime)) } if len(m.Networks) > 0 { for _, msg := range m.Networks { - data[i] = 0x12 + dAtA[i] = 0x12 i++ - i = encodeVarintNetworkdb(data, i, uint64(msg.Size())) - n, err := msg.MarshalTo(data[i:]) + i = encodeVarintNetworkdb(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) if err != nil { return 0, err } @@ -667,153 +812,158 @@ func (m *NetworkPushPull) MarshalTo(data []byte) (int, error) { } } if len(m.NodeName) > 0 { - data[i] = 0x1a + dAtA[i] = 0x1a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) - i += copy(data[i:], m.NodeName) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NodeName))) + i += copy(dAtA[i:], m.NodeName) } return i, nil } -func (m *TableEvent) Marshal() (data []byte, err error) { +func (m *TableEvent) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *TableEvent) MarshalTo(data []byte) (int, error) { +func (m *TableEvent) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.Type != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.Type)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.Type)) } if m.LTime != 0 { - data[i] = 0x10 + dAtA[i] = 0x10 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.LTime)) } if len(m.NodeName) > 0 { - data[i] = 0x1a + dAtA[i] = 0x1a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) - i += copy(data[i:], m.NodeName) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NodeName))) + i += copy(dAtA[i:], m.NodeName) } if len(m.NetworkID) > 0 { - data[i] = 0x22 + dAtA[i] = 0x22 i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NetworkID))) - i += copy(data[i:], m.NetworkID) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NetworkID))) + i += copy(dAtA[i:], m.NetworkID) } if len(m.TableName) > 0 { - data[i] = 0x2a + dAtA[i] = 0x2a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.TableName))) - i += copy(data[i:], m.TableName) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.TableName))) + i += copy(dAtA[i:], m.TableName) } if len(m.Key) > 0 { - data[i] = 0x32 + dAtA[i] = 0x32 i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.Key))) - i += copy(data[i:], m.Key) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) } if len(m.Value) > 0 { - data[i] = 0x3a + dAtA[i] = 0x3a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.Value))) - i += copy(data[i:], m.Value) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.Value))) + i += copy(dAtA[i:], m.Value) + } + if m.ResidualReapTime != 0 { + dAtA[i] = 0x40 + i++ + i = encodeVarintNetworkdb(dAtA, i, uint64(m.ResidualReapTime)) } return i, nil } -func (m *BulkSyncMessage) Marshal() (data []byte, err error) { +func (m *BulkSyncMessage) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *BulkSyncMessage) MarshalTo(data []byte) (int, error) { +func (m *BulkSyncMessage) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.LTime != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + i = encodeVarintNetworkdb(dAtA, i, uint64(m.LTime)) } if m.Unsolicited { - data[i] = 0x10 + dAtA[i] = 0x10 i++ if m.Unsolicited { - data[i] = 1 + dAtA[i] = 1 } else { - data[i] = 0 + dAtA[i] = 0 } i++ } if len(m.NodeName) > 0 { - data[i] = 0x1a + dAtA[i] = 0x1a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) - i += copy(data[i:], m.NodeName) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.NodeName))) + i += copy(dAtA[i:], m.NodeName) } if len(m.Networks) > 0 { for _, s := range m.Networks { - data[i] = 0x22 + dAtA[i] = 0x22 i++ l = len(s) for l >= 1<<7 { - data[i] = uint8(uint64(l)&0x7f | 0x80) + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) l >>= 7 i++ } - data[i] = uint8(l) + dAtA[i] = uint8(l) i++ - i += copy(data[i:], s) + i += copy(dAtA[i:], s) } } if len(m.Payload) > 0 { - data[i] = 0x2a + dAtA[i] = 0x2a i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.Payload))) - i += copy(data[i:], m.Payload) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.Payload))) + i += copy(dAtA[i:], m.Payload) } return i, nil } -func (m *CompoundMessage) Marshal() (data []byte, err error) { +func (m *CompoundMessage) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *CompoundMessage) MarshalTo(data []byte) (int, error) { +func (m *CompoundMessage) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if len(m.Messages) > 0 { for _, msg := range m.Messages { - data[i] = 0xa + dAtA[i] = 0xa i++ - i = encodeVarintNetworkdb(data, i, uint64(msg.Size())) - n, err := msg.MarshalTo(data[i:]) + i = encodeVarintNetworkdb(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) if err != nil { return 0, err } @@ -823,55 +973,55 @@ func (m *CompoundMessage) MarshalTo(data []byte) (int, error) { return i, nil } -func (m *CompoundMessage_SimpleMessage) Marshal() (data []byte, err error) { +func (m *CompoundMessage_SimpleMessage) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *CompoundMessage_SimpleMessage) MarshalTo(data []byte) (int, error) { +func (m *CompoundMessage_SimpleMessage) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if len(m.Payload) > 0 { - data[i] = 0xa + dAtA[i] = 0xa i++ - i = encodeVarintNetworkdb(data, i, uint64(len(m.Payload))) - i += copy(data[i:], m.Payload) + i = encodeVarintNetworkdb(dAtA, i, uint64(len(m.Payload))) + i += copy(dAtA[i:], m.Payload) } return i, nil } -func encodeFixed64Networkdb(data []byte, offset int, v uint64) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - data[offset+4] = uint8(v >> 32) - data[offset+5] = uint8(v >> 40) - data[offset+6] = uint8(v >> 48) - data[offset+7] = uint8(v >> 56) +func encodeFixed64Networkdb(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) return offset + 8 } -func encodeFixed32Networkdb(data []byte, offset int, v uint32) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) +func encodeFixed32Networkdb(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) return offset + 4 } -func encodeVarintNetworkdb(data []byte, offset int, v uint64) int { +func encodeVarintNetworkdb(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { - data[offset] = uint8(v&0x7f | 0x80) + dAtA[offset] = uint8(v&0x7f | 0x80) v >>= 7 offset++ } - data[offset] = uint8(v) + dAtA[offset] = uint8(v) return offset + 1 } func (m *GossipMessage) Size() (n int) { @@ -991,6 +1141,9 @@ func (m *TableEvent) Size() (n int) { if l > 0 { n += 1 + l + sovNetworkdb(uint64(l)) } + if m.ResidualReapTime != 0 { + n += 1 + sovNetworkdb(uint64(m.ResidualReapTime)) + } return n } @@ -1128,6 +1281,7 @@ func (this *TableEvent) String() string { `TableName:` + fmt.Sprintf("%v", this.TableName) + `,`, `Key:` + fmt.Sprintf("%v", this.Key) + `,`, `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + `ResidualReapTime:` + fmt.Sprintf("%v", this.ResidualReapTime) + `,`, `}`, }, "") return s @@ -1174,8 +1328,8 @@ func valueToStringNetworkdb(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } -func (m *GossipMessage) Unmarshal(data []byte) error { - l := len(data) +func (m *GossipMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -1187,7 +1341,7 @@ func (m *GossipMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1215,7 +1369,7 @@ func (m *GossipMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Type |= (MessageType(b) & 0x7F) << shift if b < 0x80 { @@ -1234,7 +1388,7 @@ func (m *GossipMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ byteLen |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -1248,14 +1402,14 @@ func (m *GossipMessage) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Data = append(m.Data[:0], data[iNdEx:postIndex]...) + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) if m.Data == nil { m.Data = []byte{} } iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -1274,8 +1428,8 @@ func (m *GossipMessage) Unmarshal(data []byte) error { } return nil } -func (m *NodeEvent) Unmarshal(data []byte) error { - l := len(data) +func (m *NodeEvent) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -1287,7 +1441,7 @@ func (m *NodeEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1315,7 +1469,7 @@ func (m *NodeEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Type |= (NodeEvent_Type(b) & 0x7F) << shift if b < 0x80 { @@ -1334,7 +1488,7 @@ func (m *NodeEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift if b < 0x80 { @@ -1353,7 +1507,7 @@ func (m *NodeEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1368,11 +1522,11 @@ func (m *NodeEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NodeName = string(data[iNdEx:postIndex]) + m.NodeName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -1391,8 +1545,8 @@ func (m *NodeEvent) Unmarshal(data []byte) error { } return nil } -func (m *NetworkEvent) Unmarshal(data []byte) error { - l := len(data) +func (m *NetworkEvent) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -1404,7 +1558,7 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1432,7 +1586,7 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Type |= (NetworkEvent_Type(b) & 0x7F) << shift if b < 0x80 { @@ -1451,7 +1605,7 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift if b < 0x80 { @@ -1470,7 +1624,7 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1485,7 +1639,7 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NodeName = string(data[iNdEx:postIndex]) + m.NodeName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 2 { @@ -1499,7 +1653,7 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1514,11 +1668,11 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NetworkID = string(data[iNdEx:postIndex]) + m.NetworkID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -1537,8 +1691,8 @@ func (m *NetworkEvent) Unmarshal(data []byte) error { } return nil } -func (m *NetworkEntry) Unmarshal(data []byte) error { - l := len(data) +func (m *NetworkEntry) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -1550,7 +1704,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1578,7 +1732,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1593,7 +1747,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NetworkID = string(data[iNdEx:postIndex]) + m.NetworkID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 0 { @@ -1607,7 +1761,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift if b < 0x80 { @@ -1626,7 +1780,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1641,7 +1795,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NodeName = string(data[iNdEx:postIndex]) + m.NodeName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 0 { @@ -1655,7 +1809,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ v |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -1665,7 +1819,7 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { m.Leaving = bool(v != 0) default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -1684,8 +1838,8 @@ func (m *NetworkEntry) Unmarshal(data []byte) error { } return nil } -func (m *NetworkPushPull) Unmarshal(data []byte) error { - l := len(data) +func (m *NetworkPushPull) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -1697,7 +1851,7 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1725,7 +1879,7 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift if b < 0x80 { @@ -1744,7 +1898,7 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ msglen |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -1759,7 +1913,7 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { return io.ErrUnexpectedEOF } m.Networks = append(m.Networks, &NetworkEntry{}) - if err := m.Networks[len(m.Networks)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { + if err := m.Networks[len(m.Networks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex @@ -1775,7 +1929,7 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1790,11 +1944,11 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NodeName = string(data[iNdEx:postIndex]) + m.NodeName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -1813,8 +1967,8 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { } return nil } -func (m *TableEvent) Unmarshal(data []byte) error { - l := len(data) +func (m *TableEvent) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -1826,7 +1980,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1854,7 +2008,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Type |= (TableEvent_Type(b) & 0x7F) << shift if b < 0x80 { @@ -1873,7 +2027,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift if b < 0x80 { @@ -1892,7 +2046,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1907,7 +2061,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NodeName = string(data[iNdEx:postIndex]) + m.NodeName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 2 { @@ -1921,7 +2075,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1936,7 +2090,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NetworkID = string(data[iNdEx:postIndex]) + m.NetworkID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 5: if wireType != 2 { @@ -1950,7 +2104,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1965,7 +2119,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.TableName = string(data[iNdEx:postIndex]) + m.TableName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 6: if wireType != 2 { @@ -1979,7 +2133,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -1994,7 +2148,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Key = string(data[iNdEx:postIndex]) + m.Key = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 7: if wireType != 2 { @@ -2008,7 +2162,7 @@ func (m *TableEvent) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ byteLen |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -2022,14 +2176,33 @@ func (m *TableEvent) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Value = append(m.Value[:0], data[iNdEx:postIndex]...) + m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...) if m.Value == nil { m.Value = []byte{} } iNdEx = postIndex + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ResidualReapTime", wireType) + } + m.ResidualReapTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ResidualReapTime |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -2048,8 +2221,8 @@ func (m *TableEvent) Unmarshal(data []byte) error { } return nil } -func (m *BulkSyncMessage) Unmarshal(data []byte) error { - l := len(data) +func (m *BulkSyncMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -2061,7 +2234,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -2089,7 +2262,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift if b < 0x80 { @@ -2108,7 +2281,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ v |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -2128,7 +2301,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -2143,7 +2316,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.NodeName = string(data[iNdEx:postIndex]) + m.NodeName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 2 { @@ -2157,7 +2330,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -2172,7 +2345,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Networks = append(m.Networks, string(data[iNdEx:postIndex])) + m.Networks = append(m.Networks, string(dAtA[iNdEx:postIndex])) iNdEx = postIndex case 5: if wireType != 2 { @@ -2186,7 +2359,7 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ byteLen |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -2200,14 +2373,14 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Payload = append(m.Payload[:0], data[iNdEx:postIndex]...) + m.Payload = append(m.Payload[:0], dAtA[iNdEx:postIndex]...) if m.Payload == nil { m.Payload = []byte{} } iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -2226,8 +2399,8 @@ func (m *BulkSyncMessage) Unmarshal(data []byte) error { } return nil } -func (m *CompoundMessage) Unmarshal(data []byte) error { - l := len(data) +func (m *CompoundMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -2239,7 +2412,7 @@ func (m *CompoundMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -2267,7 +2440,7 @@ func (m *CompoundMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ msglen |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -2282,13 +2455,13 @@ func (m *CompoundMessage) Unmarshal(data []byte) error { return io.ErrUnexpectedEOF } m.Messages = append(m.Messages, &CompoundMessage_SimpleMessage{}) - if err := m.Messages[len(m.Messages)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { + if err := m.Messages[len(m.Messages)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -2307,8 +2480,8 @@ func (m *CompoundMessage) Unmarshal(data []byte) error { } return nil } -func (m *CompoundMessage_SimpleMessage) Unmarshal(data []byte) error { - l := len(data) +func (m *CompoundMessage_SimpleMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -2320,7 +2493,7 @@ func (m *CompoundMessage_SimpleMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -2348,7 +2521,7 @@ func (m *CompoundMessage_SimpleMessage) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ byteLen |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -2362,14 +2535,14 @@ func (m *CompoundMessage_SimpleMessage) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Payload = append(m.Payload[:0], data[iNdEx:postIndex]...) + m.Payload = append(m.Payload[:0], dAtA[iNdEx:postIndex]...) if m.Payload == nil { m.Payload = []byte{} } iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipNetworkdb(data[iNdEx:]) + skippy, err := skipNetworkdb(dAtA[iNdEx:]) if err != nil { return err } @@ -2388,8 +2561,8 @@ func (m *CompoundMessage_SimpleMessage) Unmarshal(data []byte) error { } return nil } -func skipNetworkdb(data []byte) (n int, err error) { - l := len(data) +func skipNetworkdb(dAtA []byte) (n int, err error) { + l := len(dAtA) iNdEx := 0 for iNdEx < l { var wire uint64 @@ -2400,7 +2573,7 @@ func skipNetworkdb(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -2418,7 +2591,7 @@ func skipNetworkdb(data []byte) (n int, err error) { return 0, io.ErrUnexpectedEOF } iNdEx++ - if data[iNdEx-1] < 0x80 { + if dAtA[iNdEx-1] < 0x80 { break } } @@ -2435,7 +2608,7 @@ func skipNetworkdb(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ length |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -2458,7 +2631,7 @@ func skipNetworkdb(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ innerWire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -2469,7 +2642,7 @@ func skipNetworkdb(data []byte) (n int, err error) { if innerWireType == 4 { break } - next, err := skipNetworkdb(data[start:]) + next, err := skipNetworkdb(dAtA[start:]) if err != nil { return 0, err } @@ -2493,62 +2666,68 @@ var ( ErrIntOverflowNetworkdb = fmt.Errorf("proto: integer overflow") ) +func init() { proto.RegisterFile("networkdb.proto", fileDescriptorNetworkdb) } + var fileDescriptorNetworkdb = []byte{ - // 887 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x96, 0xc1, 0x6e, 0xe3, 0x44, - 0x18, 0xc7, 0xeb, 0xc4, 0x49, 0xe3, 0xaf, 0x0d, 0x1b, 0xbc, 0xdd, 0xad, 0xd7, 0x0b, 0x49, 0x31, - 0xcb, 0x2a, 0x44, 0xe0, 0xa2, 0xee, 0x13, 0x24, 0xb1, 0x05, 0xd9, 0xf5, 0x3a, 0x91, 0x93, 0x14, - 0x71, 0x8a, 0x9c, 0x78, 0x48, 0xac, 0x3a, 0xb6, 0x15, 0x3b, 0x45, 0x39, 0x81, 0x38, 0xad, 0x78, - 0x07, 0x4e, 0xcb, 0x99, 0x07, 0xe0, 0xc0, 0x89, 0xc3, 0x8a, 0x13, 0xdc, 0x10, 0x87, 0x8a, 0xee, - 0x13, 0xf0, 0x08, 0x8c, 0xc7, 0x76, 0x32, 0x4e, 0xa3, 0x5e, 0x40, 0xc0, 0xc1, 0xad, 0x67, 0xe6, - 0xe7, 0xcf, 0xdf, 0xf7, 0x9f, 0xff, 0xe7, 0x09, 0xdc, 0x71, 0x51, 0xf8, 0x85, 0xb7, 0xb8, 0xb0, - 0xc6, 0xb2, 0xbf, 0xf0, 0x42, 0x8f, 0xe7, 0xd6, 0x13, 0xe2, 0xd1, 0xd4, 0x9b, 0x7a, 0x64, 0xf6, - 0x34, 0xba, 0x8b, 0x01, 0xa9, 0x0b, 0xe5, 0x8f, 0xbd, 0x20, 0xb0, 0xfd, 0xe7, 0x28, 0x08, 0xcc, - 0x29, 0xe2, 0x1b, 0xc0, 0x86, 0x2b, 0x1f, 0x09, 0xcc, 0x09, 0x53, 0x7f, 0xe3, 0xec, 0xbe, 0xbc, - 0x89, 0x98, 0x10, 0x03, 0xbc, 0x6a, 0x10, 0x86, 0xe7, 0x81, 0xb5, 0xcc, 0xd0, 0x14, 0x72, 0x98, - 0x3d, 0x34, 0xc8, 0xbd, 0xf4, 0x32, 0x07, 0x9c, 0xee, 0x59, 0x48, 0xbd, 0x44, 0x6e, 0xc8, 0x7f, - 0x98, 0x89, 0xf6, 0x80, 0x8a, 0xb6, 0x66, 0x64, 0x2a, 0x60, 0x07, 0x8a, 0xce, 0x28, 0xb4, 0xe7, - 0x88, 0x84, 0x64, 0x5b, 0x67, 0xaf, 0xae, 0x6a, 0x7b, 0xbf, 0x5f, 0xd5, 0x1a, 0x53, 0x3b, 0x9c, - 0x2d, 0xc7, 0xf2, 0xc4, 0x9b, 0x9f, 0xce, 0xcc, 0x60, 0x66, 0x4f, 0xbc, 0x85, 0x7f, 0x1a, 0xa0, - 0xc5, 0xe7, 0xe4, 0x8f, 0xac, 0x99, 0x73, 0xdf, 0x5b, 0x84, 0x03, 0xfc, 0xa4, 0x51, 0x70, 0xa2, - 0x7f, 0xfc, 0x43, 0xe0, 0x5c, 0xfc, 0x8a, 0x91, 0x6b, 0xe2, 0x68, 0x79, 0x1c, 0x8d, 0x33, 0x4a, - 0xd1, 0x84, 0x8e, 0xc7, 0xd2, 0x97, 0xc0, 0x46, 0x6f, 0xe5, 0xdf, 0x83, 0xfd, 0x8e, 0x7e, 0xde, - 0xd4, 0x3a, 0x4a, 0x65, 0x4f, 0x14, 0xbe, 0xf9, 0xf6, 0xe4, 0x68, 0x9d, 0x56, 0xb4, 0xde, 0x71, - 0x2f, 0x4d, 0xc7, 0xb6, 0xf8, 0x1a, 0xb0, 0x4f, 0xbb, 0x1d, 0xbd, 0xc2, 0x88, 0xf7, 0x30, 0xf3, - 0x66, 0x86, 0x79, 0xea, 0xd9, 0x2e, 0xff, 0x0e, 0x14, 0x34, 0xb5, 0x79, 0xae, 0x56, 0x72, 0xe2, - 0x7d, 0x4c, 0xf0, 0x19, 0x42, 0x43, 0xe6, 0x25, 0x12, 0x0f, 0x5f, 0xbc, 0xac, 0xee, 0xfd, 0xf0, - 0x5d, 0x95, 0xbc, 0x58, 0xba, 0xce, 0xc1, 0xa1, 0x1e, 0x6b, 0x11, 0x0b, 0xf5, 0x51, 0x46, 0xa8, - 0xb7, 0x68, 0xa1, 0x28, 0xec, 0x3f, 0xd0, 0x8a, 0xff, 0x00, 0x20, 0x49, 0x66, 0x64, 0x5b, 0x02, - 0x1b, 0xad, 0xb6, 0xca, 0xaf, 0xaf, 0x6a, 0x5c, 0x92, 0x58, 0x47, 0x31, 0x52, 0x97, 0x75, 0x2c, - 0xe9, 0x05, 0x93, 0x48, 0x5b, 0xa7, 0xa5, 0x7d, 0x88, 0x45, 0x39, 0xa6, 0x0b, 0xa1, 0xd5, 0x95, - 0xd6, 0xea, 0xc6, 0x3b, 0xb0, 0x85, 0x11, 0x81, 0x1f, 0x6d, 0x04, 0x7e, 0x80, 0xa1, 0x7b, 0xdb, - 0xd0, 0x2e, 0x8d, 0x7f, 0x64, 0x36, 0x1a, 0xbb, 0xe1, 0x62, 0xb5, 0x55, 0x09, 0x73, 0x7b, 0x25, - 0xff, 0x9a, 0xbe, 0x02, 0xec, 0x3b, 0x38, 0x7b, 0xdb, 0x9d, 0x12, 0x71, 0x4b, 0x46, 0x3a, 0x94, - 0xbe, 0x67, 0xe0, 0x4e, 0x92, 0x5a, 0x6f, 0x19, 0xcc, 0x7a, 0x4b, 0xc7, 0xa1, 0xb2, 0x62, 0xfe, - 0x6e, 0x56, 0x4f, 0xa0, 0x94, 0x54, 0x1b, 0xe0, 0x12, 0xf3, 0xf5, 0x83, 0xb3, 0xe3, 0x1d, 0xb6, - 0x8b, 0x94, 0x33, 0xd6, 0xe0, 0xed, 0x6d, 0xf5, 0x73, 0x1e, 0x60, 0x60, 0x8e, 0x9d, 0xa4, 0xf9, - 0xe5, 0x8c, 0xa7, 0x45, 0x2a, 0xf8, 0x06, 0xfa, 0xdf, 0x3b, 0x9a, 0x7f, 0x1b, 0x20, 0x8c, 0xd2, - 0x8d, 0x63, 0x15, 0x48, 0x2c, 0x8e, 0xcc, 0x90, 0x60, 0x15, 0xc8, 0x5f, 0xa0, 0x95, 0x50, 0x24, - 0xf3, 0xd1, 0x2d, 0x7f, 0x04, 0x05, 0x6c, 0xec, 0x25, 0x12, 0xf6, 0xc9, 0x67, 0x31, 0x1e, 0x44, - 0x9b, 0x19, 0x37, 0xc6, 0x63, 0xba, 0x31, 0x88, 0x99, 0x37, 0x6a, 0xd0, 0x6d, 0xf1, 0x08, 0x8a, - 0x6d, 0x43, 0x6d, 0x0e, 0xd4, 0xb4, 0x31, 0xb2, 0x58, 0x7b, 0x81, 0xcc, 0x10, 0x45, 0xd4, 0xb0, - 0xa7, 0x44, 0x54, 0x6e, 0x17, 0x35, 0xf4, 0xad, 0x84, 0x52, 0x54, 0x4d, 0xc5, 0x54, 0x7e, 0x17, - 0xa5, 0x20, 0x07, 0x85, 0xdb, 0xed, 0xf3, 0x2b, 0x76, 0x5f, 0x6b, 0xe9, 0x5c, 0xf4, 0x57, 0xee, - 0x24, 0x3d, 0x1c, 0xfe, 0x41, 0xf7, 0x9d, 0xc0, 0xc1, 0xd2, 0x0d, 0x3c, 0xc7, 0x9e, 0xd8, 0x21, - 0xb2, 0xc8, 0x8e, 0x97, 0x0c, 0x7a, 0xea, 0xf6, 0x3d, 0x14, 0x29, 0xf3, 0xb2, 0xd8, 0xbc, 0x1c, - 0xe5, 0x51, 0xdc, 0x51, 0xbe, 0xb9, 0x72, 0x3c, 0xd3, 0x22, 0xdb, 0x75, 0x68, 0xa4, 0x43, 0xe9, - 0x6b, 0x5c, 0x53, 0xdb, 0xc3, 0xb9, 0x2c, 0x5d, 0x2b, 0xad, 0x49, 0x81, 0xd2, 0x3c, 0xbe, 0x0d, - 0x70, 0x55, 0x51, 0x1b, 0xd4, 0x29, 0xa7, 0x6e, 0xd1, 0x72, 0xdf, 0x9e, 0xfb, 0x0e, 0x4a, 0x46, - 0xc6, 0xfa, 0x49, 0xf1, 0x7d, 0x28, 0x67, 0x96, 0xa2, 0x24, 0x7a, 0x49, 0x12, 0x4c, 0x26, 0x89, - 0xc6, 0x4f, 0x39, 0x38, 0xa0, 0xce, 0x52, 0xfe, 0x5d, 0xda, 0x10, 0xe4, 0xf8, 0xa0, 0x56, 0x53, - 0x37, 0xc8, 0x50, 0xd6, 0xd5, 0xc1, 0xa7, 0x5d, 0xe3, 0xd9, 0x48, 0x3d, 0x57, 0xf5, 0x01, 0x36, - 0x05, 0xf9, 0xa8, 0x52, 0x68, 0xe6, 0x3c, 0x69, 0xc0, 0xc1, 0xa0, 0xd9, 0xd2, 0xd4, 0x84, 0x4e, - 0x3e, 0x9b, 0x14, 0x4d, 0xf5, 0xe9, 0x63, 0xe0, 0x7a, 0xc3, 0xfe, 0x27, 0xa3, 0xde, 0x50, 0xd3, - 0xb0, 0x41, 0x8e, 0x31, 0x79, 0x97, 0x22, 0xd7, 0xdf, 0x1e, 0xcc, 0xb5, 0x86, 0xda, 0xb3, 0x51, - 0xff, 0x33, 0xbd, 0x5d, 0x61, 0x6f, 0x70, 0xa9, 0x59, 0xf0, 0xa9, 0x5a, 0x6a, 0x77, 0x9f, 0xf7, - 0xba, 0x43, 0x5d, 0xa9, 0x14, 0x6e, 0x60, 0xa9, 0xa2, 0xf8, 0x84, 0x00, 0xbd, 0xab, 0xa4, 0x19, - 0x16, 0x63, 0x63, 0xd2, 0xf5, 0xa4, 0x87, 0xa8, 0x78, 0x37, 0x31, 0x26, 0x2d, 0x5b, 0x4b, 0xf8, - 0xed, 0xba, 0xba, 0xf7, 0xe7, 0x75, 0x95, 0xf9, 0xea, 0x75, 0x95, 0x79, 0x85, 0xaf, 0x5f, 0xf0, - 0xf5, 0x07, 0xbe, 0xc6, 0x45, 0xf2, 0xd3, 0xe6, 0xc9, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x21, - 0x78, 0x72, 0xc3, 0x0e, 0x09, 0x00, 0x00, + // 953 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x96, 0xcd, 0x6e, 0xe3, 0x54, + 0x14, 0xc7, 0x7b, 0xf3, 0xd5, 0xe4, 0x34, 0xa5, 0xe6, 0x4e, 0x67, 0xc6, 0xe3, 0x81, 0xc4, 0x98, + 0x99, 0x2a, 0x53, 0x41, 0x8a, 0x3a, 0x4f, 0xd0, 0x24, 0x16, 0x64, 0x26, 0xe3, 0x44, 0x6e, 0x52, + 0xc4, 0x2a, 0xba, 0xad, 0x2f, 0xa9, 0x55, 0xc7, 0xb6, 0x6c, 0x27, 0x28, 0x2b, 0x10, 0xab, 0x51, + 0x16, 0xbc, 0x41, 0x56, 0xc3, 0x9a, 0x07, 0x40, 0x2c, 0x59, 0xcc, 0x82, 0x05, 0xec, 0x10, 0x8b, + 0x88, 0xe6, 0x09, 0x78, 0x04, 0xe4, 0x6b, 0x3b, 0xb9, 0x49, 0xab, 0x91, 0x10, 0x23, 0xc1, 0x26, + 0xb9, 0x1f, 0xbf, 0x1c, 0x9f, 0xf3, 0xf7, 0xff, 0xdc, 0x1b, 0xd8, 0xb3, 0x69, 0xf0, 0x95, 0xe3, + 0x5d, 0x19, 0xe7, 0x55, 0xd7, 0x73, 0x02, 0x07, 0x17, 0x96, 0x0b, 0xd2, 0xfe, 0xc0, 0x19, 0x38, + 0x6c, 0xf5, 0x28, 0x1c, 0x45, 0x80, 0xd2, 0x86, 0xdd, 0x4f, 0x1d, 0xdf, 0x37, 0xdd, 0x17, 0xd4, + 0xf7, 0xc9, 0x80, 0xe2, 0x43, 0xc8, 0x04, 0x13, 0x97, 0x8a, 0x48, 0x46, 0x95, 0x77, 0x8e, 0xef, + 0x55, 0x57, 0x11, 0x63, 0xa2, 0x3b, 0x71, 0xa9, 0xce, 0x18, 0x8c, 0x21, 0x63, 0x90, 0x80, 0x88, + 0x29, 0x19, 0x55, 0x8a, 0x3a, 0x1b, 0x2b, 0xaf, 0x52, 0x50, 0xd0, 0x1c, 0x83, 0xaa, 0x63, 0x6a, + 0x07, 0xf8, 0xe3, 0xb5, 0x68, 0x0f, 0xb8, 0x68, 0x4b, 0xa6, 0xca, 0x05, 0x6c, 0x42, 0xce, 0xea, + 0x07, 0xe6, 0x90, 0xb2, 0x90, 0x99, 0xda, 0xf1, 0xeb, 0x79, 0x79, 0xeb, 0x8f, 0x79, 0xf9, 0x70, + 0x60, 0x06, 0x97, 0xa3, 0xf3, 0xea, 0x85, 0x33, 0x3c, 0xba, 0x24, 0xfe, 0xa5, 0x79, 0xe1, 0x78, + 0xee, 0x91, 0x4f, 0xbd, 0x2f, 0xd9, 0x47, 0xb5, 0x45, 0x86, 0xae, 0xe3, 0x05, 0x5d, 0x73, 0x48, + 0xf5, 0xac, 0x15, 0x7e, 0xe1, 0x87, 0x50, 0xb0, 0x1d, 0x83, 0xf6, 0x6d, 0x32, 0xa4, 0x62, 0x5a, + 0x46, 0x95, 0x82, 0x9e, 0x0f, 0x17, 0x34, 0x32, 0xa4, 0xca, 0xd7, 0x90, 0x09, 0x9f, 0x8a, 0x1f, + 0xc3, 0x76, 0x53, 0x3b, 0x3b, 0x69, 0x35, 0x1b, 0xc2, 0x96, 0x24, 0x4e, 0x67, 0xf2, 0xfe, 0x32, + 0xad, 0x70, 0xbf, 0x69, 0x8f, 0x89, 0x65, 0x1a, 0xb8, 0x0c, 0x99, 0x67, 0xed, 0xa6, 0x26, 0x20, + 0xe9, 0xee, 0x74, 0x26, 0xbf, 0xbb, 0xc6, 0x3c, 0x73, 0x4c, 0x1b, 0x7f, 0x00, 0xd9, 0x96, 0x7a, + 0x72, 0xa6, 0x0a, 0x29, 0xe9, 0xde, 0x74, 0x26, 0xe3, 0x35, 0xa2, 0x45, 0xc9, 0x98, 0x4a, 0xc5, + 0x97, 0xaf, 0x4a, 0x5b, 0x3f, 0x7e, 0x5f, 0x62, 0x0f, 0x56, 0xae, 0x53, 0x50, 0xd4, 0x22, 0x2d, + 0x22, 0xa1, 0x3e, 0x59, 0x13, 0xea, 0x3d, 0x5e, 0x28, 0x0e, 0xfb, 0x0f, 0xb4, 0xc2, 0x1f, 0x01, + 0xc4, 0xc9, 0xf4, 0x4d, 0x43, 0xcc, 0x84, 0xbb, 0xb5, 0xdd, 0xc5, 0xbc, 0x5c, 0x88, 0x13, 0x6b, + 0x36, 0xf4, 0xc4, 0x65, 0x4d, 0x43, 0x79, 0x89, 0x62, 0x69, 0x2b, 0xbc, 0xb4, 0x0f, 0xa7, 0x33, + 0xf9, 0x3e, 0x5f, 0x08, 0xaf, 0xae, 0xb2, 0x54, 0x37, 0x7a, 0x03, 0x1b, 0x18, 0x13, 0xf8, 0xd1, + 0x4a, 0xe0, 0x07, 0xd3, 0x99, 0x7c, 0x77, 0x13, 0xba, 0x4d, 0xe3, 0x5f, 0xd0, 0x4a, 0x63, 0x3b, + 0xf0, 0x26, 0x1b, 0x95, 0xa0, 0x37, 0x57, 0xf2, 0x36, 0xf5, 0x7d, 0x72, 0x43, 0xdf, 0x5a, 0x71, + 0x31, 0x2f, 0xe7, 0xb5, 0x58, 0x63, 0x4e, 0x6d, 0x11, 0xb6, 0x2d, 0x4a, 0xc6, 0xa6, 0x3d, 0x60, + 0x52, 0xe7, 0xf5, 0x64, 0xaa, 0xfc, 0x84, 0x60, 0x2f, 0x4e, 0xb4, 0x33, 0xf2, 0x2f, 0x3b, 0x23, + 0xcb, 0xe2, 0x72, 0x44, 0xff, 0x36, 0xc7, 0xa7, 0x90, 0x8f, 0x6b, 0xf7, 0xc5, 0x94, 0x9c, 0xae, + 0xec, 0x1c, 0xdf, 0xbf, 0xc5, 0x84, 0xa1, 0x8e, 0xfa, 0x12, 0xfc, 0x07, 0x85, 0x29, 0xdf, 0x65, + 0x00, 0xba, 0xe4, 0xdc, 0x8a, 0x0f, 0x86, 0xea, 0x9a, 0xdf, 0x25, 0xee, 0x51, 0x2b, 0xe8, 0x7f, + 0xef, 0x76, 0xfc, 0x3e, 0x40, 0x10, 0xa6, 0x1b, 0xc5, 0xca, 0xb2, 0x58, 0x05, 0xb6, 0xc2, 0x82, + 0x09, 0x90, 0xbe, 0xa2, 0x13, 0x31, 0xc7, 0xd6, 0xc3, 0x21, 0xde, 0x87, 0xec, 0x98, 0x58, 0x23, + 0x2a, 0x6e, 0xb3, 0x23, 0x33, 0x9a, 0xe0, 0x1a, 0x60, 0x8f, 0xfa, 0xa6, 0x31, 0x22, 0x56, 0xdf, + 0xa3, 0xc4, 0x8d, 0x0a, 0xcd, 0xcb, 0xa8, 0x92, 0xad, 0xed, 0x2f, 0xe6, 0x65, 0x41, 0x8f, 0x77, + 0x75, 0x4a, 0x5c, 0x56, 0x8a, 0xe0, 0x6d, 0xac, 0x28, 0x3f, 0x24, 0x8d, 0x77, 0xc0, 0x37, 0x1e, + 0x6b, 0x96, 0x95, 0xa2, 0x7c, 0xdb, 0x3d, 0x82, 0x5c, 0x5d, 0x57, 0x4f, 0xba, 0x6a, 0xd2, 0x78, + 0xeb, 0x58, 0xdd, 0xa3, 0x24, 0xa0, 0x21, 0xd5, 0xeb, 0x34, 0x42, 0x2a, 0x75, 0x1b, 0xd5, 0x73, + 0x8d, 0x98, 0x6a, 0xa8, 0x2d, 0xb5, 0xab, 0x0a, 0xe9, 0xdb, 0xa8, 0x06, 0xb5, 0x68, 0xb0, 0xd9, + 0x9e, 0xbf, 0x21, 0xd8, 0xab, 0x8d, 0xac, 0xab, 0xd3, 0x89, 0x7d, 0x91, 0x5c, 0x3e, 0x6f, 0xd1, + 0xcf, 0x32, 0xec, 0x8c, 0x6c, 0xdf, 0xb1, 0xcc, 0x0b, 0x33, 0xa0, 0x06, 0x73, 0x4d, 0x5e, 0xe7, + 0x97, 0xde, 0xec, 0x03, 0x89, 0x6b, 0x87, 0x8c, 0x9c, 0x66, 0x7b, 0x89, 0xeb, 0x45, 0xd8, 0x76, + 0xc9, 0xc4, 0x72, 0x88, 0xc1, 0x5e, 0x79, 0x51, 0x4f, 0xa6, 0xca, 0xb7, 0x08, 0xf6, 0xea, 0xce, + 0xd0, 0x75, 0x46, 0xb6, 0x91, 0xd4, 0xd4, 0x80, 0xfc, 0x30, 0x1a, 0xfa, 0x22, 0x62, 0x8d, 0x55, + 0xe1, 0xdc, 0xbe, 0x41, 0x57, 0x4f, 0xcd, 0xa1, 0x6b, 0xd1, 0x78, 0xa6, 0x2f, 0x7f, 0x29, 0x3d, + 0x81, 0xdd, 0xb5, 0xad, 0x30, 0x89, 0x4e, 0x9c, 0x04, 0x8a, 0x92, 0x88, 0xa7, 0x87, 0x3f, 0xa7, + 0x60, 0x87, 0xbb, 0xab, 0xf1, 0x87, 0xbc, 0x21, 0xd8, 0xf5, 0xc4, 0xed, 0x26, 0x6e, 0xa8, 0xc2, + 0xae, 0xa6, 0x76, 0x3f, 0x6f, 0xeb, 0xcf, 0xfb, 0xea, 0x99, 0xaa, 0x75, 0x05, 0x14, 0x1d, 0xda, + 0x1c, 0xba, 0x76, 0x5f, 0x1d, 0xc2, 0x4e, 0xf7, 0xa4, 0xd6, 0x52, 0x63, 0x3a, 0x3e, 0x96, 0x39, + 0x9a, 0xeb, 0xf5, 0x03, 0x28, 0x74, 0x7a, 0xa7, 0x9f, 0xf5, 0x3b, 0xbd, 0x56, 0x4b, 0x48, 0x4b, + 0xf7, 0xa7, 0x33, 0xf9, 0x0e, 0x47, 0x2e, 0x4f, 0xb3, 0x03, 0x28, 0xd4, 0x7a, 0xad, 0xe7, 0xfd, + 0xd3, 0x2f, 0xb4, 0xba, 0x90, 0xb9, 0xc1, 0x25, 0x66, 0xc1, 0x8f, 0x21, 0x5f, 0x6f, 0xbf, 0xe8, + 0xb4, 0x7b, 0x5a, 0x43, 0xc8, 0xde, 0xc0, 0x12, 0x45, 0x71, 0x05, 0x40, 0x6b, 0x37, 0x92, 0x0c, + 0x73, 0x91, 0x31, 0xf9, 0x7a, 0x92, 0x4b, 0x5a, 0xba, 0x13, 0x1b, 0x93, 0x97, 0xad, 0x26, 0xfe, + 0x7e, 0x5d, 0xda, 0xfa, 0xeb, 0xba, 0x84, 0xbe, 0x59, 0x94, 0xd0, 0xeb, 0x45, 0x09, 0xfd, 0xba, + 0x28, 0xa1, 0x3f, 0x17, 0x25, 0x74, 0x9e, 0x63, 0x7f, 0x9d, 0x9e, 0xfe, 0x1d, 0x00, 0x00, 0xff, + 0xff, 0x92, 0x82, 0xdb, 0x1a, 0x6e, 0x09, 0x00, 0x00, } diff --git a/libnetwork/networkdb/networkdb.proto b/libnetwork/networkdb/networkdb.proto index 7df1b42dca..0b8490be7a 100644 --- a/libnetwork/networkdb/networkdb.proto +++ b/libnetwork/networkdb/networkdb.proto @@ -109,7 +109,7 @@ message NetworkEntry { // network event was recorded. uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false]; // Source node name where this network attachment happened. - string node_name = 3; + string node_name = 3 [(gogoproto.customname) = "NodeName"]; // Indicates if a leave from this network is in progress. bool leaving = 4; } @@ -119,6 +119,8 @@ message NetworkPushPull { // Lamport time when this push pull was initiated. uint64 l_time = 1 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false]; repeated NetworkEntry networks = 2; + // Name of the node sending this push pull payload. + string node_name = 3 [(gogoproto.customname) = "NodeName"]; } // TableEvent message payload definition. @@ -152,6 +154,8 @@ message TableEvent { string key = 6; // Entry value. bytes value = 7; + // Residual reap time for the entry before getting deleted in seconds + int32 residual_reap_time = 8 [(gogoproto.customname) = "ResidualReapTime"];; } // BulkSync message payload definition. @@ -180,4 +184,4 @@ message CompoundMessage { // A list of simple messages. repeated SimpleMessage messages = 1; -} \ No newline at end of file +} diff --git a/libnetwork/test/networkDb/README b/libnetwork/test/networkDb/README index 17da65aa4d..72a08cee5c 100644 --- a/libnetwork/test/networkDb/README +++ b/libnetwork/test/networkDb/README @@ -1,7 +1,7 @@ SERVER cd test/networkdb -env GOOS=linux go build -v server/ndbTester.go && docker build -t fcrisciani/networkdb-test -f server/Dockerfile . +env GOOS=linux go build -v server/testMain.go && docker build -t fcrisciani/networkdb-test . (only for testkit case) docker push fcrisciani/networkdb-test Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000 From 053a534ab10b652c90700dd9b449eaf9d01aaf0c Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Tue, 19 Sep 2017 17:36:00 -0700 Subject: [PATCH 2/3] Changed ReapTable logic - Changed the loop per network. Previous implementation was taking a ReadLock to update the reapTime but now with the residualReapTime also the bulkSync is using the same ReadLock creating possible issues in concurrent read and update of the value. The new logic fetches the list of networks and proceed to the cleanup network by network locking the database and releasing it after each network. This should ensure a fair locking avoiding to keep the database blocked for too much time. Note: The ticker does not guarantee that the reap logic runs precisely every reapTimePeriod, actually documentation says that if the routine is too long will skip ticks. In case of slowdown of the process itself it is possible that the lifetime of the deleted entries increases, it still should not be a huge problem because now the residual reaptime is propagated among all the nodes a slower node will let the deleted entry being repropagate multiple times but the state will still remain consistent. Signed-off-by: Flavio Crisciani --- libnetwork/networkdb/cluster.go | 74 ++++++++++--------- libnetwork/networkdb/delegate.go | 8 +- libnetwork/networkdb/networkdb.go | 14 ++-- libnetwork/networkdb/networkdb_test.go | 6 ++ libnetwork/test/networkDb/README | 2 +- .../test/networkDb/dbclient/ndbClient.go | 10 +-- 6 files changed, 66 insertions(+), 48 deletions(-) diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index eb20524cc5..a8231481df 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -321,43 +321,51 @@ func (nDB *NetworkDB) reapNetworks() { } func (nDB *NetworkDB) reapTableEntries() { - var paths []string - + var nodeNetworks []string + // This is best effort, if the list of network changes will be picked up in the next cycle nDB.RLock() - nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { - entry, ok := v.(*entry) - if !ok { - return false - } - - if !entry.deleting { - return false - } - if entry.reapTime > 0 { - entry.reapTime -= reapPeriod - return false - } - paths = append(paths, path) - return false - }) + for nid := range nDB.networks[nDB.config.NodeName] { + nodeNetworks = append(nodeNetworks, nid) + } nDB.RUnlock() - nDB.Lock() - for _, path := range paths { - params := strings.Split(path[1:], "/") - tname := params[0] - nid := params[1] - key := params[2] + cycleStart := time.Now() + // In order to avoid blocking the database for a long time, apply the garbage collection logic by network + // The lock is taken at the beginning of the cycle and the deletion is inline + for _, nid := range nodeNetworks { + nDB.Lock() + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { + // timeCompensation compensate in case the lock took some time to be released + timeCompensation := time.Since(cycleStart) + entry, ok := v.(*entry) + if !ok || !entry.deleting { + return false + } - okTable, okNetwork := nDB.deleteEntry(nid, tname, key) - if !okTable { - logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) - } - if !okNetwork { - logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) - } + // In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet + // for the tableEvent the number is always strictly > 1 and never 0 + if entry.reapTime > reapPeriod+timeCompensation+time.Second { + entry.reapTime -= reapPeriod + timeCompensation + return false + } + + params := strings.Split(path[1:], "/") + nid := params[0] + tname := params[1] + key := params[2] + + okTable, okNetwork := nDB.deleteEntry(nid, tname, key) + if !okTable { + logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid) + } + if !okNetwork { + logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname) + } + + return false + }) + nDB.Unlock() } - nDB.Unlock() } func (nDB *NetworkDB) gossip() { @@ -406,7 +414,7 @@ func (nDB *NetworkDB) gossip() { // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent += len(msgs) if printStats { - logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", + logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) network.qMessagesSent = 0 diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 9f97e4d3ee..bcddc9014b 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -238,8 +238,8 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { } // All the entries marked for deletion should have a reapTime set greater than 0 - // This case can happens if the cluster is running different versions of the engine where the old version does not have the - // field. In both cases we should raise a warning message + // This case can happen if the cluster is running different versions of the engine where the old version does not have the + // field. If that is not the case, this can be a BUG if e.deleting && e.reapTime == 0 { logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent) e.reapTime = reapInterval @@ -251,9 +251,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { if err != nil && tEvent.Type == TableEventTypeDelete { // If it is a delete event and we did not have a state for it, don't propagate to the application - // If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around + // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. - return e.reapTime >= reapPeriod/6 + return e.reapTime > reapPeriod/6 } var op opType diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index cf3671d0f9..caa3cfc5a6 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -475,7 +475,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { entry := &entry{ ltime: oldEntry.ltime, - node: node, + node: oldEntry.node, value: oldEntry.value, deleting: true, reapTime: reapInterval, @@ -692,8 +692,10 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) if !okNetwork { // Add only if it is an insert not an update - n := nDB.networks[nDB.config.NodeName][nid] - n.entriesNumber++ + n, ok := nDB.networks[nDB.config.NodeName][nid] + if ok { + n.entriesNumber++ + } } return okTable, okNetwork } @@ -705,8 +707,10 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) { _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) if okNetwork { // Remove only if the delete is successful - n := nDB.networks[nDB.config.NodeName][nid] - n.entriesNumber-- + n, ok := nDB.networks[nDB.config.NodeName][nid] + if ok { + n.entriesNumber-- + } } return okTable, okNetwork } diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 326aaadecd..3b290e3e09 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -473,6 +473,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { if len(dbs[0].networkNodes["network1"]) != 2 { t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"]) } + if n, ok := dbs[0].networks[dbs[0].config.NodeName]["network1"]; !ok || n.leaving { + t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + } // Wait for the propagation on db[1] for i := 0; i < maxRetry; i++ { @@ -484,6 +487,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { if len(dbs[1].networkNodes["network1"]) != 2 { t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"]) } + if n, ok := dbs[1].networks[dbs[1].config.NodeName]["network1"]; !ok || n.leaving { + t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + } // Try a quick leave/join err = dbs[0].LeaveNetwork("network1") diff --git a/libnetwork/test/networkDb/README b/libnetwork/test/networkDb/README index 72a08cee5c..dbc2d1884d 100644 --- a/libnetwork/test/networkDb/README +++ b/libnetwork/test/networkDb/README @@ -1,7 +1,7 @@ SERVER cd test/networkdb -env GOOS=linux go build -v server/testMain.go && docker build -t fcrisciani/networkdb-test . +env GOOS=linux go build -v testMain.go && docker build -t fcrisciani/networkdb-test . (only for testkit case) docker push fcrisciani/networkdb-test Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000 diff --git a/libnetwork/test/networkDb/dbclient/ndbClient.go b/libnetwork/test/networkDb/dbclient/ndbClient.go index beeee2a4c6..aaf2dec4d8 100644 --- a/libnetwork/test/networkDb/dbclient/ndbClient.go +++ b/libnetwork/test/networkDb/dbclient/ndbClient.go @@ -567,7 +567,7 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { tableName := args[1] parallelWriters, _ := strconv.Atoi(args[2]) writeTimeSec, _ := strconv.Atoi(args[3]) - parallerlLeaver, _ := strconv.Atoi(args[4]) + parallelLeaver, _ := strconv.Atoi(args[4]) // Start parallel writers that will create and delete unique keys doneCh := make(chan resultTuple, parallelWriters) @@ -586,23 +586,23 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { keysExpected := keyMap[totalWrittenKeys] // The Leavers will leave the network - for i := 0; i < parallerlLeaver; i++ { + for i := 0; i < parallelLeaver; i++ { logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i]) go leaveNetwork(ips[i], servicePort, networkName, doneCh) // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed keysExpected -= keyMap[ips[i]] } - waitWriters(parallerlLeaver, false, doneCh) + waitWriters(parallelLeaver, false, doneCh) // Give some time time.Sleep(100 * time.Millisecond) // The writers will join the network - for i := 0; i < parallerlLeaver; i++ { + for i := 0; i < parallelLeaver; i++ { logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i]) go joinNetwork(ips[i], servicePort, networkName, doneCh) } - waitWriters(parallerlLeaver, false, doneCh) + waitWriters(parallelLeaver, false, doneCh) // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) From a4e64d05c1073022f43c606d2eb8d0fa4d3e947d Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Fri, 22 Sep 2017 10:23:07 -0700 Subject: [PATCH 3/3] Avoid alignment of reapNetwork and tableEntries Make sure that the network is garbage collected after the entries. Entries to be deleted requires that the network is present. Signed-off-by: Flavio Crisciani --- libnetwork/networkdb/cluster.go | 21 +++++++++++++-------- libnetwork/networkdb/delegate.go | 4 ++-- libnetwork/networkdb/networkdb.go | 7 ++++--- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index a8231481df..af6f5d9f7b 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -17,11 +17,15 @@ import ( ) const ( - reapInterval = 30 * time.Minute - reapPeriod = 5 * time.Second - retryInterval = 1 * time.Second - nodeReapInterval = 24 * time.Hour - nodeReapPeriod = 2 * time.Hour + // The garbage collection logic for entries leverage the presence of the network. + // For this reason the expiration time of the network is put slightly higher than the entry expiration so that + // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network. + reapEntryInterval = 30 * time.Minute + reapNetworkInterval = reapEntryInterval + 5*reapPeriod + reapPeriod = 5 * time.Second + retryInterval = 1 * time.Second + nodeReapInterval = 24 * time.Hour + nodeReapPeriod = 2 * time.Hour ) type logWriter struct{} @@ -300,8 +304,9 @@ func (nDB *NetworkDB) reconnectNode() { // the reaper runs. NOTE nDB.reapTableEntries updates the reapTime with a readlock. This // is safe as long as no other concurrent path touches the reapTime field. func (nDB *NetworkDB) reapState() { - nDB.reapNetworks() + // The reapTableEntries leverage the presence of the network so garbage collect entries first nDB.reapTableEntries() + nDB.reapNetworks() } func (nDB *NetworkDB) reapNetworks() { @@ -414,8 +419,8 @@ func (nDB *NetworkDB) gossip() { // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent += len(msgs) if printStats { - logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", - nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(), + logrus.Infof("NetworkDB stats - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d", + nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) network.qMessagesSent = 0 } diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index bcddc9014b..28919cf3d2 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -165,7 +165,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { n.ltime = nEvent.LTime n.leaving = nEvent.Type == NetworkEventTypeLeave if n.leaving { - n.reapTime = reapInterval + n.reapTime = reapNetworkInterval // The remote node is leaving the network, but not the gossip cluster. // Mark all its entries in deleted state, this will guarantee that @@ -242,7 +242,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { // field. If that is not the case, this can be a BUG if e.deleting && e.reapTime == 0 { logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent) - e.reapTime = reapInterval + e.reapTime = reapEntryInterval } nDB.Lock() diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index caa3cfc5a6..afdf32e2c2 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -405,7 +405,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { node: nDB.config.NodeName, value: value, deleting: true, - reapTime: reapInterval, + reapTime: reapEntryInterval, } if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil { @@ -478,7 +478,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { node: oldEntry.node, value: oldEntry.value, deleting: true, - reapTime: reapInterval, + reapTime: reapEntryInterval, } // we arrived at this point in 2 cases: @@ -619,8 +619,9 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { return fmt.Errorf("could not find network %s while trying to leave", nid) } + logrus.Debugf("%s: leaving network %s", nDB.config.NodeName, nid) n.ltime = ltime - n.reapTime = reapInterval + n.reapTime = reapNetworkInterval n.leaving = true return nil }