From d05adebf30b837154c3d7959f66170ada7128863 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Tue, 24 May 2016 22:46:18 -0700 Subject: [PATCH] Add loadbalancer support This PR adds support for loadbalancing across a group of endpoints that share the same service configuration as passed in by `OptionService`. The loadbalancer is implemented using ipvs with just round robin scheduling supported for now. Signed-off-by: Jana Radhakrishnan --- libnetwork/agent.go | 28 ++- libnetwork/agent.pb.go | 64 +++++- libnetwork/agent.proto | 3 +- libnetwork/endpoint.go | 10 +- libnetwork/ipvs/ipvs_test.go | 2 +- libnetwork/network.go | 13 ++ libnetwork/sandbox.go | 6 + libnetwork/service.go | 109 ++------- libnetwork/service_linux.go | 360 ++++++++++++++++++++++++++++++ libnetwork/service_unsupported.go | 19 ++ 10 files changed, 506 insertions(+), 108 deletions(-) create mode 100644 libnetwork/service_linux.go create mode 100644 libnetwork/service_unsupported.go diff --git a/libnetwork/agent.go b/libnetwork/agent.go index a8198ad4ed..a942c24467 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -167,14 +167,17 @@ func (ep *endpoint) addToCluster() error { c := n.getController() if !ep.isAnonymous() && ep.Iface().Address() != nil { - if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil { - return err + if ep.svcID != "" { + if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil { + return err + } } buf, err := proto.Marshal(&EndpointRecord{ Name: ep.Name(), ServiceName: ep.svcName, ServiceID: ep.svcID, + VirtualIP: ep.virtualIP.String(), EndpointIP: ep.Iface().Address().IP.String(), }) @@ -204,8 +207,8 @@ func (ep *endpoint) deleteFromCluster() error { c := n.getController() if !ep.isAnonymous() { - if ep.Iface().Address() != nil { - if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil { + if ep.svcID != "" && ep.Iface().Address() != nil { + if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil { return err } } @@ -357,6 +360,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) { name := epRec.Name svcName := epRec.ServiceName svcID := epRec.ServiceID + vip := net.ParseIP(epRec.VirtualIP) ip := net.ParseIP(epRec.EndpointIP) if name == "" || ip == nil { @@ -365,16 +369,20 @@ func (c *controller) handleEpTableEvent(ev events.Event) { } if isAdd { - if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil { - logrus.Errorf("Failed adding service binding for value %s: %v", value, err) - return + if svcID != "" { + if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil { + logrus.Errorf("Failed adding service binding for value %s: %v", value, err) + return + } } n.addSvcRecords(name, ip, nil, true) } else { - if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil { - logrus.Errorf("Failed adding service binding for value %s: %v", value, err) - return + if svcID != "" { + if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil { + logrus.Errorf("Failed adding service binding for value %s: %v", value, err) + return + } } n.deleteSvcRecords(name, ip, nil, true) diff --git a/libnetwork/agent.pb.go b/libnetwork/agent.pb.go index 7c0dd3f296..e6b8143743 100644 --- a/libnetwork/agent.pb.go +++ b/libnetwork/agent.pb.go @@ -39,7 +39,8 @@ type EndpointRecord struct { Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` ServiceID string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"` - EndpointIP string `protobuf:"bytes,4,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"` + VirtualIP string `protobuf:"bytes,4,opt,name=virtual_ip,json=virtualIp,proto3" json:"virtual_ip,omitempty"` + EndpointIP string `protobuf:"bytes,5,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"` } func (m *EndpointRecord) Reset() { *m = EndpointRecord{} } @@ -53,11 +54,12 @@ func (this *EndpointRecord) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 8) + s := make([]string, 0, 9) s = append(s, "&libnetwork.EndpointRecord{") s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n") s = append(s, "ServiceID: "+fmt.Sprintf("%#v", this.ServiceID)+",\n") + s = append(s, "VirtualIP: "+fmt.Sprintf("%#v", this.VirtualIP)+",\n") s = append(s, "EndpointIP: "+fmt.Sprintf("%#v", this.EndpointIP)+",\n") s = append(s, "}") return strings.Join(s, "") @@ -120,9 +122,15 @@ func (m *EndpointRecord) MarshalTo(data []byte) (int, error) { i = encodeVarintAgent(data, i, uint64(len(m.ServiceID))) i += copy(data[i:], m.ServiceID) } - if len(m.EndpointIP) > 0 { + if len(m.VirtualIP) > 0 { data[i] = 0x22 i++ + i = encodeVarintAgent(data, i, uint64(len(m.VirtualIP))) + i += copy(data[i:], m.VirtualIP) + } + if len(m.EndpointIP) > 0 { + data[i] = 0x2a + i++ i = encodeVarintAgent(data, i, uint64(len(m.EndpointIP))) i += copy(data[i:], m.EndpointIP) } @@ -171,6 +179,10 @@ func (m *EndpointRecord) Size() (n int) { if l > 0 { n += 1 + l + sovAgent(uint64(l)) } + l = len(m.VirtualIP) + if l > 0 { + n += 1 + l + sovAgent(uint64(l)) + } l = len(m.EndpointIP) if l > 0 { n += 1 + l + sovAgent(uint64(l)) @@ -199,6 +211,7 @@ func (this *EndpointRecord) String() string { `Name:` + fmt.Sprintf("%v", this.Name) + `,`, `ServiceName:` + fmt.Sprintf("%v", this.ServiceName) + `,`, `ServiceID:` + fmt.Sprintf("%v", this.ServiceID) + `,`, + `VirtualIP:` + fmt.Sprintf("%v", this.VirtualIP) + `,`, `EndpointIP:` + fmt.Sprintf("%v", this.EndpointIP) + `,`, `}`, }, "") @@ -329,6 +342,35 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { m.ServiceID = string(data[iNdEx:postIndex]) iNdEx = postIndex case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field VirtualIP", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAgent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAgent + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.VirtualIP = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 5: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field EndpointIP", wireType) } @@ -484,18 +526,20 @@ var ( ) var fileDescriptorAgent = []byte{ - // 204 bytes of a gzipped FileDescriptorProto + // 228 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x4c, 0x4f, 0xcd, 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0xc9, 0x4c, 0xca, 0x4b, 0x2d, 0x29, 0xcf, 0x2f, 0xca, 0x96, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xeb, 0x83, 0x58, 0x10, 0x15, - 0x4a, 0xcb, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93, + 0x4a, 0x57, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93, 0xf3, 0x8b, 0x52, 0x84, 0x84, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0xc0, 0x6c, 0x21, 0x45, 0x2e, 0x9e, 0xe2, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0xd4, 0x78, 0xb0, 0x1c, 0x13, 0x58, 0x8e, 0x1b, 0x2a, 0xe6, 0x07, 0x52, 0xa2, 0xc3, 0xc5, 0x05, 0x53, 0x92, 0x99, 0x22, 0xc1, 0x0c, 0x52, 0xe0, 0xc4, 0xfb, 0xe8, 0x9e, 0x3c, 0x67, 0x30, 0x44, 0xd4, 0xd3, - 0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x45, 0x48, 0x9f, 0x8b, 0x3b, 0x15, 0x6a, 0x6d, 0x7c, 0x66, - 0x81, 0x04, 0x0b, 0x58, 0x39, 0x1f, 0x50, 0x39, 0x17, 0xcc, 0x35, 0x9e, 0x01, 0x41, 0x5c, 0x30, - 0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, - 0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07, 0x40, 0x9c, 0xc4, 0x06, 0xf6, 0x89, 0x31, 0x20, - 0x00, 0x00, 0xff, 0xff, 0x94, 0x78, 0x3e, 0xce, 0xfa, 0x00, 0x00, 0x00, + 0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x05, 0xa4, 0xba, 0x2c, 0xb3, 0xa8, 0xa4, 0x34, 0x31, 0x27, + 0x3e, 0xb3, 0x40, 0x82, 0x05, 0xa1, 0x3a, 0x0c, 0x22, 0xea, 0x19, 0x10, 0xc4, 0x09, 0x55, 0xe0, + 0x59, 0x20, 0xa4, 0xcf, 0xc5, 0x9d, 0x0a, 0x75, 0x24, 0x48, 0x39, 0x2b, 0x58, 0x39, 0x1f, 0x50, + 0x39, 0x17, 0xcc, 0xed, 0x40, 0xf5, 0x5c, 0x30, 0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca, + 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07, + 0x40, 0x9c, 0xc4, 0x06, 0xf6, 0xb7, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xae, 0x11, 0xc5, 0x8d, + 0x28, 0x01, 0x00, 0x00, } diff --git a/libnetwork/agent.proto b/libnetwork/agent.proto index 0d99285c2b..0dbc1aaab5 100644 --- a/libnetwork/agent.proto +++ b/libnetwork/agent.proto @@ -15,5 +15,6 @@ message EndpointRecord { string name = 1; string service_name = 2; string service_id = 3 [(gogoproto.customname) = "ServiceID"]; - string endpoint_ip = 4 [(gogoproto.customname) = "EndpointIP"]; + string virtual_ip = 4 [(gogoproto.customname) = "VirtualIP"]; + string endpoint_ip = 5 [(gogoproto.customname) = "EndpointIP"]; } \ No newline at end of file diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 5335945690..c4c6a0e1dc 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -69,6 +69,7 @@ type endpoint struct { myAliases []string svcID string svcName string + virtualIP net.IP dbIndex uint64 dbExists bool sync.Mutex @@ -93,6 +94,7 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) { epMap["myAliases"] = ep.myAliases epMap["svcName"] = ep.svcName epMap["svcID"] = ep.svcID + epMap["virtualIP"] = ep.virtualIP.String() return json.Marshal(epMap) } @@ -186,6 +188,10 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { ep.svcID = si.(string) } + if vip, ok := epMap["virtualIP"]; ok { + ep.virtualIP = net.ParseIP(vip.(string)) + } + ma, _ := json.Marshal(epMap["myAliases"]) var myAliases []string json.Unmarshal(ma, &myAliases) @@ -212,6 +218,7 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error { dstEp.disableResolution = ep.disableResolution dstEp.svcName = ep.svcName dstEp.svcID = ep.svcID + dstEp.virtualIP = ep.virtualIP if ep.iface != nil { dstEp.iface = &endpointInterface{} @@ -892,10 +899,11 @@ func CreateOptionAlias(name string, alias string) EndpointOption { } // CreateOptionService function returns an option setter for setting service binding configuration -func CreateOptionService(name, id string) EndpointOption { +func CreateOptionService(name, id string, vip net.IP) EndpointOption { return func(ep *endpoint) { ep.svcName = name ep.svcID = id + ep.virtualIP = vip } } diff --git a/libnetwork/ipvs/ipvs_test.go b/libnetwork/ipvs/ipvs_test.go index e0d110dd14..d9d955a9b6 100644 --- a/libnetwork/ipvs/ipvs_test.go +++ b/libnetwork/ipvs/ipvs_test.go @@ -218,7 +218,7 @@ func TestDestination(t *testing.T) { i, err := New("") require.NoError(t, err) - for _, protocol := range []string{"TCP"} { + for _, protocol := range protocols { var serviceAddress string s := Service{ diff --git a/libnetwork/network.go b/libnetwork/network.go index b47b56e012..7cad9f0dc7 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -77,6 +77,19 @@ type svcInfo struct { service map[string][]servicePorts } +// backing container or host's info +type serviceTarget struct { + name string + ip net.IP + port uint16 +} + +type servicePorts struct { + portName string + proto string + target []serviceTarget +} + // IpamConf contains all the ipam related configurations for a network type IpamConf struct { // The master address pool for containers and network interfaces diff --git a/libnetwork/sandbox.go b/libnetwork/sandbox.go index 1635e6193c..36cbac6627 100644 --- a/libnetwork/sandbox.go +++ b/libnetwork/sandbox.go @@ -745,6 +745,12 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error { } } + // Populate load balancer only after updating all the other + // information including gateway and other routes so that + // loadbalancers are populated all the network state is in + // place in the sandbox. + sb.populateLoadbalancers(ep) + // Only update the store if we did not come here as part of // sandbox delete. If we came here as part of delete then do // not bother updating the store. The sandbox object will be diff --git a/libnetwork/service.go b/libnetwork/service.go index 0731235c40..4f90fab1de 100644 --- a/libnetwork/service.go +++ b/libnetwork/service.go @@ -1,93 +1,32 @@ package libnetwork -import "net" +import ( + "net" + "sync" +) -// backing container or host's info -type serviceTarget struct { - name string - ip net.IP - port uint16 -} - -type servicePorts struct { - portName string - proto string - target []serviceTarget -} +var ( + // A global monotonic counter to assign firewall marks to + // services. + fwMarkCtr uint32 = 256 + fwMarkCtrMu sync.Mutex +) type service struct { - name string - id string - backEnds map[string]map[string]net.IP + name string // Service Name + id string // Service ID + + // Map of loadbalancers for the service one-per attached + // network. It is keyed with network ID. + loadBalancers map[string]*loadBalancer + sync.Mutex } -func newService(name string, id string) *service { - return &service{ - name: name, - id: id, - backEnds: make(map[string]map[string]net.IP), - } -} - -func (c *controller) addServiceBinding(name, sid, nid, eid string, ip net.IP) error { - var s *service - - n, err := c.NetworkByID(nid) - if err != nil { - return err - } - - c.Lock() - s, ok := c.serviceBindings[sid] - if !ok { - s = newService(name, sid) - } - - netBackEnds, ok := s.backEnds[nid] - if !ok { - netBackEnds = make(map[string]net.IP) - s.backEnds[nid] = netBackEnds - } - - netBackEnds[eid] = ip - c.serviceBindings[sid] = s - c.Unlock() - - n.(*network).addSvcRecords(name, ip, nil, false) - return nil -} - -func (c *controller) rmServiceBinding(name, sid, nid, eid string, ip net.IP) error { - n, err := c.NetworkByID(nid) - if err != nil { - return err - } - - c.Lock() - s, ok := c.serviceBindings[sid] - if !ok { - c.Unlock() - return nil - } - - netBackEnds, ok := s.backEnds[nid] - if !ok { - c.Unlock() - return nil - } - - delete(netBackEnds, eid) - - if len(netBackEnds) == 0 { - delete(s.backEnds, nid) - } - - if len(s.backEnds) == 0 { - delete(c.serviceBindings, sid) - } - c.Unlock() - - n.(*network).deleteSvcRecords(name, ip, nil, false) - - return err +type loadBalancer struct { + vip net.IP + fwMark uint32 + + // Map of backend IPs backing this loadbalancer on this + // network. It is keyed with endpoint ID. + backEnds map[string]net.IP } diff --git a/libnetwork/service_linux.go b/libnetwork/service_linux.go new file mode 100644 index 0000000000..be163d268d --- /dev/null +++ b/libnetwork/service_linux.go @@ -0,0 +1,360 @@ +package libnetwork + +import ( + "fmt" + "net" + "os" + "os/exec" + "runtime" + "strconv" + "strings" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/reexec" + "github.com/docker/libnetwork/iptables" + "github.com/docker/libnetwork/ipvs" + "github.com/vishvananda/netlink/nl" + "github.com/vishvananda/netns" +) + +func init() { + reexec.Register("fwmarker", fwMarker) +} + +func newService(name string, id string) *service { + return &service{ + name: name, + id: id, + loadBalancers: make(map[string]*loadBalancer), + } +} + +func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error { + var ( + s *service + addService bool + ) + + n, err := c.NetworkByID(nid) + if err != nil { + return err + } + + c.Lock() + s, ok := c.serviceBindings[sid] + if !ok { + // Create a new service if we are seeing this service + // for the first time. + s = newService(name, sid) + c.serviceBindings[sid] = s + } + c.Unlock() + + s.Lock() + lb, ok := s.loadBalancers[nid] + if !ok { + // Create a new load balancer if we are seeing this + // network attachment on the service for the first + // time. + lb = &loadBalancer{ + vip: vip, + fwMark: fwMarkCtr, + backEnds: make(map[string]net.IP), + } + + fwMarkCtrMu.Lock() + fwMarkCtr++ + fwMarkCtrMu.Unlock() + + s.loadBalancers[nid] = lb + + // Since we just created this load balancer make sure + // we add a new service service in IPVS rules. + addService = true + + // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR + svcIP := vip + if len(svcIP) == 0 { + svcIP = ip + } + + n.(*network).addSvcRecords(name, svcIP, nil, false) + } + + lb.backEnds[eid] = ip + s.Unlock() + + // Add endpoint IP to special "tasks.svc_name" so that the + // applications have access to DNS RR. + n.(*network).addSvcRecords("tasks."+name, ip, nil, false) + + // Add loadbalancer service and backend in all sandboxes in + // the network only if vip is valid. + if len(vip) != 0 { + n.(*network).addLBBackend(ip, vip, lb.fwMark, addService) + } + + return nil +} + +func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error { + var rmService bool + + n, err := c.NetworkByID(nid) + if err != nil { + return err + } + + c.Lock() + s, ok := c.serviceBindings[sid] + if !ok { + c.Unlock() + return nil + } + c.Unlock() + + s.Lock() + lb, ok := s.loadBalancers[nid] + if !ok { + s.Unlock() + return nil + } + + // Delete the special "tasks.svc_name" backend record. + n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false) + delete(lb.backEnds, eid) + + if len(lb.backEnds) == 0 { + // All the backends for this service have been + // removed. Time to remove the load balancer and also + // remove the service entry in IPVS. + rmService = true + + // Make sure to remove the right IP since if vip is + // not valid we would have added a DNS RR record. + svcIP := vip + if len(svcIP) == 0 { + svcIP = ip + } + + n.(*network).deleteSvcRecords(name, svcIP, nil, false) + delete(s.loadBalancers, nid) + } + + if len(s.loadBalancers) == 0 { + // All loadbalancers for the service removed. Time to + // remove the service itself. + delete(c.serviceBindings, sid) + } + s.Unlock() + + // Remove loadbalancer service(if needed) and backend in all + // sandboxes in the network only if the vip is valid. + if len(vip) != 0 { + n.(*network).rmLBBackend(ip, vip, lb.fwMark, rmService) + } + + return nil +} + +// Get all loadbalancers on this network that is currently discovered +// on this node.. +func (n *network) connectedLoadbalancers() []*loadBalancer { + c := n.getController() + + c.Lock() + defer c.Unlock() + + var lbs []*loadBalancer + for _, s := range c.serviceBindings { + if lb, ok := s.loadBalancers[n.ID()]; ok { + lbs = append(lbs, lb) + } + } + + return lbs +} + +// Populate all loadbalancers on the network that the passed endpoint +// belongs to, into this sandbox. +func (sb *sandbox) populateLoadbalancers(ep *endpoint) { + n := ep.getNetwork() + for _, lb := range n.connectedLoadbalancers() { + // Skip if vip is not valid. + if len(lb.vip) == 0 { + continue + } + + addService := true + for _, ip := range lb.backEnds { + sb.addLBBackend(ip, lb.vip, lb.fwMark, addService) + addService = false + } + } +} + +// Add loadbalancer backend to all sandboxes which has a connection to +// this network. If needed add the service as well, as specified by +// the addService bool. +func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) { + n.WalkEndpoints(func(e Endpoint) bool { + ep := e.(*endpoint) + if sb, ok := ep.getSandbox(); ok { + sb.addLBBackend(ip, vip, fwMark, addService) + } + + return false + }) +} + +// Remove loadbalancer backend from all sandboxes which has a +// connection to this network. If needed remove the service entry as +// well, as specified by the rmService bool. +func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) { + n.WalkEndpoints(func(e Endpoint) bool { + ep := e.(*endpoint) + if sb, ok := ep.getSandbox(); ok { + sb.rmLBBackend(ip, vip, fwMark, rmService) + } + + return false + }) +} + +// Add loadbalancer backend into one connected sandbox. +func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) { + i, err := ipvs.New(sb.Key()) + if err != nil { + logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err) + return + } + defer i.Close() + + s := &ipvs.Service{ + AddressFamily: nl.FAMILY_V4, + FWMark: fwMark, + SchedName: ipvs.RoundRobin, + } + + if addService { + logrus.Debugf("Creating service for vip %s fwMark %d", vip, fwMark) + if err := invokeFWMarker(sb.Key(), vip, fwMark, false); err != nil { + logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) + return + } + + if err := i.NewService(s); err != nil { + logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err) + return + } + } + + d := &ipvs.Destination{ + AddressFamily: nl.FAMILY_V4, + Address: ip, + Weight: 1, + } + + // Remove the sched name before using the service to add + // destination. + s.SchedName = "" + if err := i.NewDestination(s, d); err != nil { + logrus.Errorf("Failed to create real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err) + } +} + +// Remove loadbalancer backend from one connected sandbox. +func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) { + i, err := ipvs.New(sb.Key()) + if err != nil { + logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err) + return + } + defer i.Close() + + s := &ipvs.Service{ + AddressFamily: nl.FAMILY_V4, + FWMark: fwMark, + } + + d := &ipvs.Destination{ + AddressFamily: nl.FAMILY_V4, + Address: ip, + Weight: 1, + } + + if err := i.DelDestination(s, d); err != nil { + logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err) + return + } + + if rmService { + s.SchedName = ipvs.RoundRobin + if err := i.DelService(s); err != nil { + logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err) + return + } + + if err := invokeFWMarker(sb.Key(), vip, fwMark, true); err != nil { + logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) + return + } + } +} + +// Invoke fwmarker reexec routine to mark vip destined packets with +// the passed firewall mark. +func invokeFWMarker(path string, vip net.IP, fwMark uint32, isDelete bool) error { + addDelOpt := "-A" + if isDelete { + addDelOpt = "-D" + } + + cmd := &exec.Cmd{ + Path: reexec.Self(), + Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt), + Stdout: os.Stdout, + Stderr: os.Stderr, + } + if err := cmd.Run(); err != nil { + return fmt.Errorf("reexec failed: %v", err) + } + return nil +} + +// Firewall marker reexec function. +func fwMarker() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + if len(os.Args) < 5 { + logrus.Error("invalid number of arguments..") + os.Exit(1) + } + + vip := os.Args[2] + fwMark, err := strconv.ParseUint(os.Args[3], 10, 32) + if err != nil { + logrus.Errorf("bad fwmark value(%s) passed: %v", os.Args[3], err) + os.Exit(2) + } + addDelOpt := os.Args[4] + + ns, err := netns.GetFromPath(os.Args[1]) + if err != nil { + logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err) + os.Exit(3) + } + defer ns.Close() + + if err := netns.Set(ns); err != nil { + logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err) + os.Exit(4) + } + + rule := strings.Fields(fmt.Sprintf("-t mangle %s OUTPUT -d %s/32 -j MARK --set-mark %d", addDelOpt, vip, fwMark)) + if err := iptables.RawCombinedOutputNative(rule...); err != nil { + logrus.Errorf("setting up rule failed, %v: %v", rule, err) + os.Exit(5) + } +} diff --git a/libnetwork/service_unsupported.go b/libnetwork/service_unsupported.go new file mode 100644 index 0000000000..af0347271f --- /dev/null +++ b/libnetwork/service_unsupported.go @@ -0,0 +1,19 @@ +// +build !linux + +package libnetwork + +import ( + "fmt" + "net" +) + +func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error { + return fmt.Errorf("not supported") +} + +func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error { + return fmt.Errorf("not supported") +} + +func (sb *sandbox) populateLoadbalancers(ep *endpoint) { +}