1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Fix race on clearing swarm nodes on stop

On stop there were multiple places that marked
`cluster.node` nil. Now stop waits for the node to
set itself nil.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2016-06-20 16:35:33 -07:00
parent 22b34d6449
commit 1a8a473017

View file

@ -89,18 +89,23 @@ type Config struct {
// manager and a worker.
type Cluster struct {
sync.RWMutex
root string
config Config
configEvent chan struct{} // todo: make this array and goroutine safe
node *swarmagent.Node
*node
root string
config Config
configEvent chan struct{} // todo: make this array and goroutine safe
listenAddr string
stop bool
err error
cancelDelay func()
}
type node struct {
*swarmagent.Node
done chan struct{}
ready bool
conn *grpc.ClientConn
client swarmapi.ControlClient
ready bool
listenAddr string
err error
reconnectDelay time.Duration
stop bool
cancelDelay func()
}
// New creates a new Cluster instance using provided config.
@ -110,10 +115,9 @@ func New(config Config) (*Cluster, error) {
return nil, err
}
c := &Cluster{
root: root,
config: config,
configEvent: make(chan struct{}, 10),
reconnectDelay: initialReconnectDelay,
root: root,
config: config,
configEvent: make(chan struct{}, 10),
}
st, err := c.loadState()
@ -124,7 +128,7 @@ func New(config Config) (*Cluster, error) {
return nil, err
}
n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
n, err := c.startNewNode(false, st.ListenAddr, "", "", "", false)
if err != nil {
return nil, err
}
@ -133,12 +137,10 @@ func New(config Config) (*Cluster, error) {
case <-time.After(swarmConnectTimeout):
logrus.Errorf("swarm component could not be started before timeout was reached")
case <-n.Ready():
case <-ctx.Done():
case <-n.done:
return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
}
if ctx.Err() != nil {
return nil, fmt.Errorf("swarm component could not be started")
}
go c.reconnectOnFailure(ctx)
go c.reconnectOnFailure(n)
return c, nil
}
@ -169,20 +171,20 @@ func (c *Cluster) saveState() error {
return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
}
func (c *Cluster) reconnectOnFailure(ctx context.Context) {
func (c *Cluster) reconnectOnFailure(n *node) {
for {
<-ctx.Done()
<-n.done
c.Lock()
if c.stop || c.node != nil {
c.Unlock()
return
}
c.reconnectDelay *= 2
if c.reconnectDelay > maxReconnectDelay {
c.reconnectDelay = maxReconnectDelay
n.reconnectDelay *= 2
if n.reconnectDelay > maxReconnectDelay {
n.reconnectDelay = maxReconnectDelay
}
logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds())
delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay)
logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
c.cancelDelay = cancel
c.Unlock()
<-delayCtx.Done()
@ -195,22 +197,23 @@ func (c *Cluster) reconnectOnFailure(ctx context.Context) {
return
}
var err error
_, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false)
if err != nil {
c.err = err
ctx = delayCtx
close(n.done)
}
c.Unlock()
}
}
func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) {
func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*node, error) {
if err := c.config.Backend.IsSwarmCompatible(); err != nil {
return nil, nil, err
return nil, err
}
c.node = nil
c.cancelDelay = nil
node, err := swarmagent.NewNode(&swarmagent.NodeConfig{
c.stop = false
n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
Hostname: c.config.Name,
ForceNewCluster: forceNewCluster,
ListenControlAPI: filepath.Join(c.root, controlSocket),
@ -225,85 +228,76 @@ func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secre
IsManager: ismanager,
})
if err != nil {
return nil, nil, err
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
if err := node.Start(ctx); err != nil {
return nil, nil, err
ctx := context.Background()
if err := n.Start(ctx); err != nil {
return nil, err
}
node := &node{
Node: n,
done: make(chan struct{}),
reconnectDelay: initialReconnectDelay,
}
c.node = node
c.listenAddr = listenAddr
c.saveState()
c.config.Backend.SetClusterProvider(c)
go func() {
err := node.Err(ctx)
err := n.Err(ctx)
if err != nil {
logrus.Errorf("cluster exited with error: %v", err)
}
c.Lock()
c.conn = nil
c.client = nil
c.node = nil
c.ready = false
c.err = err
c.Unlock()
cancel()
close(node.done)
}()
go func() {
select {
case <-node.Ready():
case <-n.Ready():
c.Lock()
c.reconnectDelay = initialReconnectDelay
c.Unlock()
case <-ctx.Done():
}
if ctx.Err() == nil {
c.Lock()
c.ready = true
node.ready = true
c.err = nil
c.Unlock()
case <-ctx.Done():
}
c.configEvent <- struct{}{}
}()
go func() {
for conn := range node.ListenControlSocket(ctx) {
for conn := range n.ListenControlSocket(ctx) {
c.Lock()
if c.conn != conn {
c.client = swarmapi.NewControlClient(conn)
if node.conn != conn {
if conn == nil {
node.client = nil
} else {
node.client = swarmapi.NewControlClient(conn)
}
}
if c.conn != nil {
c.client = nil
}
c.conn = conn
node.conn = conn
c.Unlock()
c.configEvent <- struct{}{}
}
}()
return node, ctx, nil
return node, nil
}
// Init initializes new cluster from user provided request.
func (c *Cluster) Init(req types.InitRequest) (string, error) {
c.Lock()
if node := c.node; node != nil {
c.Unlock()
if !req.ForceNewCluster {
c.Unlock()
return "", errSwarmExists(node)
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
c.cancelReconnect()
if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
if err := c.stopNode(); err != nil {
c.Unlock()
return "", err
}
c.Lock()
c.node = nil
c.conn = nil
c.ready = false
}
if err := validateAndSanitizeInitRequest(&req); err != nil {
@ -312,7 +306,7 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
}
// todo: check current state existing
n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false)
if err != nil {
c.Unlock()
return "", err
@ -324,20 +318,17 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
if err := initClusterSpec(n, req.Spec); err != nil {
return "", err
}
go c.reconnectOnFailure(ctx)
go c.reconnectOnFailure(n)
return n.NodeID(), nil
case <-ctx.Done():
case <-n.done:
c.RLock()
defer c.RUnlock()
if c.err != nil {
if !req.ForceNewCluster { // if failure on first attempt don't keep state
if err := c.clearState(); err != nil {
return "", err
}
if !req.ForceNewCluster { // if failure on first attempt don't keep state
if err := c.clearState(); err != nil {
return "", err
}
return "", c.err
}
return "", ctx.Err()
return "", c.err
}
}
@ -353,7 +344,7 @@ func (c *Cluster) Join(req types.JoinRequest) error {
return err
}
// todo: check current state existing
n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager)
if err != nil {
c.Unlock()
return err
@ -370,28 +361,41 @@ func (c *Cluster) Join(req types.JoinRequest) error {
certificateRequested = nil
case <-time.After(swarmConnectTimeout):
// attempt to connect will continue in background, also reconnecting
go c.reconnectOnFailure(ctx)
go c.reconnectOnFailure(n)
return ErrSwarmJoinTimeoutReached
case <-n.Ready():
go c.reconnectOnFailure(ctx)
go c.reconnectOnFailure(n)
return nil
case <-ctx.Done():
case <-n.done:
c.RLock()
defer c.RUnlock()
if c.err != nil {
return c.err
}
return ctx.Err()
return c.err
}
}
}
func (c *Cluster) cancelReconnect() {
// stopNode is a helper that stops the active c.node and waits until it has
// shut down. Call while keeping the cluster lock.
func (c *Cluster) stopNode() error {
if c.node == nil {
return nil
}
c.stop = true
if c.cancelDelay != nil {
c.cancelDelay()
c.cancelDelay = nil
}
node := c.node
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// TODO: can't hold lock on stop because it calls back to network
c.Unlock()
defer c.Lock()
if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
return err
}
<-node.done
return nil
}
// Leave shuts down Cluster and removes current state.
@ -425,14 +429,11 @@ func (c *Cluster) Leave(force bool) error {
c.Unlock()
return fmt.Errorf(msg)
}
c.cancelReconnect()
c.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
if err := c.stopNode(); err != nil {
c.Unlock()
return err
}
c.Unlock()
if nodeID := node.NodeID(); nodeID != "" {
for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
@ -440,11 +441,6 @@ func (c *Cluster) Leave(force bool) error {
}
}
}
c.Lock()
defer c.Unlock()
c.node = nil
c.conn = nil
c.ready = false
c.configEvent <- struct{}{}
// todo: cleanup optional?
if err := c.clearState(); err != nil {
@ -534,7 +530,7 @@ func (c *Cluster) IsManager() bool {
func (c *Cluster) IsAgent() bool {
c.RLock()
defer c.RUnlock()
return c.ready
return c.node != nil && c.ready
}
// GetListenAddress returns the listening address for current maanger's
@ -542,7 +538,7 @@ func (c *Cluster) IsAgent() bool {
func (c *Cluster) GetListenAddress() string {
c.RLock()
defer c.RUnlock()
if c.conn != nil {
if c.isActiveManager() {
return c.listenAddr
}
return ""
@ -597,7 +593,6 @@ func (c *Cluster) Info() types.Info {
if c.err != nil {
info.Error = c.err.Error()
}
if c.isActiveManager() {
info.ControlAvailable = true
if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil {
@ -626,7 +621,7 @@ func (c *Cluster) Info() types.Info {
// isActiveManager should not be called without a read lock
func (c *Cluster) isActiveManager() bool {
return c.conn != nil
return c.node != nil && c.conn != nil
}
// GetServices returns all services of a managed swarm cluster.
@ -1014,7 +1009,7 @@ func (c *Cluster) Cleanup() {
c.Unlock()
return
}
defer c.Unlock()
if c.isActiveManager() {
active, reachable, unreachable, err := c.managerStats()
if err == nil {
@ -1024,18 +1019,7 @@ func (c *Cluster) Cleanup() {
}
}
}
c.cancelReconnect()
c.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := node.Stop(ctx); err != nil {
logrus.Errorf("error cleaning up cluster: %v", err)
}
c.Lock()
c.node = nil
c.ready = false
c.conn = nil
c.Unlock()
c.stopNode()
}
func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
@ -1130,14 +1114,14 @@ func validateAddr(addr string) (string, error) {
return strings.TrimPrefix(newaddr, "tcp://"), nil
}
func errSwarmExists(node *swarmagent.Node) error {
func errSwarmExists(node *node) error {
if node.NodeMembership() != swarmapi.NodeMembershipAccepted {
return ErrPendingSwarmExists
}
return ErrSwarmExists
}
func initClusterSpec(node *swarmagent.Node, spec types.Spec) error {
func initClusterSpec(node *node, spec types.Spec) error {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
for conn := range node.ListenControlSocket(ctx) {
if ctx.Err() != nil {