Fix reapTime logic in NetworkDB

- Added remainingReapTime field in the table event.
  Wihtout it a node that did not have a state for the element
  was marking the element for deletion setting the max reapTime.
  This was creating the possibility to keep the entry being resync
  between nodes forever avoding the purpose of the reap time
  itself.

- On broadcast of the table event the node owner was rewritten
  with the local node name, this was not correct because the owner
  should continue to remain the original one of the message

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-09-19 13:42:35 -07:00
parent e621b9cd7a
commit 2d2a2bc568
8 changed files with 574 additions and 354 deletions

View File

@ -38,3 +38,4 @@ cmd/dnet/dnet
libnetworkbuild.created
test/networkDb/testMain
test/networkDb/gossipdb

View File

@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}
raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)

View File

@ -349,11 +349,11 @@ func (nDB *NetworkDB) reapTableEntries() {
nid := params[1]
key := params[2]
if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
if !okNetwork {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
}
@ -406,8 +406,9 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}
@ -572,6 +573,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
TableName: params[1],
Key: params[2],
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}
msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)

View File

@ -1,9 +1,9 @@
package networkdb
import (
"fmt"
"net"
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
}
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// Update our local clock if the received messages has newer
// time.
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)
// Ignore the table events for networks that are in the process of going away
@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
node: tEvent.NodeName,
value: tEvent.Value,
deleting: tEvent.Type == TableEventTypeDelete,
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
}
if e.deleting {
// All the entries marked for deletion should have a reapTime set greater than 0
// This case can happens if the cluster is running different versions of the engine where the old version does not have the
// field. In both cases we should raise a warning message
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapInterval
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()
if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we didn't have the entry here don't repropagate
return true
// If it is a delete event and we did not have a state for it, don't propagate to the application
// If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
return e.reapTime >= reapPeriod/6
}
var op opType
@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
nDB.RUnlock()
if !ok {
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
if !ok || n.leaving || n.tableBroadcasts == nil {
return
}
broadcastQ := n.tableBroadcasts
if broadcastQ == nil {
return
}
broadcastQ.QueueBroadcast(&tableEventMessage{
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
msg: buf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: nDB.config.NodeName,
node: tEvent.NodeName,
})
}
}

View File

@ -141,6 +141,11 @@ type network struct {
// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
// interval
entriesNumber int
}
// Config represents the configuration of the networdb instance and
@ -338,8 +343,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
@ -365,8 +369,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
@ -410,8 +413,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
@ -488,12 +490,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
// without doing a delete of all the objects
entry.ltime++
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
} else {
// the local node is leaving the network, all the entries of remote nodes can be safely removed
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)
}
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
@ -513,8 +513,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
@ -558,7 +557,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
n, ok := nodeNetworks[nid]
var entries int
if ok {
entries = n.entriesNumber
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
@ -567,6 +571,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
},
RetransmitMult: 4,
}
nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
nDB.Unlock()
@ -679,3 +684,29 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}
// createOrUpdateEntry this function handles the creation or update of entries into the local
// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
_, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork {
// Add only if it is an insert not an update
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber++
}
return okTable, okNetwork
}
// deleteEntry this function handles the deletion of entries into the local tree store.
// It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork {
// Remove only if the delete is successful
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber--
}
return okTable, okNetwork
}

File diff suppressed because it is too large Load Diff

View File

@ -109,7 +109,7 @@ message NetworkEntry {
// network event was recorded.
uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
// Source node name where this network attachment happened.
string node_name = 3;
string node_name = 3 [(gogoproto.customname) = "NodeName"];
// Indicates if a leave from this network is in progress.
bool leaving = 4;
}
@ -119,6 +119,8 @@ message NetworkPushPull {
// Lamport time when this push pull was initiated.
uint64 l_time = 1 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
repeated NetworkEntry networks = 2;
// Name of the node sending this push pull payload.
string node_name = 3 [(gogoproto.customname) = "NodeName"];
}
// TableEvent message payload definition.
@ -152,6 +154,8 @@ message TableEvent {
string key = 6;
// Entry value.
bytes value = 7;
// Residual reap time for the entry before getting deleted in seconds
int32 residual_reap_time = 8 [(gogoproto.customname) = "ResidualReapTime"];;
}
// BulkSync message payload definition.
@ -180,4 +184,4 @@ message CompoundMessage {
// A list of simple messages.
repeated SimpleMessage messages = 1;
}
}

View File

@ -1,7 +1,7 @@
SERVER
cd test/networkdb
env GOOS=linux go build -v server/ndbTester.go && docker build -t fcrisciani/networkdb-test -f server/Dockerfile .
env GOOS=linux go build -v server/testMain.go && docker build -t fcrisciani/networkdb-test .
(only for testkit case) docker push fcrisciani/networkdb-test
Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000