From 1a8a473017299c5e999d55d14634874826062fce Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 20 Jun 2016 16:35:33 -0700 Subject: [PATCH] Fix race on clearing swarm nodes on stop On stop there were multiple places that marked `cluster.node` nil. Now stop waits for the node to set itself nil. Signed-off-by: Tonis Tiigi --- daemon/cluster/cluster.go | 214 ++++++++++++++++++-------------------- 1 file changed, 99 insertions(+), 115 deletions(-) diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index f8b7a85e1a..8f5cad7b99 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -89,18 +89,23 @@ type Config struct { // manager and a worker. type Cluster struct { sync.RWMutex - root string - config Config - configEvent chan struct{} // todo: make this array and goroutine safe - node *swarmagent.Node + *node + root string + config Config + configEvent chan struct{} // todo: make this array and goroutine safe + listenAddr string + stop bool + err error + cancelDelay func() +} + +type node struct { + *swarmagent.Node + done chan struct{} + ready bool conn *grpc.ClientConn client swarmapi.ControlClient - ready bool - listenAddr string - err error reconnectDelay time.Duration - stop bool - cancelDelay func() } // New creates a new Cluster instance using provided config. @@ -110,10 +115,9 @@ func New(config Config) (*Cluster, error) { return nil, err } c := &Cluster{ - root: root, - config: config, - configEvent: make(chan struct{}, 10), - reconnectDelay: initialReconnectDelay, + root: root, + config: config, + configEvent: make(chan struct{}, 10), } st, err := c.loadState() @@ -124,7 +128,7 @@ func New(config Config) (*Cluster, error) { return nil, err } - n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false) + n, err := c.startNewNode(false, st.ListenAddr, "", "", "", false) if err != nil { return nil, err } @@ -133,12 +137,10 @@ func New(config Config) (*Cluster, error) { case <-time.After(swarmConnectTimeout): logrus.Errorf("swarm component could not be started before timeout was reached") case <-n.Ready(): - case <-ctx.Done(): + case <-n.done: + return nil, fmt.Errorf("swarm component could not be started: %v", c.err) } - if ctx.Err() != nil { - return nil, fmt.Errorf("swarm component could not be started") - } - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return c, nil } @@ -169,20 +171,20 @@ func (c *Cluster) saveState() error { return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600) } -func (c *Cluster) reconnectOnFailure(ctx context.Context) { +func (c *Cluster) reconnectOnFailure(n *node) { for { - <-ctx.Done() + <-n.done c.Lock() if c.stop || c.node != nil { c.Unlock() return } - c.reconnectDelay *= 2 - if c.reconnectDelay > maxReconnectDelay { - c.reconnectDelay = maxReconnectDelay + n.reconnectDelay *= 2 + if n.reconnectDelay > maxReconnectDelay { + n.reconnectDelay = maxReconnectDelay } - logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds()) - delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay) + logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds()) + delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay) c.cancelDelay = cancel c.Unlock() <-delayCtx.Done() @@ -195,22 +197,23 @@ func (c *Cluster) reconnectOnFailure(ctx context.Context) { return } var err error - _, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false) + n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false) if err != nil { c.err = err - ctx = delayCtx + close(n.done) } c.Unlock() } } -func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) { +func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*node, error) { if err := c.config.Backend.IsSwarmCompatible(); err != nil { - return nil, nil, err + return nil, err } c.node = nil c.cancelDelay = nil - node, err := swarmagent.NewNode(&swarmagent.NodeConfig{ + c.stop = false + n, err := swarmagent.NewNode(&swarmagent.NodeConfig{ Hostname: c.config.Name, ForceNewCluster: forceNewCluster, ListenControlAPI: filepath.Join(c.root, controlSocket), @@ -225,85 +228,76 @@ func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secre IsManager: ismanager, }) if err != nil { - return nil, nil, err + return nil, err } - ctx, cancel := context.WithCancel(context.Background()) - if err := node.Start(ctx); err != nil { - return nil, nil, err + ctx := context.Background() + if err := n.Start(ctx); err != nil { + return nil, err + } + node := &node{ + Node: n, + done: make(chan struct{}), + reconnectDelay: initialReconnectDelay, } - c.node = node c.listenAddr = listenAddr c.saveState() c.config.Backend.SetClusterProvider(c) go func() { - err := node.Err(ctx) + err := n.Err(ctx) if err != nil { logrus.Errorf("cluster exited with error: %v", err) } c.Lock() - c.conn = nil - c.client = nil c.node = nil - c.ready = false c.err = err c.Unlock() - cancel() + close(node.done) }() go func() { select { - case <-node.Ready(): + case <-n.Ready(): c.Lock() - c.reconnectDelay = initialReconnectDelay - c.Unlock() - case <-ctx.Done(): - } - if ctx.Err() == nil { - c.Lock() - c.ready = true + node.ready = true c.err = nil c.Unlock() + case <-ctx.Done(): } c.configEvent <- struct{}{} }() go func() { - for conn := range node.ListenControlSocket(ctx) { + for conn := range n.ListenControlSocket(ctx) { c.Lock() - if c.conn != conn { - c.client = swarmapi.NewControlClient(conn) + if node.conn != conn { + if conn == nil { + node.client = nil + } else { + node.client = swarmapi.NewControlClient(conn) + } } - if c.conn != nil { - c.client = nil - } - c.conn = conn + node.conn = conn c.Unlock() c.configEvent <- struct{}{} } }() - return node, ctx, nil + return node, nil } // Init initializes new cluster from user provided request. func (c *Cluster) Init(req types.InitRequest) (string, error) { c.Lock() if node := c.node; node != nil { - c.Unlock() if !req.ForceNewCluster { + c.Unlock() return "", errSwarmExists(node) } - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - c.cancelReconnect() - if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + if err := c.stopNode(); err != nil { + c.Unlock() return "", err } - c.Lock() - c.node = nil - c.conn = nil - c.ready = false } if err := validateAndSanitizeInitRequest(&req); err != nil { @@ -312,7 +306,7 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { } // todo: check current state existing - n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false) + n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false) if err != nil { c.Unlock() return "", err @@ -324,20 +318,17 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { if err := initClusterSpec(n, req.Spec); err != nil { return "", err } - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return n.NodeID(), nil - case <-ctx.Done(): + case <-n.done: c.RLock() defer c.RUnlock() - if c.err != nil { - if !req.ForceNewCluster { // if failure on first attempt don't keep state - if err := c.clearState(); err != nil { - return "", err - } + if !req.ForceNewCluster { // if failure on first attempt don't keep state + if err := c.clearState(); err != nil { + return "", err } - return "", c.err } - return "", ctx.Err() + return "", c.err } } @@ -353,7 +344,7 @@ func (c *Cluster) Join(req types.JoinRequest) error { return err } // todo: check current state existing - n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager) + n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager) if err != nil { c.Unlock() return err @@ -370,28 +361,41 @@ func (c *Cluster) Join(req types.JoinRequest) error { certificateRequested = nil case <-time.After(swarmConnectTimeout): // attempt to connect will continue in background, also reconnecting - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return ErrSwarmJoinTimeoutReached case <-n.Ready(): - go c.reconnectOnFailure(ctx) + go c.reconnectOnFailure(n) return nil - case <-ctx.Done(): + case <-n.done: c.RLock() defer c.RUnlock() - if c.err != nil { - return c.err - } - return ctx.Err() + return c.err } } } -func (c *Cluster) cancelReconnect() { +// stopNode is a helper that stops the active c.node and waits until it has +// shut down. Call while keeping the cluster lock. +func (c *Cluster) stopNode() error { + if c.node == nil { + return nil + } c.stop = true if c.cancelDelay != nil { c.cancelDelay() c.cancelDelay = nil } + node := c.node + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + // TODO: can't hold lock on stop because it calls back to network + c.Unlock() + defer c.Lock() + if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + return err + } + <-node.done + return nil } // Leave shuts down Cluster and removes current state. @@ -425,14 +429,11 @@ func (c *Cluster) Leave(force bool) error { c.Unlock() return fmt.Errorf(msg) } - c.cancelReconnect() - c.Unlock() - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + if err := c.stopNode(); err != nil { + c.Unlock() return err } + c.Unlock() if nodeID := node.NodeID(); nodeID != "" { for _, id := range c.config.Backend.ListContainersForNode(nodeID) { if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { @@ -440,11 +441,6 @@ func (c *Cluster) Leave(force bool) error { } } } - c.Lock() - defer c.Unlock() - c.node = nil - c.conn = nil - c.ready = false c.configEvent <- struct{}{} // todo: cleanup optional? if err := c.clearState(); err != nil { @@ -534,7 +530,7 @@ func (c *Cluster) IsManager() bool { func (c *Cluster) IsAgent() bool { c.RLock() defer c.RUnlock() - return c.ready + return c.node != nil && c.ready } // GetListenAddress returns the listening address for current maanger's @@ -542,7 +538,7 @@ func (c *Cluster) IsAgent() bool { func (c *Cluster) GetListenAddress() string { c.RLock() defer c.RUnlock() - if c.conn != nil { + if c.isActiveManager() { return c.listenAddr } return "" @@ -597,7 +593,6 @@ func (c *Cluster) Info() types.Info { if c.err != nil { info.Error = c.err.Error() } - if c.isActiveManager() { info.ControlAvailable = true if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil { @@ -626,7 +621,7 @@ func (c *Cluster) Info() types.Info { // isActiveManager should not be called without a read lock func (c *Cluster) isActiveManager() bool { - return c.conn != nil + return c.node != nil && c.conn != nil } // GetServices returns all services of a managed swarm cluster. @@ -1014,7 +1009,7 @@ func (c *Cluster) Cleanup() { c.Unlock() return } - + defer c.Unlock() if c.isActiveManager() { active, reachable, unreachable, err := c.managerStats() if err == nil { @@ -1024,18 +1019,7 @@ func (c *Cluster) Cleanup() { } } } - c.cancelReconnect() - c.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := node.Stop(ctx); err != nil { - logrus.Errorf("error cleaning up cluster: %v", err) - } - c.Lock() - c.node = nil - c.ready = false - c.conn = nil - c.Unlock() + c.stopNode() } func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) { @@ -1130,14 +1114,14 @@ func validateAddr(addr string) (string, error) { return strings.TrimPrefix(newaddr, "tcp://"), nil } -func errSwarmExists(node *swarmagent.Node) error { +func errSwarmExists(node *node) error { if node.NodeMembership() != swarmapi.NodeMembershipAccepted { return ErrPendingSwarmExists } return ErrSwarmExists } -func initClusterSpec(node *swarmagent.Node, spec types.Spec) error { +func initClusterSpec(node *node, spec types.Spec) error { ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) for conn := range node.ListenControlSocket(ctx) { if ctx.Err() != nil {