Diagnose framework for networkDB

This commit introduces the possibility to enable a debug mode
for the networkDB, this will allow the opening of a tcp port
on localhost that will expose the networkDB api for debugging
purposes.

The API can be discovered using curl localhost:<port>/help
It support json output if passed json as URL query parameter
option and pretty printing if passing json=pretty

All the binaries values are serialized in base64 encoding, this
can be skip passing the unsafe option as url query parameter

A simple go client will follow up

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-12-01 10:13:01 -08:00
parent 37cedaa072
commit b578cdce86
9 changed files with 593 additions and 187 deletions

View File

@ -293,11 +293,13 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
}
nDB, err := networkdb.New(netDBConf)
if err != nil {
return err
}
// Register the diagnose handlers
c.DiagnoseServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
var cancelList []func()
ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
cancelList = append(cancelList, cancel)
@ -436,7 +438,7 @@ func (n *network) Services() map[string]ServiceInfo {
for eid, value := range entries {
var epRec EndpointRecord
nid := n.ID()
if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
if err := proto.Unmarshal(value.Value, &epRec); err != nil {
logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
continue
}
@ -461,7 +463,7 @@ func (n *network) Services() map[string]ServiceInfo {
}
entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
for key, value := range entries {
epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
epID, info := d.DecodeTableEntry(table.name, key, value.Value)
if ep, ok := eps[epID]; !ok {
logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
} else {

View File

@ -60,6 +60,7 @@ import (
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/diagnose"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/drvregistry"
@ -133,6 +134,13 @@ type NetworkController interface {
// SetKeys configures the encryption key for gossip and overlay data path
SetKeys(keys []*types.EncryptionKey) error
// StartDiagnose start the network diagnose mode
StartDiagnose(port int)
// StopDiagnose start the network diagnose mode
StopDiagnose()
// IsDiagnoseEnabled returns true if the diagnose is enabled
IsDiagnoseEnabled() bool
}
// NetworkWalker is a client provided function which will be used to walk the Networks.
@ -167,6 +175,7 @@ type controller struct {
agentStopDone chan struct{}
keys []*types.EncryptionKey
clusterConfigAvailable bool
DiagnoseServer *diagnose.Server
sync.Mutex
}
@ -185,7 +194,9 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
serviceBindings: make(map[serviceKey]*service),
agentInitDone: make(chan struct{}),
networkLocker: locker.New(),
DiagnoseServer: diagnose.New(),
}
c.DiagnoseServer.Init()
if err := c.initStores(); err != nil {
return nil, err
@ -1291,3 +1302,29 @@ func (c *controller) Stop() {
c.stopExternalKeyListener()
osl.GC()
}
// StartDiagnose start the network diagnose mode
func (c *controller) StartDiagnose(port int) {
c.Lock()
defer c.Unlock()
if !c.DiagnoseServer.IsDebugEnable() {
logrus.Errorf("StartDiagnose received the port %d", port)
c.DiagnoseServer.EnableDebug("127.0.0.1", port)
}
}
// StopDiagnose start the network diagnose mode
func (c *controller) StopDiagnose() {
c.Lock()
defer c.Unlock()
if c.DiagnoseServer.IsDebugEnable() {
c.DiagnoseServer.DisableDebug()
}
}
// IsDiagnoseEnabled returns true if the diagnose is enabled
func (c *controller) IsDiagnoseEnabled() bool {
c.Lock()
defer c.Unlock()
return c.DiagnoseServer.IsDebugEnable()
}

View File

@ -1,133 +0,0 @@
package diagnose
import (
"fmt"
"net"
"net/http"
"sync"
"github.com/sirupsen/logrus"
)
// HTTPHandlerFunc TODO
type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
type httpHandlerCustom struct {
ctx interface{}
F func(interface{}, http.ResponseWriter, *http.Request)
}
// ServeHTTP TODO
func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.F(h.ctx, w, r)
}
var diagPaths2Func = map[string]HTTPHandlerFunc{
"/": notImplemented,
"/help": help,
"/ready": ready,
}
// Server when the debug is enabled exposes a
// This data structure is protected by the Agent mutex so does not require and additional mutex here
type Server struct {
sk net.Listener
port int
mux *http.ServeMux
registeredHanders []string
sync.Mutex
}
// Init TODO
func (n *Server) Init() {
n.mux = http.NewServeMux()
// Register local handlers
n.RegisterHandler(n, diagPaths2Func)
}
// RegisterHandler TODO
func (n *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
n.Lock()
defer n.Unlock()
for path, fun := range hdlrs {
n.mux.Handle(path, httpHandlerCustom{ctx, fun})
n.registeredHanders = append(n.registeredHanders, path)
}
}
// EnableDebug opens a TCP socket to debug the passed network DB
func (n *Server) EnableDebug(ip string, port int) {
n.Lock()
defer n.Unlock()
n.port = port
logrus.SetLevel(logrus.DebugLevel)
if n.sk != nil {
logrus.Infof("The server is already up and running")
return
}
logrus.Infof("Starting the server listening on %d for commands", port)
// // Create the socket
// var err error
// n.sk, err = net.Listen("tcp", listeningAddr)
// if err != nil {
// log.Fatal(err)
// }
//
// go func() {
// http.Serve(n.sk, n.mux)
// }()
http.ListenAndServe(fmt.Sprintf(":%d", port), n.mux)
}
// DisableDebug stop the dubug and closes the tcp socket
func (n *Server) DisableDebug() {
n.Lock()
defer n.Unlock()
n.sk.Close()
n.sk = nil
}
// IsDebugEnable returns true when the debug is enabled
func (n *Server) IsDebugEnable() bool {
n.Lock()
defer n.Unlock()
return n.sk != nil
}
func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "URL path: %s no method implemented check /help\n", r.URL.Path)
}
func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
n, ok := ctx.(*Server)
if ok {
for _, path := range n.registeredHanders {
fmt.Fprintf(w, "%s\n", path)
}
}
}
func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK\n")
}
// DebugHTTPForm TODO
func DebugHTTPForm(r *http.Request) {
r.ParseForm()
for k, v := range r.Form {
logrus.Debugf("Form[%q] = %q\n", k, v)
}
}
// HTTPReplyError TODO
func HTTPReplyError(w http.ResponseWriter, message, usage string) {
fmt.Fprintf(w, "%s\n", message)
if usage != "" {
fmt.Fprintf(w, "Usage: %s\n", usage)
}
}

View File

@ -0,0 +1,216 @@
package diagnose
import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"sync"
stackdump "github.com/docker/docker/pkg/signal"
"github.com/docker/libnetwork/common"
"github.com/sirupsen/logrus"
)
// HTTPHandlerFunc TODO
type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request)
type httpHandlerCustom struct {
ctx interface{}
F func(interface{}, http.ResponseWriter, *http.Request)
}
// ServeHTTP TODO
func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.F(h.ctx, w, r)
}
var diagPaths2Func = map[string]HTTPHandlerFunc{
"/": notImplemented,
"/help": help,
"/ready": ready,
"/stackdump": stackTrace,
}
// Server when the debug is enabled exposes a
// This data structure is protected by the Agent mutex so does not require and additional mutex here
type Server struct {
sk net.Listener
port int
mux *http.ServeMux
registeredHanders map[string]bool
sync.Mutex
}
// New creates a new diagnose server
func New() *Server {
return &Server{
registeredHanders: make(map[string]bool),
}
}
// Init initialize the mux for the http handling and register the base hooks
func (n *Server) Init() {
n.mux = http.NewServeMux()
// Register local handlers
n.RegisterHandler(n, diagPaths2Func)
}
// RegisterHandler allows to register new handlers to the mux and to a specific path
func (n *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) {
n.Lock()
defer n.Unlock()
for path, fun := range hdlrs {
if _, ok := n.registeredHanders[path]; ok {
continue
}
n.mux.Handle(path, httpHandlerCustom{ctx, fun})
n.registeredHanders[path] = true
}
}
// EnableDebug opens a TCP socket to debug the passed network DB
func (n *Server) EnableDebug(ip string, port int) {
n.Lock()
defer n.Unlock()
n.port = port
if n.sk != nil {
logrus.Info("The server is already up and running")
return
}
logrus.Infof("Starting the server listening on %d for commands", port)
// Create the socket
var err error
n.sk, err = net.Listen("tcp", fmt.Sprintf("%s:%d", ip, port))
if err != nil {
log.Fatal(err)
}
go func() {
http.Serve(n.sk, n.mux)
}()
}
// DisableDebug stop the dubug and closes the tcp socket
func (n *Server) DisableDebug() {
n.Lock()
defer n.Unlock()
n.sk.Close()
n.sk = nil
}
// IsDebugEnable returns true when the debug is enabled
func (n *Server) IsDebugEnable() bool {
n.Lock()
defer n.Unlock()
return n.sk != nil
}
func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
_, json := ParseHTTPFormOptions(r)
rsp := WrongCommand("not implemented", fmt.Sprintf("URL path: %s no method implemented check /help\n", r.URL.Path))
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("command not implemented done")
HTTPReply(w, rsp, json)
}
func help(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
_, json := ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("help done")
n, ok := ctx.(*Server)
var result string
if ok {
for path := range n.registeredHanders {
result += fmt.Sprintf("%s\n", path)
}
HTTPReply(w, CommandSucceed(&StringCmd{Info: result}), json)
}
}
func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
_, json := ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("ready done")
HTTPReply(w, CommandSucceed(&StringCmd{Info: "OK"}), json)
}
func stackTrace(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
_, json := ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("stack trace")
path, err := stackdump.DumpStacks("/tmp/")
if err != nil {
log.WithError(err).Error("failed to write goroutines dump")
HTTPReply(w, FailCommand(err), json)
} else {
log.Info("stack trace done")
HTTPReply(w, CommandSucceed(&StringCmd{Info: fmt.Sprintf("goroutine stacks written to %s", path)}), json)
}
}
// DebugHTTPForm helper to print the form url parameters
func DebugHTTPForm(r *http.Request) {
for k, v := range r.Form {
logrus.Debugf("Form[%q] = %q\n", k, v)
}
}
// JSONOutput contains details on JSON output printing
type JSONOutput struct {
enable bool
prettyPrint bool
}
// ParseHTTPFormOptions easily parse the JSON printing options
func ParseHTTPFormOptions(r *http.Request) (bool, *JSONOutput) {
_, unsafe := r.Form["unsafe"]
v, json := r.Form["json"]
var pretty bool
if len(v) > 0 {
pretty = v[0] == "pretty"
}
return unsafe, &JSONOutput{enable: json, prettyPrint: pretty}
}
// HTTPReply helper function that takes care of sending the message out
func HTTPReply(w http.ResponseWriter, r *HTTPResult, j *JSONOutput) (int, error) {
var response []byte
if j.enable {
var err error
if j.prettyPrint {
response, err = json.MarshalIndent(r, "", " ")
if err != nil {
response, _ = json.MarshalIndent(FailCommand(err), "", " ")
}
} else {
response, err = json.Marshal(r)
if err != nil {
response, _ = json.Marshal(FailCommand(err))
}
}
} else {
response = []byte(r.String())
}
return fmt.Fprint(w, string(response))
}

View File

@ -0,0 +1,122 @@
package diagnose
import "fmt"
// StringInterface interface that has to be implemented by messages
type StringInterface interface {
String() string
}
// CommandSucceed creates a success message
func CommandSucceed(result StringInterface) *HTTPResult {
return &HTTPResult{
Message: "OK",
Details: result,
}
}
// FailCommand creates a failure message with error
func FailCommand(err error) *HTTPResult {
return &HTTPResult{
Message: "FAIL",
Details: &ErrorCmd{Error: err.Error()},
}
}
// WrongCommand creates a wrong command response
func WrongCommand(message, usage string) *HTTPResult {
return &HTTPResult{
Message: message,
Details: &UsageCmd{Usage: usage},
}
}
// HTTPResult Diagnose Server HTTP result operation
type HTTPResult struct {
Message string `json:"message"`
Details StringInterface `json:"details"`
}
func (h *HTTPResult) String() string {
rsp := h.Message
if h.Details != nil {
rsp += "\n" + h.Details.String()
}
return rsp
}
// UsageCmd command with usage field
type UsageCmd struct {
Usage string `json:"usage"`
}
func (u *UsageCmd) String() string {
return "Usage: " + u.Usage
}
// StringCmd command with info string
type StringCmd struct {
Info string `json:"info"`
}
func (s *StringCmd) String() string {
return s.Info
}
// ErrorCmd command with error
type ErrorCmd struct {
Error string `json:"error"`
}
func (e *ErrorCmd) String() string {
return "Error: " + e.Error
}
// TableObj network db table object
type TableObj struct {
Length int `json:"size"`
Elements []StringInterface `json:"entries"`
}
func (t *TableObj) String() string {
output := fmt.Sprintf("total entries: %d\n", t.Length)
for _, e := range t.Elements {
output += e.String()
}
return output
}
// PeerEntryObj entry in the networkdb peer table
type PeerEntryObj struct {
Index int `json:"-"`
Name string `json:"-=name"`
IP string `json:"ip"`
}
func (p *PeerEntryObj) String() string {
return fmt.Sprintf("%d) %s -> %s\n", p.Index, p.Name, p.IP)
}
// TableEntryObj network db table entry object
type TableEntryObj struct {
Index int `json:"-"`
Key string `json:"key"`
Value string `json:"value"`
Owner string `json:"owner"`
}
func (t *TableEntryObj) String() string {
return fmt.Sprintf("%d) k:`%s` -> v:`%s` owner:`%s`\n", t.Index, t.Key, t.Value, t.Owner)
}
// TableEndpointsResult fully typed message for proper unmarshaling on the client side
type TableEndpointsResult struct {
TableObj
Elements []TableEntryObj `json:"entries"`
}
// TablePeersResult fully typed message for proper unmarshaling on the client side
type TablePeersResult struct {
TableObj
Elements []PeerEntryObj `json:"entries"`
}

View File

@ -401,17 +401,23 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
return nil
}
// TableElem elem
type TableElem struct {
Value []byte
owner string
}
// GetTableByNetwork walks the networkdb by the give table and network id and
// returns a map of keys and values
func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]interface{} {
entries := make(map[string]interface{})
func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem {
entries := make(map[string]*TableElem)
nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
entry := v.(*entry)
if entry.deleting {
return false
}
key := k[strings.LastIndex(k, "/")+1:]
entries[key] = entry.value
entries[key] = &TableElem{Value: entry.value, owner: entry.node}
return false
})
return entries

View File

@ -1,17 +1,19 @@
package networkdb
import (
"encoding/base64"
"fmt"
"net/http"
"strings"
stackdump "github.com/docker/docker/pkg/signal"
"github.com/docker/libnetwork/common"
"github.com/docker/libnetwork/diagnose"
"github.com/sirupsen/logrus"
)
const (
missingParameter = "missing parameter"
dbNotAvailable = "database not available"
)
// NetDbPaths2Func TODO
@ -26,14 +28,21 @@ var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{
"/deleteentry": dbDeleteEntry,
"/getentry": dbGetEntry,
"/gettable": dbGetTable,
"/dump": dbStackTrace,
}
func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
_, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("join cluster")
if len(r.Form["members"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path))
log.Error("join cluster failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -41,51 +50,88 @@ func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) {
if ok {
err := nDB.Join(strings.Split(r.Form["members"][0], ","))
if err != nil {
fmt.Fprintf(w, "%s error in the DB join %s\n", r.URL.Path, err)
rsp := diagnose.FailCommand(fmt.Errorf("%s error in the DB join %s", r.URL.Path, err))
log.WithError(err).Error("join cluster failed")
diagnose.HTTPReply(w, rsp, json)
return
}
fmt.Fprintf(w, "OK\n")
log.Info("join cluster done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
_, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("network peers")
if len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path))
log.Error("network peers failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
nDB, ok := ctx.(*NetworkDB)
if ok {
peers := nDB.Peers(r.Form["nid"][0])
fmt.Fprintf(w, "Network:%s Total peers: %d\n", r.Form["nid"], len(peers))
rsp := &diagnose.TableObj{Length: len(peers)}
for i, peerInfo := range peers {
fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
rsp.Elements = append(rsp.Elements, &diagnose.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
}
log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network peers done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
_, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("cluster peers")
nDB, ok := ctx.(*NetworkDB)
if ok {
peers := nDB.ClusterPeers()
fmt.Fprintf(w, "Total peers: %d\n", len(peers))
rsp := &diagnose.TableObj{Length: len(peers)}
for i, peerInfo := range peers {
fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP)
rsp.Elements = append(rsp.Elements, &diagnose.PeerEntryObj{Index: i, Name: peerInfo.Name, IP: peerInfo.IP})
}
log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("cluster peers done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
unsafe, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("create entry")
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 ||
len(r.Form["value"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
log.Error("create entry failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -93,25 +139,48 @@ func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
nid := r.Form["nid"][0]
key := r.Form["key"][0]
value := r.Form["value"][0]
decodedValue := []byte(value)
if !unsafe {
var err error
decodedValue, err = base64.StdEncoding.DecodeString(value)
if err != nil {
log.WithError(err).Error("create entry failed")
diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
return
}
}
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.CreateEntry(tname, nid, key, []byte(value)); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
if err := nDB.CreateEntry(tname, nid, key, decodedValue); err != nil {
rsp := diagnose.FailCommand(err)
diagnose.HTTPReply(w, rsp, json)
log.WithError(err).Error("create entry failed")
return
}
fmt.Fprintf(w, "OK\n")
log.Info("create entry done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
unsafe, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("update entry")
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 ||
len(r.Form["value"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path))
log.Error("update entry failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -119,24 +188,46 @@ func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
nid := r.Form["nid"][0]
key := r.Form["key"][0]
value := r.Form["value"][0]
decodedValue := []byte(value)
if !unsafe {
var err error
decodedValue, err = base64.StdEncoding.DecodeString(value)
if err != nil {
log.WithError(err).Error("update entry failed")
diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
return
}
}
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.UpdateEntry(tname, nid, key, []byte(value)); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
if err := nDB.UpdateEntry(tname, nid, key, decodedValue); err != nil {
log.WithError(err).Error("update entry failed")
diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
return
}
fmt.Fprintf(w, "OK\n")
log.Info("update entry done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
_, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("delete entry")
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
log.Error("delete entry failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -148,20 +239,32 @@ func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
if ok {
err := nDB.DeleteEntry(tname, nid, key)
if err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
log.WithError(err).Error("delete entry failed")
diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
return
}
fmt.Fprintf(w, "OK\n")
log.Info("delete entry done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
unsafe, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("get entry")
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 ||
len(r.Form["key"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path))
log.Error("get entry failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -173,18 +276,39 @@ func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) {
if ok {
value, err := nDB.GetEntry(tname, nid, key)
if err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
log.WithError(err).Error("get entry failed")
diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
return
}
fmt.Fprintf(w, "key:`%s` value:`%s`\n", key, string(value))
var encodedValue string
if unsafe {
encodedValue = string(value)
} else {
encodedValue = base64.StdEncoding.EncodeToString(value)
}
rsp := &diagnose.TableEntryObj{Key: key, Value: encodedValue}
log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("update entry done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
_, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("join network")
if len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
log.Error("join network failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -193,18 +317,30 @@ func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.JoinNetwork(nid); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
log.WithError(err).Error("join network failed")
diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
return
}
fmt.Fprintf(w, "OK\n")
log.Info("join network done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
_, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("leave network")
if len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path))
log.Error("leave network failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -213,19 +349,31 @@ func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) {
nDB, ok := ctx.(*NetworkDB)
if ok {
if err := nDB.LeaveNetwork(nid); err != nil {
diagnose.HTTPReplyError(w, err.Error(), "")
log.WithError(err).Error("leave network failed")
diagnose.HTTPReply(w, diagnose.FailCommand(err), json)
return
}
fmt.Fprintf(w, "OK\n")
log.Info("leave network done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(nil), json)
return
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}
func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
unsafe, json := diagnose.ParseHTTPFormOptions(r)
// audit logs
log := logrus.WithFields(logrus.Fields{"component": "diagnose", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()})
log.Info("get table")
if len(r.Form["tname"]) < 1 ||
len(r.Form["nid"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path))
log.Error("get table failed, wrong input")
diagnose.HTTPReply(w, rsp, json)
return
}
@ -235,20 +383,26 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
nDB, ok := ctx.(*NetworkDB)
if ok {
table := nDB.GetTableByNetwork(tname, nid)
fmt.Fprintf(w, "total elements: %d\n", len(table))
i := 0
rsp := &diagnose.TableObj{Length: len(table)}
var i = 0
for k, v := range table {
fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, string(v.([]byte)))
i++
var encodedValue string
if unsafe {
encodedValue = string(v.Value)
} else {
encodedValue = base64.StdEncoding.EncodeToString(v.Value)
}
rsp.Elements = append(rsp.Elements,
&diagnose.TableEntryObj{
Index: i,
Key: k,
Value: encodedValue,
Owner: v.owner,
})
}
log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("get table done")
diagnose.HTTPReply(w, diagnose.CommandSucceed(rsp), json)
return
}
}
func dbStackTrace(ctx interface{}, w http.ResponseWriter, r *http.Request) {
path, err := stackdump.DumpStacks("/tmp/")
if err != nil {
logrus.WithError(err).Error("failed to write goroutines dump")
} else {
fmt.Fprintf(w, "goroutine stacks written to %s", path)
}
diagnose.HTTPReply(w, diagnose.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json)
}

View File

@ -16,7 +16,7 @@ import (
)
var nDB *networkdb.NetworkDB
var server diagnose.Server
var server *diagnose.Server
var ipAddr string
var testerPaths2Func = map[string]diagnose.HTTPHandlerFunc{
@ -49,7 +49,7 @@ func Server(args []string) {
ipAddr = ip
logrus.Infof("%s uses IP %s\n", localNodeName, ipAddr)
server = diagnose.Server{}
server = diagnose.New()
server.Init()
conf := networkdb.DefaultConfig()
conf.Hostname = localNodeName

View File

@ -32,7 +32,8 @@ func watchTable(ctx interface{}, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["tname"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
diagnose.HTTPReply(w, rsp, &diagnose.JSONOutput{})
return
}
@ -56,7 +57,8 @@ func watchTableEntries(ctx interface{}, w http.ResponseWriter, r *http.Request)
r.ParseForm()
diagnose.DebugHTTPForm(r)
if len(r.Form["tname"]) < 1 {
diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
rsp := diagnose.WrongCommand(missingParameter, fmt.Sprintf("%s?tname=table_name", r.URL.Path))
diagnose.HTTPReply(w, rsp, &diagnose.JSONOutput{})
return
}