Service discovery race on serviceBindings delete. Bug on IP reuse (#1808)

* Correct SetMatrix documentation

The SetMatrix is a generic data structure, so the description
should not be tight to any specific use

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* Service Discovery reuse name and serviceBindings deletion

- Added logic to handle name reuse from different services
- Moved the deletion from the serviceBindings map at the end
  of the rmServiceBindings body to avoid race with new services

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* Avoid race on network cleanup

Use the locker to avoid the race between the network
deletion and new endpoints being created

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* CleanupServiceBindings to clean the SD records

Allow the cleanupServicebindings to take care of the service discovery
cleanup. Also avoid to trigger the cleanup for each endpoint from an SD
point of view
LB and SD will be separated in the future

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* Addressed comments

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

* NetworkDB deleteEntry has to happen

If there is an error locally guarantee that the delete entry
on network DB is still honored

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-06-18 05:25:58 -07:00 committed by Madhu Venugopal
parent 0dd3fd69a1
commit f969f26966
7 changed files with 314 additions and 143 deletions

View File

@ -648,13 +648,13 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
TaskAliases: ep.myAliases, TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(), EndpointIP: ep.Iface().Address().IP.String(),
}) })
if err != nil { if err != nil {
return err return err
} }
if agent != nil { if agent != nil {
if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil { if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
return err return err
} }
} }
@ -686,6 +686,13 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
name = ep.MyAliases()[0] name = ep.MyAliases()[0]
} }
if agent != nil {
// First delete from networkDB then locally
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
}
}
if ep.Iface().Address() != nil { if ep.Iface().Address() != nil {
if ep.svcID != "" { if ep.svcID != "" {
// This is a task part of a service // This is a task part of a service
@ -693,7 +700,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if n.ingress { if n.ingress {
ingressPorts = ep.ingressPorts ingressPorts = ep.ingressPorts
} }
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil { if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
return err return err
} }
} else { } else {
@ -704,12 +711,6 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
} }
} }
if agent != nil {
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
return err
}
}
logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID()) logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
return nil return nil
@ -900,7 +901,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec) logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" { if svcID != "" {
// This is a remote task part of a service // This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err) logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
return return
} }

View File

@ -10,24 +10,26 @@ import (
type SetMatrix interface { type SetMatrix interface {
// Get returns the members of the set for a specific key as a slice. // Get returns the members of the set for a specific key as a slice.
Get(key string) ([]interface{}, bool) Get(key string) ([]interface{}, bool)
// Contains is used to verify is an element is in a set for a specific key // Contains is used to verify if an element is in a set for a specific key
// returns true if the element is in the set // returns true if the element is in the set
// returns true if there is a set for the key // returns true if there is a set for the key
Contains(key string, value interface{}) (bool, bool) Contains(key string, value interface{}) (bool, bool)
// Insert inserts the mapping between the IP and the endpoint identifier // Insert inserts the value in the set of a key
// returns true if the mapping was not present, false otherwise // returns true if the value is inserted (was not already in the set), false otherwise
// returns also the number of endpoints associated to the IP // returns also the length of the set for the key
Insert(key string, value interface{}) (bool, int) Insert(key string, value interface{}) (bool, int)
// Remove removes the mapping between the IP and the endpoint identifier // Remove removes the value in the set for a specific key
// returns true if the mapping was deleted, false otherwise // returns true if the value is deleted, false otherwise
// returns also the number of endpoints associated to the IP // returns also the length of the set for the key
Remove(key string, value interface{}) (bool, int) Remove(key string, value interface{}) (bool, int)
// Cardinality returns the number of elements in the set of a specific key // Cardinality returns the number of elements in the set for a key
// returns false if the key is not in the map // returns false if the set is not present
Cardinality(key string) (int, bool) Cardinality(key string) (int, bool)
// String returns the string version of the set, empty otherwise // String returns the string version of the set, empty otherwise
// returns false if the key is not in the map // returns false if the set is not present
String(key string) (string, bool) String(key string) (string, bool)
// Returns all the keys in the map
Keys() []string
} }
type setMatrix struct { type setMatrix struct {
@ -121,3 +123,13 @@ func (s *setMatrix) String(key string) (string, bool) {
} }
return set.String(), ok return set.String(), ok
} }
func (s *setMatrix) Keys() []string {
s.Lock()
defer s.Unlock()
keys := make([]string, 0, len(s.matrix))
for k := range s.matrix {
keys = append(keys, k)
}
return keys
}

View File

@ -13,6 +13,7 @@ import (
"github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/testutils" "github.com/docker/libnetwork/testutils"
"github.com/docker/libnetwork/types" "github.com/docker/libnetwork/types"
) )
@ -382,8 +383,8 @@ func TestSRVServiceQuery(t *testing.T) {
} }
sr := svcInfo{ sr := svcInfo{
svcMap: make(map[string][]net.IP), svcMap: common.NewSetMatrix(),
svcIPv6Map: make(map[string][]net.IP), svcIPv6Map: common.NewSetMatrix(),
ipMap: common.NewSetMatrix(), ipMap: common.NewSetMatrix(),
service: make(map[string][]servicePorts), service: make(map[string][]servicePorts),
} }
@ -440,6 +441,119 @@ func TestSRVServiceQuery(t *testing.T) {
} }
} }
func TestServiceVIPReuse(t *testing.T) {
c, err := New()
if err != nil {
t.Fatal(err)
}
defer c.Stop()
n, err := c.NewNetwork("bridge", "net1", "", nil)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := n.Delete(); err != nil {
t.Fatal(err)
}
}()
ep, err := n.CreateEndpoint("testep")
if err != nil {
t.Fatal(err)
}
sb, err := c.NewSandbox("c1")
if err != nil {
t.Fatal(err)
}
defer func() {
if err := sb.Delete(); err != nil {
t.Fatal(err)
}
}()
err = ep.Join(sb)
if err != nil {
t.Fatal(err)
}
// Add 2 services with same name but different service ID to share the same VIP
n.(*network).addSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
n.(*network).addSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipToResolve := netutils.ReverseIP("192.168.0.1")
ipList, _ := n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
}
if len(ipList) != 1 {
t.Fatal("It must return only 1 VIP")
}
if ipList[0].String() != "192.168.0.1" {
t.Fatal("The service VIP is 192.168.0.1")
}
name := n.(*network).ResolveIP(ipToResolve)
if name == "" {
t.Fatal("It must return a name")
}
if name != "service_test.net1" {
t.Fatalf("It must return the service_test.net1 != %s", name)
}
// Delete service record for one of the services, the IP should remain because one service is still associated with it
n.(*network).deleteSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
}
if len(ipList) != 1 {
t.Fatal("It must return only 1 VIP")
}
if ipList[0].String() != "192.168.0.1" {
t.Fatal("The service VIP is 192.168.0.1")
}
name = n.(*network).ResolveIP(ipToResolve)
if name == "" {
t.Fatal("It must return a name")
}
if name != "service_test.net1" {
t.Fatalf("It must return the service_test.net1 != %s", name)
}
// Delete again the service using the previous service ID, nothing should happen
n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) == 0 {
t.Fatal("There must be the VIP")
}
if len(ipList) != 1 {
t.Fatal("It must return only 1 VIP")
}
if ipList[0].String() != "192.168.0.1" {
t.Fatal("The service VIP is 192.168.0.1")
}
name = n.(*network).ResolveIP(ipToResolve)
if name == "" {
t.Fatal("It must return a name")
}
if name != "service_test.net1" {
t.Fatalf("It must return the service_test.net1 != %s", name)
}
// Delete now using the second service ID, now all the entries should be gone
n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test")
ipList, _ = n.(*network).ResolveName("service_test", types.IPv4)
if len(ipList) != 0 {
t.Fatal("All the VIPs should be gone now")
}
name = n.(*network).ResolveIP(ipToResolve)
if name != "" {
t.Fatalf("It must return empty no more services associated, instead:%s", name)
}
}
func TestIpamReleaseOnNetDriverFailures(t *testing.T) { func TestIpamReleaseOnNetDriverFailures(t *testing.T) {
if !testutils.IsRunningInContainer() { if !testutils.IsRunningInContainer() {
defer testutils.SetupTestOSContext(t)() defer testutils.SetupTestOSContext(t)()

View File

@ -92,12 +92,20 @@ type EndpointWalker func(ep Endpoint) bool
// Its an indication to defer PTR queries also to that external server. // Its an indication to defer PTR queries also to that external server.
type ipInfo struct { type ipInfo struct {
name string name string
serviceID string
extResolver bool extResolver bool
} }
// svcMapEntry is the body of the element into the svcMap
// The ip is a string because the SetMatrix does not accept non hashable values
type svcMapEntry struct {
ip string
serviceID string
}
type svcInfo struct { type svcInfo struct {
svcMap map[string][]net.IP svcMap common.SetMatrix
svcIPv6Map map[string][]net.IP svcIPv6Map common.SetMatrix
ipMap common.SetMatrix ipMap common.SetMatrix
service map[string][]servicePorts service map[string][]servicePorts
} }
@ -933,6 +941,9 @@ func (n *network) delete(force bool) error {
id := n.id id := n.id
n.Unlock() n.Unlock()
c.networkLocker.Lock(id)
defer c.networkLocker.Unlock(id)
n, err := c.getNetworkFromStore(id) n, err := c.getNetworkFromStore(id)
if err != nil { if err != nil {
return &UnknownNetworkError{name: name, id: id} return &UnknownNetworkError{name: name, id: id}
@ -991,12 +1002,6 @@ func (n *network) delete(force bool) error {
c.cleanupServiceBindings(n.ID()) c.cleanupServiceBindings(n.ID())
// The network had been left, the service discovery can be cleaned up
c.Lock()
logrus.Debugf("network %s delete, clean svcRecords", n.id)
delete(c.svcRecords, n.id)
c.Unlock()
removeFromStore: removeFromStore:
// deleteFromStore performs an atomic delete operation and the // deleteFromStore performs an atomic delete operation and the
// network.epCnt will help prevent any possible // network.epCnt will help prevent any possible
@ -1070,6 +1075,9 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}} ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}}
ep.id = stringid.GenerateRandomID() ep.id = stringid.GenerateRandomID()
n.ctrlr.networkLocker.Lock(n.id)
defer n.ctrlr.networkLocker.Unlock(n.id)
// Initialize ep.network with a possibly stale copy of n. We need this to get network from // Initialize ep.network with a possibly stale copy of n. We need this to get network from
// store. But once we get it from store we will have the most uptodate copy possibly. // store. But once we get it from store we will have the most uptodate copy possibly.
ep.network = n ep.network = n
@ -1228,75 +1236,77 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
ipv6 = iface.AddressIPv6().IP ipv6 = iface.AddressIPv6().IP
} }
serviceID := ep.svcID
if serviceID == "" {
serviceID = ep.ID()
}
if isAdd { if isAdd {
// If anonymous endpoint has an alias use the first alias // If anonymous endpoint has an alias use the first alias
// for ip->name mapping. Not having the reverse mapping // for ip->name mapping. Not having the reverse mapping
// breaks some apps // breaks some apps
if ep.isAnonymous() { if ep.isAnonymous() {
if len(myAliases) > 0 { if len(myAliases) > 0 {
n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") n.addSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
} }
} else { } else {
n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") n.addSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
} }
for _, alias := range myAliases { for _, alias := range myAliases {
n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") n.addSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord")
} }
} else { } else {
if ep.isAnonymous() { if ep.isAnonymous() {
if len(myAliases) > 0 { if len(myAliases) > 0 {
n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") n.deleteSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
} }
} else { } else {
n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") n.deleteSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord")
} }
for _, alias := range myAliases { for _, alias := range myAliases {
n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") n.deleteSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord")
} }
} }
} }
} }
func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) { func addIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) {
reverseIP := netutils.ReverseIP(ip.String()) reverseIP := netutils.ReverseIP(ip.String())
ipMap.Insert(reverseIP, ipInfo{ ipMap.Insert(reverseIP, ipInfo{
name: name, name: name,
serviceID: serviceID,
}) })
} }
func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { func delIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) {
ipList := svcMap[name] reverseIP := netutils.ReverseIP(ip.String())
for _, ip := range ipList { ipMap.Remove(reverseIP, ipInfo{
if ip.Equal(epIP) { name: name,
return serviceID: serviceID,
} })
}
svcMap[name] = append(svcMap[name], epIP)
} }
func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { func addNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) {
ipList := svcMap[name] svcMap.Insert(name, svcMapEntry{
for i, ip := range ipList { ip: epIP.String(),
if ip.Equal(epIP) { serviceID: serviceID,
ipList = append(ipList[:i], ipList[i+1:]...) })
break
}
}
svcMap[name] = ipList
if len(ipList) == 0 {
delete(svcMap, name)
}
} }
func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { func delNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) {
svcMap.Remove(name, svcMapEntry{
ip: epIP.String(),
serviceID: serviceID,
})
}
func (n *network) addSvcRecords(eID, name, serviceID string, epIP, epIPv6 net.IP, ipMapUpdate bool, method string) {
// Do not add service names for ingress network as this is a // Do not add service names for ingress network as this is a
// routing only network // routing only network
if n.ingress { if n.ingress {
return return
} }
logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s sid:%s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID)
c := n.getController() c := n.getController()
c.Lock() c.Lock()
@ -1305,34 +1315,34 @@ func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ip
sr, ok := c.svcRecords[n.ID()] sr, ok := c.svcRecords[n.ID()]
if !ok { if !ok {
sr = svcInfo{ sr = svcInfo{
svcMap: make(map[string][]net.IP), svcMap: common.NewSetMatrix(),
svcIPv6Map: make(map[string][]net.IP), svcIPv6Map: common.NewSetMatrix(),
ipMap: common.NewSetMatrix(), ipMap: common.NewSetMatrix(),
} }
c.svcRecords[n.ID()] = sr c.svcRecords[n.ID()] = sr
} }
if ipMapUpdate { if ipMapUpdate {
addIPToName(sr.ipMap, name, epIP) addIPToName(sr.ipMap, name, serviceID, epIP)
if epIPv6 != nil { if epIPv6 != nil {
addIPToName(sr.ipMap, name, epIPv6) addIPToName(sr.ipMap, name, serviceID, epIPv6)
} }
} }
addNameToIP(sr.svcMap, name, epIP) addNameToIP(sr.svcMap, name, serviceID, epIP)
if epIPv6 != nil { if epIPv6 != nil {
addNameToIP(sr.svcIPv6Map, name, epIPv6) addNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6)
} }
} }
func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { func (n *network) deleteSvcRecords(eID, name, serviceID string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
// Do not delete service names from ingress network as this is a // Do not delete service names from ingress network as this is a
// routing only network // routing only network
if n.ingress { if n.ingress {
return return
} }
logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s sid:%s ", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID)
c := n.getController() c := n.getController()
c.Lock() c.Lock()
@ -1344,21 +1354,17 @@ func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP,
} }
if ipMapUpdate { if ipMapUpdate {
sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{ delIPToName(sr.ipMap, name, serviceID, epIP)
name: name,
})
if epIPv6 != nil { if epIPv6 != nil {
sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{ delIPToName(sr.ipMap, name, serviceID, epIPv6)
name: name,
})
} }
} }
delNameToIP(sr.svcMap, name, epIP) delNameToIP(sr.svcMap, name, serviceID, epIP)
if epIPv6 != nil { if epIPv6 != nil {
delNameToIP(sr.svcIPv6Map, name, epIPv6) delNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6)
} }
} }
@ -1376,19 +1382,31 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record {
n.ctrlr.Lock() n.ctrlr.Lock()
defer n.ctrlr.Unlock() defer n.ctrlr.Unlock()
sr, _ := n.ctrlr.svcRecords[n.id] sr, ok := n.ctrlr.svcRecords[n.id]
if !ok || sr.svcMap == nil {
return nil
}
for h, ip := range sr.svcMap { svcMapKeys := sr.svcMap.Keys()
if strings.Split(h, ".")[0] == epName { // Loop on service names on this network
for _, k := range svcMapKeys {
if strings.Split(k, ".")[0] == epName {
continue continue
} }
if len(ip) == 0 { // Get all the IPs associated to this service
logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.name, n.id) mapEntryList, ok := sr.svcMap.Get(k)
if !ok {
// The key got deleted
continue continue
} }
if len(mapEntryList) == 0 {
logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", k, n.name, n.id)
continue
}
recs = append(recs, etchosts.Record{ recs = append(recs, etchosts.Record{
Hosts: h, Hosts: k,
IP: ip[0].String(), IP: mapEntryList[0].(svcMapEntry).ip,
}) })
} }
@ -1845,8 +1863,7 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) {
} }
req = strings.TrimSuffix(req, ".") req = strings.TrimSuffix(req, ".")
var ip []net.IP ipSet, ok := sr.svcMap.Get(req)
ip, ok = sr.svcMap[req]
if ipType == types.IPv6 { if ipType == types.IPv6 {
// If the name resolved to v4 address then its a valid name in // If the name resolved to v4 address then its a valid name in
@ -1856,13 +1873,20 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) {
if ok && n.enableIPv6 == false { if ok && n.enableIPv6 == false {
ipv6Miss = true ipv6Miss = true
} }
ip = sr.svcIPv6Map[req] ipSet, ok = sr.svcIPv6Map.Get(req)
} }
if ip != nil { if ok && len(ipSet) > 0 {
ipLocal := make([]net.IP, len(ip)) // this map is to avoid IP duplicates, this can happen during a transition period where 2 services are using the same IP
copy(ipLocal, ip) noDup := make(map[string]bool)
return ipLocal, false var ipLocal []net.IP
for _, ip := range ipSet {
if _, dup := noDup[ip.(svcMapEntry).ip]; !dup {
noDup[ip.(svcMapEntry).ip] = true
ipLocal = append(ipLocal, net.ParseIP(ip.(svcMapEntry).ip))
}
}
return ipLocal, ok
} }
return nil, ipv6Miss return nil, ipv6Miss

View File

@ -85,14 +85,8 @@ type loadBalancer struct {
// Map of backend IPs backing this loadbalancer on this // Map of backend IPs backing this loadbalancer on this
// network. It is keyed with endpoint ID. // network. It is keyed with endpoint ID.
backEnds map[string]loadBalancerBackend backEnds map[string]net.IP
// Back pointer to service to which the loadbalancer belongs. // Back pointer to service to which the loadbalancer belongs.
service *service service *service
} }
type loadBalancerBackend struct {
ip net.IP
containerName string
taskAliases []string
}

View File

@ -15,29 +15,35 @@ func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, contain
return err return err
} }
logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService) logrus.Debugf("addEndpointNameResolution %s %s add_service:%t sAliases:%v tAliases:%v", eID, svcName, addService, serviceAliases, taskAliases)
// Add container resolution mappings // Add container resolution mappings
c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
serviceID := svcID
if serviceID == "" {
// This is the case of a normal container not part of a service
serviceID = eID
}
// Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR. // Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR.
n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) n.(*network).addSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases { for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method) n.(*network).addSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method)
} }
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
if len(vip) == 0 { if len(vip) == 0 {
n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method) n.(*network).addSvcRecords(eID, svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases { for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, alias, ip, nil, false, method) n.(*network).addSvcRecords(eID, alias, serviceID, ip, nil, false, method)
} }
} }
if addService && len(vip) != 0 { if addService && len(vip) != 0 {
n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method) n.(*network).addSvcRecords(eID, svcName, serviceID, vip, nil, false, method)
for _, alias := range serviceAliases { for _, alias := range serviceAliases {
n.(*network).addSvcRecords(eID, alias, vip, nil, false, method) n.(*network).addSvcRecords(eID, alias, serviceID, vip, nil, false, method)
} }
} }
@ -52,11 +58,11 @@ func (c *controller) addContainerNameResolution(nID, eID, containerName string,
logrus.Debugf("addContainerNameResolution %s %s", eID, containerName) logrus.Debugf("addContainerNameResolution %s %s", eID, containerName)
// Add resolution for container name // Add resolution for container name
n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method) n.(*network).addSvcRecords(eID, containerName, eID, ip, nil, true, method)
// Add resolution for taskaliases // Add resolution for taskaliases
for _, alias := range taskAliases { for _, alias := range taskAliases {
n.(*network).addSvcRecords(eID, alias, ip, nil, true, method) n.(*network).addSvcRecords(eID, alias, eID, ip, nil, true, method)
} }
return nil return nil
@ -68,32 +74,38 @@ func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, cont
return err return err
} }
logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries) logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t sAliases:%v tAliases:%v", eID, svcName, rmService, multipleEntries, serviceAliases, taskAliases)
// Delete container resolution mappings // Delete container resolution mappings
c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
serviceID := svcID
if serviceID == "" {
// This is the case of a normal container not part of a service
serviceID = eID
}
// Delete the special "tasks.svc_name" backend record. // Delete the special "tasks.svc_name" backend record.
if !multipleEntries { if !multipleEntries {
n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) n.(*network).deleteSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases { for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method) n.(*network).deleteSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method)
} }
} }
// If we are doing DNS RR delete the endpoint IP from DNS record right away. // If we are doing DNS RR delete the endpoint IP from DNS record right away.
if !multipleEntries && len(vip) == 0 { if !multipleEntries && len(vip) == 0 {
n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method) n.(*network).deleteSvcRecords(eID, svcName, serviceID, ip, nil, false, method)
for _, alias := range serviceAliases { for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method) n.(*network).deleteSvcRecords(eID, alias, serviceID, ip, nil, false, method)
} }
} }
// Remove the DNS record for VIP only if we are removing the service // Remove the DNS record for VIP only if we are removing the service
if rmService && len(vip) != 0 && !multipleEntries { if rmService && len(vip) != 0 && !multipleEntries {
n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method) n.(*network).deleteSvcRecords(eID, svcName, serviceID, vip, nil, false, method)
for _, alias := range serviceAliases { for _, alias := range serviceAliases {
n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method) n.(*network).deleteSvcRecords(eID, alias, serviceID, vip, nil, false, method)
} }
} }
@ -108,11 +120,11 @@ func (c *controller) delContainerNameResolution(nID, eID, containerName string,
logrus.Debugf("delContainerNameResolution %s %s", eID, containerName) logrus.Debugf("delContainerNameResolution %s %s", eID, containerName)
// Delete resolution for container name // Delete resolution for container name
n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method) n.(*network).deleteSvcRecords(eID, containerName, eID, ip, nil, true, method)
// Delete resolution for taskaliases // Delete resolution for taskaliases
for _, alias := range taskAliases { for _, alias := range taskAliases {
n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method) n.(*network).deleteSvcRecords(eID, alias, eID, ip, nil, true, method)
} }
return nil return nil
@ -152,6 +164,7 @@ func (c *controller) getLBIndex(sid, nid string, ingressPorts []*PortConfig) int
func (c *controller) cleanupServiceBindings(cleanupNID string) { func (c *controller) cleanupServiceBindings(cleanupNID string) {
var cleanupFuncs []func() var cleanupFuncs []func()
logrus.Debugf("cleanupServiceBindings for %s", cleanupNID)
c.Lock() c.Lock()
services := make([]*service, 0, len(c.serviceBindings)) services := make([]*service, 0, len(c.serviceBindings))
for _, s := range c.serviceBindings { for _, s := range c.serviceBindings {
@ -171,16 +184,27 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
continue continue
} }
for eid, be := range lb.backEnds { // The network is being deleted, erase all the associated service discovery records
// TODO(fcrisciani) separate the Load Balancer from the Service discovery, this operation
// can be done safely here, but the rmServiceBinding is still keeping consistency in the
// data structures that are tracking the endpoint to IP mapping.
c.Lock()
logrus.Debugf("cleanupServiceBindings erasing the svcRecords for %s", nid)
delete(c.svcRecords, nid)
c.Unlock()
for eid, ip := range lb.backEnds {
epID := eid
epIP := ip
service := s service := s
loadBalancer := lb loadBalancer := lb
networkID := nid networkID := nid
epID := eid
epIP := be.ip
cleanupFuncs = append(cleanupFuncs, func() { cleanupFuncs = append(cleanupFuncs, func() {
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip, // ContainerName and taskAliases are not available here, this is still fine because the Service discovery
service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil { // cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load
// Balancer bookeeping, is to keep consistent the mapping of endpoint to IP.
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, "", loadBalancer.vip,
service.ingressPorts, service.aliases, []string{}, epIP, "cleanupServiceBindings", false); err != nil {
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
service.id, networkID, epID, err) service.id, networkID, epID, err)
} }
@ -228,8 +252,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
} }
s.Unlock() s.Unlock()
} }
logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID) logrus.Debugf("addServiceBinding from %s START for %s %s p:%p nid:%s skey:%v", method, svcName, eID, s, nID, skey)
defer s.Unlock() defer s.Unlock()
lb, ok := s.loadBalancers[nID] lb, ok := s.loadBalancers[nID]
@ -242,7 +265,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
lb = &loadBalancer{ lb = &loadBalancer{
vip: vip, vip: vip,
fwMark: fwMarkCtr, fwMark: fwMarkCtr,
backEnds: make(map[string]loadBalancerBackend), backEnds: make(map[string]net.IP),
service: s, service: s,
} }
@ -253,9 +276,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
addService = true addService = true
} }
lb.backEnds[eID] = loadBalancerBackend{ip: ip, lb.backEnds[eID] = ip
containerName: containerName,
taskAliases: taskAliases}
ok, entries := s.assignIPToEndpoint(ip.String(), eID) ok, entries := s.assignIPToEndpoint(ip.String(), eID)
if !ok || entries > 1 { if !ok || entries > 1 {
@ -277,7 +298,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
return nil return nil
} }
func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error { func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool) error {
var rmService bool var rmService bool
@ -294,7 +315,6 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
c.Lock() c.Lock()
s, ok := c.serviceBindings[skey] s, ok := c.serviceBindings[skey]
c.Unlock() c.Unlock()
logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID)
if !ok { if !ok {
logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID) logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
return nil return nil
@ -302,6 +322,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
logrus.Debugf("rmServiceBinding from %s START for %s %s p:%p nid:%s sKey:%v deleteSvc:%t", method, svcName, eID, s, nID, skey, deleteSvcRecords)
lb, ok := s.loadBalancers[nID] lb, ok := s.loadBalancers[nID]
if !ok { if !ok {
logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID) logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID)
@ -322,17 +343,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
rmService = true rmService = true
delete(s.loadBalancers, nID) delete(s.loadBalancers, nID)
} logrus.Debugf("rmServiceBinding %s delete %s, p:%p in loadbalancers len:%d", eID, nID, lb, len(s.loadBalancers))
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
c.Lock()
// Mark the object as deleted so that the add won't use it wrongly
s.deleted = true
delete(c.serviceBindings, skey)
c.Unlock()
} }
ok, entries := s.removeIPToEndpoint(ip.String(), eID) ok, entries := s.removeIPToEndpoint(ip.String(), eID)
@ -348,7 +359,22 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
} }
// Delete the name resolutions // Delete the name resolutions
c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") if deleteSvcRecords {
c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding")
}
if len(s.loadBalancers) == 0 {
// All loadbalancers for the service removed. Time to
// remove the service itself.
c.Lock()
// Mark the object as deleted so that the add won't use it wrongly
s.deleted = true
// NOTE The delete from the serviceBindings map has to be the last operation else we are allowing a race between this service
// that is getting deleted and a new service that will be created if the entry is not anymore there
delete(c.serviceBindings, skey)
c.Unlock()
}
logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID) logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
return nil return nil

View File

@ -102,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
} }
lb.service.Lock() lb.service.Lock()
for _, l := range lb.backEnds { for _, ip := range lb.backEnds {
sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
} }
lb.service.Unlock() lb.service.Unlock()
} }