From 96472cdaea072544854c7c3d754398002cf76f93 Mon Sep 17 00:00:00 2001 From: Dani Louca Date: Fri, 6 Apr 2018 16:07:14 -0400 Subject: [PATCH] Adding a recovery mechanism for a split gossip cluster Signed-off-by: Dani Louca --- libnetwork/networkdb/cluster.go | 62 ++++++++++++++++++++++++------- libnetwork/networkdb/delegate.go | 15 +++----- libnetwork/networkdb/networkdb.go | 6 ++- 3 files changed, 57 insertions(+), 26 deletions(-) diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 198caceeb8..bd48fb9f18 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -2,6 +2,7 @@ package networkdb import ( "bytes" + "context" "crypto/rand" "encoding/hex" "fmt" @@ -17,10 +18,12 @@ import ( ) const ( - reapPeriod = 5 * time.Second - retryInterval = 1 * time.Second - nodeReapInterval = 24 * time.Hour - nodeReapPeriod = 2 * time.Hour + reapPeriod = 5 * time.Second + rejoinClusterDuration = 10 * time.Second + rejoinInterval = 60 * time.Second + retryInterval = 1 * time.Second + nodeReapInterval = 24 * time.Hour + nodeReapPeriod = 2 * time.Hour ) type logWriter struct{} @@ -154,7 +157,7 @@ func (nDB *NetworkDB) clusterInit() error { return fmt.Errorf("failed to create memberlist: %v", err) } - nDB.stopCh = make(chan struct{}) + nDB.ctx, nDB.cancelCtx = context.WithCancel(context.Background()) nDB.memberlist = mlist for _, trigger := range []struct { @@ -166,16 +169,17 @@ func (nDB *NetworkDB) clusterInit() error { {config.PushPullInterval, nDB.bulkSyncTables}, {retryInterval, nDB.reconnectNode}, {nodeReapPeriod, nDB.reapDeadNode}, + {rejoinInterval, nDB.rejoinClusterBootStrap}, } { t := time.NewTicker(trigger.interval) - go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn) + go nDB.triggerFunc(trigger.interval, t.C, trigger.fn) nDB.tickers = append(nDB.tickers, t) } return nil } -func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) { +func (nDB *NetworkDB) retryJoin(ctx context.Context, members []string) { t := time.NewTicker(retryInterval) defer t.Stop() @@ -191,7 +195,7 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) { continue } return - case <-stop: + case <-ctx.Done(): return } } @@ -202,8 +206,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error { mlist := nDB.memberlist if _, err := mlist.Join(members); err != nil { - // In case of failure, keep retrying join until it succeeds or the cluster is shutdown. - go nDB.retryJoin(members, nDB.stopCh) + // In case of failure, we no longer need to explicitly call retryJoin. + // rejoinClusterBootStrap, which runs every minute, will retryJoin for 10sec return fmt.Errorf("could not join node to memberlist: %v", err) } @@ -225,7 +229,8 @@ func (nDB *NetworkDB) clusterLeave() error { return err } - close(nDB.stopCh) + // cancel the context + nDB.cancelCtx() for _, t := range nDB.tickers { t.Stop() @@ -234,19 +239,19 @@ func (nDB *NetworkDB) clusterLeave() error { return mlist.Shutdown() } -func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) { +func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, f func()) { // Use a random stagger to avoid syncronizing randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger)) select { case <-time.After(randStagger): - case <-stop: + case <-nDB.ctx.Done(): return } for { select { case <-C: f() - case <-stop: + case <-nDB.ctx.Done(): return } } @@ -270,6 +275,35 @@ func (nDB *NetworkDB) reapDeadNode() { } } +// rejoinClusterBootStrap is called periodically to check if all bootStrap nodes are active in the cluster, +// if not, call the cluster join to merge 2 separate clusters that are formed when all managers +// stopped/started at the same time +func (nDB *NetworkDB) rejoinClusterBootStrap() { + nDB.RLock() + if len(nDB.bootStrapIP) == 0 { + nDB.RUnlock() + return + } + + bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP)) + for _, bootIP := range nDB.bootStrapIP { + for _, node := range nDB.nodes { + if node.Addr.Equal(bootIP) { + // One of the bootstrap nodes is part of the cluster, return + nDB.RUnlock() + return + } + } + bootStrapIPs = append(bootStrapIPs, bootIP.String()) + } + nDB.RUnlock() + // None of the bootStrap nodes are in the cluster, call memberlist join + logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs) + ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration) + defer cancel() + nDB.retryJoin(ctx, bootStrapIPs) +} + func (nDB *NetworkDB) reconnectNode() { nDB.RLock() if len(nDB.failedNodes) == 0 { diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index c308fde795..95f7faf77f 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -38,16 +38,11 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { // If we are here means that the event is fresher and the node is known. Update the laport time n.ltime = nEvent.LTime - // If it is a node leave event for a manager and this is the only manager we - // know of we want the reconnect logic to kick in. In a single manager - // cluster manager's gossip can't be bootstrapped unless some other node - // connects to it. - if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave { - for _, ip := range nDB.bootStrapIP { - if ip.Equal(n.Addr) { - return true - } - } + // If the node is not known from memberlist we cannot process save any state of it else if it actually + // dies we won't receive any notification and we will remain stuck with it + if _, ok := nDB.nodes[nEvent.NodeName]; !ok { + logrus.Error("node: %s is unknown to memberlist", nEvent.NodeName) + return false } switch nEvent.Type { diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index d1fa3b8d86..c433913a46 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -3,6 +3,7 @@ package networkdb //go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto import ( + "context" "fmt" "net" "os" @@ -77,9 +78,10 @@ type NetworkDB struct { // Broadcast queue for node event gossip. nodeBroadcasts *memberlist.TransmitLimitedQueue - // A central stop channel to stop all go routines running on + // A central context to stop all go routines running on // behalf of the NetworkDB instance. - stopCh chan struct{} + ctx context.Context + cancelCtx context.CancelFunc // A central broadcaster for all local watchers watching table // events.