mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #2216 from fcrisciani/netdb-qlen-issue
NetworkDB qlen optimization
This commit is contained in:
commit
b0a0059237
8 changed files with 286 additions and 39 deletions
|
@ -2,6 +2,7 @@ package dbclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
@ -25,17 +26,10 @@ type resultTuple struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func httpGetFatalError(ip, port, path string) {
|
func httpGetFatalError(ip, port, path string) {
|
||||||
// for {
|
|
||||||
body, err := httpGet(ip, port, path)
|
body, err := httpGet(ip, port, path)
|
||||||
if err != nil || !strings.Contains(string(body), "OK") {
|
if err != nil || !strings.Contains(string(body), "OK") {
|
||||||
// if strings.Contains(err.Error(), "EOF") {
|
|
||||||
// logrus.Warnf("Got EOF path:%s err:%s", path, err)
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
log.Fatalf("[%s] error %s %s", path, err, body)
|
log.Fatalf("[%s] error %s %s", path, err, body)
|
||||||
}
|
}
|
||||||
// break
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func httpGet(ip, port, path string) ([]byte, error) {
|
func httpGet(ip, port, path string) ([]byte, error) {
|
||||||
|
@ -87,7 +81,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
|
||||||
body, err := httpGet(ip, port, "/clusterpeers")
|
body, err := httpGet(ip, port, "/clusterpeers")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
|
logrus.Errorf("clusterPeers %s there was an error: %s", ip, err)
|
||||||
doneCh <- resultTuple{id: ip, result: -1}
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -101,7 +95,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
|
||||||
body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
|
body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
|
logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err)
|
||||||
doneCh <- resultTuple{id: ip, result: -1}
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -115,7 +109,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
|
||||||
body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
|
body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
|
logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err)
|
||||||
doneCh <- resultTuple{id: ip, result: -1}
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -124,6 +118,32 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
|
||||||
doneCh <- resultTuple{id: ip, result: entriesNum}
|
doneCh <- resultTuple{id: ip, result: entriesNum}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) {
|
||||||
|
body, err := httpGet(ip, port, "/networkstats?nid="+networkName)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("entriesNumber %s there was an error: %s", ip, err)
|
||||||
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`)
|
||||||
|
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
|
||||||
|
doneCh <- resultTuple{id: ip, result: entriesNum}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) {
|
||||||
|
body, err := httpGet(ip, port, "/networkstats?nid="+networkName)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("queueLength %s there was an error: %s", ip, err)
|
||||||
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`)
|
||||||
|
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
|
||||||
|
doneCh <- resultTuple{id: ip, result: entriesNum}
|
||||||
|
}
|
||||||
|
|
||||||
func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
|
func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
|
||||||
httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
|
httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
|
||||||
if doneCh != nil {
|
if doneCh != nil {
|
||||||
|
@ -135,7 +155,7 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
|
||||||
body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
|
body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
|
logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err)
|
||||||
doneCh <- resultTuple{id: ip, result: -1}
|
doneCh <- resultTuple{id: ip, result: -1}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -144,6 +164,26 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
|
||||||
doneCh <- resultTuple{id: ip, result: entriesNum}
|
doneCh <- resultTuple{id: ip, result: entriesNum}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
|
||||||
|
x := 0
|
||||||
|
for ; x < number; x++ {
|
||||||
|
k := key + strconv.Itoa(x)
|
||||||
|
// write key
|
||||||
|
writeTableKey(ip, port, networkName, tableName, k)
|
||||||
|
}
|
||||||
|
doneCh <- resultTuple{id: ip, result: x}
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
|
||||||
|
x := 0
|
||||||
|
for ; x < number; x++ {
|
||||||
|
k := key + strconv.Itoa(x)
|
||||||
|
// write key
|
||||||
|
deleteTableKey(ip, port, networkName, tableName, k)
|
||||||
|
}
|
||||||
|
doneCh <- resultTuple{id: ip, result: x}
|
||||||
|
}
|
||||||
|
|
||||||
func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
|
func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
|
||||||
for x := 0; ; x++ {
|
for x := 0; ; x++ {
|
||||||
select {
|
select {
|
||||||
|
@ -215,17 +255,18 @@ func ready(ip, port string, doneCh chan resultTuple) {
|
||||||
doneCh <- resultTuple{id: ip, result: 0}
|
doneCh <- resultTuple{id: ip, result: 0}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
|
func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) {
|
||||||
startTime := time.Now().UnixNano()
|
startTime := time.Now().UnixNano()
|
||||||
var successTime int64
|
var successTime int64
|
||||||
|
|
||||||
// Loop for 2 minutes to guartee that the result is stable
|
// Loop for 2 minutes to guarantee that the result is stable
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// Validate test success, if the time is set means that all the tables are empty
|
// Validate test success, if the time is set means that all the tables are empty
|
||||||
if successTime != 0 {
|
if successTime != 0 {
|
||||||
logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
|
opTime = time.Duration(successTime-startTime) / time.Millisecond
|
||||||
|
logrus.Infof("Check table passed, the cluster converged in %d msec", opTime)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Fatal("Test failed, there is still entries in the tables of the nodes")
|
log.Fatal("Test failed, there is still entries in the tables of the nodes")
|
||||||
|
@ -403,6 +444,107 @@ func doNetworkPeers(ips []string, args []string) {
|
||||||
close(doneCh)
|
close(doneCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// network-stats-queue networkName <gt/lt> queueSize
|
||||||
|
func doNetworkStatsQueue(ips []string, args []string) {
|
||||||
|
doneCh := make(chan resultTuple, len(ips))
|
||||||
|
networkName := args[0]
|
||||||
|
comparison := args[1]
|
||||||
|
size, _ := strconv.Atoi(args[2])
|
||||||
|
|
||||||
|
// check all the nodes
|
||||||
|
for _, ip := range ips {
|
||||||
|
go dbQueueLength(ip, servicePort, networkName, doneCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
var avgQueueSize int
|
||||||
|
// wait for the readiness of all nodes
|
||||||
|
for i := len(ips); i > 0; i-- {
|
||||||
|
node := <-doneCh
|
||||||
|
switch comparison {
|
||||||
|
case "lt":
|
||||||
|
if node.result > size {
|
||||||
|
log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size)
|
||||||
|
}
|
||||||
|
case "gt":
|
||||||
|
if node.result < size {
|
||||||
|
log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.Fatal("unknown comparison operator")
|
||||||
|
}
|
||||||
|
avgQueueSize += node.result
|
||||||
|
}
|
||||||
|
close(doneCh)
|
||||||
|
avgQueueSize /= len(ips)
|
||||||
|
fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// write-keys networkName tableName parallelWriters numberOfKeysEach
|
||||||
|
func doWriteKeys(ips []string, args []string) {
|
||||||
|
networkName := args[0]
|
||||||
|
tableName := args[1]
|
||||||
|
parallelWriters, _ := strconv.Atoi(args[2])
|
||||||
|
numberOfKeys, _ := strconv.Atoi(args[3])
|
||||||
|
|
||||||
|
doneCh := make(chan resultTuple, parallelWriters)
|
||||||
|
// Enable watch of tables from clients
|
||||||
|
for i := 0; i < parallelWriters; i++ {
|
||||||
|
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
|
||||||
|
}
|
||||||
|
waitWriters(parallelWriters, false, doneCh)
|
||||||
|
|
||||||
|
// Start parallel writers that will create and delete unique keys
|
||||||
|
defer close(doneCh)
|
||||||
|
for i := 0; i < parallelWriters; i++ {
|
||||||
|
key := "key-" + strconv.Itoa(i) + "-"
|
||||||
|
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||||
|
go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync with all the writers
|
||||||
|
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||||
|
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
|
||||||
|
|
||||||
|
// check table entries for 2 minutes
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||||
|
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
|
||||||
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete-keys networkName tableName parallelWriters numberOfKeysEach
|
||||||
|
func doDeleteKeys(ips []string, args []string) {
|
||||||
|
networkName := args[0]
|
||||||
|
tableName := args[1]
|
||||||
|
parallelWriters, _ := strconv.Atoi(args[2])
|
||||||
|
numberOfKeys, _ := strconv.Atoi(args[3])
|
||||||
|
|
||||||
|
doneCh := make(chan resultTuple, parallelWriters)
|
||||||
|
// Enable watch of tables from clients
|
||||||
|
for i := 0; i < parallelWriters; i++ {
|
||||||
|
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
|
||||||
|
}
|
||||||
|
waitWriters(parallelWriters, false, doneCh)
|
||||||
|
|
||||||
|
// Start parallel writers that will create and delete unique keys
|
||||||
|
defer close(doneCh)
|
||||||
|
for i := 0; i < parallelWriters; i++ {
|
||||||
|
key := "key-" + strconv.Itoa(i) + "-"
|
||||||
|
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||||
|
go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync with all the writers
|
||||||
|
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||||
|
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
|
||||||
|
|
||||||
|
// check table entries for 2 minutes
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||||
|
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||||
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime)
|
||||||
|
}
|
||||||
|
|
||||||
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
|
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
|
||||||
func doWriteDeleteUniqueKeys(ips []string, args []string) {
|
func doWriteDeleteUniqueKeys(ips []string, args []string) {
|
||||||
networkName := args[0]
|
networkName := args[0]
|
||||||
|
@ -432,11 +574,12 @@ func doWriteDeleteUniqueKeys(ips []string, args []string) {
|
||||||
|
|
||||||
// check table entries for 2 minutes
|
// check table entries for 2 minutes
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||||
cancel()
|
cancel()
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
|
opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
|
||||||
cancel()
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write-unique-keys networkName tableName numParallelWriters writeTimeSec
|
// write-unique-keys networkName tableName numParallelWriters writeTimeSec
|
||||||
|
@ -469,8 +612,9 @@ func doWriteUniqueKeys(ips []string, args []string) {
|
||||||
|
|
||||||
// check table entries for 2 minutes
|
// check table entries for 2 minutes
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
|
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
|
||||||
cancel()
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
|
// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
|
||||||
|
@ -497,8 +641,9 @@ func doWriteDeleteLeaveJoin(ips []string, args []string) {
|
||||||
|
|
||||||
// check table entries for 2 minutes
|
// check table entries for 2 minutes
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||||
cancel()
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
|
// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
|
||||||
|
@ -542,8 +687,9 @@ func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
|
||||||
|
|
||||||
// check table entries for 2 minutes
|
// check table entries for 2 minutes
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||||
cancel()
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write-wait-leave networkName tableName numParallelWriters writeTimeSec
|
// write-wait-leave networkName tableName numParallelWriters writeTimeSec
|
||||||
|
@ -577,8 +723,9 @@ func doWriteWaitLeave(ips []string, args []string) {
|
||||||
|
|
||||||
// check table entries for 2 minutes
|
// check table entries for 2 minutes
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||||
cancel()
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
|
// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
|
||||||
|
@ -626,8 +773,9 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {
|
||||||
|
|
||||||
// check table entries for 2 minutes
|
// check table entries for 2 minutes
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
|
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
|
||||||
cancel()
|
cancel()
|
||||||
|
fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdArgChec = map[string]int{
|
var cmdArgChec = map[string]int{
|
||||||
|
@ -687,9 +835,21 @@ func Client(args []string) {
|
||||||
// leave-network networkName
|
// leave-network networkName
|
||||||
doLeaveNetwork(ips, commandArgs)
|
doLeaveNetwork(ips, commandArgs)
|
||||||
case "network-peers":
|
case "network-peers":
|
||||||
// network-peers networkName maxRetry
|
// network-peers networkName expectedNumberPeers maxRetry
|
||||||
doNetworkPeers(ips, commandArgs)
|
doNetworkPeers(ips, commandArgs)
|
||||||
|
// case "network-stats-entries":
|
||||||
|
// // network-stats-entries networkName maxRetry
|
||||||
|
// doNetworkPeers(ips, commandArgs)
|
||||||
|
case "network-stats-queue":
|
||||||
|
// network-stats-queue networkName <lt/gt> queueSize
|
||||||
|
doNetworkStatsQueue(ips, commandArgs)
|
||||||
|
|
||||||
|
case "write-keys":
|
||||||
|
// write-keys networkName tableName parallelWriters numberOfKeysEach
|
||||||
|
doWriteKeys(ips, commandArgs)
|
||||||
|
case "delete-keys":
|
||||||
|
// delete-keys networkName tableName parallelWriters numberOfKeysEach
|
||||||
|
doDeleteKeys(ips, commandArgs)
|
||||||
case "write-unique-keys":
|
case "write-unique-keys":
|
||||||
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
|
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
|
||||||
doWriteUniqueKeys(ips, commandArgs)
|
doWriteUniqueKeys(ips, commandArgs)
|
||||||
|
|
|
@ -10,6 +10,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
formatter := &logrus.TextFormatter{
|
||||||
|
FullTimestamp: true,
|
||||||
|
}
|
||||||
|
logrus.SetFormatter(formatter)
|
||||||
logrus.Infof("Starting the image with these args: %v", os.Args)
|
logrus.Infof("Starting the image with these args: %v", os.Args)
|
||||||
if len(os.Args) < 1 {
|
if len(os.Args) < 1 {
|
||||||
log.Fatal("You need at least 1 argument [client/server]")
|
log.Fatal("You need at least 1 argument [client/server]")
|
||||||
|
|
|
@ -120,3 +120,13 @@ type TablePeersResult struct {
|
||||||
TableObj
|
TableObj
|
||||||
Elements []PeerEntryObj `json:"entries"`
|
Elements []PeerEntryObj `json:"entries"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NetworkStatsResult network db stats related to entries and queue len for a network
|
||||||
|
type NetworkStatsResult struct {
|
||||||
|
Entries int `json:"entries"`
|
||||||
|
QueueLen int `jsoin:"qlen"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NetworkStatsResult) String() string {
|
||||||
|
return fmt.Sprintf("entries: %d, qlen: %d\n", n.Entries, n.QueueLen)
|
||||||
|
}
|
||||||
|
|
|
@ -110,7 +110,6 @@ type tableEventMessage struct {
|
||||||
tname string
|
tname string
|
||||||
key string
|
key string
|
||||||
msg []byte
|
msg []byte
|
||||||
node string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
|
func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
|
||||||
|
@ -168,7 +167,6 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
|
||||||
id: nid,
|
id: nid,
|
||||||
tname: tname,
|
tname: tname,
|
||||||
key: key,
|
key: key,
|
||||||
node: nDB.config.NodeID,
|
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,9 @@ const (
|
||||||
retryInterval = 1 * time.Second
|
retryInterval = 1 * time.Second
|
||||||
nodeReapInterval = 24 * time.Hour
|
nodeReapInterval = 24 * time.Hour
|
||||||
nodeReapPeriod = 2 * time.Hour
|
nodeReapPeriod = 2 * time.Hour
|
||||||
|
// considering a cluster with > 20 nodes and a drain speed of 100 msg/s
|
||||||
|
// the following is roughly 1 minute
|
||||||
|
maxQueueLenBroadcastOnSync = 500
|
||||||
)
|
)
|
||||||
|
|
||||||
type logWriter struct{}
|
type logWriter struct{}
|
||||||
|
@ -572,6 +575,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var networks []string
|
var networks []string
|
||||||
|
var success bool
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
if node == nDB.config.NodeID {
|
if node == nDB.config.NodeID {
|
||||||
continue
|
continue
|
||||||
|
@ -579,21 +583,25 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
|
||||||
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
|
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
|
||||||
networks = nDB.findCommonNetworks(node)
|
networks = nDB.findCommonNetworks(node)
|
||||||
err = nDB.bulkSyncNode(networks, node, true)
|
err = nDB.bulkSyncNode(networks, node, true)
|
||||||
// if its periodic bulksync stop after the first successful sync
|
|
||||||
if !all && err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
|
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
|
||||||
logrus.Warn(err.Error())
|
logrus.Warn(err.Error())
|
||||||
|
} else {
|
||||||
|
// bulk sync succeeded
|
||||||
|
success = true
|
||||||
|
// if its periodic bulksync stop after the first successful sync
|
||||||
|
if !all {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if success {
|
||||||
return nil, err
|
// if at least one node sync succeeded
|
||||||
}
|
|
||||||
|
|
||||||
return networks, nil
|
return networks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bulk sync all the table entries belonging to a set of networks to a
|
// Bulk sync all the table entries belonging to a set of networks to a
|
||||||
|
|
|
@ -142,7 +142,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool {
|
||||||
// Update our local clock if the received messages has newer time.
|
// Update our local clock if the received messages has newer time.
|
||||||
nDB.tableClock.Witness(tEvent.LTime)
|
nDB.tableClock.Witness(tEvent.LTime)
|
||||||
|
|
||||||
|
@ -175,6 +175,14 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
||||||
nDB.Unlock()
|
nDB.Unlock()
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
} else if tEvent.Type == TableEventTypeDelete && !isBulkSync {
|
||||||
|
nDB.Unlock()
|
||||||
|
// We don't know the entry, the entry is being deleted and the message is an async message
|
||||||
|
// In this case the safest approach is to ignore it, it is possible that the queue grew so much to
|
||||||
|
// exceed the garbage collection time (the residual reap time that is in the message is not being
|
||||||
|
// updated, to avoid inserting too many messages in the queue).
|
||||||
|
// Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
e = &entry{
|
e = &entry{
|
||||||
|
@ -197,11 +205,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
||||||
nDB.Unlock()
|
nDB.Unlock()
|
||||||
|
|
||||||
if err != nil && tEvent.Type == TableEventTypeDelete {
|
if err != nil && tEvent.Type == TableEventTypeDelete {
|
||||||
// If it is a delete event and we did not have a state for it, don't propagate to the application
|
// Again we don't know the entry but this is coming from a TCP sync so the message body is up to date.
|
||||||
|
// We had saved the state so to speed up convergence and be able to avoid accepting create events.
|
||||||
|
// Now we will rebroadcast the message if 2 conditions are met:
|
||||||
|
// 1) we had already synced this network (during the network join)
|
||||||
|
// 2) the residual reapTime is higher than 1/6 of the total reapTime.
|
||||||
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
|
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
|
||||||
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
|
// most likely the cluster is already aware of it
|
||||||
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
|
// This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around
|
||||||
return e.reapTime > nDB.config.reapEntryInterval/6
|
// forever
|
||||||
|
//logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync)
|
||||||
|
return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6
|
||||||
}
|
}
|
||||||
|
|
||||||
var op opType
|
var op opType
|
||||||
|
@ -215,7 +229,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
|
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
|
||||||
return true
|
return network.inSync
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
|
func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
|
||||||
|
@ -244,7 +258,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
|
if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast {
|
||||||
var err error
|
var err error
|
||||||
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
|
buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -261,12 +275,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if the queue is over the threshold, avoid distributing information coming from TCP sync
|
||||||
|
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
|
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
|
||||||
msg: buf,
|
msg: buf,
|
||||||
id: tEvent.NetworkID,
|
id: tEvent.NetworkID,
|
||||||
tname: tEvent.TableName,
|
tname: tEvent.TableName,
|
||||||
key: tEvent.Key,
|
key: tEvent.Key,
|
||||||
node: tEvent.NodeName,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,6 +130,9 @@ type network struct {
|
||||||
// Lamport time for the latest state of the entry.
|
// Lamport time for the latest state of the entry.
|
||||||
ltime serf.LamportTime
|
ltime serf.LamportTime
|
||||||
|
|
||||||
|
// Gets set to true after the first bulk sync happens
|
||||||
|
inSync bool
|
||||||
|
|
||||||
// Node leave is in progress.
|
// Node leave is in progress.
|
||||||
leaving bool
|
leaving bool
|
||||||
|
|
||||||
|
@ -616,6 +619,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
|
||||||
}
|
}
|
||||||
nDB.addNetworkNode(nid, nDB.config.NodeID)
|
nDB.addNetworkNode(nid, nDB.config.NodeID)
|
||||||
networkNodes := nDB.networkNodes[nid]
|
networkNodes := nDB.networkNodes[nid]
|
||||||
|
n = nodeNetworks[nid]
|
||||||
nDB.Unlock()
|
nDB.Unlock()
|
||||||
|
|
||||||
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
|
if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
|
||||||
|
@ -627,6 +631,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
|
||||||
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
|
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark the network as being synced
|
||||||
|
// note this is a best effort, we are not checking the result of the bulk sync
|
||||||
|
nDB.Lock()
|
||||||
|
n.inSync = true
|
||||||
|
nDB.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{
|
||||||
"/deleteentry": dbDeleteEntry,
|
"/deleteentry": dbDeleteEntry,
|
||||||
"/getentry": dbGetEntry,
|
"/getentry": dbGetEntry,
|
||||||
"/gettable": dbGetTable,
|
"/gettable": dbGetTable,
|
||||||
|
"/networkstats": dbNetworkStats,
|
||||||
}
|
}
|
||||||
|
|
||||||
func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -411,3 +412,41 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
|
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||||
|
r.ParseForm()
|
||||||
|
diagnostic.DebugHTTPForm(r)
|
||||||
|
_, json := diagnostic.ParseHTTPFormOptions(r)
|
||||||
|
|
||||||
|
// audit logs
|
||||||
|
log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
|
||||||
|
log.Info("network stats")
|
||||||
|
|
||||||
|
if len(r.Form["nid"]) < 1 {
|
||||||
|
rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
|
||||||
|
log.Error("network stats failed, wrong input")
|
||||||
|
diagnostic.HTTPReply(w, rsp, json)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nDB, ok := ctx.(*NetworkDB)
|
||||||
|
if ok {
|
||||||
|
nDB.RLock()
|
||||||
|
networks := nDB.networks[nDB.config.NodeID]
|
||||||
|
network, ok := networks[r.Form["nid"][0]]
|
||||||
|
|
||||||
|
entries := -1
|
||||||
|
qLen := -1
|
||||||
|
if ok {
|
||||||
|
entries = network.entriesNumber
|
||||||
|
qLen = network.tableBroadcasts.NumQueued()
|
||||||
|
}
|
||||||
|
nDB.RUnlock()
|
||||||
|
|
||||||
|
rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen})
|
||||||
|
log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done")
|
||||||
|
diagnostic.HTTPReply(w, rsp, json)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue