Merge pull request #36638 from ctelfer/rolling-update-libnetwork-import

Import libnetwork fix for rolling updates
This commit is contained in:
Anusha Ragunathan 2018-03-28 21:27:14 -07:00 committed by GitHub
commit f12574891c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 232 additions and 110 deletions

View File

@ -979,9 +979,6 @@ func (daemon *Daemon) releaseNetwork(container *container.Container) {
logrus.Warnf("error locating sandbox id %s: %v", sid, err) logrus.Warnf("error locating sandbox id %s: %v", sid, err)
return return
} }
if err := sb.DisableService(); err != nil {
logrus.WithFields(logrus.Fields{"container": container.ID, "sandbox": sid}).WithError(err).Error("Error removing service from sandbox")
}
if err := sb.Delete(); err != nil { if err := sb.Delete(); err != nil {
logrus.Errorf("Error deleting sandbox id %s for container %s: %v", sid, container.ID, err) logrus.Errorf("Error deleting sandbox id %s for container %s: %v", sid, container.ID, err)

View File

@ -32,7 +32,7 @@ github.com/tonistiigi/fsutil dea3a0da73aee887fc02142d995be764106ac5e2
#get libnetwork packages #get libnetwork packages
# When updating, also update LIBNETWORK_COMMIT in hack/dockerfile/install/proxy accordingly # When updating, also update LIBNETWORK_COMMIT in hack/dockerfile/install/proxy accordingly
github.com/docker/libnetwork 1b91bc94094ecfdae41daa465cc0c8df37dfb3dd github.com/docker/libnetwork 2bf63300c52f5ea61989f85c732f00097d746530
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9 github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View File

@ -646,6 +646,7 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
Aliases: ep.svcAliases, Aliases: ep.svcAliases,
TaskAliases: ep.myAliases, TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(), EndpointIP: ep.Iface().Address().IP.String(),
ServiceDisabled: false,
}) })
if err != nil { if err != nil {
return err return err
@ -663,7 +664,7 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
return nil return nil
} }
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error { func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, fullRemove bool, method string) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 { if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil return nil
} }
@ -677,6 +678,15 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
defer sb.Service.Unlock() defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID()) logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
// Avoid a race w/ with a container that aborts preemptively. This would
// get caught in disableServceInNetworkDB, but we check here to make the
// nature of the condition more clear.
// See comment in addServiceInfoToCluster()
if e := sb.getEndpoint(ep.ID()); e == nil {
logrus.Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}
c := n.getController() c := n.getController()
agent := c.getAgent() agent := c.getAgent()
@ -686,10 +696,14 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
} }
if agent != nil { if agent != nil {
// First delete from networkDB then locally // First update the networkDB then locally
if fullRemove {
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil { if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err) logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
} }
} else {
disableServiceInNetworkDB(agent, n, ep)
}
} }
if ep.Iface().Address() != nil { if ep.Iface().Address() != nil {
@ -699,7 +713,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if n.ingress { if n.ingress {
ingressPorts = ep.ingressPorts ingressPorts = ep.ingressPorts
} }
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil { if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
return err return err
} }
} else { } else {
@ -715,6 +729,35 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
return nil return nil
} }
func disableServiceInNetworkDB(a *agent, n *network, ep *endpoint) {
var epRec EndpointRecord
logrus.Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())
// Update existing record to indicate that the service is disabled
inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
if err != nil {
logrus.Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Should never fail
if err := proto.Unmarshal(inBuf, &epRec); err != nil {
logrus.Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
return
}
epRec.ServiceDisabled = true
// Should never fail
outBuf, err := proto.Marshal(&epRec)
if err != nil {
logrus.Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Send update to the whole cluster
if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
logrus.Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
}
}
func (n *network) addDriverWatches() { func (n *network) addDriverWatches() {
if !n.isClusterEligible() { if !n.isClusterEligible() {
return return
@ -844,7 +887,6 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
nid string nid string
eid string eid string
value []byte value []byte
isAdd bool
epRec EndpointRecord epRec EndpointRecord
) )
@ -853,12 +895,15 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
nid = event.NetworkID nid = event.NetworkID
eid = event.Key eid = event.Key
value = event.Value value = event.Value
isAdd = true
case networkdb.DeleteEvent: case networkdb.DeleteEvent:
nid = event.NetworkID nid = event.NetworkID
eid = event.Key eid = event.Key
value = event.Value value = event.Value
case networkdb.UpdateEvent: case networkdb.UpdateEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
default:
logrus.Errorf("Unexpected update service table event = %#v", event) logrus.Errorf("Unexpected update service table event = %#v", event)
return return
} }
@ -883,7 +928,8 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
return return
} }
if isAdd { switch ev.(type) {
case networkdb.CreateEvent:
logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec) logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" { if svcID != "" {
// This is a remote task part of a service // This is a remote task part of a service
@ -897,11 +943,12 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err) logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
} }
} }
} else {
case networkdb.DeleteEvent:
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec) logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" { if svcID != "" {
// This is a remote task part of a service // This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil { if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err) logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return return
} }
@ -911,5 +958,18 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err) logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
} }
} }
case networkdb.UpdateEvent:
logrus.Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
// We currently should only get these to inform us that an endpoint
// is disabled. Report if otherwise.
if svcID == "" || !epRec.ServiceDisabled {
logrus.Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
return
}
// This is a remote task that is part of a service that is now disabled
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
logrus.Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} }
} }

