mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #2270 from ctelfer/lbdsr
Use direct server return in east-west overlay load balancing
This commit is contained in:
commit
e0d1cdd3d4
7 changed files with 164 additions and 49 deletions
|
@ -700,6 +700,9 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// XXX This should be made driver agnostic. See comment below.
|
||||||
|
const overlayDSROptionString = "dsr"
|
||||||
|
|
||||||
// NewNetwork creates a new network of the specified network type. The options
|
// NewNetwork creates a new network of the specified network type. The options
|
||||||
// are network specific and modeled in a generic way.
|
// are network specific and modeled in a generic way.
|
||||||
func (c *controller) NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error) {
|
func (c *controller) NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error) {
|
||||||
|
@ -723,15 +726,16 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
|
||||||
defaultIpam := defaultIpamForNetworkType(networkType)
|
defaultIpam := defaultIpamForNetworkType(networkType)
|
||||||
// Construct the network object
|
// Construct the network object
|
||||||
network := &network{
|
network := &network{
|
||||||
name: name,
|
name: name,
|
||||||
networkType: networkType,
|
networkType: networkType,
|
||||||
generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)},
|
generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)},
|
||||||
ipamType: defaultIpam,
|
ipamType: defaultIpam,
|
||||||
id: id,
|
id: id,
|
||||||
created: time.Now(),
|
created: time.Now(),
|
||||||
ctrlr: c,
|
ctrlr: c,
|
||||||
persist: true,
|
persist: true,
|
||||||
drvOnce: &sync.Once{},
|
drvOnce: &sync.Once{},
|
||||||
|
loadBalancerMode: loadBalancerModeDefault,
|
||||||
}
|
}
|
||||||
|
|
||||||
network.processOptions(options...)
|
network.processOptions(options...)
|
||||||
|
@ -829,6 +833,21 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// XXX If the driver type is "overlay" check the options for DSR
|
||||||
|
// being set. If so, set the network's load balancing mode to DSR.
|
||||||
|
// This should really be done in a network option, but due to
|
||||||
|
// time pressure to get this in without adding changes to moby,
|
||||||
|
// swarm and CLI, it is being implemented as a driver-specific
|
||||||
|
// option. Unfortunately, drivers can't influence the core
|
||||||
|
// "libnetwork.network" data type. Hence we need this hack code
|
||||||
|
// to implement in this manner.
|
||||||
|
if gval, ok := network.generic[netlabel.GenericData]; ok && network.networkType == "overlay" {
|
||||||
|
optMap := gval.(map[string]string)
|
||||||
|
if _, ok := optMap[overlayDSROptionString]; ok {
|
||||||
|
network.loadBalancerMode = loadBalancerModeDSR
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
addToStore:
|
addToStore:
|
||||||
// First store the endpoint count, then the network. To avoid to
|
// First store the endpoint count, then the network. To avoid to
|
||||||
// end up with a datastore containing a network and not an epCnt,
|
// end up with a datastore containing a network and not an epCnt,
|
||||||
|
|
|
@ -145,3 +145,23 @@ const (
|
||||||
// addresses.
|
// addresses.
|
||||||
SourceHashing = "sh"
|
SourceHashing = "sh"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ConnFwdMask is a mask for the fwd methods
|
||||||
|
ConnFwdMask = 0x0007
|
||||||
|
|
||||||
|
// ConnFwdMasq denotes forwarding via masquerading/NAT
|
||||||
|
ConnFwdMasq = 0x0000
|
||||||
|
|
||||||
|
// ConnFwdLocalNode denotes forwarding to a local node
|
||||||
|
ConnFwdLocalNode = 0x0001
|
||||||
|
|
||||||
|
// ConnFwdTunnel denotes forwarding via a tunnel
|
||||||
|
ConnFwdTunnel = 0x0002
|
||||||
|
|
||||||
|
// ConnFwdDirectRoute denotes forwarding via direct routing
|
||||||
|
ConnFwdDirectRoute = 0x0003
|
||||||
|
|
||||||
|
// ConnFwdBypass denotes forwarding while bypassing the cache
|
||||||
|
ConnFwdBypass = 0x0004
|
||||||
|
)
|
||||||
|
|
|
@ -199,43 +199,50 @@ func (i *IpamInfo) UnmarshalJSON(data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
type network struct {
|
type network struct {
|
||||||
ctrlr *controller
|
ctrlr *controller
|
||||||
name string
|
name string
|
||||||
networkType string
|
networkType string
|
||||||
id string
|
id string
|
||||||
created time.Time
|
created time.Time
|
||||||
scope string // network data scope
|
scope string // network data scope
|
||||||
labels map[string]string
|
labels map[string]string
|
||||||
ipamType string
|
ipamType string
|
||||||
ipamOptions map[string]string
|
ipamOptions map[string]string
|
||||||
addrSpace string
|
addrSpace string
|
||||||
ipamV4Config []*IpamConf
|
ipamV4Config []*IpamConf
|
||||||
ipamV6Config []*IpamConf
|
ipamV6Config []*IpamConf
|
||||||
ipamV4Info []*IpamInfo
|
ipamV4Info []*IpamInfo
|
||||||
ipamV6Info []*IpamInfo
|
ipamV6Info []*IpamInfo
|
||||||
enableIPv6 bool
|
enableIPv6 bool
|
||||||
postIPv6 bool
|
postIPv6 bool
|
||||||
epCnt *endpointCnt
|
epCnt *endpointCnt
|
||||||
generic options.Generic
|
generic options.Generic
|
||||||
dbIndex uint64
|
dbIndex uint64
|
||||||
dbExists bool
|
dbExists bool
|
||||||
persist bool
|
persist bool
|
||||||
stopWatchCh chan struct{}
|
stopWatchCh chan struct{}
|
||||||
drvOnce *sync.Once
|
drvOnce *sync.Once
|
||||||
resolverOnce sync.Once
|
resolverOnce sync.Once
|
||||||
resolver []Resolver
|
resolver []Resolver
|
||||||
internal bool
|
internal bool
|
||||||
attachable bool
|
attachable bool
|
||||||
inDelete bool
|
inDelete bool
|
||||||
ingress bool
|
ingress bool
|
||||||
driverTables []networkDBTable
|
driverTables []networkDBTable
|
||||||
dynamic bool
|
dynamic bool
|
||||||
configOnly bool
|
configOnly bool
|
||||||
configFrom string
|
configFrom string
|
||||||
loadBalancerIP net.IP
|
loadBalancerIP net.IP
|
||||||
|
loadBalancerMode string
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
loadBalancerModeNAT = "NAT"
|
||||||
|
loadBalancerModeDSR = "DSR"
|
||||||
|
loadBalancerModeDefault = loadBalancerModeNAT
|
||||||
|
)
|
||||||
|
|
||||||
func (n *network) Name() string {
|
func (n *network) Name() string {
|
||||||
n.Lock()
|
n.Lock()
|
||||||
defer n.Unlock()
|
defer n.Unlock()
|
||||||
|
@ -475,6 +482,7 @@ func (n *network) CopyTo(o datastore.KVObject) error {
|
||||||
dstN.configOnly = n.configOnly
|
dstN.configOnly = n.configOnly
|
||||||
dstN.configFrom = n.configFrom
|
dstN.configFrom = n.configFrom
|
||||||
dstN.loadBalancerIP = n.loadBalancerIP
|
dstN.loadBalancerIP = n.loadBalancerIP
|
||||||
|
dstN.loadBalancerMode = n.loadBalancerMode
|
||||||
|
|
||||||
// copy labels
|
// copy labels
|
||||||
if dstN.labels == nil {
|
if dstN.labels == nil {
|
||||||
|
@ -592,6 +600,7 @@ func (n *network) MarshalJSON() ([]byte, error) {
|
||||||
netMap["configOnly"] = n.configOnly
|
netMap["configOnly"] = n.configOnly
|
||||||
netMap["configFrom"] = n.configFrom
|
netMap["configFrom"] = n.configFrom
|
||||||
netMap["loadBalancerIP"] = n.loadBalancerIP
|
netMap["loadBalancerIP"] = n.loadBalancerIP
|
||||||
|
netMap["loadBalancerMode"] = n.loadBalancerMode
|
||||||
return json.Marshal(netMap)
|
return json.Marshal(netMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,6 +714,10 @@ func (n *network) UnmarshalJSON(b []byte) (err error) {
|
||||||
if v, ok := netMap["loadBalancerIP"]; ok {
|
if v, ok := netMap["loadBalancerIP"]; ok {
|
||||||
n.loadBalancerIP = net.ParseIP(v.(string))
|
n.loadBalancerIP = net.ParseIP(v.(string))
|
||||||
}
|
}
|
||||||
|
n.loadBalancerMode = loadBalancerModeDefault
|
||||||
|
if v, ok := netMap["loadBalancerMode"]; ok {
|
||||||
|
n.loadBalancerMode = v.(string)
|
||||||
|
}
|
||||||
// Reconcile old networks with the recently added `--ipv6` flag
|
// Reconcile old networks with the recently added `--ipv6` flag
|
||||||
if !n.enableIPv6 {
|
if !n.enableIPv6 {
|
||||||
n.enableIPv6 = len(n.ipamV6Info) > 0
|
n.enableIPv6 = len(n.ipamV6Info) > 0
|
||||||
|
|
|
@ -384,6 +384,36 @@ func (n *networkNamespace) RemoveAliasIP(ifName string, ip *net.IPNet) error {
|
||||||
return n.nlHandle.AddrDel(iface, &netlink.Addr{IPNet: ip})
|
return n.nlHandle.AddrDel(iface, &netlink.Addr{IPNet: ip})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *networkNamespace) DisableARPForVIP(srcName string) (Err error) {
|
||||||
|
dstName := ""
|
||||||
|
for _, i := range n.Interfaces() {
|
||||||
|
if i.SrcName() == srcName {
|
||||||
|
dstName = i.DstName()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if dstName == "" {
|
||||||
|
return fmt.Errorf("failed to find interface %s in sandbox", srcName)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := n.InvokeFunc(func() {
|
||||||
|
path := filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_ignore")
|
||||||
|
if err := ioutil.WriteFile(path, []byte{'1', '\n'}, 0644); err != nil {
|
||||||
|
Err = fmt.Errorf("Failed to set %s to 1: %v", path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
path = filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_announce")
|
||||||
|
if err := ioutil.WriteFile(path, []byte{'2', '\n'}, 0644); err != nil {
|
||||||
|
Err = fmt.Errorf("Failed to set %s to 2: %v", path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (n *networkNamespace) InvokeFunc(f func()) error {
|
func (n *networkNamespace) InvokeFunc(f func()) error {
|
||||||
return nsInvoke(n.nsPath(), func(nsFD int) error { return nil }, func(callerFD int) error {
|
return nsInvoke(n.nsPath(), func(nsFD int) error { return nil }, func(callerFD int) error {
|
||||||
f()
|
f()
|
||||||
|
|
|
@ -51,6 +51,10 @@ type Sandbox interface {
|
||||||
// RemoveAliasIP removes the passed IP address from the named interface
|
// RemoveAliasIP removes the passed IP address from the named interface
|
||||||
RemoveAliasIP(ifName string, ip *net.IPNet) error
|
RemoveAliasIP(ifName string, ip *net.IPNet) error
|
||||||
|
|
||||||
|
// DisableARPForVIP disables ARP replies and requests for VIP addresses
|
||||||
|
// on a particular interface
|
||||||
|
DisableARPForVIP(ifName string) error
|
||||||
|
|
||||||
// Add a static route to the sandbox.
|
// Add a static route to the sandbox.
|
||||||
AddStaticRoute(*types.StaticRoute) error
|
AddStaticRoute(*types.StaticRoute) error
|
||||||
|
|
||||||
|
|
|
@ -742,8 +742,17 @@ func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {
|
||||||
|
|
||||||
ep.Lock()
|
ep.Lock()
|
||||||
joinInfo := ep.joinInfo
|
joinInfo := ep.joinInfo
|
||||||
|
vip := ep.virtualIP
|
||||||
|
lbModeIsDSR := ep.network.loadBalancerMode == loadBalancerModeDSR
|
||||||
ep.Unlock()
|
ep.Unlock()
|
||||||
|
|
||||||
|
if len(vip) > 0 && lbModeIsDSR {
|
||||||
|
ipNet := &net.IPNet{IP: vip, Mask: net.CIDRMask(32, 32)}
|
||||||
|
if err := osSbox.RemoveAliasIP(osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
|
||||||
|
logrus.WithError(err).Debugf("failed to remove virtual ip %v to loopback", ipNet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if joinInfo == nil {
|
if joinInfo == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -831,6 +840,7 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
|
||||||
ep.Lock()
|
ep.Lock()
|
||||||
joinInfo := ep.joinInfo
|
joinInfo := ep.joinInfo
|
||||||
i := ep.iface
|
i := ep.iface
|
||||||
|
lbModeIsDSR := ep.network.loadBalancerMode == loadBalancerModeDSR
|
||||||
ep.Unlock()
|
ep.Unlock()
|
||||||
|
|
||||||
if ep.needResolver() {
|
if ep.needResolver() {
|
||||||
|
@ -854,6 +864,18 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
|
||||||
if err := sb.osSbox.AddInterface(i.srcName, i.dstPrefix, ifaceOptions...); err != nil {
|
if err := sb.osSbox.AddInterface(i.srcName, i.dstPrefix, ifaceOptions...); err != nil {
|
||||||
return fmt.Errorf("failed to add interface %s to sandbox: %v", i.srcName, err)
|
return fmt.Errorf("failed to add interface %s to sandbox: %v", i.srcName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(ep.virtualIP) > 0 && lbModeIsDSR {
|
||||||
|
if sb.loadBalancerNID == "" {
|
||||||
|
if err := sb.osSbox.DisableARPForVIP(i.srcName); err != nil {
|
||||||
|
return fmt.Errorf("failed disable ARP for VIP: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ipNet := &net.IPNet{IP: ep.virtualIP, Mask: net.CIDRMask(32, 32)}
|
||||||
|
if err := sb.osSbox.AddAliasIP(sb.osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
|
||||||
|
return fmt.Errorf("failed to add virtual ip %v to loopback: %v", ipNet, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if joinInfo != nil {
|
if joinInfo != nil {
|
||||||
|
|
|
@ -142,7 +142,7 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
|
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
|
||||||
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false); err != nil {
|
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false, n.loadBalancerMode); err != nil {
|
||||||
logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
|
logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -158,6 +158,9 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
|
||||||
Address: ip,
|
Address: ip,
|
||||||
Weight: 1,
|
Weight: 1,
|
||||||
}
|
}
|
||||||
|
if n.loadBalancerMode == loadBalancerModeDSR {
|
||||||
|
d.ConnectionFlags = ipvs.ConnFwdDirectRoute
|
||||||
|
}
|
||||||
|
|
||||||
// Remove the sched name before using the service to add
|
// Remove the sched name before using the service to add
|
||||||
// destination.
|
// destination.
|
||||||
|
@ -203,6 +206,9 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
|
||||||
Address: ip,
|
Address: ip,
|
||||||
Weight: 1,
|
Weight: 1,
|
||||||
}
|
}
|
||||||
|
if n.loadBalancerMode == loadBalancerModeDSR {
|
||||||
|
d.ConnectionFlags = ipvs.ConnFwdDirectRoute
|
||||||
|
}
|
||||||
|
|
||||||
if fullRemove {
|
if fullRemove {
|
||||||
if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
|
if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
|
||||||
|
@ -231,7 +237,7 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true); err != nil {
|
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true, n.loadBalancerMode); err != nil {
|
||||||
logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
|
logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -566,7 +572,7 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) {
|
||||||
|
|
||||||
// Invoke fwmarker reexec routine to mark vip destined packets with
|
// Invoke fwmarker reexec routine to mark vip destined packets with
|
||||||
// the passed firewall mark.
|
// the passed firewall mark.
|
||||||
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
|
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool, lbMode string) error {
|
||||||
var ingressPortsFile string
|
var ingressPortsFile string
|
||||||
|
|
||||||
if len(ingressPorts) != 0 {
|
if len(ingressPorts) != 0 {
|
||||||
|
@ -586,7 +592,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port
|
||||||
|
|
||||||
cmd := &exec.Cmd{
|
cmd := &exec.Cmd{
|
||||||
Path: reexec.Self(),
|
Path: reexec.Self(),
|
||||||
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
|
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String(), lbMode),
|
||||||
Stdout: os.Stdout,
|
Stdout: os.Stdout,
|
||||||
Stderr: os.Stderr,
|
Stderr: os.Stderr,
|
||||||
}
|
}
|
||||||
|
@ -603,7 +609,7 @@ func fwMarker() {
|
||||||
runtime.LockOSThread()
|
runtime.LockOSThread()
|
||||||
defer runtime.UnlockOSThread()
|
defer runtime.UnlockOSThread()
|
||||||
|
|
||||||
if len(os.Args) < 7 {
|
if len(os.Args) < 8 {
|
||||||
logrus.Error("invalid number of arguments..")
|
logrus.Error("invalid number of arguments..")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
@ -645,7 +651,8 @@ func fwMarker() {
|
||||||
os.Exit(5)
|
os.Exit(5)
|
||||||
}
|
}
|
||||||
|
|
||||||
if addDelOpt == "-A" {
|
lbMode := os.Args[7]
|
||||||
|
if addDelOpt == "-A" && lbMode == loadBalancerModeNAT {
|
||||||
eIP, subnet, err := net.ParseCIDR(os.Args[6])
|
eIP, subnet, err := net.ParseCIDR(os.Args[6])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
|
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
|
||||||
|
|
Loading…
Reference in a new issue