[1.13.x] Vendoring libnetwork @bba65e5

Signed-off-by: Alessandro Boch <aboch@docker.com>
This commit is contained in:
Alessandro Boch 2017-02-14 09:42:44 -08:00
parent e5a90d46e3
commit 2e544b1513
11 changed files with 99 additions and 33 deletions

View File

@ -23,7 +23,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
github.com/imdario/mergo 0.2.1
#get libnetwork packages
github.com/docker/libnetwork 45b40861e677e37cf27bc184eca5af92f8cdd32d
github.com/docker/libnetwork bba65e5e191eccfbc8e2f6455c527b407c2be5ff
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View File

@ -40,6 +40,8 @@ type DataStore interface {
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
List(string, KVObject) ([]KVObject, error)
// Map returns a Map of KVObjects
Map(key string, kvObject KVObject) (map[string]KVObject, error)
// Scope returns the scope of the store
Scope() string
// KVStore returns access to the KV Store
@ -512,23 +514,34 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
return ds.cache.list(kvObject)
}
var kvol []KVObject
cb := func(key string, val KVObject) {
kvol = append(kvol, val)
}
err := ds.iterateKVPairsFromStore(key, kvObject, cb)
if err != nil {
return nil, err
}
return kvol, nil
}
func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
}
// Make sure the parent key exists
if err := ds.ensureParent(key); err != nil {
return nil, err
return err
}
kvList, err := ds.store.List(key)
if err != nil {
return nil, err
return err
}
var kvol []KVObject
for _, kvPair := range kvList {
if len(kvPair.Value) == 0 {
continue
@ -536,16 +549,33 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
return nil, err
return err
}
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
dstO.SetIndex(kvPair.LastIndex)
kvol = append(kvol, dstO)
callback(kvPair.Key, dstO)
}
return nil
}
func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}
kvol := make(map[string]KVObject)
cb := func(key string, val KVObject) {
// Trim the leading & trailing "/" to make it consistent across all stores
kvol[strings.Trim(key, "/")] = val
}
err := ds.iterateKVPairsFromStore(key, kvObject, cb)
if err != nil {
return nil, err
}
return kvol, nil
}

View File

