mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #1796 from fcrisciani/name-resolution-race
Service discovery hardening
This commit is contained in:
commit
79bb7f8674
12 changed files with 666 additions and 164 deletions
|
@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ep *endpoint) addServiceInfoToCluster() error {
|
||||
func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
|
||||
if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -593,26 +593,51 @@ func (ep *endpoint) addServiceInfoToCluster() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
sb.Service.Lock()
|
||||
defer sb.Service.Unlock()
|
||||
logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
|
||||
|
||||
// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
|
||||
// This is to handle a race between the EnableService and the sbLeave
|
||||
// It is possible that the EnableService starts, fetches the list of the endpoints and
|
||||
// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
|
||||
// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
|
||||
// This check under the Service lock of the sandbox ensure the correct behavior.
|
||||
// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
|
||||
// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
|
||||
// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
|
||||
// removed from the list, in this situation the delete will bail out not finding any data to cleanup
|
||||
// and the add will bail out not finding the endpoint on the sandbox.
|
||||
if e := sb.getEndpoint(ep.ID()); e == nil {
|
||||
logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
|
||||
return nil
|
||||
}
|
||||
|
||||
c := n.getController()
|
||||
agent := c.getAgent()
|
||||
|
||||
var ingressPorts []*PortConfig
|
||||
if ep.svcID != "" {
|
||||
// Gossip ingress ports only in ingress network.
|
||||
if n.ingress {
|
||||
ingressPorts = ep.ingressPorts
|
||||
}
|
||||
|
||||
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
name := ep.Name()
|
||||
if ep.isAnonymous() {
|
||||
name = ep.MyAliases()[0]
|
||||
}
|
||||
|
||||
var ingressPorts []*PortConfig
|
||||
if ep.svcID != "" {
|
||||
// This is a task part of a service
|
||||
// Gossip ingress ports only in ingress network.
|
||||
if n.ingress {
|
||||
ingressPorts = ep.ingressPorts
|
||||
}
|
||||
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// This is a container simply attached to an attachable network
|
||||
if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
buf, err := proto.Marshal(&EndpointRecord{
|
||||
Name: name,
|
||||
ServiceName: ep.svcName,
|
||||
|
@ -634,10 +659,12 @@ func (ep *endpoint) addServiceInfoToCluster() error {
|
|||
}
|
||||
}
|
||||
|
||||
logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ep *endpoint) deleteServiceInfoFromCluster() error {
|
||||
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
|
||||
if ep.isAnonymous() && len(ep.myAliases) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -647,17 +674,33 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
sb.Service.Lock()
|
||||
defer sb.Service.Unlock()
|
||||
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
|
||||
|
||||
c := n.getController()
|
||||
agent := c.getAgent()
|
||||
|
||||
if ep.svcID != "" && ep.Iface().Address() != nil {
|
||||
var ingressPorts []*PortConfig
|
||||
if n.ingress {
|
||||
ingressPorts = ep.ingressPorts
|
||||
}
|
||||
name := ep.Name()
|
||||
if ep.isAnonymous() {
|
||||
name = ep.MyAliases()[0]
|
||||
}
|
||||
|
||||
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
|
||||
return err
|
||||
if ep.Iface().Address() != nil {
|
||||
if ep.svcID != "" {
|
||||
// This is a task part of a service
|
||||
var ingressPorts []*PortConfig
|
||||
if n.ingress {
|
||||
ingressPorts = ep.ingressPorts
|
||||
}
|
||||
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// This is a container simply attached to an attachable network
|
||||
if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -667,6 +710,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
|
|||
}
|
||||
}
|
||||
|
||||
logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -814,58 +859,56 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
|
|||
value = event.Value
|
||||
case networkdb.UpdateEvent:
|
||||
logrus.Errorf("Unexpected update service table event = %#v", event)
|
||||
}
|
||||
|
||||
nw, err := c.NetworkByID(nid)
|
||||
if err != nil {
|
||||
logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
|
||||
return
|
||||
}
|
||||
n := nw.(*network)
|
||||
|
||||
err = proto.Unmarshal(value, &epRec)
|
||||
err := proto.Unmarshal(value, &epRec)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to unmarshal service table value: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
name := epRec.Name
|
||||
containerName := epRec.Name
|
||||
svcName := epRec.ServiceName
|
||||
svcID := epRec.ServiceID
|
||||
vip := net.ParseIP(epRec.VirtualIP)
|
||||
ip := net.ParseIP(epRec.EndpointIP)
|
||||
ingressPorts := epRec.IngressPorts
|
||||
aliases := epRec.Aliases
|
||||
taskaliases := epRec.TaskAliases
|
||||
serviceAliases := epRec.Aliases
|
||||
taskAliases := epRec.TaskAliases
|
||||
|
||||
if name == "" || ip == nil {
|
||||
if containerName == "" || ip == nil {
|
||||
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
|
||||
return
|
||||
}
|
||||
|
||||
if isAdd {
|
||||
logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
|
||||
if svcID != "" {
|
||||
if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
|
||||
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
|
||||
// This is a remote task part of a service
|
||||
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
||||
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n.addSvcRecords(name, ip, nil, true)
|
||||
for _, alias := range taskaliases {
|
||||
n.addSvcRecords(alias, ip, nil, true)
|
||||
} else {
|
||||
// This is a remote container simply attached to an attachable network
|
||||
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
||||
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
|
||||
if svcID != "" {
|
||||
if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
|
||||
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
|
||||
// This is a remote task part of a service
|
||||
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
||||
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n.deleteSvcRecords(name, ip, nil, true)
|
||||
for _, alias := range taskaliases {
|
||||
n.deleteSvcRecords(alias, ip, nil, true)
|
||||
} else {
|
||||
// This is a remote container simply attached to an attachable network
|
||||
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
|
||||
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false;
|
|||
// EndpointRecord specifies all the endpoint specific information that
|
||||
// needs to gossiped to nodes participating in the network.
|
||||
message EndpointRecord {
|
||||
// Name of the endpoint
|
||||
// Name of the container
|
||||
string name = 1;
|
||||
|
||||
// Service name of the service to which this endpoint belongs.
|
||||
|
|
123
libnetwork/common/setmatrix.go
Normal file
123
libnetwork/common/setmatrix.go
Normal file
|
@ -0,0 +1,123 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
mapset "github.com/deckarep/golang-set"
|
||||
)
|
||||
|
||||
// SetMatrix is a map of Sets
|
||||
type SetMatrix interface {
|
||||
// Get returns the members of the set for a specific key as a slice.
|
||||
Get(key string) ([]interface{}, bool)
|
||||
// Contains is used to verify is an element is in a set for a specific key
|
||||
// returns true if the element is in the set
|
||||
// returns true if there is a set for the key
|
||||
Contains(key string, value interface{}) (bool, bool)
|
||||
// Insert inserts the mapping between the IP and the endpoint identifier
|
||||
// returns true if the mapping was not present, false otherwise
|
||||
// returns also the number of endpoints associated to the IP
|
||||
Insert(key string, value interface{}) (bool, int)
|
||||
// Remove removes the mapping between the IP and the endpoint identifier
|
||||
// returns true if the mapping was deleted, false otherwise
|
||||
// returns also the number of endpoints associated to the IP
|
||||
Remove(key string, value interface{}) (bool, int)
|
||||
// Cardinality returns the number of elements in the set of a specfic key
|
||||
// returns false if the key is not in the map
|
||||
Cardinality(key string) (int, bool)
|
||||
// String returns the string version of the set, empty otherwise
|
||||
// returns false if the key is not in the map
|
||||
String(key string) (string, bool)
|
||||
}
|
||||
|
||||
type setMatrix struct {
|
||||
matrix map[string]mapset.Set
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// NewSetMatrix creates a new set matrix object
|
||||
func NewSetMatrix() SetMatrix {
|
||||
s := &setMatrix{}
|
||||
s.init()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *setMatrix) init() {
|
||||
s.matrix = make(map[string]mapset.Set)
|
||||
}
|
||||
|
||||
func (s *setMatrix) Get(key string) ([]interface{}, bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
set, ok := s.matrix[key]
|
||||
if !ok {
|
||||
return nil, ok
|
||||
}
|
||||
return set.ToSlice(), ok
|
||||
}
|
||||
|
||||
func (s *setMatrix) Contains(key string, value interface{}) (bool, bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
set, ok := s.matrix[key]
|
||||
if !ok {
|
||||
return false, ok
|
||||
}
|
||||
return set.Contains(value), ok
|
||||
}
|
||||
|
||||
func (s *setMatrix) Insert(key string, value interface{}) (bool, int) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
set, ok := s.matrix[key]
|
||||
if !ok {
|
||||
s.matrix[key] = mapset.NewSet()
|
||||
s.matrix[key].Add(value)
|
||||
return true, 1
|
||||
}
|
||||
|
||||
return set.Add(value), set.Cardinality()
|
||||
}
|
||||
|
||||
func (s *setMatrix) Remove(key string, value interface{}) (bool, int) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
set, ok := s.matrix[key]
|
||||
if !ok {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
var removed bool
|
||||
if set.Contains(value) {
|
||||
set.Remove(value)
|
||||
removed = true
|
||||
// If the set is empty remove it from the matrix
|
||||
if set.Cardinality() == 0 {
|
||||
delete(s.matrix, key)
|
||||
}
|
||||
}
|
||||
|
||||
return removed, set.Cardinality()
|
||||
}
|
||||
|
||||
func (s *setMatrix) Cardinality(key string) (int, bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
set, ok := s.matrix[key]
|
||||
if !ok {
|
||||
return 0, ok
|
||||
}
|
||||
|
||||
return set.Cardinality(), ok
|
||||
}
|
||||
|
||||
func (s *setMatrix) String(key string) (string, bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
set, ok := s.matrix[key]
|
||||
if !ok {
|
||||
return "", ok
|
||||
}
|
||||
return set.String(), ok
|
||||
}
|
146
libnetwork/common/setmatrix_test.go
Normal file
146
libnetwork/common/setmatrix_test.go
Normal file
|
@ -0,0 +1,146 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/docker/libnetwork/testutils"
|
||||
)
|
||||
|
||||
func TestSetSerialInsertDelete(t *testing.T) {
|
||||
s := NewSetMatrix()
|
||||
|
||||
b, i := s.Insert("a", "1")
|
||||
if !b || i != 1 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
b, i = s.Insert("a", "1")
|
||||
if b || i != 1 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
b, i = s.Insert("a", "2")
|
||||
if !b || i != 2 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
b, i = s.Insert("a", "1")
|
||||
if b || i != 2 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
b, i = s.Insert("a", "3")
|
||||
if !b || i != 3 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
b, i = s.Insert("a", "2")
|
||||
if b || i != 3 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
b, i = s.Insert("a", "3")
|
||||
if b || i != 3 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
b, i = s.Insert("a", "4")
|
||||
if !b || i != 4 {
|
||||
t.Fatalf("error in insert %t %d", b, i)
|
||||
}
|
||||
|
||||
b, p := s.Contains("a", "1")
|
||||
if !b || !p {
|
||||
t.Fatalf("error in contains %t %t", b, p)
|
||||
}
|
||||
b, p = s.Contains("a", "2")
|
||||
if !b || !p {
|
||||
t.Fatalf("error in contains %t %t", b, p)
|
||||
}
|
||||
b, p = s.Contains("a", "3")
|
||||
if !b || !p {
|
||||
t.Fatalf("error in contains %t %t", b, p)
|
||||
}
|
||||
b, p = s.Contains("a", "4")
|
||||
if !b || !p {
|
||||
t.Fatalf("error in contains %t %t", b, p)
|
||||
}
|
||||
|
||||
i, b = s.Cardinality("a")
|
||||
if !b || i != 4 {
|
||||
t.Fatalf("error in cardinality count %t %d", b, i)
|
||||
}
|
||||
|
||||
b, i = s.Remove("a", "1")
|
||||
if !b || i != 3 {
|
||||
t.Fatalf("error in remove %t %d", b, i)
|
||||
}
|
||||
b, i = s.Remove("a", "3")
|
||||
if !b || i != 2 {
|
||||
t.Fatalf("error in remove %t %d", b, i)
|
||||
}
|
||||
b, i = s.Remove("a", "1")
|
||||
if b || i != 2 {
|
||||
t.Fatalf("error in remove %t %d", b, i)
|
||||
}
|
||||
b, i = s.Remove("a", "4")
|
||||
if !b || i != 1 {
|
||||
t.Fatalf("error in remove %t %d", b, i)
|
||||
}
|
||||
b, i = s.Remove("a", "2")
|
||||
if !b || i != 0 {
|
||||
t.Fatalf("error in remove %t %d", b, i)
|
||||
}
|
||||
b, i = s.Remove("a", "2")
|
||||
if b || i != 0 {
|
||||
t.Fatalf("error in remove %t %d", b, i)
|
||||
}
|
||||
|
||||
i, b = s.Cardinality("a")
|
||||
if b || i != 0 {
|
||||
t.Fatalf("error in cardinality count %t %d", b, i)
|
||||
}
|
||||
}
|
||||
|
||||
func insertDeleteRotuine(ctx context.Context, endCh chan int, s SetMatrix, key, value string) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
endCh <- 0
|
||||
return
|
||||
default:
|
||||
b, _ := s.Insert(key, value)
|
||||
if !b {
|
||||
endCh <- 1
|
||||
return
|
||||
}
|
||||
|
||||
b, _ = s.Remove(key, value)
|
||||
if !b {
|
||||
endCh <- 2
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetParallelInsertDelete(t *testing.T) {
|
||||
s := NewSetMatrix()
|
||||
parallelRoutines := 6
|
||||
endCh := make(chan int)
|
||||
// Let the routines running and competing for 10s
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
for i := 0; i < parallelRoutines; i++ {
|
||||
go insertDeleteRotuine(ctx, endCh, s, "key-"+strconv.Itoa(i%3), strconv.Itoa(i))
|
||||
}
|
||||
for parallelRoutines > 0 {
|
||||
v := <-endCh
|
||||
if v == 1 {
|
||||
t.Fatalf("error one goroutine failed on the insert")
|
||||
}
|
||||
if v == 2 {
|
||||
t.Fatalf("error one goroutine failed on the remove")
|
||||
}
|
||||
parallelRoutines--
|
||||
}
|
||||
if i, b := s.Cardinality("key"); b || i > 0 {
|
||||
t.Fatalf("error the set should be empty %t %d", b, i)
|
||||
}
|
||||
}
|
|
@ -597,8 +597,14 @@ func (ep *endpoint) rename(name string) error {
|
|||
|
||||
c := n.getController()
|
||||
|
||||
sb, ok := ep.getSandbox()
|
||||
if !ok {
|
||||
logrus.Warnf("rename for %s aborted, sandbox %s is not anymore present", ep.ID(), ep.sandboxID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.isAgent() {
|
||||
if err = ep.deleteServiceInfoFromCluster(); err != nil {
|
||||
if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil {
|
||||
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
|
||||
}
|
||||
} else {
|
||||
|
@ -617,15 +623,15 @@ func (ep *endpoint) rename(name string) error {
|
|||
ep.anonymous = false
|
||||
|
||||
if c.isAgent() {
|
||||
if err = ep.addServiceInfoToCluster(); err != nil {
|
||||
if err = ep.addServiceInfoToCluster(sb); err != nil {
|
||||
return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
ep.deleteServiceInfoFromCluster()
|
||||
ep.deleteServiceInfoFromCluster(sb, "rename")
|
||||
ep.name = oldName
|
||||
ep.anonymous = oldAnonymous
|
||||
ep.addServiceInfoToCluster()
|
||||
ep.addServiceInfoToCluster(sb)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
|
@ -746,7 +752,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
|
|||
return err
|
||||
}
|
||||
|
||||
if e := ep.deleteServiceInfoFromCluster(); e != nil {
|
||||
if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil {
|
||||
logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/libnetwork/common"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/discoverapi"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
|
@ -383,7 +384,7 @@ func TestSRVServiceQuery(t *testing.T) {
|
|||
sr := svcInfo{
|
||||
svcMap: make(map[string][]net.IP),
|
||||
svcIPv6Map: make(map[string][]net.IP),
|
||||
ipMap: make(map[string]*ipInfo),
|
||||
ipMap: common.NewSetMatrix(),
|
||||
service: make(map[string][]servicePorts),
|
||||
}
|
||||
// backing container for the service
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/docker/pkg/stringid"
|
||||
"github.com/docker/libnetwork/common"
|
||||
"github.com/docker/libnetwork/config"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
|
@ -97,7 +98,7 @@ type ipInfo struct {
|
|||
type svcInfo struct {
|
||||
svcMap map[string][]net.IP
|
||||
svcIPv6Map map[string][]net.IP
|
||||
ipMap map[string]*ipInfo
|
||||
ipMap common.SetMatrix
|
||||
service map[string][]servicePorts
|
||||
}
|
||||
|
||||
|
@ -990,6 +991,12 @@ func (n *network) delete(force bool) error {
|
|||
|
||||
c.cleanupServiceBindings(n.ID())
|
||||
|
||||
// The network had been left, the service discovery can be cleaned up
|
||||
c.Lock()
|
||||
logrus.Debugf("network %s delete, clean svcRecords", n.id)
|
||||
delete(c.svcRecords, n.id)
|
||||
c.Unlock()
|
||||
|
||||
removeFromStore:
|
||||
// deleteFromStore performs an atomic delete operation and the
|
||||
// network.epCnt will help prevent any possible
|
||||
|
@ -1227,36 +1234,34 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
|
|||
// breaks some apps
|
||||
if ep.isAnonymous() {
|
||||
if len(myAliases) > 0 {
|
||||
n.addSvcRecords(myAliases[0], iface.Address().IP, ipv6, true)
|
||||
n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
|
||||
}
|
||||
} else {
|
||||
n.addSvcRecords(epName, iface.Address().IP, ipv6, true)
|
||||
n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
|
||||
}
|
||||
for _, alias := range myAliases {
|
||||
n.addSvcRecords(alias, iface.Address().IP, ipv6, false)
|
||||
n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
|
||||
}
|
||||
} else {
|
||||
if ep.isAnonymous() {
|
||||
if len(myAliases) > 0 {
|
||||
n.deleteSvcRecords(myAliases[0], iface.Address().IP, ipv6, true)
|
||||
n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
|
||||
}
|
||||
} else {
|
||||
n.deleteSvcRecords(epName, iface.Address().IP, ipv6, true)
|
||||
n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
|
||||
}
|
||||
for _, alias := range myAliases {
|
||||
n.deleteSvcRecords(alias, iface.Address().IP, ipv6, false)
|
||||
n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func addIPToName(ipMap map[string]*ipInfo, name string, ip net.IP) {
|
||||
func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) {
|
||||
reverseIP := netutils.ReverseIP(ip.String())
|
||||
if _, ok := ipMap[reverseIP]; !ok {
|
||||
ipMap[reverseIP] = &ipInfo{
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
ipMap.Insert(reverseIP, ipInfo{
|
||||
name: name,
|
||||
})
|
||||
}
|
||||
|
||||
func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
|
||||
|
@ -1284,24 +1289,25 @@ func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
|
||||
func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
|
||||
// Do not add service names for ingress network as this is a
|
||||
// routing only network
|
||||
if n.ingress {
|
||||
return
|
||||
}
|
||||
|
||||
logrus.Debugf("(%s).addSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
|
||||
logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
|
||||
|
||||
c := n.getController()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
sr, ok := c.svcRecords[n.ID()]
|
||||
if !ok {
|
||||
sr = svcInfo{
|
||||
svcMap: make(map[string][]net.IP),
|
||||
svcIPv6Map: make(map[string][]net.IP),
|
||||
ipMap: make(map[string]*ipInfo),
|
||||
ipMap: common.NewSetMatrix(),
|
||||
}
|
||||
c.svcRecords[n.ID()] = sr
|
||||
}
|
||||
|
@ -1319,28 +1325,33 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp
|
|||
}
|
||||
}
|
||||
|
||||
func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
|
||||
func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
|
||||
// Do not delete service names from ingress network as this is a
|
||||
// routing only network
|
||||
if n.ingress {
|
||||
return
|
||||
}
|
||||
|
||||
logrus.Debugf("(%s).deleteSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
|
||||
logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
|
||||
|
||||
c := n.getController()
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
sr, ok := c.svcRecords[n.ID()]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if ipMapUpdate {
|
||||
delete(sr.ipMap, netutils.ReverseIP(epIP.String()))
|
||||
sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{
|
||||
name: name,
|
||||
})
|
||||
|
||||
if epIPv6 != nil {
|
||||
delete(sr.ipMap, netutils.ReverseIP(epIPv6.String()))
|
||||
sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{
|
||||
name: name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1868,9 +1879,11 @@ func (n *network) HandleQueryResp(name string, ip net.IP) {
|
|||
}
|
||||
|
||||
ipStr := netutils.ReverseIP(ip.String())
|
||||
|
||||
if ipInfo, ok := sr.ipMap[ipStr]; ok {
|
||||
ipInfo.extResolver = true
|
||||
// If an object with extResolver == true is already in the set this call will fail
|
||||
// but anyway it means that has already been inserted before
|
||||
if ok, _ := sr.ipMap.Contains(ipStr, ipInfo{name: name}); ok {
|
||||
sr.ipMap.Remove(ipStr, ipInfo{name: name})
|
||||
sr.ipMap.Insert(ipStr, ipInfo{name: name, extResolver: true})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1886,13 +1899,27 @@ func (n *network) ResolveIP(ip string) string {
|
|||
|
||||
nwName := n.Name()
|
||||
|
||||
ipInfo, ok := sr.ipMap[ip]
|
||||
|
||||
if !ok || ipInfo.extResolver {
|
||||
elemSet, ok := sr.ipMap.Get(ip)
|
||||
if !ok || len(elemSet) == 0 {
|
||||
return ""
|
||||
}
|
||||
// NOTE it is possible to have more than one element in the Set, this will happen
|
||||
// because of interleave of diffent events from differnt sources (local container create vs
|
||||
// network db notifications)
|
||||
// In such cases the resolution will be based on the first element of the set, and can vary
|
||||
// during the system stabilitation
|
||||
elem, ok := elemSet[0].(ipInfo)
|
||||
if !ok {
|
||||
setStr, b := sr.ipMap.String(ip)
|
||||
logrus.Errorf("expected set of ipInfo type for key %s set:%t %s", ip, b, setStr)
|
||||
return ""
|
||||
}
|
||||
|
||||
return ipInfo.name + "." + nwName
|
||||
if elem.extResolver {
|
||||
return ""
|
||||
}
|
||||
|
||||
return elem.name + "." + nwName
|
||||
}
|
||||
|
||||
func (n *network) ResolveService(name string) ([]*net.SRV, []net.IP) {
|
||||
|
|
|
@ -285,7 +285,6 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
|
|||
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
||||
nDB.Unlock()
|
||||
|
||||
nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -313,7 +312,6 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
|
|||
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
||||
nDB.Unlock()
|
||||
|
||||
nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -359,7 +357,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
|
|||
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
|
||||
nDB.Unlock()
|
||||
|
||||
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -86,6 +86,9 @@ type sandbox struct {
|
|||
ingress bool
|
||||
ndotsSet bool
|
||||
sync.Mutex
|
||||
// This mutex is used to serialize service related operation for an endpoint
|
||||
// The lock is here because the endpoint is saved into the store so is not unique
|
||||
Service sync.Mutex
|
||||
}
|
||||
|
||||
// These are the container configs used to customize container /etc/hosts file.
|
||||
|
@ -668,26 +671,25 @@ func (sb *sandbox) SetKey(basePath string) error {
|
|||
}
|
||||
|
||||
func (sb *sandbox) EnableService() error {
|
||||
logrus.Debugf("EnableService %s START", sb.containerID)
|
||||
for _, ep := range sb.getConnectedEndpoints() {
|
||||
if ep.enableService(true) {
|
||||
if err := ep.addServiceInfoToCluster(); err != nil {
|
||||
if err := ep.addServiceInfoToCluster(sb); err != nil {
|
||||
ep.enableService(false)
|
||||
return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
logrus.Debugf("EnableService %s DONE", sb.containerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *sandbox) DisableService() error {
|
||||
logrus.Debugf("DisableService %s START", sb.containerID)
|
||||
for _, ep := range sb.getConnectedEndpoints() {
|
||||
if ep.enableService(false) {
|
||||
if err := ep.deleteServiceInfoFromCluster(); err != nil {
|
||||
ep.enableService(true)
|
||||
return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err)
|
||||
}
|
||||
}
|
||||
ep.enableService(false)
|
||||
}
|
||||
logrus.Debugf("DisableService %s DONE", sb.containerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/libnetwork/common"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -48,17 +50,49 @@ type service struct {
|
|||
// Service aliases
|
||||
aliases []string
|
||||
|
||||
// This maps tracks for each IP address the list of endpoints ID
|
||||
// associated with it. At stable state the endpoint ID expected is 1
|
||||
// but during transition and service change it is possible to have
|
||||
// temporary more than 1
|
||||
ipToEndpoint common.SetMatrix
|
||||
|
||||
deleted bool
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// assignIPToEndpoint inserts the mapping between the IP and the endpoint identifier
|
||||
// returns true if the mapping was not present, false otherwise
|
||||
// returns also the number of endpoints associated to the IP
|
||||
func (s *service) assignIPToEndpoint(ip, eID string) (bool, int) {
|
||||
return s.ipToEndpoint.Insert(ip, eID)
|
||||
}
|
||||
|
||||
// removeIPToEndpoint removes the mapping between the IP and the endpoint identifier
|
||||
// returns true if the mapping was deleted, false otherwise
|
||||
// returns also the number of endpoints associated to the IP
|
||||
func (s *service) removeIPToEndpoint(ip, eID string) (bool, int) {
|
||||
return s.ipToEndpoint.Remove(ip, eID)
|
||||
}
|
||||
|
||||
func (s *service) printIPToEndpoint(ip string) (string, bool) {
|
||||
return s.ipToEndpoint.String(ip)
|
||||
}
|
||||
|
||||
type loadBalancer struct {
|
||||
vip net.IP
|
||||
fwMark uint32
|
||||
|
||||
// Map of backend IPs backing this loadbalancer on this
|
||||
// network. It is keyed with endpoint ID.
|
||||
backEnds map[string]net.IP
|
||||
backEnds map[string]loadBalancerBackend
|
||||
|
||||
// Back pointer to service to which the loadbalancer belongs.
|
||||
service *service
|
||||
}
|
||||
|
||||
type loadBalancerBackend struct {
|
||||
ip net.IP
|
||||
containerName string
|
||||
taskAliases []string
|
||||
}
|
||||
|
|
|
@ -6,15 +6,126 @@ import (
|
|||
"net"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libnetwork/common"
|
||||
)
|
||||
|
||||
func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
|
||||
func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, addService bool, method string) error {
|
||||
n, err := c.NetworkByID(nID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService)
|
||||
|
||||
// Add container resolution mappings
|
||||
c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
|
||||
|
||||
// Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR.
|
||||
n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
|
||||
for _, alias := range serviceAliases {
|
||||
n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
|
||||
}
|
||||
|
||||
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
|
||||
if len(vip) == 0 {
|
||||
n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method)
|
||||
for _, alias := range serviceAliases {
|
||||
n.(*network).addSvcRecords(eID, alias, ip, nil, false, method)
|
||||
}
|
||||
}
|
||||
|
||||
if addService && len(vip) != 0 {
|
||||
n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method)
|
||||
for _, alias := range serviceAliases {
|
||||
n.(*network).addSvcRecords(eID, alias, vip, nil, false, method)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) addContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
|
||||
n, err := c.NetworkByID(nID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("addContainerNameResolution %s %s", eID, containerName)
|
||||
|
||||
// Add resolution for container name
|
||||
n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method)
|
||||
|
||||
// Add resolution for taskaliases
|
||||
for _, alias := range taskAliases {
|
||||
n.(*network).addSvcRecords(eID, alias, ip, nil, true, method)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, rmService, multipleEntries bool, method string) error {
|
||||
n, err := c.NetworkByID(nID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries)
|
||||
|
||||
// Delete container resolution mappings
|
||||
c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
|
||||
|
||||
// Delete the special "tasks.svc_name" backend record.
|
||||
if !multipleEntries {
|
||||
n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
|
||||
for _, alias := range serviceAliases {
|
||||
n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
|
||||
}
|
||||
}
|
||||
|
||||
// If we are doing DNS RR delete the endpoint IP from DNS record right away.
|
||||
if !multipleEntries && len(vip) == 0 {
|
||||
n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method)
|
||||
for _, alias := range serviceAliases {
|
||||
n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the DNS record for VIP only if we are removing the service
|
||||
if rmService && len(vip) != 0 && !multipleEntries {
|
||||
n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method)
|
||||
for _, alias := range serviceAliases {
|
||||
n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) delContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
|
||||
n, err := c.NetworkByID(nID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("delContainerNameResolution %s %s", eID, containerName)
|
||||
|
||||
// Delete resolution for container name
|
||||
n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method)
|
||||
|
||||
// Delete resolution for taskaliases
|
||||
for _, alias := range taskAliases {
|
||||
n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newService(name string, id string, ingressPorts []*PortConfig, serviceAliases []string) *service {
|
||||
return &service{
|
||||
name: name,
|
||||
id: id,
|
||||
ingressPorts: ingressPorts,
|
||||
loadBalancers: make(map[string]*loadBalancer),
|
||||
aliases: aliases,
|
||||
aliases: serviceAliases,
|
||||
ipToEndpoint: common.NewSetMatrix(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,21 +161,26 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
|
|||
|
||||
for _, s := range services {
|
||||
s.Lock()
|
||||
// Skip the serviceBindings that got deleted
|
||||
if s.deleted {
|
||||
s.Unlock()
|
||||
continue
|
||||
}
|
||||
for nid, lb := range s.loadBalancers {
|
||||
if cleanupNID != "" && nid != cleanupNID {
|
||||
continue
|
||||
}
|
||||
|
||||
for eid, ip := range lb.backEnds {
|
||||
for eid, be := range lb.backEnds {
|
||||
service := s
|
||||
loadBalancer := lb
|
||||
networkID := nid
|
||||
epID := eid
|
||||
epIP := ip
|
||||
epIP := be.ip
|
||||
|
||||
cleanupFuncs = append(cleanupFuncs, func() {
|
||||
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip,
|
||||
service.ingressPorts, service.aliases, epIP); err != nil {
|
||||
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip,
|
||||
service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil {
|
||||
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
|
||||
service.id, networkID, epID, err)
|
||||
}
|
||||
|
@ -80,48 +196,43 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
|
|||
|
||||
}
|
||||
|
||||
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
|
||||
n, err := c.NetworkByID(nid)
|
||||
func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error {
|
||||
var addService bool
|
||||
|
||||
n, err := c.NetworkByID(nID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
skey := serviceKey{
|
||||
id: sid,
|
||||
id: svcID,
|
||||
ports: portConfigs(ingressPorts).String(),
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
s, ok := c.serviceBindings[skey]
|
||||
if !ok {
|
||||
// Create a new service if we are seeing this service
|
||||
// for the first time.
|
||||
s = newService(name, sid, ingressPorts, aliases)
|
||||
c.serviceBindings[skey] = s
|
||||
var s *service
|
||||
for {
|
||||
c.Lock()
|
||||
var ok bool
|
||||
s, ok = c.serviceBindings[skey]
|
||||
if !ok {
|
||||
// Create a new service if we are seeing this service
|
||||
// for the first time.
|
||||
s = newService(svcName, svcID, ingressPorts, serviceAliases)
|
||||
c.serviceBindings[skey] = s
|
||||
}
|
||||
c.Unlock()
|
||||
s.Lock()
|
||||
if !s.deleted {
|
||||
// ok the object is good to be used
|
||||
break
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
||||
c.Unlock()
|
||||
logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID)
|
||||
|
||||
// Add endpoint IP to special "tasks.svc_name" so that the
|
||||
// applications have access to DNS RR.
|
||||
n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
|
||||
for _, alias := range aliases {
|
||||
n.(*network).addSvcRecords("tasks."+alias, ip, nil, false)
|
||||
}
|
||||
|
||||
// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
|
||||
svcIP := vip
|
||||
if len(svcIP) == 0 {
|
||||
svcIP = ip
|
||||
}
|
||||
n.(*network).addSvcRecords(name, svcIP, nil, false)
|
||||
for _, alias := range aliases {
|
||||
n.(*network).addSvcRecords(alias, svcIP, nil, false)
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
lb, ok := s.loadBalancers[nid]
|
||||
lb, ok := s.loadBalancers[nID]
|
||||
if !ok {
|
||||
// Create a new load balancer if we are seeing this
|
||||
// network attachment on the service for the first
|
||||
|
@ -129,7 +240,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
|
|||
lb = &loadBalancer{
|
||||
vip: vip,
|
||||
fwMark: fwMarkCtr,
|
||||
backEnds: make(map[string]net.IP),
|
||||
backEnds: make(map[string]loadBalancerBackend),
|
||||
service: s,
|
||||
}
|
||||
|
||||
|
@ -137,10 +248,19 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
|
|||
fwMarkCtr++
|
||||
fwMarkCtrMu.Unlock()
|
||||
|
||||
s.loadBalancers[nid] = lb
|
||||
s.loadBalancers[nID] = lb
|
||||
addService = true
|
||||
}
|
||||
|
||||
lb.backEnds[eid] = ip
|
||||
lb.backEnds[eID] = loadBalancerBackend{ip: ip,
|
||||
containerName: containerName,
|
||||
taskAliases: taskAliases}
|
||||
|
||||
ok, entries := s.assignIPToEndpoint(ip.String(), eID)
|
||||
if !ok || entries > 1 {
|
||||
setStr, b := s.printIPToEndpoint(ip.String())
|
||||
logrus.Warnf("addServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
|
||||
}
|
||||
|
||||
// Add loadbalancer service and backend in all sandboxes in
|
||||
// the network only if vip is valid.
|
||||
|
@ -148,89 +268,87 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
|
|||
n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts)
|
||||
}
|
||||
|
||||
// Add the appropriate name resolutions
|
||||
c.addEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, addService, "addServiceBinding")
|
||||
|
||||
logrus.Debugf("addServiceBinding from %s END for %s %s", method, svcName, eID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
|
||||
func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error {
|
||||
|
||||
var rmService bool
|
||||
|
||||
n, err := c.NetworkByID(nid)
|
||||
n, err := c.NetworkByID(nID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
skey := serviceKey{
|
||||
id: sid,
|
||||
id: svcID,
|
||||
ports: portConfigs(ingressPorts).String(),
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
s, ok := c.serviceBindings[skey]
|
||||
c.Unlock()
|
||||
logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID)
|
||||
if !ok {
|
||||
logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
lb, ok := s.loadBalancers[nid]
|
||||
defer s.Unlock()
|
||||
lb, ok := s.loadBalancers[nID]
|
||||
if !ok {
|
||||
s.Unlock()
|
||||
logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, ok = lb.backEnds[eid]
|
||||
_, ok = lb.backEnds[eID]
|
||||
if !ok {
|
||||
s.Unlock()
|
||||
logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] !ok", method, svcName, eID)
|
||||
return nil
|
||||
}
|
||||
|
||||
delete(lb.backEnds, eid)
|
||||
delete(lb.backEnds, eID)
|
||||
if len(lb.backEnds) == 0 {
|
||||
// All the backends for this service have been
|
||||
// removed. Time to remove the load balancer and also
|
||||
// remove the service entry in IPVS.
|
||||
rmService = true
|
||||
|
||||
delete(s.loadBalancers, nid)
|
||||
delete(s.loadBalancers, nID)
|
||||
}
|
||||
|
||||
if len(s.loadBalancers) == 0 {
|
||||
// All loadbalancers for the service removed. Time to
|
||||
// remove the service itself.
|
||||
c.Lock()
|
||||
|
||||
// Mark the object as deleted so that the add won't use it wrongly
|
||||
s.deleted = true
|
||||
delete(c.serviceBindings, skey)
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
ok, entries := s.removeIPToEndpoint(ip.String(), eID)
|
||||
if !ok || entries > 0 {
|
||||
setStr, b := s.printIPToEndpoint(ip.String())
|
||||
logrus.Warnf("rmServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
|
||||
}
|
||||
|
||||
// Remove loadbalancer service(if needed) and backend in all
|
||||
// sandboxes in the network only if the vip is valid.
|
||||
if len(vip) != 0 {
|
||||
n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService)
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
// Delete the special "tasks.svc_name" backend record.
|
||||
n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
|
||||
for _, alias := range aliases {
|
||||
n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false)
|
||||
}
|
||||
|
||||
// If we are doing DNS RR add the endpoint IP to DNS record
|
||||
// right away.
|
||||
if len(vip) == 0 {
|
||||
n.(*network).deleteSvcRecords(name, ip, nil, false)
|
||||
for _, alias := range aliases {
|
||||
n.(*network).deleteSvcRecords(alias, ip, nil, false)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the DNS record for VIP only if we are removing the service
|
||||
if rmService && len(vip) != 0 {
|
||||
n.(*network).deleteSvcRecords(name, vip, nil, false)
|
||||
for _, alias := range aliases {
|
||||
n.(*network).deleteSvcRecords(alias, vip, nil, false)
|
||||
}
|
||||
}
|
||||
// Delete the name resolutions
|
||||
c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding")
|
||||
|
||||
logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -44,6 +44,11 @@ func (n *network) connectedLoadbalancers() []*loadBalancer {
|
|||
var lbs []*loadBalancer
|
||||
for _, s := range serviceBindings {
|
||||
s.Lock()
|
||||
// Skip the serviceBindings that got deleted
|
||||
if s.deleted {
|
||||
s.Unlock()
|
||||
continue
|
||||
}
|
||||
if lb, ok := s.loadBalancers[n.ID()]; ok {
|
||||
lbs = append(lbs, lb)
|
||||
}
|
||||
|
@ -97,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
|
|||
}
|
||||
|
||||
lb.service.Lock()
|
||||
for _, ip := range lb.backEnds {
|
||||
sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
|
||||
for _, l := range lb.backEnds {
|
||||
sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
|
||||
}
|
||||
lb.service.Unlock()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue