diff --git a/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go b/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go index 609b581b23..f2f669f7b3 100644 --- a/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go +++ b/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go @@ -2,6 +2,7 @@ package dbclient import ( "context" + "fmt" "io/ioutil" "log" "net" @@ -25,17 +26,10 @@ type resultTuple struct { } func httpGetFatalError(ip, port, path string) { - // for { body, err := httpGet(ip, port, path) if err != nil || !strings.Contains(string(body), "OK") { - // if strings.Contains(err.Error(), "EOF") { - // logrus.Warnf("Got EOF path:%s err:%s", path, err) - // continue - // } log.Fatalf("[%s] error %s %s", path, err, body) } - // break - // } } func httpGet(ip, port, path string) ([]byte, error) { @@ -87,7 +81,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) { body, err := httpGet(ip, port, "/clusterpeers") if err != nil { - logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err) + logrus.Errorf("clusterPeers %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -101,7 +95,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) { body, err := httpGet(ip, port, "/networkpeers?nid="+networkName) if err != nil { - logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -115,7 +109,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName) if err != nil { - logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -124,6 +118,32 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r doneCh <- resultTuple{id: ip, result: entriesNum} } +func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/networkstats?nid="+networkName) + + if err != nil { + logrus.Errorf("entriesNumber %s there was an error: %s", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + +func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/networkstats?nid="+networkName) + + if err != nil { + logrus.Errorf("queueLength %s there was an error: %s", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) { httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName) if doneCh != nil { @@ -135,7 +155,7 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName) if err != nil { - logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -144,6 +164,26 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch doneCh <- resultTuple{id: ip, result: entriesNum} } +func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) { + x := 0 + for ; x < number; x++ { + k := key + strconv.Itoa(x) + // write key + writeTableKey(ip, port, networkName, tableName, k) + } + doneCh <- resultTuple{id: ip, result: x} +} + +func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) { + x := 0 + for ; x < number; x++ { + k := key + strconv.Itoa(x) + // write key + deleteTableKey(ip, port, networkName, tableName, k) + } + doneCh <- resultTuple{id: ip, result: x} +} + func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) { for x := 0; ; x++ { select { @@ -215,17 +255,18 @@ func ready(ip, port string, doneCh chan resultTuple) { doneCh <- resultTuple{id: ip, result: 0} } -func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) { +func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) { startTime := time.Now().UnixNano() var successTime int64 - // Loop for 2 minutes to guartee that the result is stable + // Loop for 2 minutes to guarantee that the result is stable for { select { case <-ctx.Done(): // Validate test success, if the time is set means that all the tables are empty if successTime != 0 { - logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond) + opTime = time.Duration(successTime-startTime) / time.Millisecond + logrus.Infof("Check table passed, the cluster converged in %d msec", opTime) return } log.Fatal("Test failed, there is still entries in the tables of the nodes") @@ -403,6 +444,107 @@ func doNetworkPeers(ips []string, args []string) { close(doneCh) } +// network-stats-queue networkName queueSize +func doNetworkStatsQueue(ips []string, args []string) { + doneCh := make(chan resultTuple, len(ips)) + networkName := args[0] + comparison := args[1] + size, _ := strconv.Atoi(args[2]) + + // check all the nodes + for _, ip := range ips { + go dbQueueLength(ip, servicePort, networkName, doneCh) + } + + var avgQueueSize int + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + node := <-doneCh + switch comparison { + case "lt": + if node.result > size { + log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size) + } + case "gt": + if node.result < size { + log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size) + } + default: + log.Fatal("unknown comparison operator") + } + avgQueueSize += node.result + } + close(doneCh) + avgQueueSize /= len(ips) + fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize) +} + +// write-keys networkName tableName parallelWriters numberOfKeysEach +func doWriteKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + numberOfKeys, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + defer close(doneCh) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) + cancel() + fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime) +} + +// delete-keys networkName tableName parallelWriters numberOfKeysEach +func doDeleteKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + numberOfKeys, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + defer close(doneCh) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + cancel() + fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime) +} + // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec func doWriteDeleteUniqueKeys(ips []string, args []string) { networkName := args[0] @@ -432,11 +574,12 @@ func doWriteDeleteUniqueKeys(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber) + opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime) } // write-unique-keys networkName tableName numParallelWriters writeTimeSec @@ -469,8 +612,9 @@ func doWriteUniqueKeys(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime) } // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec @@ -497,8 +641,9 @@ func doWriteDeleteLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime) } // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec @@ -542,8 +687,9 @@ func doWriteDeleteWaitLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime) } // write-wait-leave networkName tableName numParallelWriters writeTimeSec @@ -577,8 +723,9 @@ func doWriteWaitLeave(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime) } // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver @@ -626,8 +773,9 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime) } var cmdArgChec = map[string]int{ @@ -687,9 +835,21 @@ func Client(args []string) { // leave-network networkName doLeaveNetwork(ips, commandArgs) case "network-peers": - // network-peers networkName maxRetry + // network-peers networkName expectedNumberPeers maxRetry doNetworkPeers(ips, commandArgs) + // case "network-stats-entries": + // // network-stats-entries networkName maxRetry + // doNetworkPeers(ips, commandArgs) + case "network-stats-queue": + // network-stats-queue networkName queueSize + doNetworkStatsQueue(ips, commandArgs) + case "write-keys": + // write-keys networkName tableName parallelWriters numberOfKeysEach + doWriteKeys(ips, commandArgs) + case "delete-keys": + // delete-keys networkName tableName parallelWriters numberOfKeysEach + doDeleteKeys(ips, commandArgs) case "write-unique-keys": // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec doWriteUniqueKeys(ips, commandArgs) diff --git a/libnetwork/cmd/networkdb-test/testMain.go b/libnetwork/cmd/networkdb-test/testMain.go index 0cd8c29942..76cc406af1 100644 --- a/libnetwork/cmd/networkdb-test/testMain.go +++ b/libnetwork/cmd/networkdb-test/testMain.go @@ -10,6 +10,10 @@ import ( ) func main() { + formatter := &logrus.TextFormatter{ + FullTimestamp: true, + } + logrus.SetFormatter(formatter) logrus.Infof("Starting the image with these args: %v", os.Args) if len(os.Args) < 1 { log.Fatal("You need at least 1 argument [client/server]") diff --git a/libnetwork/diagnostic/types.go b/libnetwork/diagnostic/types.go index 4eb4ca0d9f..e6b4831263 100644 --- a/libnetwork/diagnostic/types.go +++ b/libnetwork/diagnostic/types.go @@ -120,3 +120,13 @@ type TablePeersResult struct { TableObj Elements []PeerEntryObj `json:"entries"` } + +// NetworkStatsResult network db stats related to entries and queue len for a network +type NetworkStatsResult struct { + Entries int `json:"entries"` + QueueLen int `jsoin:"qlen"` +} + +func (n *NetworkStatsResult) String() string { + return fmt.Sprintf("entries: %d, qlen: %d\n", n.Entries, n.QueueLen) +} diff --git a/libnetwork/networkdb/networkdbdiagnostic.go b/libnetwork/networkdb/networkdbdiagnostic.go index ffeb98d607..a0e9598799 100644 --- a/libnetwork/networkdb/networkdbdiagnostic.go +++ b/libnetwork/networkdb/networkdbdiagnostic.go @@ -28,6 +28,7 @@ var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{ "/deleteentry": dbDeleteEntry, "/getentry": dbGetEntry, "/gettable": dbGetTable, + "/networkstats": dbNetworkStats, } func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) { @@ -411,3 +412,41 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { } diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json) } + +func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnostic.DebugHTTPForm(r) + _, json := diagnostic.ParseHTTPFormOptions(r) + + // audit logs + log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()}) + log.Info("network stats") + + if len(r.Form["nid"]) < 1 { + rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path)) + log.Error("network stats failed, wrong input") + diagnostic.HTTPReply(w, rsp, json) + return + } + + nDB, ok := ctx.(*NetworkDB) + if ok { + nDB.RLock() + networks := nDB.networks[nDB.config.NodeID] + network, ok := networks[r.Form["nid"][0]] + + entries := -1 + qLen := -1 + if ok { + entries = network.entriesNumber + qLen = network.tableBroadcasts.NumQueued() + } + nDB.RUnlock() + + rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen}) + log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done") + diagnostic.HTTPReply(w, rsp, json) + return + } + diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json) +}