diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index eb20524cc5..a8231481df 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -321,43 +321,51 @@ func (nDB *NetworkDB) reapNetworks() { } func (nDB *NetworkDB) reapTableEntries() { - var paths []string - + var nodeNetworks []string + // This is best effort, if the list of network changes will be picked up in the next cycle nDB.RLock() - nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { - entry, ok := v.(*entry) - if !ok { - return false - } - - if !entry.deleting { - return false - } - if entry.reapTime > 0 { - entry.reapTime -= reapPeriod - return false - } - paths = append(paths, path) - return false - }) + for nid := range nDB.networks[nDB.config.NodeName] { + nodeNetworks = append(nodeNetworks, nid) + } nDB.RUnlock() - nDB.Lock() - for _, path := range paths { - params := strings.Split(path[1:], "/") - tname := params[0] - nid := params[1] - key := params[2] + cycleStart := time.Now() + // In order to avoid blocking the database for a long time, apply the garbage collection logic by network + // The lock is taken at the beginning of the cycle and the deletion is inline + for _, nid := range nodeNetworks { + nDB.Lock() + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { + // timeCompensation compensate in case the lock took some time to be released + timeCompensation := time.Since(cycleStart) + entry, ok := v.(*entry) + if !ok || !entry.deleting { + return false + } - okTable, okNetwork := nDB.deleteEntry(nid, tname, key) - if !okTable { - logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) - } - if !okNetwork { - logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) - } + // In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet + // for the tableEvent the number is always strictly > 1 and never 0 + if entry.reapTime > reapPeriod+timeCompensation+time.Second { + entry.reapTime -= reapPeriod + timeCompensation + return false + } + + params := strings.Split(path[1:], "/") + nid := params[0] + tname := params[1] + key := params[2] + + okTable, okNetwork := nDB.deleteEntry(nid, tname, key) + if !okTable { + logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid) + } + if !okNetwork { + logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname) + } + + return false + }) + nDB.Unlock() } - nDB.Unlock() } func (nDB *NetworkDB) gossip() { @@ -406,7 +414,7 @@ func (nDB *NetworkDB) gossip() { // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent += len(msgs) if printStats { - logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", + logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) network.qMessagesSent = 0 diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 9f97e4d3ee..bcddc9014b 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -238,8 +238,8 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { } // All the entries marked for deletion should have a reapTime set greater than 0 - // This case can happens if the cluster is running different versions of the engine where the old version does not have the - // field. In both cases we should raise a warning message + // This case can happen if the cluster is running different versions of the engine where the old version does not have the + // field. If that is not the case, this can be a BUG if e.deleting && e.reapTime == 0 { logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent) e.reapTime = reapInterval @@ -251,9 +251,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { if err != nil && tEvent.Type == TableEventTypeDelete { // If it is a delete event and we did not have a state for it, don't propagate to the application - // If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around + // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. - return e.reapTime >= reapPeriod/6 + return e.reapTime > reapPeriod/6 } var op opType diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index cf3671d0f9..caa3cfc5a6 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -475,7 +475,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { entry := &entry{ ltime: oldEntry.ltime, - node: node, + node: oldEntry.node, value: oldEntry.value, deleting: true, reapTime: reapInterval, @@ -692,8 +692,10 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) if !okNetwork { // Add only if it is an insert not an update - n := nDB.networks[nDB.config.NodeName][nid] - n.entriesNumber++ + n, ok := nDB.networks[nDB.config.NodeName][nid] + if ok { + n.entriesNumber++ + } } return okTable, okNetwork } @@ -705,8 +707,10 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) { _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) if okNetwork { // Remove only if the delete is successful - n := nDB.networks[nDB.config.NodeName][nid] - n.entriesNumber-- + n, ok := nDB.networks[nDB.config.NodeName][nid] + if ok { + n.entriesNumber-- + } } return okTable, okNetwork } diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 326aaadecd..3b290e3e09 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -473,6 +473,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { if len(dbs[0].networkNodes["network1"]) != 2 { t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"]) } + if n, ok := dbs[0].networks[dbs[0].config.NodeName]["network1"]; !ok || n.leaving { + t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + } // Wait for the propagation on db[1] for i := 0; i < maxRetry; i++ { @@ -484,6 +487,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { if len(dbs[1].networkNodes["network1"]) != 2 { t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"]) } + if n, ok := dbs[1].networks[dbs[1].config.NodeName]["network1"]; !ok || n.leaving { + t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + } // Try a quick leave/join err = dbs[0].LeaveNetwork("network1") diff --git a/libnetwork/test/networkDb/README b/libnetwork/test/networkDb/README index 72a08cee5c..dbc2d1884d 100644 --- a/libnetwork/test/networkDb/README +++ b/libnetwork/test/networkDb/README @@ -1,7 +1,7 @@ SERVER cd test/networkdb -env GOOS=linux go build -v server/testMain.go && docker build -t fcrisciani/networkdb-test . +env GOOS=linux go build -v testMain.go && docker build -t fcrisciani/networkdb-test . (only for testkit case) docker push fcrisciani/networkdb-test Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000 diff --git a/libnetwork/test/networkDb/dbclient/ndbClient.go b/libnetwork/test/networkDb/dbclient/ndbClient.go index beeee2a4c6..aaf2dec4d8 100644 --- a/libnetwork/test/networkDb/dbclient/ndbClient.go +++ b/libnetwork/test/networkDb/dbclient/ndbClient.go @@ -567,7 +567,7 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { tableName := args[1] parallelWriters, _ := strconv.Atoi(args[2]) writeTimeSec, _ := strconv.Atoi(args[3]) - parallerlLeaver, _ := strconv.Atoi(args[4]) + parallelLeaver, _ := strconv.Atoi(args[4]) // Start parallel writers that will create and delete unique keys doneCh := make(chan resultTuple, parallelWriters) @@ -586,23 +586,23 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { keysExpected := keyMap[totalWrittenKeys] // The Leavers will leave the network - for i := 0; i < parallerlLeaver; i++ { + for i := 0; i < parallelLeaver; i++ { logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i]) go leaveNetwork(ips[i], servicePort, networkName, doneCh) // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed keysExpected -= keyMap[ips[i]] } - waitWriters(parallerlLeaver, false, doneCh) + waitWriters(parallelLeaver, false, doneCh) // Give some time time.Sleep(100 * time.Millisecond) // The writers will join the network - for i := 0; i < parallerlLeaver; i++ { + for i := 0; i < parallelLeaver; i++ { logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i]) go joinNetwork(ips[i], servicePort, networkName, doneCh) } - waitWriters(parallerlLeaver, false, doneCh) + waitWriters(parallelLeaver, false, doneCh) // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)