1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

bump libnetwork to eb6b2a57955e5c149d47c3973573216e8f8baa09

Changes included:

- libnetwork#2147 Adding logs for ipam state
- libnetwork#2143 Fix race conditions in the overlay network driver
  - possibly addresses moby#36743 services do not start: ingress-sbox is already present
  - possibly addresses moby#30427 Flaky Test: TestSwarmPublishDuplicatePorts on s390
  - possibly addresses moby#36501 Flaky tests: Service "port" tests
- libnetwork#2142 Add wait time into xtables lock warning
- libnetwork#2135 filter xtables lock warnings when firewalld is active
- libnetwork#2140 Switch from x/net/context to context
- libnetwork#2134 Adding a recovery mechanism for a split gossip cluster

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2018-05-21 00:37:54 +02:00
parent 8974fd47c7
commit 158ab95c17
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
12 changed files with 136 additions and 74 deletions

View file

@ -32,7 +32,7 @@ github.com/tonistiigi/fsutil dea3a0da73aee887fc02142d995be764106ac5e2
#get libnetwork packages
# When updating, also update LIBNETWORK_COMMIT in hack/dockerfile/install/proxy accordingly
github.com/docker/libnetwork c15b372ef22125880d378167dde44f4b134e1a77
github.com/docker/libnetwork eb6b2a57955e5c149d47c3973573216e8f8baa09
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View file

@ -326,7 +326,6 @@ func (h *Handle) set(ordinal, start, end uint64, any bool, release bool, serial
}
h.Lock() // Acquire the lock back
}
logrus.Debugf("Received set for ordinal %v, start %v, end %v, any %t, release %t, serial:%v curr:%d \n", ordinal, start, end, any, release, serial, h.curr)
if serial {
curr = h.curr
}
@ -466,8 +465,8 @@ func (h *Handle) Unselected() uint64 {
func (h *Handle) String() string {
h.Lock()
defer h.Unlock()
return fmt.Sprintf("App: %s, ID: %s, DBIndex: 0x%x, bits: %d, unselected: %d, sequence: %s",
h.app, h.id, h.dbIndex, h.bits, h.unselected, h.head.toString())
return fmt.Sprintf("App: %s, ID: %s, DBIndex: 0x%x, Bits: %d, Unselected: %d, Sequence: %s Curr:%d",
h.app, h.id, h.dbIndex, h.bits, h.unselected, h.head.toString(), h.curr)
}
// MarshalJSON encodes Handle into json message

View file

@ -1,8 +1,9 @@
package cluster
import (
"context"
"github.com/docker/docker/api/types/network"
"golang.org/x/net/context"
)
const (

View file

@ -438,7 +438,7 @@ func (d *driver) setKeys(keys []*key) error {
d.keys = keys
d.secMap = &encrMap{nodes: map[string][]*spi{}}
d.Unlock()
logrus.Debugf("Initial encryption keys: %v", d.keys)
logrus.Debugf("Initial encryption keys: %v", keys)
return nil
}
@ -458,6 +458,8 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
)
d.Lock()
defer d.Unlock()
// add new
if newKey != nil {
d.keys = append(d.keys, newKey)
@ -471,7 +473,6 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
delIdx = i
}
}
d.Unlock()
if (newKey != nil && newIdx == -1) ||
(primary != nil && priIdx == -1) ||
@ -480,17 +481,18 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
"(newIdx,priIdx,delIdx):(%d, %d, %d)", newIdx, priIdx, delIdx)
}
if priIdx != -1 && priIdx == delIdx {
return types.BadRequestErrorf("attempting to both make a key (index %d) primary and delete it", priIdx)
}
d.secMapWalk(func(rIPs string, spis []*spi) ([]*spi, bool) {
rIP := net.ParseIP(rIPs)
return updateNodeKey(lIP, aIP, rIP, spis, d.keys, newIdx, priIdx, delIdx), false
})
d.Lock()
// swap primary
if priIdx != -1 {
swp := d.keys[0]
d.keys[0] = d.keys[priIdx]
d.keys[priIdx] = swp
d.keys[0], d.keys[priIdx] = d.keys[priIdx], d.keys[0]
}
// prune
if delIdx != -1 {
@ -499,7 +501,6 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
}
d.keys = append(d.keys[:delIdx], d.keys[delIdx+1:]...)
}
d.Unlock()
logrus.Debugf("Updated: %v", d.keys)