@ -413,6 +413,7 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
priIdx = -1
delIdx = -1
lIP = net.ParseIP(d.bindAddress)
aIP = net.ParseIP(d.advertiseAddress)
)
d.Lock()
@ -440,7 +441,7 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
d.secMapWalk(func(rIPs string, spis []*spi) ([]*spi, bool) {
rIP := net.ParseIP(rIPs)
return updateNodeKey(lIP, rIP, spis, d.keys, newIdx, priIdx, delIdx), false
return updateNodeKey(lIP, aIP, rIP, spis, d.keys, newIdx, priIdx, delIdx), false
})
d.Lock()
@ -471,7 +472,7 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
*********************************************************/
// Spis and keys are sorted in such away the one in position 0 is the primary
func updateNodeKey(lIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, priIdx, delIdx int) []*spi {
func updateNodeKey(lIP, aIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, priIdx, delIdx int) []*spi {
logrus.Debugf("Updating keys for node: %s (%d,%d,%d)", rIP, newIdx, priIdx, delIdx)
spis := idxs
@ -480,8 +481,8 @@ func updateNodeKey(lIP, rIP net.IP, idxs []*spi, curKeys []*key, newIdx, priIdx,
// add new
if newIdx != -1 {
spis = append(spis, &spi{
forward: buildSPI(lIP, rIP, curKeys[newIdx].tag),
reverse: buildSPI(rIP, lIP, curKeys[newIdx].tag),
forward: buildSPI(aIP, rIP, curKeys[newIdx].tag),
reverse: buildSPI(rIP, aIP, curKeys[newIdx].tag),
})
}

View File

@ -612,13 +612,13 @@ func (n *network) initSandbox(restore bool) error {
var nlSock *nl.NetlinkSocket
sbox.InvokeFunc(func() {
nlSock, err = nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
if err != nil {
err = fmt.Errorf("failed to subscribe to neighbor group netlink messages")
}
})
if nlSock != nil {
if err == nil {
go n.watchMiss(nlSock)
} else {
logrus.Errorf("failed to subscribe to neighbor group netlink messages for overlay network %s in sbox %s: %v",
n.id, sbox.Key(), err)
}
return nil
@ -644,6 +644,9 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
}
if neigh.IP.To4() == nil {
if neigh.HardwareAddr != nil {
logrus.Debugf("Miss notification, l2 mac %v", neigh.HardwareAddr)
}
continue
}

View File

@ -73,7 +73,7 @@ func (d *driver) serfJoin(neighIP string) error {
if neighIP == "" {
return fmt.Errorf("no neighbor to join")
}
if _, err := d.serfInstance.Join([]string{neighIP}, false); err != nil {
if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil {
return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
neighIP, err)
}
@ -94,8 +94,8 @@ func (d *driver) notifyEvent(event ovNotify) {
}
func (d *driver) processEvent(u serf.UserEvent) {
logrus.Debugf("Received user event name:%s, payload:%s\n", u.Name,
string(u.Payload))
logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name,
string(u.Payload), uint64(u.LTime))
var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
@ -146,6 +146,7 @@ func (d *driver) processQuery(q *serf.Query) {
return
}
logrus.Debugf("Sending peer query resp mac %s, mask %s, vtep %s", peerMac, net.IP(peerIPMask), vtep)
q.Respond([]byte(fmt.Sprintf("%s %s %s", peerMac.String(), net.IP(peerIPMask).String(), vtep.String())))
}
@ -173,6 +174,7 @@ func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.I
return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err)
}
logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr)
return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil
case <-time.After(time.Second):

View File

@ -48,6 +48,7 @@ type driver struct {
vxlanIdm *idm.Idm
once sync.Once
joinOnce sync.Once
localJoinOnce sync.Once
keys []*key
sync.Mutex
}
@ -241,6 +242,12 @@ func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
d.bindAddress = bindAddress
d.Unlock()
// If containers are already running on this network update the
// advertiseaddress in the peerDB
d.localJoinOnce.Do(func() {
d.peerDBUpdateSelf()
})
// If there is no cluster store there is no need to start serf.
if d.store != nil {
if err := validateSelf(advertiseAddress); err != nil {

View File

@ -80,25 +80,29 @@ func (d *driver) peerDbWalk(f func(string, *peerKey, *peerEntry) bool) error {
func (d *driver) peerDbNetworkWalk(nid string, f func(*peerKey, *peerEntry) bool) error {
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
d.peerDb.Unlock()
if !ok {
d.peerDb.Unlock()
return nil
}
d.peerDb.Unlock()
mp := map[string]peerEntry{}
pMap.Lock()
for pKeyStr, pEntry := range pMap.mp {
mp[pKeyStr] = pEntry
}
pMap.Unlock()
for pKeyStr, pEntry := range mp {
var pKey peerKey
if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil {
logrus.Warnf("Peer key scan on network %s failed: %v", nid, err)
}
if f(&pKey, &pEntry) {
pMap.Unlock()
return nil
}
}
pMap.Unlock()
return nil
}
@ -363,3 +367,12 @@ func (d *driver) pushLocalDb() {
return false
})
}
func (d *driver) peerDBUpdateSelf() {
d.peerDbWalk(func(nid string, pkey *peerKey, pEntry *peerEntry) bool {
if pEntry.isLocal {
pEntry.vtep = net.ParseIP(d.advertiseAddress)
}
return false
})
}

View File

@ -181,6 +181,9 @@ type tableEntry struct {
}
func (ep *endpoint) Info() EndpointInfo {
if ep.sandboxID != "" {
return ep
}
n, err := ep.getNetworkFromStore()
if err != nil {
return nil

View File

@ -138,6 +138,7 @@ func getIPVSFamily() (int, error) {
if err != nil {
return 0, err
}
defer sock.Close()
req := newGenlRequest(genlCtrlID, genlCtrlCmdGetFamily)
req.AddData(nl.NewRtAttr(genlCtrlAttrFamilyName, nl.ZeroTerminated("IPVS")))

View File

@ -5,6 +5,7 @@ import (
"fmt"
"net"
"github.com/Sirupsen/logrus"
"github.com/vishvananda/netlink"
)
@ -96,6 +97,7 @@ func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, op
nh := n.findNeighbor(dstIP, dstMac)
if nh != nil {
logrus.Debugf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
// If it exists silently return
return nil
}

View File

@ -2,6 +2,7 @@ package libnetwork
import (
"fmt"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store/boltdb"
@ -152,21 +153,24 @@ func (c *controller) getNetworksFromStore() ([]*network, error) {
continue
}
kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
if err != nil {
if err != datastore.ErrKeyNotFound {
logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err)
}
}
for _, kvo := range kvol {
n := kvo.(*network)
n.Lock()
n.ctrlr = c
n.Unlock()
ec := &endpointCnt{n: n}
err = store.GetObject(datastore.Key(ec.Key()...), ec)
if err != nil && !n.inDelete {
logrus.Warnf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
continue
// Trim the leading & trailing "/" to make it consistent across all stores
if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
ec = val.(*endpointCnt)
ec.n = n
n.epCnt = ec
}
n.Lock()
n.epCnt = ec
n.scope = store.Scope()
n.Unlock()
nl = append(nl, n)