View File

@ -77,6 +77,8 @@ type EndpointRecord struct {
Aliases []string `protobuf:"bytes,7,rep,name=aliases" json:"aliases,omitempty"` Aliases []string `protobuf:"bytes,7,rep,name=aliases" json:"aliases,omitempty"`
// List of aliases task specific aliases // List of aliases task specific aliases
TaskAliases []string `protobuf:"bytes,8,rep,name=task_aliases,json=taskAliases" json:"task_aliases,omitempty"` TaskAliases []string `protobuf:"bytes,8,rep,name=task_aliases,json=taskAliases" json:"task_aliases,omitempty"`
// Whether this enpoint's service has been disabled
ServiceDisabled bool `protobuf:"varint,9,opt,name=service_disabled,json=serviceDisabled,proto3" json:"service_disabled,omitempty"`
} }
func (m *EndpointRecord) Reset() { *m = EndpointRecord{} } func (m *EndpointRecord) Reset() { *m = EndpointRecord{} }
@ -139,6 +141,13 @@ func (m *EndpointRecord) GetTaskAliases() []string {
return nil return nil
} }
func (m *EndpointRecord) GetServiceDisabled() bool {
if m != nil {
return m.ServiceDisabled
}
return false
}
// PortConfig specifies an exposed port which can be // PortConfig specifies an exposed port which can be
// addressed using the given name. This can be later queried // addressed using the given name. This can be later queried
// using a service discovery api or a DNS SRV query. The node // using a service discovery api or a DNS SRV query. The node
@ -202,7 +211,7 @@ func (this *EndpointRecord) GoString() string {
if this == nil { if this == nil {
return "nil" return "nil"
} }
s := make([]string, 0, 12) s := make([]string, 0, 13)
s = append(s, "&libnetwork.EndpointRecord{") s = append(s, "&libnetwork.EndpointRecord{")
s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n") s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n")
@ -214,6 +223,7 @@ func (this *EndpointRecord) GoString() string {
} }
s = append(s, "Aliases: "+fmt.Sprintf("%#v", this.Aliases)+",\n") s = append(s, "Aliases: "+fmt.Sprintf("%#v", this.Aliases)+",\n")
s = append(s, "TaskAliases: "+fmt.Sprintf("%#v", this.TaskAliases)+",\n") s = append(s, "TaskAliases: "+fmt.Sprintf("%#v", this.TaskAliases)+",\n")
s = append(s, "ServiceDisabled: "+fmt.Sprintf("%#v", this.ServiceDisabled)+",\n")
s = append(s, "}") s = append(s, "}")
return strings.Join(s, "") return strings.Join(s, "")
} }
@ -325,6 +335,16 @@ func (m *EndpointRecord) MarshalTo(dAtA []byte) (int, error) {
i += copy(dAtA[i:], s) i += copy(dAtA[i:], s)
} }
} }
if m.ServiceDisabled {
dAtA[i] = 0x48
i++
if m.ServiceDisabled {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
return i, nil return i, nil
} }
@ -367,24 +387,6 @@ func (m *PortConfig) MarshalTo(dAtA []byte) (int, error) {
return i, nil return i, nil
} }
func encodeFixed64Agent(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
dAtA[offset+4] = uint8(v >> 32)
dAtA[offset+5] = uint8(v >> 40)
dAtA[offset+6] = uint8(v >> 48)
dAtA[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Agent(dAtA []byte, offset int, v uint32) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintAgent(dAtA []byte, offset int, v uint64) int { func encodeVarintAgent(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 { for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80) dAtA[offset] = uint8(v&0x7f | 0x80)
@ -435,6 +437,9 @@ func (m *EndpointRecord) Size() (n int) {
n += 1 + l + sovAgent(uint64(l)) n += 1 + l + sovAgent(uint64(l))
} }
} }
if m.ServiceDisabled {
n += 2
}
return n return n
} }
@ -483,6 +488,7 @@ func (this *EndpointRecord) String() string {
`IngressPorts:` + strings.Replace(fmt.Sprintf("%v", this.IngressPorts), "PortConfig", "PortConfig", 1) + `,`, `IngressPorts:` + strings.Replace(fmt.Sprintf("%v", this.IngressPorts), "PortConfig", "PortConfig", 1) + `,`,
`Aliases:` + fmt.Sprintf("%v", this.Aliases) + `,`, `Aliases:` + fmt.Sprintf("%v", this.Aliases) + `,`,
`TaskAliases:` + fmt.Sprintf("%v", this.TaskAliases) + `,`, `TaskAliases:` + fmt.Sprintf("%v", this.TaskAliases) + `,`,
`ServiceDisabled:` + fmt.Sprintf("%v", this.ServiceDisabled) + `,`,
`}`, `}`,
}, "") }, "")
return s return s
@ -771,6 +777,26 @@ func (m *EndpointRecord) Unmarshal(dAtA []byte) error {
} }
m.TaskAliases = append(m.TaskAliases, string(dAtA[iNdEx:postIndex])) m.TaskAliases = append(m.TaskAliases, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex iNdEx = postIndex
case 9:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ServiceDisabled", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowAgent
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.ServiceDisabled = bool(v != 0)
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipAgent(dAtA[iNdEx:]) skippy, err := skipAgent(dAtA[iNdEx:])
@ -1036,33 +1062,34 @@ var (
func init() { proto.RegisterFile("agent.proto", fileDescriptorAgent) } func init() { proto.RegisterFile("agent.proto", fileDescriptorAgent) }
var fileDescriptorAgent = []byte{ var fileDescriptorAgent = []byte{
// 437 bytes of a gzipped FileDescriptorProto // 459 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0xc1, 0x6e, 0xd3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x91, 0x31, 0x6f, 0xd3, 0x4c,
0x18, 0xc7, 0x9b, 0x36, 0x6c, 0xcd, 0x97, 0xb6, 0x54, 0x16, 0x42, 0x51, 0x0e, 0x69, 0xa8, 0x84, 0x18, 0xc7, 0xe3, 0xc4, 0x6f, 0x1b, 0x3f, 0x4e, 0x52, 0xeb, 0xf4, 0x0a, 0x59, 0x1e, 0x1c, 0x13,
0xd4, 0x03, 0xea, 0xa4, 0x71, 0xdc, 0x89, 0xb5, 0x1c, 0x72, 0x41, 0x96, 0xd7, 0x71, 0x0d, 0x69, 0x09, 0x29, 0x48, 0x28, 0x95, 0xca, 0xd8, 0x89, 0x26, 0x0c, 0x5e, 0x90, 0x75, 0x4d, 0x59, 0x83,
0x63, 0x82, 0xb5, 0x10, 0x47, 0xb6, 0x37, 0xae, 0xdc, 0x40, 0x7b, 0x87, 0x9d, 0x78, 0x19, 0x4e, 0x13, 0x1f, 0xe6, 0x54, 0xe3, 0xb3, 0xee, 0xae, 0x65, 0x65, 0x03, 0xf5, 0x3b, 0x74, 0xe2, 0xcb,
0x88, 0x23, 0xa7, 0x89, 0xe5, 0x09, 0x78, 0x04, 0x64, 0x27, 0x5e, 0x35, 0x69, 0x37, 0xfb, 0xf7, 0x30, 0x32, 0x32, 0x55, 0xd4, 0x9f, 0x80, 0x95, 0x0d, 0xdd, 0xf9, 0xae, 0x11, 0x52, 0xb7, 0xf3,
0xff, 0xd9, 0xfa, 0xbe, 0x3f, 0xf8, 0x59, 0x41, 0x2b, 0xb5, 0xac, 0x05, 0x57, 0x1c, 0x41, 0xc9, 0xef, 0xff, 0x3b, 0xeb, 0xb9, 0xff, 0x03, 0x7e, 0x5e, 0x92, 0x5a, 0x2e, 0x1a, 0xce, 0x24, 0x43,
0xb6, 0x15, 0x55, 0x5f, 0xb8, 0xb8, 0x08, 0x9f, 0x15, 0xbc, 0xe0, 0x06, 0x1f, 0xe9, 0x53, 0x6b, 0x50, 0xd1, 0x6d, 0x4d, 0xe4, 0x27, 0xc6, 0x2f, 0xa3, 0xff, 0x4b, 0x56, 0x32, 0x8d, 0x8f, 0xd5,
0xcc, 0x7f, 0xf5, 0x61, 0xf2, 0xb6, 0xca, 0x6b, 0xce, 0x2a, 0x45, 0xe8, 0x8e, 0x8b, 0x1c, 0x21, 0xa9, 0x33, 0x66, 0x7f, 0xfa, 0x30, 0x79, 0x5d, 0x17, 0x0d, 0xa3, 0xb5, 0xc4, 0x64, 0xc7, 0x78,
0x70, 0xab, 0xec, 0x33, 0x0d, 0x9c, 0xd8, 0x59, 0x78, 0xc4, 0x9c, 0xd1, 0x0b, 0x18, 0x49, 0x2a, 0x81, 0x10, 0xb8, 0x75, 0xfe, 0x91, 0x84, 0x4e, 0xe2, 0xcc, 0x3d, 0xac, 0xcf, 0xe8, 0x29, 0x8c,
0xae, 0xd8, 0x8e, 0xa6, 0x26, 0xeb, 0x9b, 0xcc, 0xef, 0xd8, 0x3b, 0xad, 0xbc, 0x02, 0xb0, 0x0a, 0x04, 0xe1, 0xd7, 0x74, 0x47, 0x36, 0x3a, 0xeb, 0xeb, 0xcc, 0x37, 0xec, 0x8d, 0x52, 0x5e, 0x00,
0xcb, 0x83, 0x81, 0x16, 0x4e, 0xc7, 0xcd, 0xed, 0xcc, 0x3b, 0x6b, 0x69, 0xb2, 0x26, 0x5e, 0x27, 0x58, 0x85, 0x16, 0xe1, 0x40, 0x09, 0x67, 0xe3, 0xf6, 0x6e, 0xea, 0x9d, 0x77, 0x34, 0x5d, 0x61,
0x24, 0xb9, 0xb6, 0xaf, 0x98, 0x50, 0x97, 0x59, 0x99, 0xb2, 0x3a, 0x70, 0xf7, 0xf6, 0xfb, 0x96, 0xcf, 0x08, 0x69, 0xa1, 0xec, 0x6b, 0xca, 0xe5, 0x55, 0x5e, 0x6d, 0x68, 0x13, 0xba, 0x7b, 0xfb,
0x26, 0x98, 0x78, 0x9d, 0x90, 0xd4, 0xe8, 0x08, 0x7c, 0xda, 0x0d, 0xa9, 0xf5, 0x27, 0x46, 0x9f, 0x6d, 0x47, 0xd3, 0x0c, 0x7b, 0x46, 0x48, 0x1b, 0x74, 0x0c, 0x3e, 0x31, 0x43, 0x2a, 0xfd, 0x3f,
0x34, 0xb7, 0x33, 0xb0, 0xb3, 0x27, 0x98, 0x80, 0x55, 0x92, 0x1a, 0x9d, 0xc0, 0x98, 0x55, 0x85, 0xad, 0x4f, 0xda, 0xbb, 0x29, 0xd8, 0xd9, 0xd3, 0x0c, 0x83, 0x55, 0xd2, 0x06, 0x9d, 0xc2, 0x98,
0xa0, 0x52, 0xa6, 0x35, 0x17, 0x4a, 0x06, 0x07, 0xf1, 0x60, 0xe1, 0x1f, 0x3f, 0x5f, 0xee, 0x0b, 0xd6, 0x25, 0x27, 0x42, 0x6c, 0x1a, 0xc6, 0xa5, 0x08, 0x0f, 0x92, 0xc1, 0xdc, 0x3f, 0x79, 0xb2,
0x59, 0x62, 0x2e, 0xd4, 0x8a, 0x57, 0x1f, 0x59, 0x41, 0x46, 0x9d, 0xac, 0x91, 0x44, 0x01, 0x1c, 0xd8, 0x17, 0xb2, 0xc8, 0x18, 0x97, 0x4b, 0x56, 0xbf, 0xa7, 0x25, 0x1e, 0x19, 0x59, 0x21, 0x81,
0x66, 0x25, 0xcb, 0x24, 0x95, 0xc1, 0x61, 0x3c, 0x58, 0x78, 0xc4, 0x5e, 0x75, 0x0d, 0x2a, 0x93, 0x42, 0x38, 0xcc, 0x2b, 0x9a, 0x0b, 0x22, 0xc2, 0xc3, 0x64, 0x30, 0xf7, 0xb0, 0xfd, 0x54, 0x35,
0x17, 0xa9, 0x8d, 0x87, 0x26, 0xf6, 0x35, 0x7b, 0xd3, 0xa2, 0xf9, 0xb7, 0x3e, 0xc0, 0xfe, 0xe7, 0xc8, 0x5c, 0x5c, 0x6e, 0x6c, 0x3c, 0xd4, 0xb1, 0xaf, 0xd8, 0x2b, 0xa3, 0x3c, 0x87, 0xc0, 0xd6,
0x47, 0xcb, 0x3c, 0x81, 0xa1, 0x29, 0x7f, 0xc7, 0x4b, 0x53, 0xe4, 0xe4, 0x78, 0xf6, 0xf8, 0x5c, 0x50, 0x50, 0x91, 0x6f, 0x2b, 0x52, 0x84, 0x5e, 0xe2, 0xcc, 0x87, 0xf8, 0xc8, 0xf0, 0x95, 0xc1,
0x4b, 0xdc, 0x69, 0xe4, 0xfe, 0x01, 0x9a, 0x81, 0xaf, 0x32, 0x51, 0x50, 0x65, 0x16, 0x33, 0x3d, 0xb3, 0x2f, 0x7d, 0x80, 0xfd, 0x10, 0x8f, 0xf6, 0x7e, 0x0a, 0x43, 0xbd, 0xa7, 0x1d, 0xab, 0x74,
0x8f, 0x09, 0xb4, 0x48, 0xbf, 0x44, 0x2f, 0x61, 0x52, 0x5f, 0x6e, 0x4b, 0x26, 0x3f, 0xd1, 0xbc, 0xe7, 0x93, 0x93, 0xe9, 0xe3, 0x4f, 0x58, 0x64, 0x46, 0xc3, 0x0f, 0x17, 0xd0, 0x14, 0x7c, 0x99,
0x75, 0x5c, 0xe3, 0x8c, 0xef, 0xa9, 0xd6, 0xe6, 0x1f, 0x60, 0x68, 0x7f, 0x47, 0x01, 0x0c, 0x36, 0xf3, 0x92, 0x48, 0xdd, 0x81, 0x5e, 0xc9, 0x18, 0x43, 0x87, 0xd4, 0x4d, 0xf4, 0x0c, 0x26, 0xcd,
0x2b, 0x3c, 0xed, 0x85, 0x4f, 0xaf, 0x6f, 0x62, 0xdf, 0xe2, 0xcd, 0x0a, 0xeb, 0xe4, 0x7c, 0x8d, 0xd5, 0xb6, 0xa2, 0xe2, 0x03, 0x29, 0x3a, 0xc7, 0xd5, 0xce, 0xf8, 0x81, 0x2a, 0x6d, 0xf6, 0x0e,
0xa7, 0xce, 0xc3, 0xe4, 0x7c, 0x8d, 0x51, 0x08, 0xee, 0xd9, 0x6a, 0x83, 0xa7, 0xfd, 0x70, 0x7a, 0x86, 0xf6, 0xef, 0x28, 0x84, 0xc1, 0x7a, 0x99, 0x05, 0xbd, 0xe8, 0xe8, 0xe6, 0x36, 0xf1, 0x2d,
0x7d, 0x13, 0x8f, 0x6c, 0xa4, 0x59, 0xe8, 0x7e, 0xff, 0x11, 0xf5, 0x4e, 0x83, 0x3f, 0x77, 0x51, 0x5e, 0x2f, 0x33, 0x95, 0x5c, 0xac, 0xb2, 0xc0, 0xf9, 0x37, 0xb9, 0x58, 0x65, 0x28, 0x02, 0xf7,
0xef, 0xdf, 0x5d, 0xe4, 0x7c, 0x6d, 0x22, 0xe7, 0x67, 0x13, 0x39, 0xbf, 0x9b, 0xc8, 0xf9, 0xdb, 0x7c, 0xb9, 0xce, 0x82, 0x7e, 0x14, 0xdc, 0xdc, 0x26, 0x23, 0x1b, 0x29, 0x16, 0xb9, 0x5f, 0xbf,
0x44, 0xce, 0xf6, 0xc0, 0x6c, 0xf3, 0xfa, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xce, 0x12, 0x15, 0xc5, 0xbd, 0xb3, 0xf0, 0xe7, 0x7d, 0xdc, 0xfb, 0x7d, 0x1f, 0x3b, 0x9f, 0xdb, 0xd8, 0xf9, 0xde,
0x67, 0xac, 0x02, 0x00, 0x00, 0xc6, 0xce, 0x8f, 0x36, 0x76, 0x7e, 0xb5, 0xb1, 0xb3, 0x3d, 0xd0, 0xaf, 0x79, 0xf9, 0x37, 0x00,
0x00, 0xff, 0xff, 0x55, 0x29, 0x75, 0x5c, 0xd7, 0x02, 0x00, 0x00,
} }

View File

@ -37,6 +37,9 @@ message EndpointRecord {
// List of aliases task specific aliases // List of aliases task specific aliases
repeated string task_aliases = 8; repeated string task_aliases = 8;
// Whether this enpoint's service has been disabled
bool service_disabled = 9;
} }
// PortConfig specifies an exposed port which can be // PortConfig specifies an exposed port which can be

View File

@ -620,7 +620,7 @@ func (ep *endpoint) rename(name string) error {
} }
if c.isAgent() { if c.isAgent() {
if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil { if err = ep.deleteServiceInfoFromCluster(sb, true, "rename"); err != nil {
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err) return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
} }
} else { } else {
@ -644,7 +644,7 @@ func (ep *endpoint) rename(name string) error {
} }
defer func() { defer func() {
if err != nil { if err != nil {
ep.deleteServiceInfoFromCluster(sb, "rename") ep.deleteServiceInfoFromCluster(sb, true, "rename")
ep.name = oldName ep.name = oldName
ep.anonymous = oldAnonymous ep.anonymous = oldAnonymous
ep.addServiceInfoToCluster(sb) ep.addServiceInfoToCluster(sb)
@ -755,8 +755,14 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
} }
} }
if ep.svcID != "" {
if err := ep.deleteServiceInfoFromCluster(sb, true, "sbLeave"); err != nil {
logrus.Warnf("Failed to clean up service info on container %s disconnect: %v", ep.name, err)
}
}
if err := sb.clearNetworkResources(ep); err != nil { if err := sb.clearNetworkResources(ep); err != nil {
logrus.Warnf("Could not cleanup network resources on container %s disconnect: %v", ep.name, err) logrus.Warnf("Failed to clean up network resources on container %s disconnect: %v", ep.name, err)
} }
// Update the store about the sandbox detach only after we // Update the store about the sandbox detach only after we
@ -769,7 +775,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
} }
if e := ep.deleteDriverInfoFromCluster(); e != nil { if e := ep.deleteDriverInfoFromCluster(); e != nil {
logrus.Errorf("Could not delete endpoint state for endpoint %s from cluster: %v", ep.Name(), e) logrus.Errorf("Failed to delete endpoint state for endpoint %s from cluster: %v", ep.Name(), e)
} }
sb.deleteHostsEntries(n.getSvcRecords(ep)) sb.deleteHostsEntries(n.getSvcRecords(ep))

View File

@ -703,7 +703,7 @@ func (sb *sandbox) DisableService() (err error) {
}() }()
for _, ep := range sb.getConnectedEndpoints() { for _, ep := range sb.getConnectedEndpoints() {
if ep.isServiceEnabled() { if ep.isServiceEnabled() {
if err := ep.deleteServiceInfoFromCluster(sb, "DisableService"); err != nil { if err := ep.deleteServiceInfoFromCluster(sb, false, "DisableService"); err != nil {
failedEps = append(failedEps, ep.Name()) failedEps = append(failedEps, ep.Name())
logrus.Warnf("failed update state for endpoint %s into cluster: %v", ep.Name(), err) logrus.Warnf("failed update state for endpoint %s into cluster: %v", ep.Name(), err)
} }

View File

@ -79,13 +79,18 @@ func (s *service) printIPToEndpoint(ip string) (string, bool) {
return s.ipToEndpoint.String(ip) return s.ipToEndpoint.String(ip)
} }
type lbBackend struct {
ip net.IP
disabled bool
}
type loadBalancer struct { type loadBalancer struct {
vip net.IP vip net.IP
fwMark uint32 fwMark uint32
// Map of backend IPs backing this loadbalancer on this // Map of backend IPs backing this loadbalancer on this
// network. It is keyed with endpoint ID. // network. It is keyed with endpoint ID.
backEnds map[string]net.IP backEnds map[string]*lbBackend
// Back pointer to service to which the loadbalancer belongs. // Back pointer to service to which the loadbalancer belongs.
service *service service *service

View File

@ -198,23 +198,8 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
if cleanupNID != "" && nid != cleanupNID { if cleanupNID != "" && nid != cleanupNID {
continue continue
} }
for eid, be := range lb.backEnds {
for eid, ip := range lb.backEnds { cleanupFuncs = append(cleanupFuncs, makeServiceCleanupFunc(c, s, nid, eid, lb.vip, be.ip))
epID := eid
epIP := ip
service := s
loadBalancer := lb
networkID := nid
cleanupFuncs = append(cleanupFuncs, func() {
// ContainerName and taskAliases are not available here, this is still fine because the Service discovery
// cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load
// Balancer bookeeping, is to keep consistent the mapping of endpoint to IP.
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, "", loadBalancer.vip,
service.ingressPorts, service.aliases, []string{}, epIP, "cleanupServiceBindings", false); err != nil {
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
service.id, networkID, epID, err)
}
})
} }
} }
s.Unlock() s.Unlock()
@ -226,6 +211,17 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
} }
func makeServiceCleanupFunc(c *controller, s *service, nID, eID string, vip net.IP, ip net.IP) func() {
// ContainerName and taskAliases are not available here, this is still fine because the Service discovery
// cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load
// Balancer bookeeping, is to keep consistent the mapping of endpoint to IP.
return func() {
if err := c.rmServiceBinding(s.name, s.id, nID, eID, "", vip, s.ingressPorts, s.aliases, []string{}, ip, "cleanupServiceBindings", false, true); err != nil {
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", s.id, nID, eID, err)
}
}
}
func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error { func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error {
var addService bool var addService bool
@ -271,7 +267,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
lb = &loadBalancer{ lb = &loadBalancer{
vip: vip, vip: vip,
fwMark: fwMarkCtr, fwMark: fwMarkCtr,
backEnds: make(map[string]net.IP), backEnds: make(map[string]*lbBackend),
service: s, service: s,
} }
@ -282,7 +278,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
addService = true addService = true
} }
lb.backEnds[eID] = ip lb.backEnds[eID] = &lbBackend{ip, false}
ok, entries := s.assignIPToEndpoint(ip.String(), eID) ok, entries := s.assignIPToEndpoint(ip.String(), eID)
if !ok || entries > 1 { if !ok || entries > 1 {
@ -307,7 +303,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s
return nil return nil
} }
func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool) error { func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool, fullRemove bool) error {
var rmService bool var rmService bool
@ -338,13 +334,19 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
return nil return nil
} }
_, ok = lb.backEnds[eID] be, ok := lb.backEnds[eID]
if !ok { if !ok {
logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] !ok", method, svcName, eID) logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] && lb.disabled[eid] !ok", method, svcName, eID)
return nil return nil
} }
if fullRemove {
// delete regardless
delete(lb.backEnds, eID) delete(lb.backEnds, eID)
} else {
be.disabled = true
}
if len(lb.backEnds) == 0 { if len(lb.backEnds) == 0 {
// All the backends for this service have been // All the backends for this service have been
// removed. Time to remove the load balancer and also // removed. Time to remove the load balancer and also
@ -367,7 +369,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st
// Remove loadbalancer service(if needed) and backend in all // Remove loadbalancer service(if needed) and backend in all
// sandboxes in the network only if the vip is valid. // sandboxes in the network only if the vip is valid.
if len(vip) != 0 && entries == 0 { if len(vip) != 0 && entries == 0 {
n.(*network).rmLBBackend(ip, vip, lb, ingressPorts, rmService) n.(*network).rmLBBackend(ip, vip, lb, ingressPorts, rmService, fullRemove)
} }
// Delete the name resolutions // Delete the name resolutions

View File

@ -103,8 +103,10 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
} }
lb.service.Lock() lb.service.Lock()
for _, ip := range lb.backEnds { for _, be := range lb.backEnds {
sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) if !be.disabled {
sb.addLBBackend(be.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
}
} }
lb.service.Unlock() lb.service.Unlock()
} }
@ -135,7 +137,7 @@ func (n *network) addLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []
// Remove loadbalancer backend from all sandboxes which has a // Remove loadbalancer backend from all sandboxes which has a
// connection to this network. If needed remove the service entry as // connection to this network. If needed remove the service entry as
// well, as specified by the rmService bool. // well, as specified by the rmService bool.
func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool) { func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool, fullRemove bool) {
n.WalkEndpoints(func(e Endpoint) bool { n.WalkEndpoints(func(e Endpoint) bool {
ep := e.(*endpoint) ep := e.(*endpoint)
if sb, ok := ep.getSandbox(); ok { if sb, ok := ep.getSandbox(); ok {
@ -148,7 +150,7 @@ func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*
gwIP = ep.Iface().Address().IP gwIP = ep.Iface().Address().IP
} }
sb.rmLBBackend(ip, vip, lb.fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService, n.ingress) sb.rmLBBackend(ip, vip, lb.fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService, fullRemove, n.ingress)
} }
return false return false
@ -215,7 +217,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
} }
// Remove loadbalancer backend from one connected sandbox. // Remove loadbalancer backend from one connected sandbox.
func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, rmService bool, isIngressNetwork bool) { func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, rmService bool, fullRemove bool, isIngressNetwork bool) {
if sb.osSbox == nil { if sb.osSbox == nil {
return return
} }
@ -242,9 +244,16 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
Weight: 1, Weight: 1,
} }
if fullRemove {
if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT { if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err) logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err)
} }
} else {
d.Weight = 0
if err := i.UpdateDestination(s, d); err != nil && err != syscall.ENOENT {
logrus.Errorf("Failed to set LB weight of real server %s to 0 for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err)
}
}
if rmService { if rmService {
s.SchedName = ipvs.RoundRobin s.SchedName = ipvs.RoundRobin

View File

@ -44,7 +44,10 @@ func (n *network) addLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []
var endpoints []hcsshim.HNSEndpoint var endpoints []hcsshim.HNSEndpoint
for eid := range lb.backEnds { for eid, be := range lb.backEnds {
if be.disabled {
continue
}
//Call HNS to get back ID (GUID) corresponding to the endpoint. //Call HNS to get back ID (GUID) corresponding to the endpoint.
hnsEndpoint, err := hcsshim.GetHNSEndpointByName(eid) hnsEndpoint, err := hcsshim.GetHNSEndpointByName(eid)
if err != nil { if err != nil {
@ -114,9 +117,9 @@ func (n *network) addLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []
} }
} }
func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool) { func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool, fullRemove bool) {
if system.GetOSVersion().Build > 16236 { if system.GetOSVersion().Build > 16236 {
if len(lb.backEnds) > 0 { if numEnabledBackends(lb) > 0 {
//Reprogram HNS (actually VFP) with the existing backends. //Reprogram HNS (actually VFP) with the existing backends.
n.addLBBackend(ip, vip, lb, ingressPorts) n.addLBBackend(ip, vip, lb, ingressPorts)
} else { } else {
@ -143,6 +146,16 @@ func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*
} }
} }
func numEnabledBackends(lb *loadBalancer) int {
nEnabled := 0
for _, be := range lb.backEnds {
if !be.disabled {
nEnabled++
}
}
return nEnabled
}
func (sb *sandbox) populateLoadbalancers(ep *endpoint) { func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
} }