mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Do not add loadbalancer to unpopulated sandboxes
When adding a loadbalancer to a sandbox, the sandbox may have a valid namespace but it might not have populated all the dependent network resources yet. In that case do not populate that endpoint's loadbalancer into that sandbox yet. The loadbalancer will be populated into the sandbox when it is done populating all the dependent network resources. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
parent
0551112769
commit
d5ce149555
4 changed files with 75 additions and 41 deletions
|
@ -810,12 +810,13 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (s
|
|||
// Create sandbox and process options first. Key generation depends on an option
|
||||
if sb == nil {
|
||||
sb = &sandbox{
|
||||
id: stringid.GenerateRandomID(),
|
||||
containerID: containerID,
|
||||
endpoints: epHeap{},
|
||||
epPriority: map[string]int{},
|
||||
config: containerConfig{},
|
||||
controller: c,
|
||||
id: stringid.GenerateRandomID(),
|
||||
containerID: containerID,
|
||||
endpoints: epHeap{},
|
||||
epPriority: map[string]int{},
|
||||
populatedEndpoints: map[string]struct{}{},
|
||||
config: containerConfig{},
|
||||
controller: c,
|
||||
}
|
||||
}
|
||||
sBox = sb
|
||||
|
|
|
@ -68,23 +68,24 @@ func (sb *sandbox) processOptions(options ...SandboxOption) {
|
|||
type epHeap []*endpoint
|
||||
|
||||
type sandbox struct {
|
||||
id string
|
||||
containerID string
|
||||
config containerConfig
|
||||
extDNS []string
|
||||
osSbox osl.Sandbox
|
||||
controller *controller
|
||||
resolver Resolver
|
||||
resolverOnce sync.Once
|
||||
refCnt int
|
||||
endpoints epHeap
|
||||
epPriority map[string]int
|
||||
joinLeaveDone chan struct{}
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
isStub bool
|
||||
inDelete bool
|
||||
ingress bool
|
||||
id string
|
||||
containerID string
|
||||
config containerConfig
|
||||
extDNS []string
|
||||
osSbox osl.Sandbox
|
||||
controller *controller
|
||||
resolver Resolver
|
||||
resolverOnce sync.Once
|
||||
refCnt int
|
||||
endpoints epHeap
|
||||
epPriority map[string]int
|
||||
populatedEndpoints map[string]struct{}
|
||||
joinLeaveDone chan struct{}
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
isStub bool
|
||||
inDelete bool
|
||||
ingress bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -798,6 +799,12 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Make sure to add the endpoint to the populated endpoint set
|
||||
// before populating loadbalancers.
|
||||
sb.Lock()
|
||||
sb.populatedEndpoints[ep.ID()] = struct{}{}
|
||||
sb.Unlock()
|
||||
|
||||
// Populate load balancer only after updating all the other
|
||||
// information including gateway and other routes so that
|
||||
// loadbalancers are populated all the network state is in
|
||||
|
@ -830,6 +837,7 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
|
|||
releaseOSSboxResources(osSbox, ep)
|
||||
}
|
||||
|
||||
delete(sb.populatedEndpoints, ep.ID())
|
||||
sb.Lock()
|
||||
if len(sb.endpoints) == 0 {
|
||||
// sb.endpoints should never be empty and this is unexpected error condition
|
||||
|
@ -879,6 +887,13 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (sb *sandbox) isEndpointPopulated(ep *endpoint) bool {
|
||||
sb.Lock()
|
||||
_, ok := sb.populatedEndpoints[ep.ID()]
|
||||
sb.Unlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// joinLeaveStart waits to ensure there are no joins or leaves in progress and
|
||||
// marks this join/leave in progress without race
|
||||
func (sb *sandbox) joinLeaveStart() {
|
||||
|
|
|
@ -197,14 +197,15 @@ func (c *controller) sandboxCleanup(activeSandboxes map[string]interface{}) {
|
|||
sbs := kvo.(*sbState)
|
||||
|
||||
sb := &sandbox{
|
||||
id: sbs.ID,
|
||||
controller: sbs.c,
|
||||
containerID: sbs.Cid,
|
||||
endpoints: epHeap{},
|
||||
dbIndex: sbs.dbIndex,
|
||||
isStub: true,
|
||||
dbExists: true,
|
||||
extDNS: sbs.ExtDNS,
|
||||
id: sbs.ID,
|
||||
controller: sbs.c,
|
||||
containerID: sbs.Cid,
|
||||
endpoints: epHeap{},
|
||||
populatedEndpoints: map[string]struct{}{},
|
||||
dbIndex: sbs.dbIndex,
|
||||
isStub: true,
|
||||
dbExists: true,
|
||||
extDNS: sbs.ExtDNS,
|
||||
}
|
||||
|
||||
msg := " for cleanup"
|
||||
|
|
|
@ -184,14 +184,20 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
|
|||
func (n *network) connectedLoadbalancers() []*loadBalancer {
|
||||
c := n.getController()
|
||||
|
||||
serviceBindings := make([]*service, 0, len(c.serviceBindings))
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, s := range c.serviceBindings {
|
||||
serviceBindings = append(serviceBindings, s)
|
||||
}
|
||||
c.Unlock()
|
||||
|
||||
var lbs []*loadBalancer
|
||||
for _, s := range c.serviceBindings {
|
||||
for _, s := range serviceBindings {
|
||||
s.Lock()
|
||||
if lb, ok := s.loadBalancers[n.ID()]; ok {
|
||||
lbs = append(lbs, lb)
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
return lbs
|
||||
|
@ -229,12 +235,14 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
|
|||
continue
|
||||
}
|
||||
|
||||
lb.service.Lock()
|
||||
addService := true
|
||||
for _, ip := range lb.backEnds {
|
||||
sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
|
||||
eIP, gwIP, addService)
|
||||
addService = false
|
||||
}
|
||||
lb.service.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -245,6 +253,10 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
|
|||
n.WalkEndpoints(func(e Endpoint) bool {
|
||||
ep := e.(*endpoint)
|
||||
if sb, ok := ep.getSandbox(); ok {
|
||||
if !sb.isEndpointPopulated(ep) {
|
||||
return false
|
||||
}
|
||||
|
||||
var gwIP net.IP
|
||||
if ep := sb.getGatewayEndpoint(); ep != nil {
|
||||
gwIP = ep.Iface().Address().IP
|
||||
|
@ -264,6 +276,10 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Por
|
|||
n.WalkEndpoints(func(e Endpoint) bool {
|
||||
ep := e.(*endpoint)
|
||||
if sb, ok := ep.getSandbox(); ok {
|
||||
if !sb.isEndpointPopulated(ep) {
|
||||
return false
|
||||
}
|
||||
|
||||
var gwIP net.IP
|
||||
if ep := sb.getGatewayEndpoint(); ep != nil {
|
||||
gwIP = ep.Iface().Address().IP
|
||||
|
@ -356,15 +372,13 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
|
|||
}
|
||||
|
||||
if err := i.DelDestination(s, d); err != nil {
|
||||
logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
|
||||
return
|
||||
logrus.Infof("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
|
||||
}
|
||||
|
||||
if rmService {
|
||||
s.SchedName = ipvs.RoundRobin
|
||||
if err := i.DelService(s); err != nil {
|
||||
logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
|
||||
return
|
||||
logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
|
||||
}
|
||||
|
||||
var iPorts []*PortConfig
|
||||
|
@ -372,13 +386,11 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
|
|||
iPorts = ingressPorts
|
||||
if err := programIngress(gwIP, iPorts, true); err != nil {
|
||||
logrus.Errorf("Failed to delete ingress: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
|
||||
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -454,12 +466,17 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
|
|||
rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d",
|
||||
addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort))
|
||||
if err := iptables.RawCombinedOutput(rule...); err != nil {
|
||||
return fmt.Errorf("setting up rule failed, %v: %v", rule, err)
|
||||
errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
|
||||
if !isDelete {
|
||||
return fmt.Errorf("%s", errStr)
|
||||
}
|
||||
|
||||
logrus.Infof("%s", errStr)
|
||||
}
|
||||
}
|
||||
|
||||
if err := plumbProxy(iPort, isDelete); err != nil {
|
||||
return fmt.Errorf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
|
||||
logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue