diff --git a/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go b/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go index 609b581b23..f2f669f7b3 100644 --- a/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go +++ b/libnetwork/cmd/networkdb-test/dbclient/ndbClient.go @@ -2,6 +2,7 @@ package dbclient import ( "context" + "fmt" "io/ioutil" "log" "net" @@ -25,17 +26,10 @@ type resultTuple struct { } func httpGetFatalError(ip, port, path string) { - // for { body, err := httpGet(ip, port, path) if err != nil || !strings.Contains(string(body), "OK") { - // if strings.Contains(err.Error(), "EOF") { - // logrus.Warnf("Got EOF path:%s err:%s", path, err) - // continue - // } log.Fatalf("[%s] error %s %s", path, err, body) } - // break - // } } func httpGet(ip, port, path string) ([]byte, error) { @@ -87,7 +81,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) { body, err := httpGet(ip, port, "/clusterpeers") if err != nil { - logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err) + logrus.Errorf("clusterPeers %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -101,7 +95,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) { body, err := httpGet(ip, port, "/networkpeers?nid="+networkName) if err != nil { - logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -115,7 +109,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName) if err != nil { - logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -124,6 +118,32 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r doneCh <- resultTuple{id: ip, result: entriesNum} } +func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/networkstats?nid="+networkName) + + if err != nil { + logrus.Errorf("entriesNumber %s there was an error: %s", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + +func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/networkstats?nid="+networkName) + + if err != nil { + logrus.Errorf("queueLength %s there was an error: %s", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) { httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName) if doneCh != nil { @@ -135,7 +155,7 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName) if err != nil { - logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -144,6 +164,26 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch doneCh <- resultTuple{id: ip, result: entriesNum} } +func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) { + x := 0 + for ; x < number; x++ { + k := key + strconv.Itoa(x) + // write key + writeTableKey(ip, port, networkName, tableName, k) + } + doneCh <- resultTuple{id: ip, result: x} +} + +func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) { + x := 0 + for ; x < number; x++ { + k := key + strconv.Itoa(x) + // write key + deleteTableKey(ip, port, networkName, tableName, k) + } + doneCh <- resultTuple{id: ip, result: x} +} + func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) { for x := 0; ; x++ { select { @@ -215,17 +255,18 @@ func ready(ip, port string, doneCh chan resultTuple) { doneCh <- resultTuple{id: ip, result: 0} } -func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) { +func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) { startTime := time.Now().UnixNano() var successTime int64 - // Loop for 2 minutes to guartee that the result is stable + // Loop for 2 minutes to guarantee that the result is stable for { select { case <-ctx.Done(): // Validate test success, if the time is set means that all the tables are empty if successTime != 0 { - logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond) + opTime = time.Duration(successTime-startTime) / time.Millisecond + logrus.Infof("Check table passed, the cluster converged in %d msec", opTime) return } log.Fatal("Test failed, there is still entries in the tables of the nodes") @@ -403,6 +444,107 @@ func doNetworkPeers(ips []string, args []string) { close(doneCh) } +// network-stats-queue networkName queueSize +func doNetworkStatsQueue(ips []string, args []string) { + doneCh := make(chan resultTuple, len(ips)) + networkName := args[0] + comparison := args[1] + size, _ := strconv.Atoi(args[2]) + + // check all the nodes + for _, ip := range ips { + go dbQueueLength(ip, servicePort, networkName, doneCh) + } + + var avgQueueSize int + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + node := <-doneCh + switch comparison { + case "lt": + if node.result > size { + log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size) + } + case "gt": + if node.result < size { + log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size) + } + default: + log.Fatal("unknown comparison operator") + } + avgQueueSize += node.result + } + close(doneCh) + avgQueueSize /= len(ips) + fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize) +} + +// write-keys networkName tableName parallelWriters numberOfKeysEach +func doWriteKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + numberOfKeys, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + defer close(doneCh) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) + cancel() + fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime) +} + +// delete-keys networkName tableName parallelWriters numberOfKeysEach +func doDeleteKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + numberOfKeys, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + defer close(doneCh) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + cancel() + fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime) +} + // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec func doWriteDeleteUniqueKeys(ips []string, args []string) { networkName := args[0] @@ -432,11 +574,12 @@ func doWriteDeleteUniqueKeys(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber) + opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime) } // write-unique-keys networkName tableName numParallelWriters writeTimeSec @@ -469,8 +612,9 @@ func doWriteUniqueKeys(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime) } // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec @@ -497,8 +641,9 @@ func doWriteDeleteLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime) } // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec @@ -542,8 +687,9 @@ func doWriteDeleteWaitLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime) } // write-wait-leave networkName tableName numParallelWriters writeTimeSec @@ -577,8 +723,9 @@ func doWriteWaitLeave(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime) } // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver @@ -626,8 +773,9 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime) } var cmdArgChec = map[string]int{ @@ -687,9 +835,21 @@ func Client(args []string) { // leave-network networkName doLeaveNetwork(ips, commandArgs) case "network-peers": - // network-peers networkName maxRetry + // network-peers networkName expectedNumberPeers maxRetry doNetworkPeers(ips, commandArgs) + // case "network-stats-entries": + // // network-stats-entries networkName maxRetry + // doNetworkPeers(ips, commandArgs) + case "network-stats-queue": + // network-stats-queue networkName queueSize + doNetworkStatsQueue(ips, commandArgs) + case "write-keys": + // write-keys networkName tableName parallelWriters numberOfKeysEach + doWriteKeys(ips, commandArgs) + case "delete-keys": + // delete-keys networkName tableName parallelWriters numberOfKeysEach + doDeleteKeys(ips, commandArgs) case "write-unique-keys": // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec doWriteUniqueKeys(ips, commandArgs) diff --git a/libnetwork/cmd/networkdb-test/testMain.go b/libnetwork/cmd/networkdb-test/testMain.go index 0cd8c29942..76cc406af1 100644 --- a/libnetwork/cmd/networkdb-test/testMain.go +++ b/libnetwork/cmd/networkdb-test/testMain.go @@ -10,6 +10,10 @@ import ( ) func main() { + formatter := &logrus.TextFormatter{ + FullTimestamp: true, + } + logrus.SetFormatter(formatter) logrus.Infof("Starting the image with these args: %v", os.Args) if len(os.Args) < 1 { log.Fatal("You need at least 1 argument [client/server]") diff --git a/libnetwork/diagnostic/types.go b/libnetwork/diagnostic/types.go index 4eb4ca0d9f..e6b4831263 100644 --- a/libnetwork/diagnostic/types.go +++ b/libnetwork/diagnostic/types.go @@ -120,3 +120,13 @@ type TablePeersResult struct { TableObj Elements []PeerEntryObj `json:"entries"` } + +// NetworkStatsResult network db stats related to entries and queue len for a network +type NetworkStatsResult struct { + Entries int `json:"entries"` + QueueLen int `jsoin:"qlen"` +} + +func (n *NetworkStatsResult) String() string { + return fmt.Sprintf("entries: %d, qlen: %d\n", n.Entries, n.QueueLen) +} diff --git a/libnetwork/networkdb/broadcast.go b/libnetwork/networkdb/broadcast.go index 174023b22b..efcfcc2426 100644 --- a/libnetwork/networkdb/broadcast.go +++ b/libnetwork/networkdb/broadcast.go @@ -110,7 +110,6 @@ type tableEventMessage struct { tname string key string msg []byte - node string } func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool { @@ -168,7 +167,6 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st id: nid, tname: tname, key: key, - node: nDB.config.NodeID, }) return nil } diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 600f4ccf4c..0a64787df9 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -24,6 +24,9 @@ const ( retryInterval = 1 * time.Second nodeReapInterval = 24 * time.Hour nodeReapPeriod = 2 * time.Hour + // considering a cluster with > 20 nodes and a drain speed of 100 msg/s + // the following is roughly 1 minute + maxQueueLenBroadcastOnSync = 500 ) type logWriter struct{} @@ -572,6 +575,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { var err error var networks []string + var success bool for _, node := range nodes { if node == nDB.config.NodeID { continue @@ -579,21 +583,25 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node) networks = nDB.findCommonNetworks(node) err = nDB.bulkSyncNode(networks, node, true) - // if its periodic bulksync stop after the first successful sync - if !all && err == nil { - break - } if err != nil { err = fmt.Errorf("bulk sync to node %s failed: %v", node, err) logrus.Warn(err.Error()) + } else { + // bulk sync succeeded + success = true + // if its periodic bulksync stop after the first successful sync + if !all { + break + } } } - if err != nil { - return nil, err + if success { + // if at least one node sync succeeded + return networks, nil } - return networks, nil + return nil, err } // Bulk sync all the table entries belonging to a set of networks to a diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 6cd827ee26..14e19bbdd7 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -142,7 +142,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { return true } -func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { +func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool { // Update our local clock if the received messages has newer time. nDB.tableClock.Witness(tEvent.LTime) @@ -175,6 +175,14 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.Unlock() return false } + } else if tEvent.Type == TableEventTypeDelete && !isBulkSync { + nDB.Unlock() + // We don't know the entry, the entry is being deleted and the message is an async message + // In this case the safest approach is to ignore it, it is possible that the queue grew so much to + // exceed the garbage collection time (the residual reap time that is in the message is not being + // updated, to avoid inserting too many messages in the queue). + // Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time + return false } e = &entry{ @@ -197,11 +205,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.Unlock() if err != nil && tEvent.Type == TableEventTypeDelete { - // If it is a delete event and we did not have a state for it, don't propagate to the application + // Again we don't know the entry but this is coming from a TCP sync so the message body is up to date. + // We had saved the state so to speed up convergence and be able to avoid accepting create events. + // Now we will rebroadcast the message if 2 conditions are met: + // 1) we had already synced this network (during the network join) + // 2) the residual reapTime is higher than 1/6 of the total reapTime. // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around - // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. - // This also avoids that deletion of entries close to their garbage collection ends up circuling around forever - return e.reapTime > nDB.config.reapEntryInterval/6 + // most likely the cluster is already aware of it + // This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around + // forever + //logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync) + return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6 } var op opType @@ -215,7 +229,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { } nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value)) - return true + return network.inSync } func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) { @@ -244,7 +258,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { return } - if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { + if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast { var err error buf, err = encodeRawMessage(MessageTypeTableEvent, buf) if err != nil { @@ -261,12 +275,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { return } + // if the queue is over the threshold, avoid distributing information coming from TCP sync + if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync { + return + } + n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ msg: buf, id: tEvent.NetworkID, tname: tEvent.TableName, key: tEvent.Key, - node: tEvent.NodeName, }) } } diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index ec07a6eb81..b79f346eea 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -130,6 +130,9 @@ type network struct { // Lamport time for the latest state of the entry. ltime serf.LamportTime + // Gets set to true after the first bulk sync happens + inSync bool + // Node leave is in progress. leaving bool @@ -616,6 +619,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { } nDB.addNetworkNode(nid, nDB.config.NodeID) networkNodes := nDB.networkNodes[nid] + n = nodeNetworks[nid] nDB.Unlock() if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil { @@ -627,6 +631,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) } + // Mark the network as being synced + // note this is a best effort, we are not checking the result of the bulk sync + nDB.Lock() + n.inSync = true + nDB.Unlock() + return nil } diff --git a/libnetwork/networkdb/networkdbdiagnostic.go b/libnetwork/networkdb/networkdbdiagnostic.go index ffeb98d607..a0e9598799 100644 --- a/libnetwork/networkdb/networkdbdiagnostic.go +++ b/libnetwork/networkdb/networkdbdiagnostic.go @@ -28,6 +28,7 @@ var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{ "/deleteentry": dbDeleteEntry, "/getentry": dbGetEntry, "/gettable": dbGetTable, + "/networkstats": dbNetworkStats, } func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) { @@ -411,3 +412,41 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { } diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json) } + +func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnostic.DebugHTTPForm(r) + _, json := diagnostic.ParseHTTPFormOptions(r) + + // audit logs + log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()}) + log.Info("network stats") + + if len(r.Form["nid"]) < 1 { + rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path)) + log.Error("network stats failed, wrong input") + diagnostic.HTTPReply(w, rsp, json) + return + } + + nDB, ok := ctx.(*NetworkDB) + if ok { + nDB.RLock() + networks := nDB.networks[nDB.config.NodeID] + network, ok := networks[r.Form["nid"][0]] + + entries := -1 + qLen := -1 + if ok { + entries = network.entriesNumber + qLen = network.tableBroadcasts.NumQueued() + } + nDB.RUnlock() + + rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen}) + log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done") + diagnostic.HTTPReply(w, rsp, json) + return + } + diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json) +}