Enabling ILB/ELB on windows using per-node, per-network LB endpoint.

Signed-off-by: Pradip Dhara <pradipd@microsoft.com>
This commit is contained in:
Pradip Dhara 2017-08-28 23:35:31 -07:00
parent a0bdc52fd7
commit 43360c627f
6 changed files with 167 additions and 9 deletions

View File

@ -75,6 +75,7 @@ type endpoint struct {
dbIndex uint64
dbExists bool
serviceEnabled bool
loadBalancer bool
sync.Mutex
}
@ -101,6 +102,7 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) {
epMap["virtualIP"] = ep.virtualIP.String()
epMap["ingressPorts"] = ep.ingressPorts
epMap["svcAliases"] = ep.svcAliases
epMap["loadBalancer"] = ep.loadBalancer
return json.Marshal(epMap)
}
@ -201,6 +203,10 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
ep.virtualIP = net.ParseIP(vip.(string))
}
if v, ok := epMap["loadBalancer"]; ok {
ep.loadBalancer = v.(bool)
}
sal, _ := json.Marshal(epMap["svcAliases"])
var svcAliases []string
json.Unmarshal(sal, &svcAliases)
@ -238,6 +244,7 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
dstEp.svcName = ep.svcName
dstEp.svcID = ep.svcID
dstEp.virtualIP = ep.virtualIP
dstEp.loadBalancer = ep.loadBalancer
dstEp.svcAliases = make([]string, len(ep.svcAliases))
copy(dstEp.svcAliases, ep.svcAliases)
@ -1013,6 +1020,13 @@ func CreateOptionMyAlias(alias string) EndpointOption {
}
}
//CreateOptionLoadBalancer function returns an option setter for denoting the endpoint is a load balancer for a network
func CreateOptionLoadBalancer() EndpointOption {
return func(ep *endpoint) {
ep.loadBalancer = true
}
}
// JoinOptionPriority function returns an option setter for priority option to
// be passed to the endpoint.Join() method.
func JoinOptionPriority(ep Endpoint, prio int) EndpointOption {

View File

@ -31,6 +31,9 @@ type EndpointInfo interface {
// Sandbox returns the attached sandbox if there, nil otherwise.
Sandbox() Sandbox
// LoadBalancer returns whether the endpoint is the load balancer endpoint for the network.
LoadBalancer() bool
}
// InterfaceInfo provides an interface to retrieve interface addresses bound to the endpoint.
@ -327,6 +330,12 @@ func (ep *endpoint) Sandbox() Sandbox {
return cnt
}
func (ep *endpoint) LoadBalancer() bool {
ep.Lock()
defer ep.Unlock()
return ep.loadBalancer
}
func (ep *endpoint) StaticRoutes() []*types.StaticRoute {
ep.Lock()
defer ep.Unlock()

View File

@ -89,4 +89,5 @@ type loadBalancer struct {
// Back pointer to service to which the loadbalancer belongs.
service *service
sync.Mutex
}

View File

@ -287,7 +287,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
// Add loadbalancer service and backend in all sandboxes in
// the network only if vip is valid.
if len(vip) != 0 {
n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts)
n.(*network).addLBBackend(ip, vip, lb, ingressPorts)
}
// Add the appropriate name resolutions
@ -355,7 +355,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
// Remove loadbalancer service(if needed) and backend in all
// sandboxes in the network only if the vip is valid.
if len(vip) != 0 && entries == 0 {
n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService)
n.(*network).rmLBBackend(ip, vip, lb, ingressPorts, rmService)
}
// Delete the name resolutions

View File

@ -111,7 +111,7 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
// Add loadbalancer backend to all sandboxes which has a connection to
// this network. If needed add the service as well.
func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig) {
func (n *network) addLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
@ -124,7 +124,7 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
gwIP = ep.Iface().Address().IP
}
sb.addLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, n.ingress)
sb.addLBBackend(ip, vip, lb.fwMark, ingressPorts, ep.Iface().Address(), gwIP, n.ingress)
}
return false
@ -134,7 +134,7 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
// Remove loadbalancer backend from all sandboxes which has a
// connection to this network. If needed remove the service entry as
// well, as specified by the rmService bool.
func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) {
func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
@ -147,7 +147,7 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Por
gwIP = ep.Iface().Address().IP
}
sb.rmLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService, n.ingress)
sb.rmLBBackend(ip, vip, lb.fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService, n.ingress)
}
return false

