mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #1834 from fcrisciani/network-db-infra
NetworkDB testing infra
This commit is contained in:
commit
b1bfc4d919
11 changed files with 1364 additions and 0 deletions
1
libnetwork/.gitignore
vendored
1
libnetwork/.gitignore
vendored
|
@ -37,3 +37,4 @@ cmd/dnet/dnet
|
|||
.settings/
|
||||
|
||||
libnetworkbuild.created
|
||||
test/networkDb/testMain
|
||||
|
|
133
libnetwork/diagnose/diagnose.go
Normal file
133
libnetwork/diagnose/diagnose.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package diagnose
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// HTTPHandlerFunc TODO
|
||||
type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
|
||||
|
||||
type httpHandlerCustom struct {
|
||||
ctx interface{}
|
||||
F func(interface{}, http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// ServeHTTP TODO
|
||||
func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.F(h.ctx, w, r)
|
||||
}
|
||||
|
||||
var diagPaths2Func = map[string]HTTPHandlerFunc{
|
||||
"/": notImplemented,
|
||||
"/help": help,
|
||||
"/ready": ready,
|
||||
}
|
||||
|
||||
// Server when the debug is enabled exposes a
|
||||
// This data structure is protected by the Agent mutex so does not require and additional mutex here
|
||||
type Server struct {
|
||||
sk net.Listener
|
||||
port int
|
||||
mux *http.ServeMux
|
||||
registeredHanders []string
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// Init TODO
|
||||
func (n *Server) Init() {
|
||||
n.mux = http.NewServeMux()
|
||||
|
||||
// Register local handlers
|
||||
n.RegisterHandler(n, diagPaths2Func)
|
||||
}
|
||||
|
||||
// RegisterHandler TODO
|
||||
func (n *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
for path, fun := range hdlrs {
|
||||
n.mux.Handle(path, httpHandlerCustom{ctx, fun})
|
||||
n.registeredHanders = append(n.registeredHanders, path)
|
||||
}
|
||||
}
|
||||
|
||||
// EnableDebug opens a TCP socket to debug the passed network DB
|
||||
func (n *Server) EnableDebug(ip string, port int) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
n.port = port
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
|
||||
if n.sk != nil {
|
||||
logrus.Infof("The server is already up and running")
|
||||
return
|
||||
}
|
||||
|
||||
logrus.Infof("Starting the server listening on %d for commands", port)
|
||||
|
||||
// // Create the socket
|
||||
// var err error
|
||||
// n.sk, err = net.Listen("tcp", listeningAddr)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
//
|
||||
// go func() {
|
||||
// http.Serve(n.sk, n.mux)
|
||||
// }()
|
||||
http.ListenAndServe(":8000", n.mux)
|
||||
}
|
||||
|
||||
// DisableDebug stop the dubug and closes the tcp socket
|
||||
func (n *Server) DisableDebug() {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
n.sk.Close()
|
||||
n.sk = nil
|
||||
}
|
||||
|
||||
// IsDebugEnable returns true when the debug is enabled
|
||||
func (n *Server) IsDebugEnable() bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
return n.sk != nil
|
||||
}
|
||||
|
||||
func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintf(w, "URL path: %s no method implemented check /help\n", r.URL.Path)
|
||||
}
|
||||
|
||||
func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
n, ok := ctx.(*Server)
|
||||
if ok {
|
||||
for _, path := range n.registeredHanders {
|
||||
fmt.Fprintf(w, "%s\n", path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
|
||||
// DebugHTTPForm TODO
|
||||
func DebugHTTPForm(r *http.Request) {
|
||||
r.ParseForm()
|
||||
for k, v := range r.Form {
|
||||
logrus.Debugf("Form[%q] = %q\n", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPReplyError TODO
|
||||
func HTTPReplyError(w http.ResponseWriter, message, usage string) {
|
||||
fmt.Fprintf(w, "%s\n", message)
|
||||
if usage != "" {
|
||||
fmt.Fprintf(w, "Usage: %s\n", usage)
|
||||
}
|
||||
}
|
|
@ -104,6 +104,9 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
|
|||
}
|
||||
|
||||
n = nDB.checkAndGetNode(nEvent)
|
||||
if n == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
nDB.purgeSameNode(n)
|
||||
n.ltime = nEvent.LTime
|
||||
|
|
|
@ -108,6 +108,11 @@ type PeerInfo struct {
|
|||
IP string
|
||||
}
|
||||
|
||||
// PeerClusterInfo represents the peer (gossip cluster) nodes
|
||||
type PeerClusterInfo struct {
|
||||
PeerInfo
|
||||
}
|
||||
|
||||
type node struct {
|
||||
memberlist.Node
|
||||
ltime serf.LamportTime
|
||||
|
@ -253,6 +258,20 @@ func (nDB *NetworkDB) Close() {
|
|||
}
|
||||
}
|
||||
|
||||
// ClusterPeers returns all the gossip cluster peers.
|
||||
func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
|
||||
nDB.RLock()
|
||||
defer nDB.RUnlock()
|
||||
peers := make([]PeerInfo, 0, len(nDB.nodes))
|
||||
for _, node := range nDB.nodes {
|
||||
peers = append(peers, PeerInfo{
|
||||
Name: node.Name,
|
||||
IP: node.Node.Addr.String(),
|
||||
})
|
||||
}
|
||||
return peers
|
||||
}
|
||||
|
||||
// Peers returns the gossip peers for a given network.
|
||||
func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
|
||||
nDB.RLock()
|
||||
|
|
242
libnetwork/networkdb/networkdbdiagnose.go
Normal file
242
libnetwork/networkdb/networkdbdiagnose.go
Normal file
|
@ -0,0 +1,242 @@
|
|||
package networkdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/libnetwork/diagnose"
|
||||
)
|
||||
|
||||
const (
|
||||
missingParameter = "missing parameter"
|
||||
)
|
||||
|
||||
// NetDbPaths2Func TODO
|
||||
var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{
|
||||
"/join": dbJoin,
|
||||
"/networkpeers": dbPeers,
|
||||
"/clusterpeers": dbClusterPeers,
|
||||
"/joinnetwork": dbJoinNetwork,
|
||||
"/leavenetwork": dbLeaveNetwork,
|
||||
"/createentry": dbCreateEntry,
|
||||
"/updateentry": dbUpdateEntry,
|
||||
"/deleteentry": dbDeleteEntry,
|
||||
"/getentry": dbGetEntry,
|
||||
"/gettable": dbGetTable,
|
||||
}
|
||||
|
||||
func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["members"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
err := nDB.Join(strings.Split(r.Form["members"][0], ","))
|
||||
if err != nil {
|
||||
fmt.Fprintf(w, "%s error in the DB join %s\n", r.URL.Path, err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
}
|
||||
|
||||
func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["nid"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
peers := nDB.Peers(r.Form["nid"][0])
|
||||
fmt.Fprintf(w, "Network:%s Total peers: %d\n", r.Form["nid"], len(peers))
|
||||
for i, peerInfo := range peers {
|
||||
fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
peers := nDB.ClusterPeers()
|
||||
fmt.Fprintf(w, "Total peers: %d\n", len(peers))
|
||||
for i, peerInfo := range peers {
|
||||
fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 ||
|
||||
len(r.Form["nid"]) < 1 ||
|
||||
len(r.Form["key"]) < 1 ||
|
||||
len(r.Form["value"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
tname := r.Form["tname"][0]
|
||||
nid := r.Form["nid"][0]
|
||||
key := r.Form["key"][0]
|
||||
value := r.Form["value"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.CreateEntry(tname, nid, key, []byte(value)); err != nil {
|
||||
diagnose.HTTPReplyError(w, err.Error(), "")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
}
|
||||
|
||||
func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 ||
|
||||
len(r.Form["nid"]) < 1 ||
|
||||
len(r.Form["key"]) < 1 ||
|
||||
len(r.Form["value"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
tname := r.Form["tname"][0]
|
||||
nid := r.Form["nid"][0]
|
||||
key := r.Form["key"][0]
|
||||
value := r.Form["value"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.UpdateEntry(tname, nid, key, []byte(value)); err != nil {
|
||||
diagnose.HTTPReplyError(w, err.Error(), "")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
}
|
||||
|
||||
func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 ||
|
||||
len(r.Form["nid"]) < 1 ||
|
||||
len(r.Form["key"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
tname := r.Form["tname"][0]
|
||||
nid := r.Form["nid"][0]
|
||||
key := r.Form["key"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
err := nDB.DeleteEntry(tname, nid, key)
|
||||
if err != nil {
|
||||
diagnose.HTTPReplyError(w, err.Error(), "")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
}
|
||||
|
||||
func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 ||
|
||||
len(r.Form["nid"]) < 1 ||
|
||||
len(r.Form["key"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
tname := r.Form["tname"][0]
|
||||
nid := r.Form["nid"][0]
|
||||
key := r.Form["key"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
value, err := nDB.GetEntry(tname, nid, key)
|
||||
if err != nil {
|
||||
diagnose.HTTPReplyError(w, err.Error(), "")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "key:`%s` value:`%s`\n", key, string(value))
|
||||
}
|
||||
}
|
||||
|
||||
func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["nid"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
nid := r.Form["nid"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.JoinNetwork(nid); err != nil {
|
||||
diagnose.HTTPReplyError(w, err.Error(), "")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
}
|
||||
|
||||
func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["nid"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
nid := r.Form["nid"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
if err := nDB.LeaveNetwork(nid); err != nil {
|
||||
diagnose.HTTPReplyError(w, err.Error(), "")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
}
|
||||
|
||||
func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 ||
|
||||
len(r.Form["nid"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
tname := r.Form["tname"][0]
|
||||
nid := r.Form["nid"][0]
|
||||
|
||||
nDB, ok := ctx.(*NetworkDB)
|
||||
if ok {
|
||||
table := nDB.GetTableByNetwork(tname, nid)
|
||||
fmt.Fprintf(w, "total elements: %d\n", len(table))
|
||||
i := 0
|
||||
for k, v := range table {
|
||||
fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, string(v.([]byte)))
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
7
libnetwork/test/networkDb/Dockerfile
Normal file
7
libnetwork/test/networkDb/Dockerfile
Normal file
|
@ -0,0 +1,7 @@
|
|||
FROM alpine
|
||||
|
||||
COPY testMain /app/
|
||||
|
||||
WORKDIR app
|
||||
|
||||
ENTRYPOINT ["/app/testMain"]
|
15
libnetwork/test/networkDb/README
Normal file
15
libnetwork/test/networkDb/README
Normal file
|
@ -0,0 +1,15 @@
|
|||
SERVER
|
||||
|
||||
cd test/networkdb
|
||||
env GOOS=linux go build -v server/ndbTester.go && docker build -t fcrisciani/networkdb-test -f server/Dockerfile .
|
||||
(only for testkit case) docker push fcrisciani/networkdb-test
|
||||
|
||||
Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000
|
||||
|
||||
CLIENT
|
||||
|
||||
cd test/networkdb
|
||||
Join cluster: docker run -it --network net1 fcrisciani/networkdb-test client join testdb 8000
|
||||
Join network: docker run -it --network net1 fcrisciani/networkdb-test client join-network testdb 8000 test
|
||||
Run test: docker run -it --network net1 fcrisciani/networkdb-test client write-delete-unique-keys testdb 8000 test tableBla 3 10
|
||||
check table: curl "localhost:32768/gettable?nid=test&tname=table_name"
|
693
libnetwork/test/networkDb/dbclient/ndbClient.go
Normal file
693
libnetwork/test/networkDb/dbclient/ndbClient.go
Normal file
|
@ -0,0 +1,693 @@
|
|||
package dbclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
var servicePort string
|
||||
|
||||
const totalWrittenKeys string = "totalKeys"
|
||||
|
||||
type resultTuple struct {
|
||||
id string
|
||||
result int
|
||||
}
|
||||
|
||||
func httpGetFatalError(ip, port, path string) {
|
||||
// for {
|
||||
body, err := httpGet(ip, port, path)
|
||||
if err != nil || !strings.Contains(string(body), "OK") {
|
||||
// if strings.Contains(err.Error(), "EOF") {
|
||||
// logrus.Warnf("Got EOF path:%s err:%s", path, err)
|
||||
// continue
|
||||
// }
|
||||
log.Fatalf("[%s] error %s %s", path, err, body)
|
||||
}
|
||||
// break
|
||||
// }
|
||||
}
|
||||
|
||||
func httpGet(ip, port, path string) ([]byte, error) {
|
||||
resp, err := http.Get("http://" + ip + ":" + port + path)
|
||||
if err != nil {
|
||||
logrus.Errorf("httpGet error:%s", err)
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
return body, err
|
||||
}
|
||||
|
||||
func joinCluster(ip, port string, members []string, doneCh chan resultTuple) {
|
||||
httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ","))
|
||||
|
||||
if doneCh != nil {
|
||||
doneCh <- resultTuple{id: ip, result: 0}
|
||||
}
|
||||
}
|
||||
|
||||
func joinNetwork(ip, port, network string, doneCh chan resultTuple) {
|
||||
httpGetFatalError(ip, port, "/joinnetwork?nid="+network)
|
||||
|
||||
if doneCh != nil {
|
||||
doneCh <- resultTuple{id: ip, result: 0}
|
||||
}
|
||||
}
|
||||
|
||||
func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
|
||||
httpGetFatalError(ip, port, "/leavenetwork?nid="+network)
|
||||
|
||||
if doneCh != nil {
|
||||
doneCh <- resultTuple{id: ip, result: 0}
|
||||
}
|
||||
}
|
||||
|
||||
func writeTableKey(ip, port, networkName, tableName, key string) {
|
||||
createPath := "/createentry?nid=" + networkName + "&tname=" + tableName + "&value=v&key="
|
||||
httpGetFatalError(ip, port, createPath+key)
|
||||
}
|
||||
|
||||
func deleteTableKey(ip, port, networkName, tableName, key string) {
|
||||
deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key="
|
||||
httpGetFatalError(ip, port, deletePath+key)
|
||||
}
|
||||
|
||||
func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
|
||||
body, err := httpGet(ip, port, "/clusterpeers")
|
||||
|
||||
if err != nil {
|
||||
logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
|
||||
doneCh <- resultTuple{id: ip, result: -1}
|
||||
return
|
||||
}
|
||||
peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`)
|
||||
peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
|
||||
|
||||
doneCh <- resultTuple{id: ip, result: peersNum}
|
||||
}
|
||||
|
||||
func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
|
||||
body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
|
||||
|
||||
if err != nil {
|
||||
logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
|
||||
doneCh <- resultTuple{id: ip, result: -1}
|
||||
return
|
||||
}
|
||||
peersRegexp := regexp.MustCompile(`Total peers: ([0-9]+)`)
|
||||
peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
|
||||
|
||||
doneCh <- resultTuple{id: ip, result: peersNum}
|
||||
}
|
||||
|
||||
func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
|
||||
body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
|
||||
|
||||
if err != nil {
|
||||
logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
|
||||
doneCh <- resultTuple{id: ip, result: -1}
|
||||
return
|
||||
}
|
||||
elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
|
||||
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
|
||||
doneCh <- resultTuple{id: ip, result: entriesNum}
|
||||
}
|
||||
|
||||
func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
|
||||
httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
|
||||
if doneCh != nil {
|
||||
doneCh <- resultTuple{id: ip, result: 0}
|
||||
}
|
||||
}
|
||||
|
||||
func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
|
||||
body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
|
||||
|
||||
if err != nil {
|
||||
logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
|
||||
doneCh <- resultTuple{id: ip, result: -1}
|
||||
return
|
||||
}
|
||||
elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
|
||||
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
|
||||
doneCh <- resultTuple{id: ip, result: entriesNum}
|
||||
}
|
||||
|
||||
func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
|
||||
for x := 0; ; x++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
doneCh <- resultTuple{id: ip, result: x}
|
||||
return
|
||||
default:
|
||||
k := key + strconv.Itoa(x)
|
||||
// write key
|
||||
writeTableKey(ip, port, networkName, tableName, k)
|
||||
// give time to send out key writes
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
|
||||
for x := 0; ; x++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
doneCh <- resultTuple{id: ip, result: x}
|
||||
return
|
||||
default:
|
||||
k := key + strconv.Itoa(x)
|
||||
// write key
|
||||
writeTableKey(ip, port, networkName, tableName, k)
|
||||
// give time to send out key writes
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// delete key
|
||||
deleteTableKey(ip, port, networkName, tableName, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
|
||||
for x := 0; ; x++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
doneCh <- resultTuple{id: ip, result: x}
|
||||
return
|
||||
default:
|
||||
k := key + strconv.Itoa(x)
|
||||
// write key
|
||||
writeTableKey(ip, port, networkName, tableName, k)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// delete key
|
||||
deleteTableKey(ip, port, networkName, tableName, k)
|
||||
// give some time
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// leave network
|
||||
leaveNetwork(ip, port, networkName, nil)
|
||||
// join network
|
||||
joinNetwork(ip, port, networkName, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ready(ip, port string, doneCh chan resultTuple) {
|
||||
for {
|
||||
body, err := httpGet(ip, port, "/ready")
|
||||
if err != nil || !strings.Contains(string(body), "OK") {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
// success
|
||||
break
|
||||
}
|
||||
// notify the completion
|
||||
doneCh <- resultTuple{id: ip, result: 0}
|
||||
}
|
||||
|
||||
func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
|
||||
startTime := time.Now().UnixNano()
|
||||
var successTime int64
|
||||
|
||||
// Loop for 2 minutes to guartee that the result is stable
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Validate test success, if the time is set means that all the tables are empty
|
||||
if successTime != 0 {
|
||||
logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
|
||||
return
|
||||
}
|
||||
log.Fatal("Test failed, there is still entries in the tables of the nodes")
|
||||
default:
|
||||
logrus.Infof("Checking table %s expected %d", tableName, expectedEntries)
|
||||
doneCh := make(chan resultTuple, len(ips))
|
||||
for _, ip := range ips {
|
||||
go fn(ip, servicePort, networkName, tableName, doneCh)
|
||||
}
|
||||
|
||||
nodesWithCorrectEntriesNum := 0
|
||||
for i := len(ips); i > 0; i-- {
|
||||
tableEntries := <-doneCh
|
||||
logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result)
|
||||
if tableEntries.result == expectedEntries {
|
||||
nodesWithCorrectEntriesNum++
|
||||
}
|
||||
}
|
||||
close(doneCh)
|
||||
if nodesWithCorrectEntriesNum == len(ips) {
|
||||
if successTime == 0 {
|
||||
successTime = time.Now().UnixNano()
|
||||
logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond)
|
||||
}
|
||||
} else {
|
||||
successTime = 0
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int {
|
||||
var totalKeys int
|
||||
resultTable := make(map[string]int)
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
logrus.Infof("Waiting for %d workers", parallelWriters-i)
|
||||
workerReturn := <-doneCh
|
||||
totalKeys += workerReturn.result
|
||||
if mustWrite && workerReturn.result == 0 {
|
||||
log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result)
|
||||
}
|
||||
if !mustWrite && workerReturn.result != 0 {
|
||||
log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result)
|
||||
}
|
||||
if mustWrite {
|
||||
resultTable[workerReturn.id] = workerReturn.result
|
||||
logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result)
|
||||
}
|
||||
}
|
||||
resultTable[totalWrittenKeys] = totalKeys
|
||||
return resultTable
|
||||
}
|
||||
|
||||
// ready
|
||||
func doReady(ips []string) {
|
||||
doneCh := make(chan resultTuple, len(ips))
|
||||
// check all the nodes
|
||||
for _, ip := range ips {
|
||||
go ready(ip, servicePort, doneCh)
|
||||
}
|
||||
// wait for the readiness of all nodes
|
||||
for i := len(ips); i > 0; i-- {
|
||||
<-doneCh
|
||||
}
|
||||
close(doneCh)
|
||||
}
|
||||
|
||||
// join
|
||||
func doJoin(ips []string) {
|
||||
doneCh := make(chan resultTuple, len(ips))
|
||||
// check all the nodes
|
||||
for i, ip := range ips {
|
||||
members := append([]string(nil), ips[:i]...)
|
||||
members = append(members, ips[i+1:]...)
|
||||
go joinCluster(ip, servicePort, members, doneCh)
|
||||
}
|
||||
// wait for the readiness of all nodes
|
||||
for i := len(ips); i > 0; i-- {
|
||||
<-doneCh
|
||||
}
|
||||
close(doneCh)
|
||||
}
|
||||
|
||||
// cluster-peers expectedNumberPeers
|
||||
func doClusterPeers(ips []string, args []string) {
|
||||
doneCh := make(chan resultTuple, len(ips))
|
||||
expectedPeers, _ := strconv.Atoi(args[0])
|
||||
// check all the nodes
|
||||
for _, ip := range ips {
|
||||
go clusterPeersNumber(ip, servicePort, doneCh)
|
||||
}
|
||||
// wait for the readiness of all nodes
|
||||
for i := len(ips); i > 0; i-- {
|
||||
node := <-doneCh
|
||||
if node.result != expectedPeers {
|
||||
log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
|
||||
}
|
||||
}
|
||||
close(doneCh)
|
||||
}
|
||||
|
||||
// join-network networkName
|
||||
func doJoinNetwork(ips []string, args []string) {
|
||||
doneCh := make(chan resultTuple, len(ips))
|
||||
// check all the nodes
|
||||
for _, ip := range ips {
|
||||
go joinNetwork(ip, servicePort, args[0], doneCh)
|
||||
}
|
||||
// wait for the readiness of all nodes
|
||||
for i := len(ips); i > 0; i-- {
|
||||
<-doneCh
|
||||
}
|
||||
close(doneCh)
|
||||
}
|
||||
|
||||
// leave-network networkName
|
||||
func doLeaveNetwork(ips []string, args []string) {
|
||||
doneCh := make(chan resultTuple, len(ips))
|
||||
// check all the nodes
|
||||
for _, ip := range ips {
|
||||
go leaveNetwork(ip, servicePort, args[0], doneCh)
|
||||
}
|
||||
// wait for the readiness of all nodes
|
||||
for i := len(ips); i > 0; i-- {
|
||||
<-doneCh
|
||||
}
|
||||
close(doneCh)
|
||||
}
|
||||
|
||||
// cluster-peers networkName expectedNumberPeers maxRetry
|
||||
func doNetworkPeers(ips []string, args []string) {
|
||||
doneCh := make(chan resultTuple, len(ips))
|
||||
networkName := args[0]
|
||||
expectedPeers, _ := strconv.Atoi(args[1])
|
||||
maxRetry, _ := strconv.Atoi(args[2])
|
||||
for retry := 0; retry < maxRetry; retry++ {
|
||||
// check all the nodes
|
||||
for _, ip := range ips {
|
||||
go networkPeersNumber(ip, servicePort, networkName, doneCh)
|
||||
}
|
||||
// wait for the readiness of all nodes
|
||||
for i := len(ips); i > 0; i-- {
|
||||
node := <-doneCh
|
||||
if node.result != expectedPeers {
|
||||
if retry == maxRetry-1 {
|
||||
log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
|
||||
} else {
|
||||
logrus.Warnf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
close(doneCh)
|
||||
}
|
||||
|
||||
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
|
||||
func doWriteDeleteUniqueKeys(ips []string, args []string) {
|
||||
networkName := args[0]
|
||||
tableName := args[1]
|
||||
parallelWriters, _ := strconv.Atoi(args[2])
|
||||
writeTimeSec, _ := strconv.Atoi(args[3])
|
||||
|
||||
doneCh := make(chan resultTuple, parallelWriters)
|
||||
// Enable watch of tables from clients
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
|
||||
}
|
||||
waitWriters(parallelWriters, false, doneCh)
|
||||
|
||||
// Start parallel writers that will create and delete unique keys
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
key := "key-" + strconv.Itoa(i) + "-"
|
||||
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||
go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
|
||||
}
|
||||
|
||||
// Sync with all the writers
|
||||
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||
cancel()
|
||||
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
|
||||
|
||||
// check table entries for 2 minutes
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||
cancel()
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// write-unique-keys networkName tableName numParallelWriters writeTimeSec
|
||||
func doWriteUniqueKeys(ips []string, args []string) {
|
||||
networkName := args[0]
|
||||
tableName := args[1]
|
||||
parallelWriters, _ := strconv.Atoi(args[2])
|
||||
writeTimeSec, _ := strconv.Atoi(args[3])
|
||||
|
||||
doneCh := make(chan resultTuple, parallelWriters)
|
||||
// Enable watch of tables from clients
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
|
||||
}
|
||||
waitWriters(parallelWriters, false, doneCh)
|
||||
|
||||
// Start parallel writers that will create and delete unique keys
|
||||
defer close(doneCh)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
key := "key-" + strconv.Itoa(i) + "-"
|
||||
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||
go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
|
||||
}
|
||||
|
||||
// Sync with all the writers
|
||||
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||
cancel()
|
||||
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
|
||||
|
||||
// check table entries for 2 minutes
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
|
||||
func doWriteDeleteLeaveJoin(ips []string, args []string) {
|
||||
networkName := args[0]
|
||||
tableName := args[1]
|
||||
parallelWriters, _ := strconv.Atoi(args[2])
|
||||
writeTimeSec, _ := strconv.Atoi(args[3])
|
||||
|
||||
// Start parallel writers that will create and delete unique keys
|
||||
doneCh := make(chan resultTuple, parallelWriters)
|
||||
defer close(doneCh)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
key := "key-" + strconv.Itoa(i) + "-"
|
||||
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||
go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
|
||||
}
|
||||
|
||||
// Sync with all the writers
|
||||
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||
cancel()
|
||||
logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"])
|
||||
|
||||
// check table entries for 2 minutes
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
|
||||
func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
|
||||
networkName := args[0]
|
||||
tableName := args[1]
|
||||
parallelWriters, _ := strconv.Atoi(args[2])
|
||||
writeTimeSec, _ := strconv.Atoi(args[3])
|
||||
|
||||
// Start parallel writers that will create and delete unique keys
|
||||
doneCh := make(chan resultTuple, parallelWriters)
|
||||
defer close(doneCh)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
key := "key-" + strconv.Itoa(i) + "-"
|
||||
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||
go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
|
||||
}
|
||||
|
||||
// Sync with all the writers
|
||||
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||
cancel()
|
||||
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
|
||||
|
||||
// The writers will leave the network
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
|
||||
go leaveNetwork(ips[i], servicePort, networkName, doneCh)
|
||||
}
|
||||
waitWriters(parallelWriters, false, doneCh)
|
||||
|
||||
// Give some time
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// The writers will join the network
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
|
||||
go joinNetwork(ips[i], servicePort, networkName, doneCh)
|
||||
}
|
||||
waitWriters(parallelWriters, false, doneCh)
|
||||
|
||||
// check table entries for 2 minutes
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// write-wait-leave networkName tableName numParallelWriters writeTimeSec
|
||||
func doWriteWaitLeave(ips []string, args []string) {
|
||||
networkName := args[0]
|
||||
tableName := args[1]
|
||||
parallelWriters, _ := strconv.Atoi(args[2])
|
||||
writeTimeSec, _ := strconv.Atoi(args[3])
|
||||
|
||||
// Start parallel writers that will create and delete unique keys
|
||||
doneCh := make(chan resultTuple, parallelWriters)
|
||||
defer close(doneCh)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
key := "key-" + strconv.Itoa(i) + "-"
|
||||
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||
go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
|
||||
}
|
||||
|
||||
// Sync with all the writers
|
||||
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||
cancel()
|
||||
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
|
||||
|
||||
// The writers will leave the network
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
|
||||
go leaveNetwork(ips[i], servicePort, networkName, doneCh)
|
||||
}
|
||||
waitWriters(parallelWriters, false, doneCh)
|
||||
|
||||
// check table entries for 2 minutes
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
|
||||
func doWriteWaitLeaveJoin(ips []string, args []string) {
|
||||
networkName := args[0]
|
||||
tableName := args[1]
|
||||
parallelWriters, _ := strconv.Atoi(args[2])
|
||||
writeTimeSec, _ := strconv.Atoi(args[3])
|
||||
parallerlLeaver, _ := strconv.Atoi(args[4])
|
||||
|
||||
// Start parallel writers that will create and delete unique keys
|
||||
doneCh := make(chan resultTuple, parallelWriters)
|
||||
defer close(doneCh)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
|
||||
for i := 0; i < parallelWriters; i++ {
|
||||
key := "key-" + strconv.Itoa(i) + "-"
|
||||
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
|
||||
go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
|
||||
}
|
||||
|
||||
// Sync with all the writers
|
||||
keyMap := waitWriters(parallelWriters, true, doneCh)
|
||||
cancel()
|
||||
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
|
||||
|
||||
keysExpected := keyMap[totalWrittenKeys]
|
||||
// The Leavers will leave the network
|
||||
for i := 0; i < parallerlLeaver; i++ {
|
||||
logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
|
||||
go leaveNetwork(ips[i], servicePort, networkName, doneCh)
|
||||
// Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
|
||||
keysExpected -= keyMap[ips[i]]
|
||||
}
|
||||
waitWriters(parallerlLeaver, false, doneCh)
|
||||
|
||||
// Give some time
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// The writers will join the network
|
||||
for i := 0; i < parallerlLeaver; i++ {
|
||||
logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
|
||||
go joinNetwork(ips[i], servicePort, networkName, doneCh)
|
||||
}
|
||||
waitWriters(parallerlLeaver, false, doneCh)
|
||||
|
||||
// check table entries for 2 minutes
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
|
||||
cancel()
|
||||
}
|
||||
|
||||
var cmdArgChec = map[string]int{
|
||||
"debug": 0,
|
||||
"fail": 0,
|
||||
"ready": 2,
|
||||
"join": 2,
|
||||
"leave": 2,
|
||||
"join-network": 3,
|
||||
"leave-network": 3,
|
||||
"cluster-peers": 3,
|
||||
"write-delete-unique-keys": 4,
|
||||
}
|
||||
|
||||
// Client is a client
|
||||
func Client(args []string) {
|
||||
logrus.Infof("[CLIENT] Starting with arguments %v", args)
|
||||
command := args[0]
|
||||
|
||||
if len(args) < cmdArgChec[command] {
|
||||
log.Fatalf("Command %s requires %d arguments, aborting...", command, cmdArgChec[command])
|
||||
}
|
||||
|
||||
switch command {
|
||||
case "debug":
|
||||
time.Sleep(1 * time.Hour)
|
||||
os.Exit(0)
|
||||
case "fail":
|
||||
log.Fatalf("Test error condition with message: error error error")
|
||||
}
|
||||
|
||||
serviceName := args[1]
|
||||
ips, _ := net.LookupHost("tasks." + serviceName)
|
||||
logrus.Infof("got the ips %v", ips)
|
||||
if len(ips) == 0 {
|
||||
log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName)
|
||||
}
|
||||
servicePort = args[2]
|
||||
commandArgs := args[3:]
|
||||
logrus.Infof("Executing %s with args:%v", command, commandArgs)
|
||||
switch command {
|
||||
case "ready":
|
||||
doReady(ips)
|
||||
case "join":
|
||||
doJoin(ips)
|
||||
case "leave":
|
||||
|
||||
case "cluster-peers":
|
||||
// cluster-peers
|
||||
doClusterPeers(ips, commandArgs)
|
||||
|
||||
case "join-network":
|
||||
// join-network networkName
|
||||
doJoinNetwork(ips, commandArgs)
|
||||
case "leave-network":
|
||||
// leave-network networkName
|
||||
doLeaveNetwork(ips, commandArgs)
|
||||
case "network-peers":
|
||||
// network-peers networkName maxRetry
|
||||
doNetworkPeers(ips, commandArgs)
|
||||
|
||||
case "write-unique-keys":
|
||||
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
|
||||
doWriteUniqueKeys(ips, commandArgs)
|
||||
case "write-delete-unique-keys":
|
||||
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
|
||||
doWriteDeleteUniqueKeys(ips, commandArgs)
|
||||
case "write-delete-leave-join":
|
||||
// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
|
||||
doWriteDeleteLeaveJoin(ips, commandArgs)
|
||||
case "write-delete-wait-leave-join":
|
||||
// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
|
||||
doWriteDeleteWaitLeaveJoin(ips, commandArgs)
|
||||
case "write-wait-leave":
|
||||
// write-wait-leave networkName tableName numParallelWriters writeTimeSec
|
||||
doWriteWaitLeave(ips, commandArgs)
|
||||
case "write-wait-leave-join":
|
||||
// write-wait-leave networkName tableName numParallelWriters writeTimeSec
|
||||
doWriteWaitLeaveJoin(ips, commandArgs)
|
||||
default:
|
||||
log.Fatalf("Command %s not recognized", command)
|
||||
}
|
||||
}
|
109
libnetwork/test/networkDb/dbserver/ndbServer.go
Normal file
109
libnetwork/test/networkDb/dbserver/ndbServer.go
Normal file
|
@ -0,0 +1,109 @@
|
|||
package dbserver
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libnetwork/diagnose"
|
||||
"github.com/docker/libnetwork/networkdb"
|
||||
"github.com/docker/libnetwork/test/networkDb/dummyclient"
|
||||
)
|
||||
|
||||
var nDB *networkdb.NetworkDB
|
||||
var server diagnose.Server
|
||||
var ipAddr string
|
||||
|
||||
var testerPaths2Func = map[string]diagnose.HTTPHandlerFunc{
|
||||
"/myip": ipaddress,
|
||||
}
|
||||
|
||||
func ipaddress(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintf(w, "%s\n", ipAddr)
|
||||
}
|
||||
|
||||
// Server starts the server
|
||||
func Server(args []string) {
|
||||
logrus.Infof("[SERVER] Starting with arguments %v", args)
|
||||
if len(args) < 1 {
|
||||
log.Fatal("Port number is a mandatory argument, aborting...")
|
||||
}
|
||||
port, _ := strconv.Atoi(args[0])
|
||||
var localNodeName string
|
||||
var ok bool
|
||||
if localNodeName, ok = os.LookupEnv("TASK_ID"); !ok {
|
||||
log.Fatal("TASK_ID environment variable not set, aborting...")
|
||||
}
|
||||
logrus.Infof("[SERVER] Starting node %s on port %d", localNodeName, port)
|
||||
|
||||
ip, err := getIPInterface("eth0")
|
||||
if err != nil {
|
||||
logrus.Errorf("%s There was a problem with the IP %s\n", localNodeName, err)
|
||||
return
|
||||
}
|
||||
ipAddr = ip
|
||||
logrus.Infof("%s uses IP %s\n", localNodeName, ipAddr)
|
||||
|
||||
server = diagnose.Server{}
|
||||
server.Init()
|
||||
conf := networkdb.DefaultConfig()
|
||||
conf.NodeName = localNodeName
|
||||
conf.AdvertiseAddr = ipAddr
|
||||
conf.BindAddr = ipAddr
|
||||
nDB, err = networkdb.New(conf)
|
||||
if err != nil {
|
||||
logrus.Infof("%s error in the DB init %s\n", localNodeName, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Register network db handlers
|
||||
server.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
|
||||
server.RegisterHandler(nil, testerPaths2Func)
|
||||
server.RegisterHandler(nDB, dummyclient.DummyClientPaths2Func)
|
||||
server.EnableDebug("", port)
|
||||
}
|
||||
|
||||
func getIPInterface(name string) (string, error) {
|
||||
ifaces, err := net.Interfaces()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, iface := range ifaces {
|
||||
if iface.Name != name {
|
||||
continue // not the name specified
|
||||
}
|
||||
|
||||
if iface.Flags&net.FlagUp == 0 {
|
||||
return "", errors.New("Interfaces is down")
|
||||
}
|
||||
|
||||
addrs, err := iface.Addrs()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
var ip net.IP
|
||||
switch v := addr.(type) {
|
||||
case *net.IPNet:
|
||||
ip = v.IP
|
||||
case *net.IPAddr:
|
||||
ip = v.IP
|
||||
}
|
||||
if ip == nil || ip.IsLoopback() {
|
||||
continue
|
||||
}
|
||||
ip = ip.To4()
|
||||
if ip == nil {
|
||||
continue
|
||||
}
|
||||
return ip.String(), nil
|
||||
}
|
||||
return "", errors.New("Interfaces does not have a valid IPv4")
|
||||
}
|
||||
return "", errors.New("Interface not found")
|
||||
}
|
118
libnetwork/test/networkDb/dummyclient/dummyClient.go
Normal file
118
libnetwork/test/networkDb/dummyclient/dummyClient.go
Normal file
|
@ -0,0 +1,118 @@
|
|||
package dummyclient
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
events "github.com/docker/go-events"
|
||||
"github.com/docker/libnetwork/diagnose"
|
||||
"github.com/docker/libnetwork/networkdb"
|
||||
)
|
||||
|
||||
// DummyClientPaths2Func exported paths for the client
|
||||
var DummyClientPaths2Func = map[string]diagnose.HTTPHandlerFunc{
|
||||
"/watchtable": watchTable,
|
||||
"/watchedtableentries": watchTableEntries,
|
||||
}
|
||||
|
||||
const (
|
||||
missingParameter = "missing parameter"
|
||||
)
|
||||
|
||||
type tableHandler struct {
|
||||
cancelWatch func()
|
||||
entries map[string]string
|
||||
}
|
||||
|
||||
var clientWatchTable = map[string]tableHandler{}
|
||||
|
||||
func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
tableName := r.Form["tname"][0]
|
||||
if _, ok := clientWatchTable[tableName]; ok {
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
return
|
||||
}
|
||||
|
||||
nDB, ok := ctx.(*networkdb.NetworkDB)
|
||||
if ok {
|
||||
ch, cancel := nDB.Watch(tableName, "", "")
|
||||
clientWatchTable[tableName] = tableHandler{cancelWatch: cancel, entries: make(map[string]string)}
|
||||
go handleTableEvents(tableName, ch)
|
||||
|
||||
fmt.Fprintf(w, "OK\n")
|
||||
}
|
||||
}
|
||||
|
||||
func watchTableEntries(ctx interface{}, w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
diagnose.DebugHTTPForm(r)
|
||||
if len(r.Form["tname"]) < 1 {
|
||||
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
|
||||
return
|
||||
}
|
||||
|
||||
tableName := r.Form["tname"][0]
|
||||
table, ok := clientWatchTable[tableName]
|
||||
if !ok {
|
||||
fmt.Fprintf(w, "Table %s not watched\n", tableName)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "total elements: %d\n", len(table.entries))
|
||||
i := 0
|
||||
for k, v := range table.entries {
|
||||
fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, v)
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
func handleTableEvents(tableName string, ch *events.Channel) {
|
||||
var (
|
||||
// nid string
|
||||
eid string
|
||||
value []byte
|
||||
isAdd bool
|
||||
)
|
||||
|
||||
logrus.Infof("Started watching table:%s", tableName)
|
||||
for {
|
||||
select {
|
||||
case <-ch.Done():
|
||||
logrus.Infof("End watching %s", tableName)
|
||||
return
|
||||
|
||||
case evt := <-ch.C:
|
||||
logrus.Infof("Recevied new event on:%s", tableName)
|
||||
switch event := evt.(type) {
|
||||
case networkdb.CreateEvent:
|
||||
// nid = event.NetworkID
|
||||
eid = event.Key
|
||||
value = event.Value
|
||||
isAdd = true
|
||||
case networkdb.DeleteEvent:
|
||||
// nid = event.NetworkID
|
||||
eid = event.Key
|
||||
value = event.Value
|
||||
isAdd = false
|
||||
default:
|
||||
log.Fatalf("Unexpected table event = %#v", event)
|
||||
}
|
||||
if isAdd {
|
||||
// logrus.Infof("Add %s %s", tableName, eid)
|
||||
clientWatchTable[tableName].entries[eid] = string(value)
|
||||
} else {
|
||||
// logrus.Infof("Del %s %s", tableName, eid)
|
||||
delete(clientWatchTable[tableName].entries, eid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
24
libnetwork/test/networkDb/testMain.go
Normal file
24
libnetwork/test/networkDb/testMain.go
Normal file
|
@ -0,0 +1,24 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libnetwork/test/networkDb/dbclient"
|
||||
"github.com/docker/libnetwork/test/networkDb/dbserver"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logrus.Infof("Starting the image with these args: %v", os.Args)
|
||||
if len(os.Args) < 1 {
|
||||
log.Fatal("You need at least 1 argument [client/server]")
|
||||
}
|
||||
|
||||
switch os.Args[1] {
|
||||
case "server":
|
||||
dbserver.Server(os.Args[2:])
|
||||
case "client":
|
||||
dbclient.Client(os.Args[2:])
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue