From 60b5add4af9e971f4b5a23a1caeae18ca42373b8 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Mon, 10 Jul 2017 12:05:58 -0700 Subject: [PATCH] NetworkDB allow setting PacketSize - Introduce the possibility to specify the max buffer length in network DB. This will allow to use the whole MTU limit of the interface - Add queue stats per network, it can be handy to identify the node's throughput per network and identify unbalance between nodes that can point to an MTU missconfiguration Signed-off-by: Flavio Crisciani --- libnetwork/agent.go | 23 +++++++++----- libnetwork/config/config.go | 35 ++++++++++++++------- libnetwork/networkdb/cluster.go | 37 ++++++++++++++++++---- libnetwork/networkdb/message.go | 4 --- libnetwork/networkdb/networkdb.go | 43 ++++++++++++++++++++++++-- libnetwork/networkdb/networkdb_test.go | 8 ++--- 6 files changed, 113 insertions(+), 37 deletions(-) diff --git a/libnetwork/agent.go b/libnetwork/agent.go index b93ca288e0..4877df1c34 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -214,8 +214,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error { listen := clusterProvider.GetListenAddress() listenAddr, _, _ := net.SplitHostPort(listen) - logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v", - listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList) + logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d", + listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU) if advAddr != "" && agent == nil { if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil { logrus.Errorf("error in agentInit: %v", err) @@ -286,12 +286,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID()) logrus.Info("Gossip cluster hostname ", nodeName) - nDB, err := networkdb.New(&networkdb.Config{ - BindAddr: listenAddr, - AdvertiseAddr: advertiseAddr, - NodeName: nodeName, - Keys: keys, - }) + netDBConf := networkdb.DefaultConfig() + netDBConf.NodeName = nodeName + netDBConf.BindAddr = listenAddr + netDBConf.AdvertiseAddr = advertiseAddr + netDBConf.Keys = keys + if c.Config().Daemon.NetworkControlPlaneMTU != 0 { + // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr. + // To be on the safe side let's cut 100 bytes + netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100) + logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d", + c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize) + } + nDB, err := networkdb.New(netDBConf) if err != nil { return err diff --git a/libnetwork/config/config.go b/libnetwork/config/config.go index 3acb4320c4..f3c49e89b1 100644 --- a/libnetwork/config/config.go +++ b/libnetwork/config/config.go @@ -26,14 +26,15 @@ type Config struct { // DaemonCfg represents libnetwork core configuration type DaemonCfg struct { - Debug bool - Experimental bool - DataDir string - DefaultNetwork string - DefaultDriver string - Labels []string - DriverCfg map[string]interface{} - ClusterProvider cluster.Provider + Debug bool + Experimental bool + DataDir string + DefaultNetwork string + DefaultDriver string + Labels []string + DriverCfg map[string]interface{} + ClusterProvider cluster.Provider + NetworkControlPlaneMTU int } // ClusterCfg represents cluster configuration @@ -221,6 +222,19 @@ func OptionExperimental(exp bool) Option { } } +// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU +func OptionNetworkControlPlaneMTU(exp int) Option { + return func(c *Config) { + logrus.Debugf("Network Control Plane MTU: %d", exp) + if exp < 1500 { + // if exp == 0 the value won't be used + logrus.Warnf("Received a MTU of %d, this value is very low,", + "the network control plane can misbehave", exp) + } + c.Daemon.NetworkControlPlaneMTU = exp + } +} + // ProcessOptions processes options and stores it in config func (c *Config) ProcessOptions(options ...Option) { for _, opt := range options { @@ -232,10 +246,7 @@ func (c *Config) ProcessOptions(options ...Option) { // IsValidName validates configuration objects supported by libnetwork func IsValidName(name string) bool { - if strings.TrimSpace(name) == "" { - return false - } - return true + return strings.TrimSpace(name) != "" } // OptionLocalKVProvider function returns an option setter for kvstore provider diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 01dab7e1ec..e011a5336e 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -98,10 +98,14 @@ func (nDB *NetworkDB) RemoveKey(key []byte) { } func (nDB *NetworkDB) clusterInit() error { + nDB.lastStatsTimestamp = time.Now() + nDB.lastHealthTimestamp = nDB.lastStatsTimestamp + config := memberlist.DefaultLANConfig() config.Name = nDB.config.NodeName config.BindAddr = nDB.config.BindAddr config.AdvertiseAddr = nDB.config.AdvertiseAddr + config.UDPBufferSize = nDB.config.PacketBufferSize if nDB.config.BindPort != 0 { config.BindPort = nDB.config.BindPort @@ -199,9 +203,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error { mlist := nDB.memberlist if _, err := mlist.Join(members); err != nil { - // Incase of failure, keep retrying join until it succeeds or the cluster is shutdown. + // In case of failure, keep retrying join until it succeeds or the cluster is shutdown. go nDB.retryJoin(members, nDB.stopCh) - return fmt.Errorf("could not join node to memberlist: %v", err) } @@ -372,11 +375,21 @@ func (nDB *NetworkDB) gossip() { networkNodes[nid] = nDB.networkNodes[nid] } + printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod + printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod nDB.RUnlock() + if printHealth { + healthScore := nDB.memberlist.GetHealthScore() + if healthScore != 0 { + logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore) + } + nDB.lastHealthTimestamp = time.Now() + } + for nid, nodes := range networkNodes { mNodes := nDB.mRandomNodes(3, nodes) - bytesAvail := udpSendBuf - compoundHeaderOverhead + bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead nDB.RLock() network, ok := thisNodeNetworks[nid] @@ -397,6 +410,14 @@ func (nDB *NetworkDB) gossip() { } msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail) + // Collect stats and print the queue info, note this code is here also to have a view of the queues empty + network.qMessagesSent += len(msgs) + if printStats { + logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d", + nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) + network.qMessagesSent = 0 + } + if len(msgs) == 0 { continue } @@ -414,11 +435,15 @@ func (nDB *NetworkDB) gossip() { } // Send the compound message - if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil { + if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil { logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err) } } } + // Reset the stats + if printStats { + nDB.lastStatsTimestamp = time.Now() + } } func (nDB *NetworkDB) bulkSyncTables() { @@ -589,7 +614,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b nDB.bulkSyncAckTbl[node] = ch nDB.Unlock() - err = nDB.memberlist.SendToTCP(&mnode.Node, buf) + err = nDB.memberlist.SendReliable(&mnode.Node, buf) if err != nil { nDB.Lock() delete(nDB.bulkSyncAckTbl, node) @@ -606,7 +631,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b case <-t.C: logrus.Errorf("Bulk sync to node %s timed out", node) case <-ch: - logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime)) + logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime)) } t.Stop() } diff --git a/libnetwork/networkdb/message.go b/libnetwork/networkdb/message.go index a861752bd4..81a6d832a6 100644 --- a/libnetwork/networkdb/message.go +++ b/libnetwork/networkdb/message.go @@ -3,10 +3,6 @@ package networkdb import "github.com/gogo/protobuf/proto" const ( - // Max udp message size chosen to avoid network packet - // fragmentation. - udpSendBuf = 1400 - // Compound message header overhead 1 byte(message type) + 4 // bytes (num messages) compoundHeaderOverhead = 5 diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 98903c89ba..e4cc9df12c 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -5,6 +5,7 @@ package networkdb import ( "fmt" "net" + "os" "strings" "sync" "time" @@ -93,6 +94,12 @@ type NetworkDB struct { // bootStrapIP is the list of IPs that can be used to bootstrap // the gossip. bootStrapIP []net.IP + + // lastStatsTimestamp is the last timestamp when the stats got printed + lastStatsTimestamp time.Time + + // lastHealthTimestamp is the last timestamp when the health score got printed + lastHealthTimestamp time.Time } // PeerInfo represents the peer (gossip cluster) nodes of a network @@ -126,6 +133,9 @@ type network struct { // The broadcast queue for table event gossip. This is only // initialized for this node's network attachment entries. tableBroadcasts *memberlist.TransmitLimitedQueue + + // Number of gossip messages sent related to this network during the last stats collection period + qMessagesSent int } // Config represents the configuration of the networdb instance and @@ -149,6 +159,21 @@ type Config struct { // Keys to be added to the Keyring of the memberlist. Key at index // 0 is the primary key Keys [][]byte + + // PacketBufferSize is the maximum number of bytes that memberlist will + // put in a packet (this will be for UDP packets by default with a NetTransport). + // A safe value for this is typically 1400 bytes (which is the default). However, + // depending on your network's MTU (Maximum Transmission Unit) you may + // be able to increase this to get more content into each gossip packet. + PacketBufferSize int + + // StatsPrintPeriod the period to use to print queue stats + // Default is 5min + StatsPrintPeriod time.Duration + + // HealthPrintPeriod the period to use to print the health score + // Default is 1min + HealthPrintPeriod time.Duration } // entry defines a table entry @@ -171,6 +196,18 @@ type entry struct { reapTime time.Duration } +// DefaultConfig returns a NetworkDB config with default values +func DefaultConfig() *Config { + hostname, _ := os.Hostname() + return &Config{ + NodeName: hostname, + BindAddr: "0.0.0.0", + PacketBufferSize: 1400, + StatsPrintPeriod: 5 * time.Minute, + HealthPrintPeriod: 1 * time.Minute, + } +} + // New creates a new instance of NetworkDB using the Config passed by // the caller. func New(c *Config) (*NetworkDB, error) { @@ -200,6 +237,7 @@ func New(c *Config) (*NetworkDB, error) { // instances passed by the caller in the form of addr:port func (nDB *NetworkDB) Join(members []string) error { nDB.Lock() + nDB.bootStrapIP = make([]net.IP, 0, len(members)) for _, m := range members { nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m)) } @@ -481,9 +519,8 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: func() int { nDB.RLock() - num := len(nDB.networkNodes[nid]) - nDB.RUnlock() - return num + defer nDB.RUnlock() + return len(nDB.networkNodes[nid]) }, RetransmitMult: 4, } diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index cde693f77c..cb93eefd08 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -30,10 +30,10 @@ func TestMain(m *testing.M) { func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB { var dbs []*NetworkDB for i := 0; i < num; i++ { - db, err := New(&Config{ - NodeName: fmt.Sprintf("%s%d", namePrefix, i+1), - BindPort: int(atomic.AddInt32(&dbPort, 1)), - }) + conf := DefaultConfig() + conf.NodeName = fmt.Sprintf("%s%d", namePrefix, i+1) + conf.BindPort = int(atomic.AddInt32(&dbPort, 1)) + db, err := New(conf) require.NoError(t, err) if i != 0 {