View File

@ -1,11 +1,145 @@
package libnetwork
import "net"
import (
"net"
func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig) {
"github.com/Microsoft/hcsshim"
"github.com/docker/docker/pkg/system"
"github.com/sirupsen/logrus"
)
type policyLists struct {
ilb *hcsshim.PolicyList
elb *hcsshim.PolicyList
}
func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) {
var lbPolicylistMap map[*loadBalancer]*policyLists
func init() {
lbPolicylistMap = make(map[*loadBalancer]*policyLists)
}
func (n *network) addLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig) {
if system.GetOSVersion().Build > 16236 {
lb.Lock()
defer lb.Unlock()
//find the load balancer IP for the network.
var sourceVIP string
for _, e := range n.Endpoints() {
epInfo := e.Info()
if epInfo == nil {
continue
}
if epInfo.LoadBalancer() {
sourceVIP = epInfo.Iface().Address().IP.String()
break
}
}
if sourceVIP == "" {
logrus.Errorf("Failed to find load balancer IP for network %s", n.Name())
return
}
var endpoints []hcsshim.HNSEndpoint
for eid := range lb.backEnds {
//Call HNS to get back ID (GUID) corresponding to the endpoint.
hnsEndpoint, err := hcsshim.GetHNSEndpointByName(eid)
if err != nil {
logrus.Errorf("Failed to find HNS ID for endpoint %v: %v", eid, err)
return
}
endpoints = append(endpoints, *hnsEndpoint)
}
if policies, ok := lbPolicylistMap[lb]; ok {
if policies.ilb != nil {
policies.ilb.Delete()
policies.ilb = nil
}
if policies.elb != nil {
policies.elb.Delete()
policies.elb = nil
}
delete(lbPolicylistMap, lb)
}
ilbPolicy, err := hcsshim.AddLoadBalancer(endpoints, true, sourceVIP, vip.String(), 0, 0, 0)
if err != nil {
logrus.Errorf("Failed to add ILB policy for service %s (%s) with endpoints %v using load balancer IP %s on network %s: %v",
lb.service.name, vip.String(), endpoints, sourceVIP, n.Name(), err)
}
lbPolicylistMap[lb] = &policyLists{
ilb: ilbPolicy,
}
publishedPorts := make(map[uint32]uint32)
for i, port := range ingressPorts {
protocol := uint16(6)
// Skip already published port
if publishedPorts[port.PublishedPort] == port.TargetPort {
continue
}
if port.Protocol == ProtocolUDP {
protocol = 17
}
// check if already has udp matching to add wild card publishing
for j := i + 1; j < len(ingressPorts); j++ {
if ingressPorts[j].TargetPort == port.TargetPort &&
ingressPorts[j].PublishedPort == port.PublishedPort {
protocol = 0
}
}
publishedPorts[port.PublishedPort] = port.TargetPort
lbPolicylistMap[lb].elb, err = hcsshim.AddLoadBalancer(endpoints, false, sourceVIP, "", protocol, uint16(port.TargetPort), uint16(port.PublishedPort))
if err != nil {
logrus.Errorf("Failed to add ELB policy for service %s (ip:%s target port:%v published port:%v) with endpoints %v using load balancer IP %s on network %s: %v",
lb.service.name, vip.String(), uint16(port.TargetPort), uint16(port.PublishedPort), endpoints, sourceVIP, n.Name(), err)
return
}
}
}
}
func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool) {
if system.GetOSVersion().Build > 16236 {
if len(lb.backEnds) > 0 {
//Reprogram HNS (actually VFP) with the existing backends.
n.addLBBackend(ip, vip, lb, ingressPorts)
} else {
lb.Lock()
defer lb.Unlock()
logrus.Debugf("No more backends for service %s (ip:%s). Removing all policies", lb.service.name, lb.vip.String())
if policyLists, ok := lbPolicylistMap[lb]; ok {
if policyLists.ilb != nil {
policyLists.ilb.Delete()
policyLists.ilb = nil
}
if policyLists.elb != nil {
policyLists.elb.Delete()
policyLists.elb = nil
}
delete(lbPolicylistMap, lb)
} else {
logrus.Errorf("Failed to find policies for service %s (%s)", lb.service.name, lb.vip.String())
}
}
}
}
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {