From f969f26966cac9dffb38e5f3aad72e3846c152d8 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Sun, 18 Jun 2017 05:25:58 -0700 Subject: [PATCH] Service discovery race on serviceBindings delete. Bug on IP reuse (#1808) * Correct SetMatrix documentation The SetMatrix is a generic data structure, so the description should not be tight to any specific use Signed-off-by: Flavio Crisciani * Service Discovery reuse name and serviceBindings deletion - Added logic to handle name reuse from different services - Moved the deletion from the serviceBindings map at the end of the rmServiceBindings body to avoid race with new services Signed-off-by: Flavio Crisciani * Avoid race on network cleanup Use the locker to avoid the race between the network deletion and new endpoints being created Signed-off-by: Flavio Crisciani * CleanupServiceBindings to clean the SD records Allow the cleanupServicebindings to take care of the service discovery cleanup. Also avoid to trigger the cleanup for each endpoint from an SD point of view LB and SD will be separated in the future Signed-off-by: Flavio Crisciani * Addressed comments Signed-off-by: Flavio Crisciani * NetworkDB deleteEntry has to happen If there is an error locally guarantee that the delete entry on network DB is still honored Signed-off-by: Flavio Crisciani --- libnetwork/agent.go | 19 +-- libnetwork/common/setmatrix.go | 32 +++-- libnetwork/libnetwork_internal_test.go | 118 +++++++++++++++++- libnetwork/network.go | 162 ++++++++++++++----------- libnetwork/service.go | 8 +- libnetwork/service_common.go | 114 ++++++++++------- libnetwork/service_linux.go | 4 +- 7 files changed, 314 insertions(+), 143 deletions(-) diff --git a/libnetwork/agent.go b/libnetwork/agent.go index a7b6911b31..a45a569500 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -648,13 +648,13 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error { TaskAliases: ep.myAliases, EndpointIP: ep.Iface().Address().IP.String(), }) - if err != nil { return err } if agent != nil { if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil { + logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err) return err } } @@ -686,6 +686,13 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err name = ep.MyAliases()[0] } + if agent != nil { + // First delete from networkDB then locally + if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil { + logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err) + } + } + if ep.Iface().Address() != nil { if ep.svcID != "" { // This is a task part of a service @@ -693,7 +700,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err if n.ingress { ingressPorts = ep.ingressPorts } - if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil { + if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil { return err } } else { @@ -704,12 +711,6 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err } } - if agent != nil { - if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil { - return err - } - } - logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID()) return nil @@ -900,7 +901,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) { logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service - if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { + if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil { logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err) return } diff --git a/libnetwork/common/setmatrix.go b/libnetwork/common/setmatrix.go index 52c0d1f908..72be5bbbfc 100644 --- a/libnetwork/common/setmatrix.go +++ b/libnetwork/common/setmatrix.go @@ -10,24 +10,26 @@ import ( type SetMatrix interface { // Get returns the members of the set for a specific key as a slice. Get(key string) ([]interface{}, bool) - // Contains is used to verify is an element is in a set for a specific key + // Contains is used to verify if an element is in a set for a specific key // returns true if the element is in the set // returns true if there is a set for the key Contains(key string, value interface{}) (bool, bool) - // Insert inserts the mapping between the IP and the endpoint identifier - // returns true if the mapping was not present, false otherwise - // returns also the number of endpoints associated to the IP + // Insert inserts the value in the set of a key + // returns true if the value is inserted (was not already in the set), false otherwise + // returns also the length of the set for the key Insert(key string, value interface{}) (bool, int) - // Remove removes the mapping between the IP and the endpoint identifier - // returns true if the mapping was deleted, false otherwise - // returns also the number of endpoints associated to the IP + // Remove removes the value in the set for a specific key + // returns true if the value is deleted, false otherwise + // returns also the length of the set for the key Remove(key string, value interface{}) (bool, int) - // Cardinality returns the number of elements in the set of a specific key - // returns false if the key is not in the map + // Cardinality returns the number of elements in the set for a key + // returns false if the set is not present Cardinality(key string) (int, bool) // String returns the string version of the set, empty otherwise - // returns false if the key is not in the map + // returns false if the set is not present String(key string) (string, bool) + // Returns all the keys in the map + Keys() []string } type setMatrix struct { @@ -121,3 +123,13 @@ func (s *setMatrix) String(key string) (string, bool) { } return set.String(), ok } + +func (s *setMatrix) Keys() []string { + s.Lock() + defer s.Unlock() + keys := make([]string, 0, len(s.matrix)) + for k := range s.matrix { + keys = append(keys, k) + } + return keys +} diff --git a/libnetwork/libnetwork_internal_test.go b/libnetwork/libnetwork_internal_test.go index f6cb188ae8..58742cf5e1 100644 --- a/libnetwork/libnetwork_internal_test.go +++ b/libnetwork/libnetwork_internal_test.go @@ -13,6 +13,7 @@ import ( "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/netutils" "github.com/docker/libnetwork/testutils" "github.com/docker/libnetwork/types" ) @@ -382,8 +383,8 @@ func TestSRVServiceQuery(t *testing.T) { } sr := svcInfo{ - svcMap: make(map[string][]net.IP), - svcIPv6Map: make(map[string][]net.IP), + svcMap: common.NewSetMatrix(), + svcIPv6Map: common.NewSetMatrix(), ipMap: common.NewSetMatrix(), service: make(map[string][]servicePorts), } @@ -440,6 +441,119 @@ func TestSRVServiceQuery(t *testing.T) { } } +func TestServiceVIPReuse(t *testing.T) { + c, err := New() + if err != nil { + t.Fatal(err) + } + defer c.Stop() + + n, err := c.NewNetwork("bridge", "net1", "", nil) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := n.Delete(); err != nil { + t.Fatal(err) + } + }() + + ep, err := n.CreateEndpoint("testep") + if err != nil { + t.Fatal(err) + } + + sb, err := c.NewSandbox("c1") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := sb.Delete(); err != nil { + t.Fatal(err) + } + }() + + err = ep.Join(sb) + if err != nil { + t.Fatal(err) + } + + // Add 2 services with same name but different service ID to share the same VIP + n.(*network).addSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + n.(*network).addSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + + ipToResolve := netutils.ReverseIP("192.168.0.1") + + ipList, _ := n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) == 0 { + t.Fatal("There must be the VIP") + } + if len(ipList) != 1 { + t.Fatal("It must return only 1 VIP") + } + if ipList[0].String() != "192.168.0.1" { + t.Fatal("The service VIP is 192.168.0.1") + } + name := n.(*network).ResolveIP(ipToResolve) + if name == "" { + t.Fatal("It must return a name") + } + if name != "service_test.net1" { + t.Fatalf("It must return the service_test.net1 != %s", name) + } + + // Delete service record for one of the services, the IP should remain because one service is still associated with it + n.(*network).deleteSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + ipList, _ = n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) == 0 { + t.Fatal("There must be the VIP") + } + if len(ipList) != 1 { + t.Fatal("It must return only 1 VIP") + } + if ipList[0].String() != "192.168.0.1" { + t.Fatal("The service VIP is 192.168.0.1") + } + name = n.(*network).ResolveIP(ipToResolve) + if name == "" { + t.Fatal("It must return a name") + } + if name != "service_test.net1" { + t.Fatalf("It must return the service_test.net1 != %s", name) + } + + // Delete again the service using the previous service ID, nothing should happen + n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + ipList, _ = n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) == 0 { + t.Fatal("There must be the VIP") + } + if len(ipList) != 1 { + t.Fatal("It must return only 1 VIP") + } + if ipList[0].String() != "192.168.0.1" { + t.Fatal("The service VIP is 192.168.0.1") + } + name = n.(*network).ResolveIP(ipToResolve) + if name == "" { + t.Fatal("It must return a name") + } + if name != "service_test.net1" { + t.Fatalf("It must return the service_test.net1 != %s", name) + } + + // Delete now using the second service ID, now all the entries should be gone + n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + ipList, _ = n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) != 0 { + t.Fatal("All the VIPs should be gone now") + } + name = n.(*network).ResolveIP(ipToResolve) + if name != "" { + t.Fatalf("It must return empty no more services associated, instead:%s", name) + } +} + func TestIpamReleaseOnNetDriverFailures(t *testing.T) { if !testutils.IsRunningInContainer() { defer testutils.SetupTestOSContext(t)() diff --git a/libnetwork/network.go b/libnetwork/network.go index 3798a53900..9f99064e11 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -92,12 +92,20 @@ type EndpointWalker func(ep Endpoint) bool // Its an indication to defer PTR queries also to that external server. type ipInfo struct { name string + serviceID string extResolver bool } +// svcMapEntry is the body of the element into the svcMap +// The ip is a string because the SetMatrix does not accept non hashable values +type svcMapEntry struct { + ip string + serviceID string +} + type svcInfo struct { - svcMap map[string][]net.IP - svcIPv6Map map[string][]net.IP + svcMap common.SetMatrix + svcIPv6Map common.SetMatrix ipMap common.SetMatrix service map[string][]servicePorts } @@ -933,6 +941,9 @@ func (n *network) delete(force bool) error { id := n.id n.Unlock() + c.networkLocker.Lock(id) + defer c.networkLocker.Unlock(id) + n, err := c.getNetworkFromStore(id) if err != nil { return &UnknownNetworkError{name: name, id: id} @@ -991,12 +1002,6 @@ func (n *network) delete(force bool) error { c.cleanupServiceBindings(n.ID()) - // The network had been left, the service discovery can be cleaned up - c.Lock() - logrus.Debugf("network %s delete, clean svcRecords", n.id) - delete(c.svcRecords, n.id) - c.Unlock() - removeFromStore: // deleteFromStore performs an atomic delete operation and the // network.epCnt will help prevent any possible @@ -1070,6 +1075,9 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}} ep.id = stringid.GenerateRandomID() + n.ctrlr.networkLocker.Lock(n.id) + defer n.ctrlr.networkLocker.Unlock(n.id) + // Initialize ep.network with a possibly stale copy of n. We need this to get network from // store. But once we get it from store we will have the most uptodate copy possibly. ep.network = n @@ -1228,75 +1236,77 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool ipv6 = iface.AddressIPv6().IP } + serviceID := ep.svcID + if serviceID == "" { + serviceID = ep.ID() + } if isAdd { // If anonymous endpoint has an alias use the first alias // for ip->name mapping. Not having the reverse mapping // breaks some apps if ep.isAnonymous() { if len(myAliases) > 0 { - n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") + n.addSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } } else { - n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") + n.addSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } for _, alias := range myAliases { - n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") + n.addSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord") } } else { if ep.isAnonymous() { if len(myAliases) > 0 { - n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") + n.deleteSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } } else { - n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") + n.deleteSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } for _, alias := range myAliases { - n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") + n.deleteSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord") } } } } -func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) { +func addIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) { reverseIP := netutils.ReverseIP(ip.String()) ipMap.Insert(reverseIP, ipInfo{ - name: name, + name: name, + serviceID: serviceID, }) } -func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { - ipList := svcMap[name] - for _, ip := range ipList { - if ip.Equal(epIP) { - return - } - } - svcMap[name] = append(svcMap[name], epIP) +func delIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) { + reverseIP := netutils.ReverseIP(ip.String()) + ipMap.Remove(reverseIP, ipInfo{ + name: name, + serviceID: serviceID, + }) } -func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { - ipList := svcMap[name] - for i, ip := range ipList { - if ip.Equal(epIP) { - ipList = append(ipList[:i], ipList[i+1:]...) - break - } - } - svcMap[name] = ipList - - if len(ipList) == 0 { - delete(svcMap, name) - } +func addNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) { + svcMap.Insert(name, svcMapEntry{ + ip: epIP.String(), + serviceID: serviceID, + }) } -func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { +func delNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) { + svcMap.Remove(name, svcMapEntry{ + ip: epIP.String(), + serviceID: serviceID, + }) +} + +func (n *network) addSvcRecords(eID, name, serviceID string, epIP, epIPv6 net.IP, ipMapUpdate bool, method string) { // Do not add service names for ingress network as this is a // routing only network if n.ingress { return } - logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) + logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s sid:%s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID) c := n.getController() c.Lock() @@ -1305,34 +1315,34 @@ func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ip sr, ok := c.svcRecords[n.ID()] if !ok { sr = svcInfo{ - svcMap: make(map[string][]net.IP), - svcIPv6Map: make(map[string][]net.IP), + svcMap: common.NewSetMatrix(), + svcIPv6Map: common.NewSetMatrix(), ipMap: common.NewSetMatrix(), } c.svcRecords[n.ID()] = sr } if ipMapUpdate { - addIPToName(sr.ipMap, name, epIP) + addIPToName(sr.ipMap, name, serviceID, epIP) if epIPv6 != nil { - addIPToName(sr.ipMap, name, epIPv6) + addIPToName(sr.ipMap, name, serviceID, epIPv6) } } - addNameToIP(sr.svcMap, name, epIP) + addNameToIP(sr.svcMap, name, serviceID, epIP) if epIPv6 != nil { - addNameToIP(sr.svcIPv6Map, name, epIPv6) + addNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6) } } -func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { +func (n *network) deleteSvcRecords(eID, name, serviceID string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { // Do not delete service names from ingress network as this is a // routing only network if n.ingress { return } - logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) + logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s sid:%s ", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID) c := n.getController() c.Lock() @@ -1344,21 +1354,17 @@ func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, } if ipMapUpdate { - sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{ - name: name, - }) + delIPToName(sr.ipMap, name, serviceID, epIP) if epIPv6 != nil { - sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{ - name: name, - }) + delIPToName(sr.ipMap, name, serviceID, epIPv6) } } - delNameToIP(sr.svcMap, name, epIP) + delNameToIP(sr.svcMap, name, serviceID, epIP) if epIPv6 != nil { - delNameToIP(sr.svcIPv6Map, name, epIPv6) + delNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6) } } @@ -1376,19 +1382,31 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record { n.ctrlr.Lock() defer n.ctrlr.Unlock() - sr, _ := n.ctrlr.svcRecords[n.id] + sr, ok := n.ctrlr.svcRecords[n.id] + if !ok || sr.svcMap == nil { + return nil + } - for h, ip := range sr.svcMap { - if strings.Split(h, ".")[0] == epName { + svcMapKeys := sr.svcMap.Keys() + // Loop on service names on this network + for _, k := range svcMapKeys { + if strings.Split(k, ".")[0] == epName { continue } - if len(ip) == 0 { - logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.name, n.id) + // Get all the IPs associated to this service + mapEntryList, ok := sr.svcMap.Get(k) + if !ok { + // The key got deleted continue } + if len(mapEntryList) == 0 { + logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", k, n.name, n.id) + continue + } + recs = append(recs, etchosts.Record{ - Hosts: h, - IP: ip[0].String(), + Hosts: k, + IP: mapEntryList[0].(svcMapEntry).ip, }) } @@ -1845,8 +1863,7 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) { } req = strings.TrimSuffix(req, ".") - var ip []net.IP - ip, ok = sr.svcMap[req] + ipSet, ok := sr.svcMap.Get(req) if ipType == types.IPv6 { // If the name resolved to v4 address then its a valid name in @@ -1856,13 +1873,20 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) { if ok && n.enableIPv6 == false { ipv6Miss = true } - ip = sr.svcIPv6Map[req] + ipSet, ok = sr.svcIPv6Map.Get(req) } - if ip != nil { - ipLocal := make([]net.IP, len(ip)) - copy(ipLocal, ip) - return ipLocal, false + if ok && len(ipSet) > 0 { + // this map is to avoid IP duplicates, this can happen during a transition period where 2 services are using the same IP + noDup := make(map[string]bool) + var ipLocal []net.IP + for _, ip := range ipSet { + if _, dup := noDup[ip.(svcMapEntry).ip]; !dup { + noDup[ip.(svcMapEntry).ip] = true + ipLocal = append(ipLocal, net.ParseIP(ip.(svcMapEntry).ip)) + } + } + return ipLocal, ok } return nil, ipv6Miss diff --git a/libnetwork/service.go b/libnetwork/service.go index c890e01a39..5a0d7e0057 100644 --- a/libnetwork/service.go +++ b/libnetwork/service.go @@ -85,14 +85,8 @@ type loadBalancer struct { // Map of backend IPs backing this loadbalancer on this // network. It is keyed with endpoint ID. - backEnds map[string]loadBalancerBackend + backEnds map[string]net.IP // Back pointer to service to which the loadbalancer belongs. service *service } - -type loadBalancerBackend struct { - ip net.IP - containerName string - taskAliases []string -} diff --git a/libnetwork/service_common.go b/libnetwork/service_common.go index e57ce1498b..7e3367c71c 100644 --- a/libnetwork/service_common.go +++ b/libnetwork/service_common.go @@ -15,29 +15,35 @@ func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, contain return err } - logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService) + logrus.Debugf("addEndpointNameResolution %s %s add_service:%t sAliases:%v tAliases:%v", eID, svcName, addService, serviceAliases, taskAliases) // Add container resolution mappings c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) + serviceID := svcID + if serviceID == "" { + // This is the case of a normal container not part of a service + serviceID = eID + } + // Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR. - n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) + n.(*network).addSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method) + n.(*network).addSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method) } // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR if len(vip) == 0 { - n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method) + n.(*network).addSvcRecords(eID, svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, alias, ip, nil, false, method) + n.(*network).addSvcRecords(eID, alias, serviceID, ip, nil, false, method) } } if addService && len(vip) != 0 { - n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method) + n.(*network).addSvcRecords(eID, svcName, serviceID, vip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, alias, vip, nil, false, method) + n.(*network).addSvcRecords(eID, alias, serviceID, vip, nil, false, method) } } @@ -52,11 +58,11 @@ func (c *controller) addContainerNameResolution(nID, eID, containerName string, logrus.Debugf("addContainerNameResolution %s %s", eID, containerName) // Add resolution for container name - n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method) + n.(*network).addSvcRecords(eID, containerName, eID, ip, nil, true, method) // Add resolution for taskaliases for _, alias := range taskAliases { - n.(*network).addSvcRecords(eID, alias, ip, nil, true, method) + n.(*network).addSvcRecords(eID, alias, eID, ip, nil, true, method) } return nil @@ -68,32 +74,38 @@ func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, cont return err } - logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries) + logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t sAliases:%v tAliases:%v", eID, svcName, rmService, multipleEntries, serviceAliases, taskAliases) // Delete container resolution mappings c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) + serviceID := svcID + if serviceID == "" { + // This is the case of a normal container not part of a service + serviceID = eID + } + // Delete the special "tasks.svc_name" backend record. if !multipleEntries { - n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method) } } // If we are doing DNS RR delete the endpoint IP from DNS record right away. if !multipleEntries && len(vip) == 0 { - n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, alias, serviceID, ip, nil, false, method) } } // Remove the DNS record for VIP only if we are removing the service if rmService && len(vip) != 0 && !multipleEntries { - n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method) + n.(*network).deleteSvcRecords(eID, svcName, serviceID, vip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method) + n.(*network).deleteSvcRecords(eID, alias, serviceID, vip, nil, false, method) } } @@ -108,11 +120,11 @@ func (c *controller) delContainerNameResolution(nID, eID, containerName string, logrus.Debugf("delContainerNameResolution %s %s", eID, containerName) // Delete resolution for container name - n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method) + n.(*network).deleteSvcRecords(eID, containerName, eID, ip, nil, true, method) // Delete resolution for taskaliases for _, alias := range taskAliases { - n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method) + n.(*network).deleteSvcRecords(eID, alias, eID, ip, nil, true, method) } return nil @@ -152,6 +164,7 @@ func (c *controller) getLBIndex(sid, nid string, ingressPorts []*PortConfig) int func (c *controller) cleanupServiceBindings(cleanupNID string) { var cleanupFuncs []func() + logrus.Debugf("cleanupServiceBindings for %s", cleanupNID) c.Lock() services := make([]*service, 0, len(c.serviceBindings)) for _, s := range c.serviceBindings { @@ -171,16 +184,27 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) { continue } - for eid, be := range lb.backEnds { + // The network is being deleted, erase all the associated service discovery records + // TODO(fcrisciani) separate the Load Balancer from the Service discovery, this operation + // can be done safely here, but the rmServiceBinding is still keeping consistency in the + // data structures that are tracking the endpoint to IP mapping. + c.Lock() + logrus.Debugf("cleanupServiceBindings erasing the svcRecords for %s", nid) + delete(c.svcRecords, nid) + c.Unlock() + + for eid, ip := range lb.backEnds { + epID := eid + epIP := ip service := s loadBalancer := lb networkID := nid - epID := eid - epIP := be.ip - cleanupFuncs = append(cleanupFuncs, func() { - if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip, - service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil { + // ContainerName and taskAliases are not available here, this is still fine because the Service discovery + // cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load + // Balancer bookeeping, is to keep consistent the mapping of endpoint to IP. + if err := c.rmServiceBinding(service.name, service.id, networkID, epID, "", loadBalancer.vip, + service.ingressPorts, service.aliases, []string{}, epIP, "cleanupServiceBindings", false); err != nil { logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", service.id, networkID, epID, err) } @@ -228,8 +252,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s } s.Unlock() } - logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID) - + logrus.Debugf("addServiceBinding from %s START for %s %s p:%p nid:%s skey:%v", method, svcName, eID, s, nID, skey) defer s.Unlock() lb, ok := s.loadBalancers[nID] @@ -242,7 +265,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s lb = &loadBalancer{ vip: vip, fwMark: fwMarkCtr, - backEnds: make(map[string]loadBalancerBackend), + backEnds: make(map[string]net.IP), service: s, } @@ -253,9 +276,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s addService = true } - lb.backEnds[eID] = loadBalancerBackend{ip: ip, - containerName: containerName, - taskAliases: taskAliases} + lb.backEnds[eID] = ip ok, entries := s.assignIPToEndpoint(ip.String(), eID) if !ok || entries > 1 { @@ -277,7 +298,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s return nil } -func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error { +func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool) error { var rmService bool @@ -294,7 +315,6 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st c.Lock() s, ok := c.serviceBindings[skey] c.Unlock() - logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID) if !ok { logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID) return nil @@ -302,6 +322,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st s.Lock() defer s.Unlock() + logrus.Debugf("rmServiceBinding from %s START for %s %s p:%p nid:%s sKey:%v deleteSvc:%t", method, svcName, eID, s, nID, skey, deleteSvcRecords) lb, ok := s.loadBalancers[nID] if !ok { logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID) @@ -322,17 +343,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st rmService = true delete(s.loadBalancers, nID) - } - - if len(s.loadBalancers) == 0 { - // All loadbalancers for the service removed. Time to - // remove the service itself. - c.Lock() - - // Mark the object as deleted so that the add won't use it wrongly - s.deleted = true - delete(c.serviceBindings, skey) - c.Unlock() + logrus.Debugf("rmServiceBinding %s delete %s, p:%p in loadbalancers len:%d", eID, nID, lb, len(s.loadBalancers)) } ok, entries := s.removeIPToEndpoint(ip.String(), eID) @@ -348,7 +359,22 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st } // Delete the name resolutions - c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") + if deleteSvcRecords { + c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") + } + + if len(s.loadBalancers) == 0 { + // All loadbalancers for the service removed. Time to + // remove the service itself. + c.Lock() + + // Mark the object as deleted so that the add won't use it wrongly + s.deleted = true + // NOTE The delete from the serviceBindings map has to be the last operation else we are allowing a race between this service + // that is getting deleted and a new service that will be created if the entry is not anymore there + delete(c.serviceBindings, skey) + c.Unlock() + } logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID) return nil diff --git a/libnetwork/service_linux.go b/libnetwork/service_linux.go index 9e41ba9b7a..1cf7ee91aa 100644 --- a/libnetwork/service_linux.go +++ b/libnetwork/service_linux.go @@ -102,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { } lb.service.Lock() - for _, l := range lb.backEnds { - sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) + for _, ip := range lb.backEnds { + sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) } lb.service.Unlock() }