diff --git a/libnetwork/agent.go b/libnetwork/agent.go index 485713171e..3fa5341481 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error { return nil } -func (ep *endpoint) addServiceInfoToCluster() error { +func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error { if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil { return nil } @@ -593,26 +593,51 @@ func (ep *endpoint) addServiceInfoToCluster() error { return nil } + sb.Service.Lock() + defer sb.Service.Unlock() + logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID()) + + // Check that the endpoint is still present on the sandbox before adding it to the service discovery. + // This is to handle a race between the EnableService and the sbLeave + // It is possible that the EnableService starts, fetches the list of the endpoints and + // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox + // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster. + // This check under the Service lock of the sandbox ensure the correct behavior. + // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit + // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed. + // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is + // removed from the list, in this situation the delete will bail out not finding any data to cleanup + // and the add will bail out not finding the endpoint on the sandbox. + if e := sb.getEndpoint(ep.ID()); e == nil { + logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID()) + return nil + } + c := n.getController() agent := c.getAgent() - var ingressPorts []*PortConfig - if ep.svcID != "" { - // Gossip ingress ports only in ingress network. - if n.ingress { - ingressPorts = ep.ingressPorts - } - - if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil { - return err - } - } - name := ep.Name() if ep.isAnonymous() { name = ep.MyAliases()[0] } + var ingressPorts []*PortConfig + if ep.svcID != "" { + // This is a task part of a service + // Gossip ingress ports only in ingress network. + if n.ingress { + ingressPorts = ep.ingressPorts + } + if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil { + return err + } + } else { + // This is a container simply attached to an attachable network + if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil { + return err + } + } + buf, err := proto.Marshal(&EndpointRecord{ Name: name, ServiceName: ep.svcName, @@ -634,10 +659,12 @@ func (ep *endpoint) addServiceInfoToCluster() error { } } + logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID()) + return nil } -func (ep *endpoint) deleteServiceInfoFromCluster() error { +func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error { if ep.isAnonymous() && len(ep.myAliases) == 0 { return nil } @@ -647,17 +674,33 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error { return nil } + sb.Service.Lock() + defer sb.Service.Unlock() + logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID()) + c := n.getController() agent := c.getAgent() - if ep.svcID != "" && ep.Iface().Address() != nil { - var ingressPorts []*PortConfig - if n.ingress { - ingressPorts = ep.ingressPorts - } + name := ep.Name() + if ep.isAnonymous() { + name = ep.MyAliases()[0] + } - if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil { - return err + if ep.Iface().Address() != nil { + if ep.svcID != "" { + // This is a task part of a service + var ingressPorts []*PortConfig + if n.ingress { + ingressPorts = ep.ingressPorts + } + if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil { + return err + } + } else { + // This is a container simply attached to an attachable network + if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil { + return err + } } } @@ -667,6 +710,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error { } } + logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID()) + return nil } @@ -814,58 +859,56 @@ func (c *controller) handleEpTableEvent(ev events.Event) { value = event.Value case networkdb.UpdateEvent: logrus.Errorf("Unexpected update service table event = %#v", event) - } - - nw, err := c.NetworkByID(nid) - if err != nil { - logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err) return } - n := nw.(*network) - err = proto.Unmarshal(value, &epRec) + err := proto.Unmarshal(value, &epRec) if err != nil { logrus.Errorf("Failed to unmarshal service table value: %v", err) return } - name := epRec.Name + containerName := epRec.Name svcName := epRec.ServiceName svcID := epRec.ServiceID vip := net.ParseIP(epRec.VirtualIP) ip := net.ParseIP(epRec.EndpointIP) ingressPorts := epRec.IngressPorts - aliases := epRec.Aliases - taskaliases := epRec.TaskAliases + serviceAliases := epRec.Aliases + taskAliases := epRec.TaskAliases - if name == "" || ip == nil { + if containerName == "" || ip == nil { logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value) return } if isAdd { + logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec) if svcID != "" { - if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil { - logrus.Errorf("Failed adding service binding for value %s: %v", value, err) + // This is a remote task part of a service + if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { + logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err) return } - } - - n.addSvcRecords(name, ip, nil, true) - for _, alias := range taskaliases { - n.addSvcRecords(alias, ip, nil, true) + } else { + // This is a remote container simply attached to an attachable network + if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { + logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err) + } } } else { + logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec) if svcID != "" { - if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil { - logrus.Errorf("Failed adding service binding for value %s: %v", value, err) + // This is a remote task part of a service + if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { + logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err) return } - } - - n.deleteSvcRecords(name, ip, nil, true) - for _, alias := range taskaliases { - n.deleteSvcRecords(alias, ip, nil, true) + } else { + // This is a remote container simply attached to an attachable network + if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { + logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err) + } } } } diff --git a/libnetwork/agent.proto b/libnetwork/agent.proto index 0b7708e32b..54c71c0e2a 100644 --- a/libnetwork/agent.proto +++ b/libnetwork/agent.proto @@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false; // EndpointRecord specifies all the endpoint specific information that // needs to gossiped to nodes participating in the network. message EndpointRecord { - // Name of the endpoint + // Name of the container string name = 1; // Service name of the service to which this endpoint belongs. diff --git a/libnetwork/common/setmatrix.go b/libnetwork/common/setmatrix.go new file mode 100644 index 0000000000..0fdb542be4 --- /dev/null +++ b/libnetwork/common/setmatrix.go @@ -0,0 +1,123 @@ +package common + +import ( + "sync" + + mapset "github.com/deckarep/golang-set" +) + +// SetMatrix is a map of Sets +type SetMatrix interface { + // Get returns the members of the set for a specific key as a slice. + Get(key string) ([]interface{}, bool) + // Contains is used to verify is an element is in a set for a specific key + // returns true if the element is in the set + // returns true if there is a set for the key + Contains(key string, value interface{}) (bool, bool) + // Insert inserts the mapping between the IP and the endpoint identifier + // returns true if the mapping was not present, false otherwise + // returns also the number of endpoints associated to the IP + Insert(key string, value interface{}) (bool, int) + // Remove removes the mapping between the IP and the endpoint identifier + // returns true if the mapping was deleted, false otherwise + // returns also the number of endpoints associated to the IP + Remove(key string, value interface{}) (bool, int) + // Cardinality returns the number of elements in the set of a specfic key + // returns false if the key is not in the map + Cardinality(key string) (int, bool) + // String returns the string version of the set, empty otherwise + // returns false if the key is not in the map + String(key string) (string, bool) +} + +type setMatrix struct { + matrix map[string]mapset.Set + + sync.Mutex +} + +// NewSetMatrix creates a new set matrix object +func NewSetMatrix() SetMatrix { + s := &setMatrix{} + s.init() + return s +} + +func (s *setMatrix) init() { + s.matrix = make(map[string]mapset.Set) +} + +func (s *setMatrix) Get(key string) ([]interface{}, bool) { + s.Lock() + defer s.Unlock() + set, ok := s.matrix[key] + if !ok { + return nil, ok + } + return set.ToSlice(), ok +} + +func (s *setMatrix) Contains(key string, value interface{}) (bool, bool) { + s.Lock() + defer s.Unlock() + set, ok := s.matrix[key] + if !ok { + return false, ok + } + return set.Contains(value), ok +} + +func (s *setMatrix) Insert(key string, value interface{}) (bool, int) { + s.Lock() + defer s.Unlock() + set, ok := s.matrix[key] + if !ok { + s.matrix[key] = mapset.NewSet() + s.matrix[key].Add(value) + return true, 1 + } + + return set.Add(value), set.Cardinality() +} + +func (s *setMatrix) Remove(key string, value interface{}) (bool, int) { + s.Lock() + defer s.Unlock() + set, ok := s.matrix[key] + if !ok { + return false, 0 + } + + var removed bool + if set.Contains(value) { + set.Remove(value) + removed = true + // If the set is empty remove it from the matrix + if set.Cardinality() == 0 { + delete(s.matrix, key) + } + } + + return removed, set.Cardinality() +} + +func (s *setMatrix) Cardinality(key string) (int, bool) { + s.Lock() + defer s.Unlock() + set, ok := s.matrix[key] + if !ok { + return 0, ok + } + + return set.Cardinality(), ok +} + +func (s *setMatrix) String(key string) (string, bool) { + s.Lock() + defer s.Unlock() + set, ok := s.matrix[key] + if !ok { + return "", ok + } + return set.String(), ok +} diff --git a/libnetwork/common/setmatrix_test.go b/libnetwork/common/setmatrix_test.go new file mode 100644 index 0000000000..d87ffc7dfe --- /dev/null +++ b/libnetwork/common/setmatrix_test.go @@ -0,0 +1,146 @@ +package common + +import ( + "context" + "strconv" + "testing" + "time" + + _ "github.com/docker/libnetwork/testutils" +) + +func TestSetSerialInsertDelete(t *testing.T) { + s := NewSetMatrix() + + b, i := s.Insert("a", "1") + if !b || i != 1 { + t.Fatalf("error in insert %t %d", b, i) + } + b, i = s.Insert("a", "1") + if b || i != 1 { + t.Fatalf("error in insert %t %d", b, i) + } + b, i = s.Insert("a", "2") + if !b || i != 2 { + t.Fatalf("error in insert %t %d", b, i) + } + b, i = s.Insert("a", "1") + if b || i != 2 { + t.Fatalf("error in insert %t %d", b, i) + } + b, i = s.Insert("a", "3") + if !b || i != 3 { + t.Fatalf("error in insert %t %d", b, i) + } + b, i = s.Insert("a", "2") + if b || i != 3 { + t.Fatalf("error in insert %t %d", b, i) + } + b, i = s.Insert("a", "3") + if b || i != 3 { + t.Fatalf("error in insert %t %d", b, i) + } + b, i = s.Insert("a", "4") + if !b || i != 4 { + t.Fatalf("error in insert %t %d", b, i) + } + + b, p := s.Contains("a", "1") + if !b || !p { + t.Fatalf("error in contains %t %t", b, p) + } + b, p = s.Contains("a", "2") + if !b || !p { + t.Fatalf("error in contains %t %t", b, p) + } + b, p = s.Contains("a", "3") + if !b || !p { + t.Fatalf("error in contains %t %t", b, p) + } + b, p = s.Contains("a", "4") + if !b || !p { + t.Fatalf("error in contains %t %t", b, p) + } + + i, b = s.Cardinality("a") + if !b || i != 4 { + t.Fatalf("error in cardinality count %t %d", b, i) + } + + b, i = s.Remove("a", "1") + if !b || i != 3 { + t.Fatalf("error in remove %t %d", b, i) + } + b, i = s.Remove("a", "3") + if !b || i != 2 { + t.Fatalf("error in remove %t %d", b, i) + } + b, i = s.Remove("a", "1") + if b || i != 2 { + t.Fatalf("error in remove %t %d", b, i) + } + b, i = s.Remove("a", "4") + if !b || i != 1 { + t.Fatalf("error in remove %t %d", b, i) + } + b, i = s.Remove("a", "2") + if !b || i != 0 { + t.Fatalf("error in remove %t %d", b, i) + } + b, i = s.Remove("a", "2") + if b || i != 0 { + t.Fatalf("error in remove %t %d", b, i) + } + + i, b = s.Cardinality("a") + if b || i != 0 { + t.Fatalf("error in cardinality count %t %d", b, i) + } +} + +func insertDeleteRotuine(ctx context.Context, endCh chan int, s SetMatrix, key, value string) { + for { + select { + case <-ctx.Done(): + endCh <- 0 + return + default: + b, _ := s.Insert(key, value) + if !b { + endCh <- 1 + return + } + + b, _ = s.Remove(key, value) + if !b { + endCh <- 2 + return + } + } + } +} + +func TestSetParallelInsertDelete(t *testing.T) { + s := NewSetMatrix() + parallelRoutines := 6 + endCh := make(chan int) + // Let the routines running and competing for 10s + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for i := 0; i < parallelRoutines; i++ { + go insertDeleteRotuine(ctx, endCh, s, "key-"+strconv.Itoa(i%3), strconv.Itoa(i)) + } + for parallelRoutines > 0 { + v := <-endCh + if v == 1 { + t.Fatalf("error one goroutine failed on the insert") + } + if v == 2 { + t.Fatalf("error one goroutine failed on the remove") + } + parallelRoutines-- + } + if i, b := s.Cardinality("key"); b || i > 0 { + t.Fatalf("error the set should be empty %t %d", b, i) + } +} diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index e607eddaf2..111b747352 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -597,8 +597,14 @@ func (ep *endpoint) rename(name string) error { c := n.getController() + sb, ok := ep.getSandbox() + if !ok { + logrus.Warnf("rename for %s aborted, sandbox %s is not anymore present", ep.ID(), ep.sandboxID) + return nil + } + if c.isAgent() { - if err = ep.deleteServiceInfoFromCluster(); err != nil { + if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil { return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err) } } else { @@ -617,15 +623,15 @@ func (ep *endpoint) rename(name string) error { ep.anonymous = false if c.isAgent() { - if err = ep.addServiceInfoToCluster(); err != nil { + if err = ep.addServiceInfoToCluster(sb); err != nil { return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err) } defer func() { if err != nil { - ep.deleteServiceInfoFromCluster() + ep.deleteServiceInfoFromCluster(sb, "rename") ep.name = oldName ep.anonymous = oldAnonymous - ep.addServiceInfoToCluster() + ep.addServiceInfoToCluster(sb) } }() } else { @@ -746,7 +752,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption) return err } - if e := ep.deleteServiceInfoFromCluster(); e != nil { + if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil { logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e) } diff --git a/libnetwork/libnetwork_internal_test.go b/libnetwork/libnetwork_internal_test.go index 927d623daf..a832d5f7c9 100644 --- a/libnetwork/libnetwork_internal_test.go +++ b/libnetwork/libnetwork_internal_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/docker/libnetwork/common" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/discoverapi" "github.com/docker/libnetwork/driverapi" @@ -383,7 +384,7 @@ func TestSRVServiceQuery(t *testing.T) { sr := svcInfo{ svcMap: make(map[string][]net.IP), svcIPv6Map: make(map[string][]net.IP), - ipMap: make(map[string]*ipInfo), + ipMap: common.NewSetMatrix(), service: make(map[string][]servicePorts), } // backing container for the service diff --git a/libnetwork/network.go b/libnetwork/network.go index fa2ab800ae..ed9a61b456 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -10,6 +10,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" + "github.com/docker/libnetwork/common" "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" @@ -97,7 +98,7 @@ type ipInfo struct { type svcInfo struct { svcMap map[string][]net.IP svcIPv6Map map[string][]net.IP - ipMap map[string]*ipInfo + ipMap common.SetMatrix service map[string][]servicePorts } @@ -990,6 +991,12 @@ func (n *network) delete(force bool) error { c.cleanupServiceBindings(n.ID()) + // The network had been left, the service discovery can be cleaned up + c.Lock() + logrus.Debugf("network %s delete, clean svcRecords", n.id) + delete(c.svcRecords, n.id) + c.Unlock() + removeFromStore: // deleteFromStore performs an atomic delete operation and the // network.epCnt will help prevent any possible @@ -1227,36 +1234,34 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool // breaks some apps if ep.isAnonymous() { if len(myAliases) > 0 { - n.addSvcRecords(myAliases[0], iface.Address().IP, ipv6, true) + n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") } } else { - n.addSvcRecords(epName, iface.Address().IP, ipv6, true) + n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") } for _, alias := range myAliases { - n.addSvcRecords(alias, iface.Address().IP, ipv6, false) + n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") } } else { if ep.isAnonymous() { if len(myAliases) > 0 { - n.deleteSvcRecords(myAliases[0], iface.Address().IP, ipv6, true) + n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") } } else { - n.deleteSvcRecords(epName, iface.Address().IP, ipv6, true) + n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") } for _, alias := range myAliases { - n.deleteSvcRecords(alias, iface.Address().IP, ipv6, false) + n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") } } } } -func addIPToName(ipMap map[string]*ipInfo, name string, ip net.IP) { +func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) { reverseIP := netutils.ReverseIP(ip.String()) - if _, ok := ipMap[reverseIP]; !ok { - ipMap[reverseIP] = &ipInfo{ - name: name, - } - } + ipMap.Insert(reverseIP, ipInfo{ + name: name, + }) } func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { @@ -1284,24 +1289,25 @@ func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { } } -func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) { +func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { // Do not add service names for ingress network as this is a // routing only network if n.ingress { return } - logrus.Debugf("(%s).addSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate) + logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) c := n.getController() c.Lock() defer c.Unlock() + sr, ok := c.svcRecords[n.ID()] if !ok { sr = svcInfo{ svcMap: make(map[string][]net.IP), svcIPv6Map: make(map[string][]net.IP), - ipMap: make(map[string]*ipInfo), + ipMap: common.NewSetMatrix(), } c.svcRecords[n.ID()] = sr } @@ -1319,28 +1325,33 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp } } -func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) { +func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { // Do not delete service names from ingress network as this is a // routing only network if n.ingress { return } - logrus.Debugf("(%s).deleteSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate) + logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) c := n.getController() c.Lock() defer c.Unlock() + sr, ok := c.svcRecords[n.ID()] if !ok { return } if ipMapUpdate { - delete(sr.ipMap, netutils.ReverseIP(epIP.String())) + sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{ + name: name, + }) if epIPv6 != nil { - delete(sr.ipMap, netutils.ReverseIP(epIPv6.String())) + sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{ + name: name, + }) } } @@ -1868,9 +1879,11 @@ func (n *network) HandleQueryResp(name string, ip net.IP) { } ipStr := netutils.ReverseIP(ip.String()) - - if ipInfo, ok := sr.ipMap[ipStr]; ok { - ipInfo.extResolver = true + // If an object with extResolver == true is already in the set this call will fail + // but anyway it means that has already been inserted before + if ok, _ := sr.ipMap.Contains(ipStr, ipInfo{name: name}); ok { + sr.ipMap.Remove(ipStr, ipInfo{name: name}) + sr.ipMap.Insert(ipStr, ipInfo{name: name, extResolver: true}) } } @@ -1886,13 +1899,27 @@ func (n *network) ResolveIP(ip string) string { nwName := n.Name() - ipInfo, ok := sr.ipMap[ip] - - if !ok || ipInfo.extResolver { + elemSet, ok := sr.ipMap.Get(ip) + if !ok || len(elemSet) == 0 { + return "" + } + // NOTE it is possible to have more than one element in the Set, this will happen + // because of interleave of diffent events from differnt sources (local container create vs + // network db notifications) + // In such cases the resolution will be based on the first element of the set, and can vary + // during the system stabilitation + elem, ok := elemSet[0].(ipInfo) + if !ok { + setStr, b := sr.ipMap.String(ip) + logrus.Errorf("expected set of ipInfo type for key %s set:%t %s", ip, b, setStr) return "" } - return ipInfo.name + "." + nwName + if elem.extResolver { + return "" + } + + return elem.name + "." + nwName } func (n *network) ResolveService(name string) ([]*net.SRV, []net.IP) { diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index ecb2d714a4..b93a90d019 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -285,7 +285,6 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) nDB.Unlock() - nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value)) return nil } @@ -313,7 +312,6 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) nDB.Unlock() - nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value)) return nil } @@ -359,7 +357,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) nDB.Unlock() - nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value)) return nil } diff --git a/libnetwork/sandbox.go b/libnetwork/sandbox.go index c820cc04e9..472dbeafe7 100644 --- a/libnetwork/sandbox.go +++ b/libnetwork/sandbox.go @@ -86,6 +86,9 @@ type sandbox struct { ingress bool ndotsSet bool sync.Mutex + // This mutex is used to serialize service related operation for an endpoint + // The lock is here because the endpoint is saved into the store so is not unique + Service sync.Mutex } // These are the container configs used to customize container /etc/hosts file. @@ -668,26 +671,25 @@ func (sb *sandbox) SetKey(basePath string) error { } func (sb *sandbox) EnableService() error { + logrus.Debugf("EnableService %s START", sb.containerID) for _, ep := range sb.getConnectedEndpoints() { if ep.enableService(true) { - if err := ep.addServiceInfoToCluster(); err != nil { + if err := ep.addServiceInfoToCluster(sb); err != nil { ep.enableService(false) return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err) } } } + logrus.Debugf("EnableService %s DONE", sb.containerID) return nil } func (sb *sandbox) DisableService() error { + logrus.Debugf("DisableService %s START", sb.containerID) for _, ep := range sb.getConnectedEndpoints() { - if ep.enableService(false) { - if err := ep.deleteServiceInfoFromCluster(); err != nil { - ep.enableService(true) - return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err) - } - } + ep.enableService(false) } + logrus.Debugf("DisableService %s DONE", sb.containerID) return nil } diff --git a/libnetwork/service.go b/libnetwork/service.go index a957026b2f..c890e01a39 100644 --- a/libnetwork/service.go +++ b/libnetwork/service.go @@ -4,6 +4,8 @@ import ( "fmt" "net" "sync" + + "github.com/docker/libnetwork/common" ) var ( @@ -48,17 +50,49 @@ type service struct { // Service aliases aliases []string + // This maps tracks for each IP address the list of endpoints ID + // associated with it. At stable state the endpoint ID expected is 1 + // but during transition and service change it is possible to have + // temporary more than 1 + ipToEndpoint common.SetMatrix + + deleted bool + sync.Mutex } +// assignIPToEndpoint inserts the mapping between the IP and the endpoint identifier +// returns true if the mapping was not present, false otherwise +// returns also the number of endpoints associated to the IP +func (s *service) assignIPToEndpoint(ip, eID string) (bool, int) { + return s.ipToEndpoint.Insert(ip, eID) +} + +// removeIPToEndpoint removes the mapping between the IP and the endpoint identifier +// returns true if the mapping was deleted, false otherwise +// returns also the number of endpoints associated to the IP +func (s *service) removeIPToEndpoint(ip, eID string) (bool, int) { + return s.ipToEndpoint.Remove(ip, eID) +} + +func (s *service) printIPToEndpoint(ip string) (string, bool) { + return s.ipToEndpoint.String(ip) +} + type loadBalancer struct { vip net.IP fwMark uint32 // Map of backend IPs backing this loadbalancer on this // network. It is keyed with endpoint ID. - backEnds map[string]net.IP + backEnds map[string]loadBalancerBackend // Back pointer to service to which the loadbalancer belongs. service *service } + +type loadBalancerBackend struct { + ip net.IP + containerName string + taskAliases []string +} diff --git a/libnetwork/service_common.go b/libnetwork/service_common.go index 049d308423..c8ed346384 100644 --- a/libnetwork/service_common.go +++ b/libnetwork/service_common.go @@ -6,15 +6,126 @@ import ( "net" "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/common" ) -func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { +func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, addService bool, method string) error { + n, err := c.NetworkByID(nID) + if err != nil { + return err + } + + logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService) + + // Add container resolution mappings + c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) + + // Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR. + n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) + for _, alias := range serviceAliases { + n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method) + } + + // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR + if len(vip) == 0 { + n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method) + for _, alias := range serviceAliases { + n.(*network).addSvcRecords(eID, alias, ip, nil, false, method) + } + } + + if addService && len(vip) != 0 { + n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method) + for _, alias := range serviceAliases { + n.(*network).addSvcRecords(eID, alias, vip, nil, false, method) + } + } + + return nil +} + +func (c *controller) addContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error { + n, err := c.NetworkByID(nID) + if err != nil { + return err + } + logrus.Debugf("addContainerNameResolution %s %s", eID, containerName) + + // Add resolution for container name + n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method) + + // Add resolution for taskaliases + for _, alias := range taskAliases { + n.(*network).addSvcRecords(eID, alias, ip, nil, true, method) + } + + return nil +} + +func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, rmService, multipleEntries bool, method string) error { + n, err := c.NetworkByID(nID) + if err != nil { + return err + } + + logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries) + + // Delete container resolution mappings + c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) + + // Delete the special "tasks.svc_name" backend record. + if !multipleEntries { + n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) + for _, alias := range serviceAliases { + n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method) + } + } + + // If we are doing DNS RR delete the endpoint IP from DNS record right away. + if !multipleEntries && len(vip) == 0 { + n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method) + for _, alias := range serviceAliases { + n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method) + } + } + + // Remove the DNS record for VIP only if we are removing the service + if rmService && len(vip) != 0 && !multipleEntries { + n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method) + for _, alias := range serviceAliases { + n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method) + } + } + + return nil +} + +func (c *controller) delContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error { + n, err := c.NetworkByID(nID) + if err != nil { + return err + } + logrus.Debugf("delContainerNameResolution %s %s", eID, containerName) + + // Delete resolution for container name + n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method) + + // Delete resolution for taskaliases + for _, alias := range taskAliases { + n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method) + } + + return nil +} + +func newService(name string, id string, ingressPorts []*PortConfig, serviceAliases []string) *service { return &service{ name: name, id: id, ingressPorts: ingressPorts, loadBalancers: make(map[string]*loadBalancer), - aliases: aliases, + aliases: serviceAliases, + ipToEndpoint: common.NewSetMatrix(), } } @@ -50,21 +161,26 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) { for _, s := range services { s.Lock() + // Skip the serviceBindings that got deleted + if s.deleted { + s.Unlock() + continue + } for nid, lb := range s.loadBalancers { if cleanupNID != "" && nid != cleanupNID { continue } - for eid, ip := range lb.backEnds { + for eid, be := range lb.backEnds { service := s loadBalancer := lb networkID := nid epID := eid - epIP := ip + epIP := be.ip cleanupFuncs = append(cleanupFuncs, func() { - if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip, - service.ingressPorts, service.aliases, epIP); err != nil { + if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip, + service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil { logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", service.id, networkID, epID, err) } @@ -80,48 +196,43 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) { } -func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { - n, err := c.NetworkByID(nid) +func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error { + var addService bool + + n, err := c.NetworkByID(nID) if err != nil { return err } skey := serviceKey{ - id: sid, + id: svcID, ports: portConfigs(ingressPorts).String(), } - c.Lock() - s, ok := c.serviceBindings[skey] - if !ok { - // Create a new service if we are seeing this service - // for the first time. - s = newService(name, sid, ingressPorts, aliases) - c.serviceBindings[skey] = s + var s *service + for { + c.Lock() + var ok bool + s, ok = c.serviceBindings[skey] + if !ok { + // Create a new service if we are seeing this service + // for the first time. + s = newService(svcName, svcID, ingressPorts, serviceAliases) + c.serviceBindings[skey] = s + } + c.Unlock() + s.Lock() + if !s.deleted { + // ok the object is good to be used + break + } + s.Unlock() } - c.Unlock() + logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID) - // Add endpoint IP to special "tasks.svc_name" so that the - // applications have access to DNS RR. - n.(*network).addSvcRecords("tasks."+name, ip, nil, false) - for _, alias := range aliases { - n.(*network).addSvcRecords("tasks."+alias, ip, nil, false) - } - - // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR - svcIP := vip - if len(svcIP) == 0 { - svcIP = ip - } - n.(*network).addSvcRecords(name, svcIP, nil, false) - for _, alias := range aliases { - n.(*network).addSvcRecords(alias, svcIP, nil, false) - } - - s.Lock() defer s.Unlock() - lb, ok := s.loadBalancers[nid] + lb, ok := s.loadBalancers[nID] if !ok { // Create a new load balancer if we are seeing this // network attachment on the service for the first @@ -129,7 +240,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i lb = &loadBalancer{ vip: vip, fwMark: fwMarkCtr, - backEnds: make(map[string]net.IP), + backEnds: make(map[string]loadBalancerBackend), service: s, } @@ -137,10 +248,19 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i fwMarkCtr++ fwMarkCtrMu.Unlock() - s.loadBalancers[nid] = lb + s.loadBalancers[nID] = lb + addService = true } - lb.backEnds[eid] = ip + lb.backEnds[eID] = loadBalancerBackend{ip: ip, + containerName: containerName, + taskAliases: taskAliases} + + ok, entries := s.assignIPToEndpoint(ip.String(), eID) + if !ok || entries > 1 { + setStr, b := s.printIPToEndpoint(ip.String()) + logrus.Warnf("addServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr) + } // Add loadbalancer service and backend in all sandboxes in // the network only if vip is valid. @@ -148,89 +268,87 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts) } + // Add the appropriate name resolutions + c.addEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, addService, "addServiceBinding") + + logrus.Debugf("addServiceBinding from %s END for %s %s", method, svcName, eID) + return nil } -func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { +func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error { + var rmService bool - n, err := c.NetworkByID(nid) + n, err := c.NetworkByID(nID) if err != nil { return err } skey := serviceKey{ - id: sid, + id: svcID, ports: portConfigs(ingressPorts).String(), } c.Lock() s, ok := c.serviceBindings[skey] c.Unlock() + logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID) if !ok { + logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID) return nil } s.Lock() - lb, ok := s.loadBalancers[nid] + defer s.Unlock() + lb, ok := s.loadBalancers[nID] if !ok { - s.Unlock() + logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID) return nil } - _, ok = lb.backEnds[eid] + _, ok = lb.backEnds[eID] if !ok { - s.Unlock() + logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] !ok", method, svcName, eID) return nil } - delete(lb.backEnds, eid) + delete(lb.backEnds, eID) if len(lb.backEnds) == 0 { // All the backends for this service have been // removed. Time to remove the load balancer and also // remove the service entry in IPVS. rmService = true - delete(s.loadBalancers, nid) + delete(s.loadBalancers, nID) } if len(s.loadBalancers) == 0 { // All loadbalancers for the service removed. Time to // remove the service itself. c.Lock() + + // Mark the object as deleted so that the add won't use it wrongly + s.deleted = true delete(c.serviceBindings, skey) c.Unlock() } + ok, entries := s.removeIPToEndpoint(ip.String(), eID) + if !ok || entries > 0 { + setStr, b := s.printIPToEndpoint(ip.String()) + logrus.Warnf("rmServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr) + } + // Remove loadbalancer service(if needed) and backend in all // sandboxes in the network only if the vip is valid. if len(vip) != 0 { n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService) } - s.Unlock() - // Delete the special "tasks.svc_name" backend record. - n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false) - for _, alias := range aliases { - n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false) - } - - // If we are doing DNS RR add the endpoint IP to DNS record - // right away. - if len(vip) == 0 { - n.(*network).deleteSvcRecords(name, ip, nil, false) - for _, alias := range aliases { - n.(*network).deleteSvcRecords(alias, ip, nil, false) - } - } - - // Remove the DNS record for VIP only if we are removing the service - if rmService && len(vip) != 0 { - n.(*network).deleteSvcRecords(name, vip, nil, false) - for _, alias := range aliases { - n.(*network).deleteSvcRecords(alias, vip, nil, false) - } - } + // Delete the name resolutions + c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") + logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID) return nil } diff --git a/libnetwork/service_linux.go b/libnetwork/service_linux.go index 2bcb6de5eb..70af7e33bc 100644 --- a/libnetwork/service_linux.go +++ b/libnetwork/service_linux.go @@ -44,6 +44,11 @@ func (n *network) connectedLoadbalancers() []*loadBalancer { var lbs []*loadBalancer for _, s := range serviceBindings { s.Lock() + // Skip the serviceBindings that got deleted + if s.deleted { + s.Unlock() + continue + } if lb, ok := s.loadBalancers[n.ID()]; ok { lbs = append(lbs, lb) } @@ -97,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { } lb.service.Lock() - for _, ip := range lb.backEnds { - sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) + for _, l := range lb.backEnds { + sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) } lb.service.Unlock() }