View file

@ -203,6 +203,12 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
n.subnets = append(n.subnets, s)
}
d.Lock()
defer d.Unlock()
if d.networks[n.id] != nil {
return fmt.Errorf("attempt to create overlay network %v that already exists", n.id)
}
if err := n.writeToStore(); err != nil {
return fmt.Errorf("failed to update data store for network %v: %v", n.id, err)
}
@ -217,11 +223,13 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
if nInfo != nil {
if err := nInfo.TableEventRegister(ovPeerTable, driverapi.EndpointObject); err != nil {
// XXX Undo writeToStore? No method to so. Why?
return err
}
}
d.addNetwork(n)
d.networks[id] = n
return nil
}
@ -235,7 +243,15 @@ func (d *driver) DeleteNetwork(nid string) error {
return err
}
n := d.network(nid)
d.Lock()
defer d.Unlock()
// This is similar to d.network(), but we need to keep holding the lock
// until we are done removing this network.
n, ok := d.networks[nid]
if !ok {
n = d.restoreNetworkFromStore(nid)
}
if n == nil {
return fmt.Errorf("could not find network with id %s", nid)
}
@ -255,7 +271,7 @@ func (d *driver) DeleteNetwork(nid string) error {
}
// flush the peerDB entries
d.peerFlush(nid)
d.deleteNetwork(nid)
delete(d.networks, nid)
vnis, err := n.releaseVxlanID()
if err != nil {
@ -805,32 +821,25 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket, nsPath string) {
}
}
func (d *driver) addNetwork(n *network) {
d.Lock()
d.networks[n.id] = n
d.Unlock()
}
func (d *driver) deleteNetwork(nid string) {
d.Lock()
delete(d.networks, nid)
d.Unlock()
// Restore a network from the store to the driver if it is present.
// Must be called with the driver locked!
func (d *driver) restoreNetworkFromStore(nid string) *network {
n := d.getNetworkFromStore(nid)
if n != nil {
n.driver = d
n.endpoints = endpointTable{}
n.once = &sync.Once{}
d.networks[nid] = n
}
return n
}
func (d *driver) network(nid string) *network {
d.Lock()
defer d.Unlock()
n, ok := d.networks[nid]
d.Unlock()
if !ok {
n = d.getNetworkFromStore(nid)
if n != nil {
n.driver = d
n.endpoints = endpointTable{}
n.once = &sync.Once{}
d.Lock()
d.networks[nid] = n
d.Unlock()
}
n = d.restoreNetworkFromStore(nid)
}
return n

View file

@ -125,8 +125,12 @@ func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data,
opts[netlabel.OverlayVxlanIDList] = val
d.Lock()
defer d.Unlock()
if _, ok := d.networks[id]; ok {
n.releaseVxlanID()
return nil, fmt.Errorf("network %s already exists", id)
}
d.networks[id] = n
d.Unlock()
return opts, nil
}
@ -137,8 +141,8 @@ func (d *driver) NetworkFree(id string) error {
}
d.Lock()
defer d.Unlock()
n, ok := d.networks[id]
d.Unlock()
if !ok {
return fmt.Errorf("overlay network with id %s not found", id)
@ -147,9 +151,7 @@ func (d *driver) NetworkFree(id string) error {
// Release all vxlan IDs in one shot.
n.releaseVxlanID()
d.Lock()
delete(d.networks, id)
d.Unlock()
return nil
}

View file

@ -526,6 +526,7 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error {
return types.InternalErrorf("could not find bitmask in datastore for %s on address %v release from pool %s: %v",
k.String(), address, poolID, err)
}
defer logrus.Debugf("Released address PoolID:%s, Address:%v Sequence:%s", poolID, address, bm.String())
return bm.Unset(ipToUint64(h))
}
@ -537,6 +538,7 @@ func (a *Allocator) getAddress(nw *net.IPNet, bitmask *bitseq.Handle, prefAddres
base *net.IPNet
)
logrus.Debugf("Request address PoolID:%v %s Serial:%v PrefAddress:%v ", nw, bitmask.String(), serial, prefAddress)
base = types.GetIPNetCopy(nw)
if bitmask.Unselected() <= 0 {

View file

@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
)
@ -45,7 +46,7 @@ var (
iptablesPath string
supportsXlock = false
supportsCOpt = false
xLockWaitMsg = "Another app is currently holding the xtables lock; waiting"
xLockWaitMsg = "Another app is currently holding the xtables lock"
// used to lock iptables commands if xtables lock is not supported
bestEffortLock sync.Mutex
// ErrIptablesNotFound is returned when the rule is not found.
@ -423,12 +424,32 @@ func existsRaw(table Table, chain string, rule ...string) bool {
return strings.Contains(string(existingRules), ruleString)
}
// Maximum duration that an iptables operation can take
// before flagging a warning.
const opWarnTime = 2 * time.Second
func filterOutput(start time.Time, output []byte, args ...string) []byte {
// Flag operations that have taken a long time to complete
opTime := time.Since(start)
if opTime > opWarnTime {
logrus.Warnf("xtables contention detected while running [%s]: Waited for %.2f seconds and received %q", strings.Join(args, " "), float64(opTime)/float64(time.Second), string(output))
}
// ignore iptables' message about xtables lock:
// it is a warning, not an error.
if strings.Contains(string(output), xLockWaitMsg) {
output = []byte("")
}
// Put further filters here if desired
return output
}
// Raw calls 'iptables' system command, passing supplied arguments.
func Raw(args ...string) ([]byte, error) {
if firewalldRunning {
startTime := time.Now()
output, err := Passthrough(Iptables, args...)
if err == nil || !strings.Contains(err.Error(), "was not provided by any .service files") {
return output, err
return filterOutput(startTime, output, args...), err
}
}
return raw(args...)
@ -447,17 +468,13 @@ func raw(args ...string) ([]byte, error) {
logrus.Debugf("%s, %v", iptablesPath, args)
startTime := time.Now()
output, err := exec.Command(iptablesPath, args...).CombinedOutput()
if err != nil {
return nil, fmt.Errorf("iptables failed: iptables %v: %s (%s)", strings.Join(args, " "), output, err)
}
// ignore iptables' message about xtables lock
if strings.Contains(string(output), xLockWaitMsg) {
output = []byte("")
}
return output, err
return filterOutput(startTime, output, args...), err
}
// RawCombinedOutput inernally calls the Raw function and returns a non nil

View file

@ -2,6 +2,7 @@ package networkdb
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"fmt"
@ -17,10 +18,12 @@ import (
)
const (
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
reapPeriod = 5 * time.Second
rejoinClusterDuration = 10 * time.Second
rejoinInterval = 60 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)
type logWriter struct{}
@ -154,7 +157,7 @@ func (nDB *NetworkDB) clusterInit() error {
return fmt.Errorf("failed to create memberlist: %v", err)
}
nDB.stopCh = make(chan struct{})
nDB.ctx, nDB.cancelCtx = context.WithCancel(context.Background())
nDB.memberlist = mlist
for _, trigger := range []struct {
@ -166,16 +169,17 @@ func (nDB *NetworkDB) clusterInit() error {
{config.PushPullInterval, nDB.bulkSyncTables},
{retryInterval, nDB.reconnectNode},
{nodeReapPeriod, nDB.reapDeadNode},
{rejoinInterval, nDB.rejoinClusterBootStrap},
} {
t := time.NewTicker(trigger.interval)
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
go nDB.triggerFunc(trigger.interval, t.C, trigger.fn)
nDB.tickers = append(nDB.tickers, t)
}
return nil
}
func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
func (nDB *NetworkDB) retryJoin(ctx context.Context, members []string) {
t := time.NewTicker(retryInterval)
defer t.Stop()
@ -191,7 +195,7 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
continue
}
return
case <-stop:
case <-ctx.Done():
return
}
}
@ -202,8 +206,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist
if _, err := mlist.Join(members); err != nil {
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)
// In case of failure, we no longer need to explicitly call retryJoin.
// rejoinClusterBootStrap, which runs every minute, will retryJoin for 10sec
return fmt.Errorf("could not join node to memberlist: %v", err)
}
@ -225,7 +229,8 @@ func (nDB *NetworkDB) clusterLeave() error {
return err
}
close(nDB.stopCh)
// cancel the context
nDB.cancelCtx()
for _, t := range nDB.tickers {
t.Stop()
@ -234,19 +239,19 @@ func (nDB *NetworkDB) clusterLeave() error {
return mlist.Shutdown()
}
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, f func()) {
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
select {
case <-time.After(randStagger):
case <-stop:
case <-nDB.ctx.Done():
return
}
for {
select {
case <-C:
f()
case <-stop:
case <-nDB.ctx.Done():
return
}
}
@ -270,6 +275,35 @@ func (nDB *NetworkDB) reapDeadNode() {
}
}
// rejoinClusterBootStrap is called periodically to check if all bootStrap nodes are active in the cluster,
// if not, call the cluster join to merge 2 separate clusters that are formed when all managers
// stopped/started at the same time
func (nDB *NetworkDB) rejoinClusterBootStrap() {
nDB.RLock()
if len(nDB.bootStrapIP) == 0 {
nDB.RUnlock()
return
}
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
for _, bootIP := range nDB.bootStrapIP {
for _, node := range nDB.nodes {
if node.Addr.Equal(bootIP) {
// One of the bootstrap nodes is part of the cluster, return
nDB.RUnlock()
return
}
}
bootStrapIPs = append(bootStrapIPs, bootIP.String())
}
nDB.RUnlock()
// None of the bootStrap nodes are in the cluster, call memberlist join
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
defer cancel()
nDB.retryJoin(ctx, bootStrapIPs)
}
func (nDB *NetworkDB) reconnectNode() {
nDB.RLock()
if len(nDB.failedNodes) == 0 {

View file

@ -38,16 +38,11 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
// If we are here means that the event is fresher and the node is known. Update the laport time
n.ltime = nEvent.LTime
// If it is a node leave event for a manager and this is the only manager we
// know of we want the reconnect logic to kick in. In a single manager
// cluster manager's gossip can't be bootstrapped unless some other node
// connects to it.
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
for _, ip := range nDB.bootStrapIP {
if ip.Equal(n.Addr) {
return true
}
}
// If the node is not known from memberlist we cannot process save any state of it else if it actually
// dies we won't receive any notification and we will remain stuck with it
if _, ok := nDB.nodes[nEvent.NodeName]; !ok {
logrus.Error("node: %s is unknown to memberlist", nEvent.NodeName)
return false
}
switch nEvent.Type {

View file

@ -3,6 +3,7 @@ package networkdb
//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
import (
"context"
"fmt"
"net"
"os"
@ -77,9 +78,10 @@ type NetworkDB struct {
// Broadcast queue for node event gossip.
nodeBroadcasts *memberlist.TransmitLimitedQueue
// A central stop channel to stop all go routines running on
// A central context to stop all go routines running on
// behalf of the NetworkDB instance.
stopCh chan struct{}
ctx context.Context
cancelCtx context.CancelFunc
// A central broadcaster for all local watchers watching table
// events.

View file

@ -48,7 +48,7 @@ github.com/ugorji/go f1f1a805ed361a0e078bb537e4ea78cd37dcf065
github.com/vishvananda/netlink b2de5d10e38ecce8607e6b438b6d174f389a004e
github.com/vishvananda/netns 604eaf189ee867d8c147fafc28def2394e878d25
golang.org/x/crypto 558b6879de74bc843225cde5686419267ff707ca
golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6
golang.org/x/net b3756b4b77d7b13260a0a2ec658753cf48922eac
golang.org/x/sys 07c182904dbd53199946ba614a412c61d3c548f5
golang.org/x/sync fd80eb99c8f653c847d294a001bdf2a3a6f768f5
github.com/pkg/errors 839d9e913e063e28dfd0e6c7b7512793e0a48be9