1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Add loadbalancer support

This PR adds support for loadbalancing across a group of endpoints that
share the same service configuration as passed in by
`OptionService`. The loadbalancer is implemented using ipvs with just
round robin scheduling supported for now.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2016-05-24 22:46:18 -07:00
parent d7946ec4d8
commit d05adebf30
10 changed files with 506 additions and 108 deletions

View file

@ -167,14 +167,17 @@ func (ep *endpoint) addToCluster() error {
c := n.getController() c := n.getController()
if !ep.isAnonymous() && ep.Iface().Address() != nil { if !ep.isAnonymous() && ep.Iface().Address() != nil {
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil { if ep.svcID != "" {
return err if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil {
return err
}
} }
buf, err := proto.Marshal(&EndpointRecord{ buf, err := proto.Marshal(&EndpointRecord{
Name: ep.Name(), Name: ep.Name(),
ServiceName: ep.svcName, ServiceName: ep.svcName,
ServiceID: ep.svcID, ServiceID: ep.svcID,
VirtualIP: ep.virtualIP.String(),
EndpointIP: ep.Iface().Address().IP.String(), EndpointIP: ep.Iface().Address().IP.String(),
}) })
@ -204,8 +207,8 @@ func (ep *endpoint) deleteFromCluster() error {
c := n.getController() c := n.getController()
if !ep.isAnonymous() { if !ep.isAnonymous() {
if ep.Iface().Address() != nil { if ep.svcID != "" && ep.Iface().Address() != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil { if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil {
return err return err
} }
} }
@ -357,6 +360,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
name := epRec.Name name := epRec.Name
svcName := epRec.ServiceName svcName := epRec.ServiceName
svcID := epRec.ServiceID svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP) ip := net.ParseIP(epRec.EndpointIP)
if name == "" || ip == nil { if name == "" || ip == nil {
@ -365,16 +369,20 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
} }
if isAdd { if isAdd {
if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil { if svcID != "" {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err) if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil {
return logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
}
} }
n.addSvcRecords(name, ip, nil, true) n.addSvcRecords(name, ip, nil, true)
} else { } else {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil { if svcID != "" {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err) if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil {
return logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
}
} }
n.deleteSvcRecords(name, ip, nil, true) n.deleteSvcRecords(name, ip, nil, true)

View file

@ -39,7 +39,8 @@ type EndpointRecord struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"`
ServiceID string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"` ServiceID string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"`
EndpointIP string `protobuf:"bytes,4,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"` VirtualIP string `protobuf:"bytes,4,opt,name=virtual_ip,json=virtualIp,proto3" json:"virtual_ip,omitempty"`
EndpointIP string `protobuf:"bytes,5,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"`
} }
func (m *EndpointRecord) Reset() { *m = EndpointRecord{} } func (m *EndpointRecord) Reset() { *m = EndpointRecord{} }
@ -53,11 +54,12 @@ func (this *EndpointRecord) GoString() string {
if this == nil { if this == nil {
return "nil" return "nil"
} }
s := make([]string, 0, 8) s := make([]string, 0, 9)
s = append(s, "&libnetwork.EndpointRecord{") s = append(s, "&libnetwork.EndpointRecord{")
s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n") s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n")
s = append(s, "ServiceID: "+fmt.Sprintf("%#v", this.ServiceID)+",\n") s = append(s, "ServiceID: "+fmt.Sprintf("%#v", this.ServiceID)+",\n")
s = append(s, "VirtualIP: "+fmt.Sprintf("%#v", this.VirtualIP)+",\n")
s = append(s, "EndpointIP: "+fmt.Sprintf("%#v", this.EndpointIP)+",\n") s = append(s, "EndpointIP: "+fmt.Sprintf("%#v", this.EndpointIP)+",\n")
s = append(s, "}") s = append(s, "}")
return strings.Join(s, "") return strings.Join(s, "")
@ -120,9 +122,15 @@ func (m *EndpointRecord) MarshalTo(data []byte) (int, error) {
i = encodeVarintAgent(data, i, uint64(len(m.ServiceID))) i = encodeVarintAgent(data, i, uint64(len(m.ServiceID)))
i += copy(data[i:], m.ServiceID) i += copy(data[i:], m.ServiceID)
} }
if len(m.EndpointIP) > 0 { if len(m.VirtualIP) > 0 {
data[i] = 0x22 data[i] = 0x22
i++ i++
i = encodeVarintAgent(data, i, uint64(len(m.VirtualIP)))
i += copy(data[i:], m.VirtualIP)
}
if len(m.EndpointIP) > 0 {
data[i] = 0x2a
i++
i = encodeVarintAgent(data, i, uint64(len(m.EndpointIP))) i = encodeVarintAgent(data, i, uint64(len(m.EndpointIP)))
i += copy(data[i:], m.EndpointIP) i += copy(data[i:], m.EndpointIP)
} }
@ -171,6 +179,10 @@ func (m *EndpointRecord) Size() (n int) {
if l > 0 { if l > 0 {
n += 1 + l + sovAgent(uint64(l)) n += 1 + l + sovAgent(uint64(l))
} }
l = len(m.VirtualIP)
if l > 0 {
n += 1 + l + sovAgent(uint64(l))
}
l = len(m.EndpointIP) l = len(m.EndpointIP)
if l > 0 { if l > 0 {
n += 1 + l + sovAgent(uint64(l)) n += 1 + l + sovAgent(uint64(l))
@ -199,6 +211,7 @@ func (this *EndpointRecord) String() string {
`Name:` + fmt.Sprintf("%v", this.Name) + `,`, `Name:` + fmt.Sprintf("%v", this.Name) + `,`,
`ServiceName:` + fmt.Sprintf("%v", this.ServiceName) + `,`, `ServiceName:` + fmt.Sprintf("%v", this.ServiceName) + `,`,
`ServiceID:` + fmt.Sprintf("%v", this.ServiceID) + `,`, `ServiceID:` + fmt.Sprintf("%v", this.ServiceID) + `,`,
`VirtualIP:` + fmt.Sprintf("%v", this.VirtualIP) + `,`,
`EndpointIP:` + fmt.Sprintf("%v", this.EndpointIP) + `,`, `EndpointIP:` + fmt.Sprintf("%v", this.EndpointIP) + `,`,
`}`, `}`,
}, "") }, "")
@ -329,6 +342,35 @@ func (m *EndpointRecord) Unmarshal(data []byte) error {
m.ServiceID = string(data[iNdEx:postIndex]) m.ServiceID = string(data[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
case 4: case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field VirtualIP", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowAgent
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthAgent
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.VirtualIP = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 5:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EndpointIP", wireType) return fmt.Errorf("proto: wrong wireType = %d for field EndpointIP", wireType)
} }
@ -484,18 +526,20 @@ var (
) )
var fileDescriptorAgent = []byte{ var fileDescriptorAgent = []byte{
// 204 bytes of a gzipped FileDescriptorProto // 228 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x4c, 0x4f, 0xcd, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x4c, 0x4f, 0xcd,
0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0xc9, 0x4c, 0xca, 0x4b, 0x2d, 0x29, 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0xc9, 0x4c, 0xca, 0x4b, 0x2d, 0x29,
0xcf, 0x2f, 0xca, 0x96, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xeb, 0x83, 0x58, 0x10, 0x15, 0xcf, 0x2f, 0xca, 0x96, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xeb, 0x83, 0x58, 0x10, 0x15,
0x4a, 0xcb, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93, 0x4a, 0x57, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93,
0xf3, 0x8b, 0x52, 0x84, 0x84, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35, 0xf3, 0x8b, 0x52, 0x84, 0x84, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35,
0x38, 0x83, 0xc0, 0x6c, 0x21, 0x45, 0x2e, 0x9e, 0xe2, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0xd4, 0x78, 0x38, 0x83, 0xc0, 0x6c, 0x21, 0x45, 0x2e, 0x9e, 0xe2, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0xd4, 0x78,
0xb0, 0x1c, 0x13, 0x58, 0x8e, 0x1b, 0x2a, 0xe6, 0x07, 0x52, 0xa2, 0xc3, 0xc5, 0x05, 0x53, 0x92, 0xb0, 0x1c, 0x13, 0x58, 0x8e, 0x1b, 0x2a, 0xe6, 0x07, 0x52, 0xa2, 0xc3, 0xc5, 0x05, 0x53, 0x92,
0x99, 0x22, 0xc1, 0x0c, 0x52, 0xe0, 0xc4, 0xfb, 0xe8, 0x9e, 0x3c, 0x67, 0x30, 0x44, 0xd4, 0xd3, 0x99, 0x22, 0xc1, 0x0c, 0x52, 0xe0, 0xc4, 0xfb, 0xe8, 0x9e, 0x3c, 0x67, 0x30, 0x44, 0xd4, 0xd3,
0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x45, 0x48, 0x9f, 0x8b, 0x3b, 0x15, 0x6a, 0x6d, 0x7c, 0x66, 0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x05, 0xa4, 0xba, 0x2c, 0xb3, 0xa8, 0xa4, 0x34, 0x31, 0x27,
0x81, 0x04, 0x0b, 0x58, 0x39, 0x1f, 0x50, 0x39, 0x17, 0xcc, 0x35, 0x9e, 0x01, 0x41, 0x5c, 0x30, 0x3e, 0xb3, 0x40, 0x82, 0x05, 0xa1, 0x3a, 0x0c, 0x22, 0xea, 0x19, 0x10, 0xc4, 0x09, 0x55, 0xe0,
0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x59, 0x20, 0xa4, 0xcf, 0xc5, 0x9d, 0x0a, 0x75, 0x24, 0x48, 0x39, 0x2b, 0x58, 0x39, 0x1f, 0x50,
0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07, 0x40, 0x9c, 0xc4, 0x06, 0xf6, 0x89, 0x31, 0x20, 0x39, 0x17, 0xcc, 0xed, 0x40, 0xf5, 0x5c, 0x30, 0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca,
0x00, 0x00, 0xff, 0xff, 0x94, 0x78, 0x3e, 0xce, 0xfa, 0x00, 0x00, 0x00, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07,
0x40, 0x9c, 0xc4, 0x06, 0xf6, 0xb7, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xae, 0x11, 0xc5, 0x8d,
0x28, 0x01, 0x00, 0x00,
} }

View file

@ -15,5 +15,6 @@ message EndpointRecord {
string name = 1; string name = 1;
string service_name = 2; string service_name = 2;
string service_id = 3 [(gogoproto.customname) = "ServiceID"]; string service_id = 3 [(gogoproto.customname) = "ServiceID"];
string endpoint_ip = 4 [(gogoproto.customname) = "EndpointIP"]; string virtual_ip = 4 [(gogoproto.customname) = "VirtualIP"];
string endpoint_ip = 5 [(gogoproto.customname) = "EndpointIP"];
} }

View file

@ -69,6 +69,7 @@ type endpoint struct {
myAliases []string myAliases []string
svcID string svcID string
svcName string svcName string
virtualIP net.IP
dbIndex uint64 dbIndex uint64
dbExists bool dbExists bool
sync.Mutex sync.Mutex
@ -93,6 +94,7 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) {
epMap["myAliases"] = ep.myAliases epMap["myAliases"] = ep.myAliases
epMap["svcName"] = ep.svcName epMap["svcName"] = ep.svcName
epMap["svcID"] = ep.svcID epMap["svcID"] = ep.svcID
epMap["virtualIP"] = ep.virtualIP.String()
return json.Marshal(epMap) return json.Marshal(epMap)
} }
@ -186,6 +188,10 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
ep.svcID = si.(string) ep.svcID = si.(string)
} }
if vip, ok := epMap["virtualIP"]; ok {
ep.virtualIP = net.ParseIP(vip.(string))
}
ma, _ := json.Marshal(epMap["myAliases"]) ma, _ := json.Marshal(epMap["myAliases"])
var myAliases []string var myAliases []string
json.Unmarshal(ma, &myAliases) json.Unmarshal(ma, &myAliases)
@ -212,6 +218,7 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
dstEp.disableResolution = ep.disableResolution dstEp.disableResolution = ep.disableResolution
dstEp.svcName = ep.svcName dstEp.svcName = ep.svcName
dstEp.svcID = ep.svcID dstEp.svcID = ep.svcID
dstEp.virtualIP = ep.virtualIP
if ep.iface != nil { if ep.iface != nil {
dstEp.iface = &endpointInterface{} dstEp.iface = &endpointInterface{}
@ -892,10 +899,11 @@ func CreateOptionAlias(name string, alias string) EndpointOption {
} }
// CreateOptionService function returns an option setter for setting service binding configuration // CreateOptionService function returns an option setter for setting service binding configuration
func CreateOptionService(name, id string) EndpointOption { func CreateOptionService(name, id string, vip net.IP) EndpointOption {
return func(ep *endpoint) { return func(ep *endpoint) {
ep.svcName = name ep.svcName = name
ep.svcID = id ep.svcID = id
ep.virtualIP = vip
} }
} }

View file

@ -218,7 +218,7 @@ func TestDestination(t *testing.T) {
i, err := New("") i, err := New("")
require.NoError(t, err) require.NoError(t, err)
for _, protocol := range []string{"TCP"} { for _, protocol := range protocols {
var serviceAddress string var serviceAddress string
s := Service{ s := Service{

View file

@ -77,6 +77,19 @@ type svcInfo struct {
service map[string][]servicePorts service map[string][]servicePorts
} }
// backing container or host's info
type serviceTarget struct {
name string
ip net.IP
port uint16
}
type servicePorts struct {
portName string
proto string
target []serviceTarget
}
// IpamConf contains all the ipam related configurations for a network // IpamConf contains all the ipam related configurations for a network
type IpamConf struct { type IpamConf struct {
// The master address pool for containers and network interfaces // The master address pool for containers and network interfaces

View file

@ -745,6 +745,12 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
} }
} }
// Populate load balancer only after updating all the other
// information including gateway and other routes so that
// loadbalancers are populated all the network state is in
// place in the sandbox.
sb.populateLoadbalancers(ep)
// Only update the store if we did not come here as part of // Only update the store if we did not come here as part of
// sandbox delete. If we came here as part of delete then do // sandbox delete. If we came here as part of delete then do
// not bother updating the store. The sandbox object will be // not bother updating the store. The sandbox object will be

View file

@ -1,93 +1,32 @@
package libnetwork package libnetwork
import "net" import (
"net"
"sync"
)
// backing container or host's info var (
type serviceTarget struct { // A global monotonic counter to assign firewall marks to
name string // services.
ip net.IP fwMarkCtr uint32 = 256
port uint16 fwMarkCtrMu sync.Mutex
} )
type servicePorts struct {
portName string
proto string
target []serviceTarget
}
type service struct { type service struct {
name string name string // Service Name
id string id string // Service ID
backEnds map[string]map[string]net.IP
// Map of loadbalancers for the service one-per attached
// network. It is keyed with network ID.
loadBalancers map[string]*loadBalancer
sync.Mutex
} }
func newService(name string, id string) *service { type loadBalancer struct {
return &service{ vip net.IP
name: name, fwMark uint32
id: id,
backEnds: make(map[string]map[string]net.IP), // Map of backend IPs backing this loadbalancer on this
} // network. It is keyed with endpoint ID.
} backEnds map[string]net.IP
func (c *controller) addServiceBinding(name, sid, nid, eid string, ip net.IP) error {
var s *service
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
s = newService(name, sid)
}
netBackEnds, ok := s.backEnds[nid]
if !ok {
netBackEnds = make(map[string]net.IP)
s.backEnds[nid] = netBackEnds
}
netBackEnds[eid] = ip
c.serviceBindings[sid] = s
c.Unlock()
n.(*network).addSvcRecords(name, ip, nil, false)
return nil
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, ip net.IP) error {
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
c.Unlock()
return nil
}
netBackEnds, ok := s.backEnds[nid]
if !ok {
c.Unlock()
return nil
}
delete(netBackEnds, eid)
if len(netBackEnds) == 0 {
delete(s.backEnds, nid)
}
if len(s.backEnds) == 0 {
delete(c.serviceBindings, sid)
}
c.Unlock()
n.(*network).deleteSvcRecords(name, ip, nil, false)
return err
} }

360
libnetwork/service_linux.go Normal file
View file

@ -0,0 +1,360 @@
package libnetwork
import (
"fmt"
"net"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/reexec"
"github.com/docker/libnetwork/iptables"
"github.com/docker/libnetwork/ipvs"
"github.com/vishvananda/netlink/nl"
"github.com/vishvananda/netns"
)
func init() {
reexec.Register("fwmarker", fwMarker)
}
func newService(name string, id string) *service {
return &service{
name: name,
id: id,
loadBalancers: make(map[string]*loadBalancer),
}
}
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
var (
s *service
addService bool
)
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
// Create a new service if we are seeing this service
// for the first time.
s = newService(name, sid)
c.serviceBindings[sid] = s
}
c.Unlock()
s.Lock()
lb, ok := s.loadBalancers[nid]
if !ok {
// Create a new load balancer if we are seeing this
// network attachment on the service for the first
// time.
lb = &loadBalancer{
vip: vip,
fwMark: fwMarkCtr,
backEnds: make(map[string]net.IP),
}
fwMarkCtrMu.Lock()
fwMarkCtr++
fwMarkCtrMu.Unlock()
s.loadBalancers[nid] = lb
// Since we just created this load balancer make sure
// we add a new service service in IPVS rules.
addService = true
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
svcIP := vip
if len(svcIP) == 0 {
svcIP = ip
}
n.(*network).addSvcRecords(name, svcIP, nil, false)
}
lb.backEnds[eid] = ip
s.Unlock()
// Add endpoint IP to special "tasks.svc_name" so that the
// applications have access to DNS RR.
n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
// Add loadbalancer service and backend in all sandboxes in
// the network only if vip is valid.
if len(vip) != 0 {
n.(*network).addLBBackend(ip, vip, lb.fwMark, addService)
}
return nil
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
var rmService bool
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
c.Unlock()
return nil
}
c.Unlock()
s.Lock()
lb, ok := s.loadBalancers[nid]
if !ok {
s.Unlock()
return nil
}
// Delete the special "tasks.svc_name" backend record.
n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
delete(lb.backEnds, eid)
if len(lb.backEnds) == 0 {
// All the backends for this service have been
// removed. Time to remove the load balancer and also
// remove the service entry in IPVS.
rmService = true
// Make sure to remove the right IP since if vip is
// not valid we would have added a DNS RR record.
svcIP := vip
if len(svcIP) == 0 {
svcIP = ip
}
n.(*network).deleteSvcRecords(name, svcIP, nil, false)
delete(s.loadBalancers, nid)
}
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
delete(c.serviceBindings, sid)
}
s.Unlock()
// Remove loadbalancer service(if needed) and backend in all
// sandboxes in the network only if the vip is valid.
if len(vip) != 0 {
n.(*network).rmLBBackend(ip, vip, lb.fwMark, rmService)
}
return nil
}
// Get all loadbalancers on this network that is currently discovered
// on this node..
func (n *network) connectedLoadbalancers() []*loadBalancer {
c := n.getController()
c.Lock()
defer c.Unlock()
var lbs []*loadBalancer
for _, s := range c.serviceBindings {
if lb, ok := s.loadBalancers[n.ID()]; ok {
lbs = append(lbs, lb)
}
}
return lbs
}
// Populate all loadbalancers on the network that the passed endpoint
// belongs to, into this sandbox.
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
n := ep.getNetwork()
for _, lb := range n.connectedLoadbalancers() {
// Skip if vip is not valid.
if len(lb.vip) == 0 {
continue
}
addService := true
for _, ip := range lb.backEnds {
sb.addLBBackend(ip, lb.vip, lb.fwMark, addService)
addService = false
}
}
}
// Add loadbalancer backend to all sandboxes which has a connection to
// this network. If needed add the service as well, as specified by
// the addService bool.
func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
sb.addLBBackend(ip, vip, fwMark, addService)
}
return false
})
}
// Remove loadbalancer backend from all sandboxes which has a
// connection to this network. If needed remove the service entry as
// well, as specified by the rmService bool.
func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok {
sb.rmLBBackend(ip, vip, fwMark, rmService)
}
return false
})
}
// Add loadbalancer backend into one connected sandbox.
func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) {
i, err := ipvs.New(sb.Key())
if err != nil {
logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
return
}
defer i.Close()
s := &ipvs.Service{
AddressFamily: nl.FAMILY_V4,
FWMark: fwMark,
SchedName: ipvs.RoundRobin,
}
if addService {
logrus.Debugf("Creating service for vip %s fwMark %d", vip, fwMark)
if err := invokeFWMarker(sb.Key(), vip, fwMark, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
if err := i.NewService(s); err != nil {
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
return
}
}
d := &ipvs.Destination{
AddressFamily: nl.FAMILY_V4,
Address: ip,
Weight: 1,
}
// Remove the sched name before using the service to add
// destination.
s.SchedName = ""
if err := i.NewDestination(s, d); err != nil {
logrus.Errorf("Failed to create real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
}
}
// Remove loadbalancer backend from one connected sandbox.
func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
i, err := ipvs.New(sb.Key())
if err != nil {
logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
return
}
defer i.Close()
s := &ipvs.Service{
AddressFamily: nl.FAMILY_V4,
FWMark: fwMark,
}
d := &ipvs.Destination{
AddressFamily: nl.FAMILY_V4,
Address: ip,
Weight: 1,
}
if err := i.DelDestination(s, d); err != nil {
logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
return
}
if rmService {
s.SchedName = ipvs.RoundRobin
if err := i.DelService(s); err != nil {
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
return
}
if err := invokeFWMarker(sb.Key(), vip, fwMark, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
}
}
// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, isDelete bool) error {
addDelOpt := "-A"
if isDelete {
addDelOpt = "-D"
}
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}
return nil
}
// Firewall marker reexec function.
func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if len(os.Args) < 5 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
vip := os.Args[2]
fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
if err != nil {
logrus.Errorf("bad fwmark value(%s) passed: %v", os.Args[3], err)
os.Exit(2)
}
addDelOpt := os.Args[4]
ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(3)
}
defer ns.Close()
if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(4)
}
rule := strings.Fields(fmt.Sprintf("-t mangle %s OUTPUT -d %s/32 -j MARK --set-mark %d", addDelOpt, vip, fwMark))
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)
os.Exit(5)
}
}

View file

@ -0,0 +1,19 @@
// +build !linux
package libnetwork
import (
"fmt"
"net"
)
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
return fmt.Errorf("not supported")
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
return fmt.Errorf("not supported")
}
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
}