mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #1839 from fcrisciani/datapath-mtu
NetworkDB to honor the Network Control Plane MTU
This commit is contained in:
commit
1fea0ffc8e
6 changed files with 113 additions and 37 deletions
|
@ -214,8 +214,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
|
||||||
listen := clusterProvider.GetListenAddress()
|
listen := clusterProvider.GetListenAddress()
|
||||||
listenAddr, _, _ := net.SplitHostPort(listen)
|
listenAddr, _, _ := net.SplitHostPort(listen)
|
||||||
|
|
||||||
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
|
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
|
||||||
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
|
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU)
|
||||||
if advAddr != "" && agent == nil {
|
if advAddr != "" && agent == nil {
|
||||||
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
|
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
|
||||||
logrus.Errorf("error in agentInit: %v", err)
|
logrus.Errorf("error in agentInit: %v", err)
|
||||||
|
@ -286,12 +286,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
|
||||||
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
|
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
|
||||||
logrus.Info("Gossip cluster hostname ", nodeName)
|
logrus.Info("Gossip cluster hostname ", nodeName)
|
||||||
|
|
||||||
nDB, err := networkdb.New(&networkdb.Config{
|
netDBConf := networkdb.DefaultConfig()
|
||||||
BindAddr: listenAddr,
|
netDBConf.NodeName = nodeName
|
||||||
AdvertiseAddr: advertiseAddr,
|
netDBConf.BindAddr = listenAddr
|
||||||
NodeName: nodeName,
|
netDBConf.AdvertiseAddr = advertiseAddr
|
||||||
Keys: keys,
|
netDBConf.Keys = keys
|
||||||
})
|
if c.Config().Daemon.NetworkControlPlaneMTU != 0 {
|
||||||
|
// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
|
||||||
|
// To be on the safe side let's cut 100 bytes
|
||||||
|
netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100)
|
||||||
|
logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
|
||||||
|
c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
|
||||||
|
}
|
||||||
|
nDB, err := networkdb.New(netDBConf)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -26,14 +26,15 @@ type Config struct {
|
||||||
|
|
||||||
// DaemonCfg represents libnetwork core configuration
|
// DaemonCfg represents libnetwork core configuration
|
||||||
type DaemonCfg struct {
|
type DaemonCfg struct {
|
||||||
Debug bool
|
Debug bool
|
||||||
Experimental bool
|
Experimental bool
|
||||||
DataDir string
|
DataDir string
|
||||||
DefaultNetwork string
|
DefaultNetwork string
|
||||||
DefaultDriver string
|
DefaultDriver string
|
||||||
Labels []string
|
Labels []string
|
||||||
DriverCfg map[string]interface{}
|
DriverCfg map[string]interface{}
|
||||||
ClusterProvider cluster.Provider
|
ClusterProvider cluster.Provider
|
||||||
|
NetworkControlPlaneMTU int
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClusterCfg represents cluster configuration
|
// ClusterCfg represents cluster configuration
|
||||||
|
@ -221,6 +222,19 @@ func OptionExperimental(exp bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU
|
||||||
|
func OptionNetworkControlPlaneMTU(exp int) Option {
|
||||||
|
return func(c *Config) {
|
||||||
|
logrus.Debugf("Network Control Plane MTU: %d", exp)
|
||||||
|
if exp < 1500 {
|
||||||
|
// if exp == 0 the value won't be used
|
||||||
|
logrus.Warnf("Received a MTU of %d, this value is very low,",
|
||||||
|
"the network control plane can misbehave", exp)
|
||||||
|
}
|
||||||
|
c.Daemon.NetworkControlPlaneMTU = exp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ProcessOptions processes options and stores it in config
|
// ProcessOptions processes options and stores it in config
|
||||||
func (c *Config) ProcessOptions(options ...Option) {
|
func (c *Config) ProcessOptions(options ...Option) {
|
||||||
for _, opt := range options {
|
for _, opt := range options {
|
||||||
|
@ -232,10 +246,7 @@ func (c *Config) ProcessOptions(options ...Option) {
|
||||||
|
|
||||||
// IsValidName validates configuration objects supported by libnetwork
|
// IsValidName validates configuration objects supported by libnetwork
|
||||||
func IsValidName(name string) bool {
|
func IsValidName(name string) bool {
|
||||||
if strings.TrimSpace(name) == "" {
|
return strings.TrimSpace(name) != ""
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OptionLocalKVProvider function returns an option setter for kvstore provider
|
// OptionLocalKVProvider function returns an option setter for kvstore provider
|
||||||
|
|
|
@ -98,10 +98,14 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nDB *NetworkDB) clusterInit() error {
|
func (nDB *NetworkDB) clusterInit() error {
|
||||||
|
nDB.lastStatsTimestamp = time.Now()
|
||||||
|
nDB.lastHealthTimestamp = nDB.lastStatsTimestamp
|
||||||
|
|
||||||
config := memberlist.DefaultLANConfig()
|
config := memberlist.DefaultLANConfig()
|
||||||
config.Name = nDB.config.NodeName
|
config.Name = nDB.config.NodeName
|
||||||
config.BindAddr = nDB.config.BindAddr
|
config.BindAddr = nDB.config.BindAddr
|
||||||
config.AdvertiseAddr = nDB.config.AdvertiseAddr
|
config.AdvertiseAddr = nDB.config.AdvertiseAddr
|
||||||
|
config.UDPBufferSize = nDB.config.PacketBufferSize
|
||||||
|
|
||||||
if nDB.config.BindPort != 0 {
|
if nDB.config.BindPort != 0 {
|
||||||
config.BindPort = nDB.config.BindPort
|
config.BindPort = nDB.config.BindPort
|
||||||
|
@ -199,9 +203,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
|
||||||
mlist := nDB.memberlist
|
mlist := nDB.memberlist
|
||||||
|
|
||||||
if _, err := mlist.Join(members); err != nil {
|
if _, err := mlist.Join(members); err != nil {
|
||||||
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
|
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
|
||||||
go nDB.retryJoin(members, nDB.stopCh)
|
go nDB.retryJoin(members, nDB.stopCh)
|
||||||
|
|
||||||
return fmt.Errorf("could not join node to memberlist: %v", err)
|
return fmt.Errorf("could not join node to memberlist: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,11 +375,21 @@ func (nDB *NetworkDB) gossip() {
|
||||||
networkNodes[nid] = nDB.networkNodes[nid]
|
networkNodes[nid] = nDB.networkNodes[nid]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
|
||||||
|
printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod
|
||||||
nDB.RUnlock()
|
nDB.RUnlock()
|
||||||
|
|
||||||
|
if printHealth {
|
||||||
|
healthScore := nDB.memberlist.GetHealthScore()
|
||||||
|
if healthScore != 0 {
|
||||||
|
logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore)
|
||||||
|
}
|
||||||
|
nDB.lastHealthTimestamp = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
for nid, nodes := range networkNodes {
|
for nid, nodes := range networkNodes {
|
||||||
mNodes := nDB.mRandomNodes(3, nodes)
|
mNodes := nDB.mRandomNodes(3, nodes)
|
||||||
bytesAvail := udpSendBuf - compoundHeaderOverhead
|
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead
|
||||||
|
|
||||||
nDB.RLock()
|
nDB.RLock()
|
||||||
network, ok := thisNodeNetworks[nid]
|
network, ok := thisNodeNetworks[nid]
|
||||||
|
@ -397,6 +410,14 @@ func (nDB *NetworkDB) gossip() {
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
|
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
|
||||||
|
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
|
||||||
|
network.qMessagesSent += len(msgs)
|
||||||
|
if printStats {
|
||||||
|
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
|
||||||
|
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
|
||||||
|
network.qMessagesSent = 0
|
||||||
|
}
|
||||||
|
|
||||||
if len(msgs) == 0 {
|
if len(msgs) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -414,11 +435,15 @@ func (nDB *NetworkDB) gossip() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the compound message
|
// Send the compound message
|
||||||
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
|
if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil {
|
||||||
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
|
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Reset the stats
|
||||||
|
if printStats {
|
||||||
|
nDB.lastStatsTimestamp = time.Now()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nDB *NetworkDB) bulkSyncTables() {
|
func (nDB *NetworkDB) bulkSyncTables() {
|
||||||
|
@ -589,7 +614,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
|
||||||
nDB.bulkSyncAckTbl[node] = ch
|
nDB.bulkSyncAckTbl[node] = ch
|
||||||
nDB.Unlock()
|
nDB.Unlock()
|
||||||
|
|
||||||
err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
|
err = nDB.memberlist.SendReliable(&mnode.Node, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
nDB.Lock()
|
nDB.Lock()
|
||||||
delete(nDB.bulkSyncAckTbl, node)
|
delete(nDB.bulkSyncAckTbl, node)
|
||||||
|
@ -606,7 +631,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
logrus.Errorf("Bulk sync to node %s timed out", node)
|
logrus.Errorf("Bulk sync to node %s timed out", node)
|
||||||
case <-ch:
|
case <-ch:
|
||||||
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
|
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
|
||||||
}
|
}
|
||||||
t.Stop()
|
t.Stop()
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,10 +3,6 @@ package networkdb
|
||||||
import "github.com/gogo/protobuf/proto"
|
import "github.com/gogo/protobuf/proto"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Max udp message size chosen to avoid network packet
|
|
||||||
// fragmentation.
|
|
||||||
udpSendBuf = 1400
|
|
||||||
|
|
||||||
// Compound message header overhead 1 byte(message type) + 4
|
// Compound message header overhead 1 byte(message type) + 4
|
||||||
// bytes (num messages)
|
// bytes (num messages)
|
||||||
compoundHeaderOverhead = 5
|
compoundHeaderOverhead = 5
|
||||||
|
|
|
@ -5,6 +5,7 @@ package networkdb
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -93,6 +94,12 @@ type NetworkDB struct {
|
||||||
// bootStrapIP is the list of IPs that can be used to bootstrap
|
// bootStrapIP is the list of IPs that can be used to bootstrap
|
||||||
// the gossip.
|
// the gossip.
|
||||||
bootStrapIP []net.IP
|
bootStrapIP []net.IP
|
||||||
|
|
||||||
|
// lastStatsTimestamp is the last timestamp when the stats got printed
|
||||||
|
lastStatsTimestamp time.Time
|
||||||
|
|
||||||
|
// lastHealthTimestamp is the last timestamp when the health score got printed
|
||||||
|
lastHealthTimestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerInfo represents the peer (gossip cluster) nodes of a network
|
// PeerInfo represents the peer (gossip cluster) nodes of a network
|
||||||
|
@ -126,6 +133,9 @@ type network struct {
|
||||||
// The broadcast queue for table event gossip. This is only
|
// The broadcast queue for table event gossip. This is only
|
||||||
// initialized for this node's network attachment entries.
|
// initialized for this node's network attachment entries.
|
||||||
tableBroadcasts *memberlist.TransmitLimitedQueue
|
tableBroadcasts *memberlist.TransmitLimitedQueue
|
||||||
|
|
||||||
|
// Number of gossip messages sent related to this network during the last stats collection period
|
||||||
|
qMessagesSent int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config represents the configuration of the networdb instance and
|
// Config represents the configuration of the networdb instance and
|
||||||
|
@ -149,6 +159,21 @@ type Config struct {
|
||||||
// Keys to be added to the Keyring of the memberlist. Key at index
|
// Keys to be added to the Keyring of the memberlist. Key at index
|
||||||
// 0 is the primary key
|
// 0 is the primary key
|
||||||
Keys [][]byte
|
Keys [][]byte
|
||||||
|
|
||||||
|
// PacketBufferSize is the maximum number of bytes that memberlist will
|
||||||
|
// put in a packet (this will be for UDP packets by default with a NetTransport).
|
||||||
|
// A safe value for this is typically 1400 bytes (which is the default). However,
|
||||||
|
// depending on your network's MTU (Maximum Transmission Unit) you may
|
||||||
|
// be able to increase this to get more content into each gossip packet.
|
||||||
|
PacketBufferSize int
|
||||||
|
|
||||||
|
// StatsPrintPeriod the period to use to print queue stats
|
||||||
|
// Default is 5min
|
||||||
|
StatsPrintPeriod time.Duration
|
||||||
|
|
||||||
|
// HealthPrintPeriod the period to use to print the health score
|
||||||
|
// Default is 1min
|
||||||
|
HealthPrintPeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// entry defines a table entry
|
// entry defines a table entry
|
||||||
|
@ -171,6 +196,18 @@ type entry struct {
|
||||||
reapTime time.Duration
|
reapTime time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DefaultConfig returns a NetworkDB config with default values
|
||||||
|
func DefaultConfig() *Config {
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
return &Config{
|
||||||
|
NodeName: hostname,
|
||||||
|
BindAddr: "0.0.0.0",
|
||||||
|
PacketBufferSize: 1400,
|
||||||
|
StatsPrintPeriod: 5 * time.Minute,
|
||||||
|
HealthPrintPeriod: 1 * time.Minute,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a new instance of NetworkDB using the Config passed by
|
// New creates a new instance of NetworkDB using the Config passed by
|
||||||
// the caller.
|
// the caller.
|
||||||
func New(c *Config) (*NetworkDB, error) {
|
func New(c *Config) (*NetworkDB, error) {
|
||||||
|
@ -200,6 +237,7 @@ func New(c *Config) (*NetworkDB, error) {
|
||||||
// instances passed by the caller in the form of addr:port
|
// instances passed by the caller in the form of addr:port
|
||||||
func (nDB *NetworkDB) Join(members []string) error {
|
func (nDB *NetworkDB) Join(members []string) error {
|
||||||
nDB.Lock()
|
nDB.Lock()
|
||||||
|
nDB.bootStrapIP = make([]net.IP, 0, len(members))
|
||||||
for _, m := range members {
|
for _, m := range members {
|
||||||
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
|
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
|
||||||
}
|
}
|
||||||
|
@ -481,9 +519,8 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
|
||||||
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
|
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
|
||||||
NumNodes: func() int {
|
NumNodes: func() int {
|
||||||
nDB.RLock()
|
nDB.RLock()
|
||||||
num := len(nDB.networkNodes[nid])
|
defer nDB.RUnlock()
|
||||||
nDB.RUnlock()
|
return len(nDB.networkNodes[nid])
|
||||||
return num
|
|
||||||
},
|
},
|
||||||
RetransmitMult: 4,
|
RetransmitMult: 4,
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,10 +30,10 @@ func TestMain(m *testing.M) {
|
||||||
func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
|
func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
|
||||||
var dbs []*NetworkDB
|
var dbs []*NetworkDB
|
||||||
for i := 0; i < num; i++ {
|
for i := 0; i < num; i++ {
|
||||||
db, err := New(&Config{
|
conf := DefaultConfig()
|
||||||
NodeName: fmt.Sprintf("%s%d", namePrefix, i+1),
|
conf.NodeName = fmt.Sprintf("%s%d", namePrefix, i+1)
|
||||||
BindPort: int(atomic.AddInt32(&dbPort, 1)),
|
conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
|
||||||
})
|
db, err := New(conf)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
if i != 0 {
|
if i != 0 {
|
||||||
|
|
Loading…
Reference in a new issue