From b7ea1bdb0227a789a15e23821b8db4d5ddceb26e Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 16 Nov 2016 14:17:18 -0800 Subject: [PATCH] Switch cluster locking strategy Signed-off-by: Tonis Tiigi --- daemon/cluster/cluster.go | 986 +++++++++-------------- daemon/cluster/noderunner.go | 296 +++++++ daemon/cluster/secrets.go | 55 +- daemon/cluster/utils.go | 56 ++ integration-cli/docker_api_swarm_test.go | 4 +- 5 files changed, 784 insertions(+), 613 deletions(-) create mode 100644 daemon/cluster/noderunner.go create mode 100644 daemon/cluster/utils.go diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index eb4dc78c14..dccc8e9554 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -1,16 +1,52 @@ package cluster +// +// ## Swarmkit integration +// +// Cluster - static configurable object for accessing everything swarm related. +// Contains methods for connecting and controlling the cluster. Exists always, +// even if swarm mode is not enabled. +// +// NodeRunner - Manager for starting the swarmkit node. Is present only and +// always if swarm mode is enabled. Implements backoff restart loop in case of +// errors. +// +// NodeState - Information about the current node status including access to +// gRPC clients if a manager is active. +// +// ### Locking +// +// `cluster.controlMutex` - taken for the whole lifecycle of the processes that +// can reconfigure cluster(init/join/leave etc). Protects that one +// reconfiguration action has fully completed before another can start. +// +// `cluster.mu` - taken when the actual changes in cluster configurations +// happen. Different from `controlMutex` because in some cases we need to +// access current cluster state even if the long-running reconfiguration is +// going on. For example network stack may ask for the current cluster state in +// the middle of the shutdown. Any time current cluster state is asked you +// should take the read lock of `cluster.mu`. If you are writing an API +// responder that returns synchronously, hold `cluster.mu.RLock()` for the +// duration of the whole handler function. That ensures that node will not be +// shut down until the handler has finished. +// +// NodeRunner implements its internal locks that should not be used outside of +// the struct. Instead, you should just call `nodeRunner.State()` method to get +// the current state of the cluster(still need `cluster.mu.RLock()` to access +// `cluster.nr` reference itself). Most of the changes in NodeRunner happen +// because of an external event(network problem, unexpected swarmkit error) and +// Docker shouldn't take any locks that delay these changes from happening. +// + import ( "crypto/x509" "encoding/base64" "encoding/json" "fmt" "io" - "io/ioutil" "net" "os" "path/filepath" - "runtime" "strings" "sync" "time" @@ -25,7 +61,6 @@ import ( types "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/daemon/cluster/convert" executorpkg "github.com/docker/docker/daemon/cluster/executor" - "github.com/docker/docker/daemon/cluster/executor/container" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/ioutils" @@ -39,7 +74,6 @@ import ( "github.com/docker/swarmkit/protobuf/ptypes" "github.com/pkg/errors" "golang.org/x/net/context" - "google.golang.org/grpc" ) const swarmDirName = "swarm" @@ -95,19 +129,14 @@ type Config struct { // Cluster provides capabilities to participate in a cluster as a worker or a // manager. type Cluster struct { - sync.RWMutex - *node - root string - runtimeRoot string - config Config - configEvent chan struct{} // todo: make this array and goroutine safe - actualLocalAddr string // after resolution, not persisted - stop bool - err error - cancelDelay func() - attachers map[string]*attacher - locked bool - lastNodeConfig *nodeStartConfig + mu sync.RWMutex + controlMutex sync.RWMutex // protect init/join/leave user operations + nr *nodeRunner + root string + runtimeRoot string + config Config + configEvent chan struct{} // todo: make this array and goroutine safe + attachers map[string]*attacher } // attacher manages the in-memory attachment state of a container @@ -122,38 +151,6 @@ type attacher struct { detachWaitCh chan struct{} } -type node struct { - *swarmnode.Node - done chan struct{} - ready bool - conn *grpc.ClientConn - client swarmapi.ControlClient - logs swarmapi.LogsClient - reconnectDelay time.Duration - config nodeStartConfig -} - -// nodeStartConfig holds configuration needed to start a new node. Exported -// fields of this structure are saved to disk in json. Unexported fields -// contain data that shouldn't be persisted between daemon reloads. -type nodeStartConfig struct { - // LocalAddr is this machine's local IP or hostname, if specified. - LocalAddr string - // RemoteAddr is the address that was given to "swarm join". It is used - // to find LocalAddr if necessary. - RemoteAddr string - // ListenAddr is the address we bind to, including a port. - ListenAddr string - // AdvertiseAddr is the address other nodes should connect to, - // including a port. - AdvertiseAddr string - joinAddr string - forceNewCluster bool - joinToken string - lockKey []byte - autolock bool -} - // New creates a new Cluster instance using provided config. func New(config Config) (*Cluster, error) { root := filepath.Join(config.Root, swarmDirName) @@ -174,7 +171,7 @@ func New(config Config) (*Cluster, error) { attachers: make(map[string]*attacher), } - nodeConfig, err := c.loadState() + nodeConfig, err := loadPersistentState(root) if err != nil { if os.IsNotExist(err) { return c, nil @@ -182,95 +179,30 @@ func New(config Config) (*Cluster, error) { return nil, err } - n, err := c.startNewNode(*nodeConfig) + nr, err := c.newNodeRunner(*nodeConfig) if err != nil { return nil, err } + c.nr = nr select { case <-time.After(swarmConnectTimeout): logrus.Error("swarm component could not be started before timeout was reached") - case <-n.Ready(): - case <-n.done: - if errors.Cause(c.err) == ErrSwarmLocked { - return c, nil + case err := <-nr.Ready(): + if err != nil { + if errors.Cause(err) == ErrSwarmLocked { + return c, nil + } + if err, ok := errors.Cause(c.nr.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired { + return c, nil + } + return nil, errors.Wrap(err, "swarm component could not be started") } - if err, ok := errors.Cause(c.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired { - c.err = ErrSwarmCertificatesExpired - return c, nil - } - return nil, fmt.Errorf("swarm component could not be started: %v", c.err) } - go c.reconnectOnFailure(n) return c, nil } -func (c *Cluster) loadState() (*nodeStartConfig, error) { - dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile)) - if err != nil { - return nil, err - } - // missing certificate means no actual state to restore from - if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil { - if os.IsNotExist(err) { - c.clearState() - } - return nil, err - } - var st nodeStartConfig - if err := json.Unmarshal(dt, &st); err != nil { - return nil, err - } - return &st, nil -} - -func (c *Cluster) saveState(config nodeStartConfig) error { - dt, err := json.Marshal(config) - if err != nil { - return err - } - return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600) -} - -func (c *Cluster) reconnectOnFailure(n *node) { - for { - <-n.done - c.Lock() - if c.stop || c.node != nil { - c.Unlock() - return - } - n.reconnectDelay *= 2 - if n.reconnectDelay > maxReconnectDelay { - n.reconnectDelay = maxReconnectDelay - } - logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds()) - delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay) - c.cancelDelay = cancel - c.Unlock() - <-delayCtx.Done() - if delayCtx.Err() != context.DeadlineExceeded { - return - } - c.Lock() - if c.node != nil { - c.Unlock() - return - } - var err error - config := n.config - config.RemoteAddr = c.getRemoteAddress() - config.joinAddr = config.RemoteAddr - n, err = c.startNewNode(config) - if err != nil { - c.err = err - close(n.done) - } - c.Unlock() - } -} - -func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) { +func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) { if err := c.config.Backend.IsSwarmCompatible(); err != nil { return nil, err } @@ -304,128 +236,47 @@ func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) { } } - var control string - if runtime.GOOS == "windows" { - control = `\\.\pipe\` + controlSocket - } else { - control = filepath.Join(c.runtimeRoot, controlSocket) - } + nr := &nodeRunner{cluster: c} + nr.actualLocalAddr = actualLocalAddr - c.node = nil - c.cancelDelay = nil - c.stop = false - n, err := swarmnode.New(&swarmnode.Config{ - Hostname: c.config.Name, - ForceNewCluster: conf.forceNewCluster, - ListenControlAPI: control, - ListenRemoteAPI: conf.ListenAddr, - AdvertiseRemoteAPI: conf.AdvertiseAddr, - JoinAddr: conf.joinAddr, - StateDir: c.root, - JoinToken: conf.joinToken, - Executor: container.NewExecutor(c.config.Backend), - HeartbeatTick: 1, - ElectionTick: 3, - UnlockKey: conf.lockKey, - AutoLockManagers: conf.autolock, - }) - - if err != nil { + if err := nr.Start(conf); err != nil { return nil, err } - ctx := context.Background() - if err := n.Start(ctx); err != nil { - return nil, err - } - node := &node{ - Node: n, - done: make(chan struct{}), - reconnectDelay: initialReconnectDelay, - config: conf, - } - c.node = node - c.actualLocalAddr = actualLocalAddr // not saved - c.saveState(conf) c.config.Backend.SetClusterProvider(c) - go func() { - err := detectLockedError(n.Err(ctx)) - if err != nil { - logrus.Errorf("cluster exited with error: %v", err) - } - c.Lock() - c.node = nil - c.err = err - if errors.Cause(err) == ErrSwarmLocked { - c.locked = true - confClone := conf - c.lastNodeConfig = &confClone - } - c.Unlock() - close(node.done) - }() - go func() { - select { - case <-n.Ready(): - c.Lock() - node.ready = true - c.err = nil - c.Unlock() - case <-ctx.Done(): - } - c.configEvent <- struct{}{} - }() - - go func() { - for conn := range n.ListenControlSocket(ctx) { - c.Lock() - if node.conn != conn { - if conn == nil { - node.client = nil - node.logs = nil - } else { - node.client = swarmapi.NewControlClient(conn) - node.logs = swarmapi.NewLogsClient(conn) - } - } - node.conn = conn - c.Unlock() - c.configEvent <- struct{}{} - } - }() - - return node, nil + return nr, nil } // Init initializes new cluster from user provided request. func (c *Cluster) Init(req types.InitRequest) (string, error) { - c.Lock() - if c.swarmExists() { - if !req.ForceNewCluster { - c.Unlock() + c.controlMutex.Lock() + defer c.controlMutex.Unlock() + c.mu.Lock() + if c.nr != nil { + if req.ForceNewCluster { + if err := c.nr.Stop(); err != nil { + c.mu.Unlock() + return "", err + } + } else { + c.mu.Unlock() return "", ErrSwarmExists } - if err := c.stopNode(); err != nil { - c.Unlock() - return "", err - } } + c.mu.Unlock() if err := validateAndSanitizeInitRequest(&req); err != nil { - c.Unlock() return "", err } listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) if err != nil { - c.Unlock() return "", err } advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) if err != nil { - c.Unlock() return "", err } @@ -451,7 +302,6 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { if !found { ip, err := c.resolveSystemAddr() if err != nil { - c.Unlock() logrus.Warnf("Could not find a local address: %v", err) return "", errMustSpecifyListenAddr } @@ -459,8 +309,11 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { } } - // todo: check current state existing - n, err := c.startNewNode(nodeStartConfig{ + if !req.ForceNewCluster { + clearPersistentState(c.root) + } + + nr, err := c.newNodeRunner(nodeStartConfig{ forceNewCluster: req.ForceNewCluster, autolock: req.AutoLockManagers, LocalAddr: localAddr, @@ -468,45 +321,52 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort), }) if err != nil { - c.Unlock() return "", err } - c.Unlock() + c.mu.Lock() + c.nr = nr + c.mu.Unlock() - select { - case <-n.Ready(): - if err := initClusterSpec(n, req.Spec); err != nil { - return "", err - } - go c.reconnectOnFailure(n) - return n.NodeID(), nil - case <-n.done: - c.RLock() - defer c.RUnlock() + if err := <-nr.Ready(); err != nil { if !req.ForceNewCluster { // if failure on first attempt don't keep state - if err := c.clearState(); err != nil { + if err := clearPersistentState(c.root); err != nil { return "", err } } - return "", c.err + if err != nil { + c.mu.Lock() + c.nr = nil + c.mu.Unlock() + } + return "", err } + state := nr.State() + if state.swarmNode == nil { // should never happen but protect from panic + return "", errors.New("invalid cluster state for spec initialization") + } + if err := initClusterSpec(state.swarmNode, req.Spec); err != nil { + return "", err + } + return state.NodeID(), nil } // Join makes current Cluster part of an existing swarm cluster. func (c *Cluster) Join(req types.JoinRequest) error { - c.Lock() - if c.swarmExists() { - c.Unlock() + c.controlMutex.Lock() + defer c.controlMutex.Unlock() + c.mu.Lock() + if c.nr != nil { + c.mu.Unlock() return ErrSwarmExists } + c.mu.Unlock() + if err := validateAndSanitizeJoinRequest(&req); err != nil { - c.Unlock() return err } listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) if err != nil { - c.Unlock() return err } @@ -520,8 +380,9 @@ func (c *Cluster) Join(req types.JoinRequest) error { } } - // todo: check current state existing - n, err := c.startNewNode(nodeStartConfig{ + clearPersistentState(c.root) + + nr, err := c.newNodeRunner(nodeStartConfig{ RemoteAddr: req.RemoteAddrs[0], ListenAddr: net.JoinHostPort(listenHost, listenPort), AdvertiseAddr: advertiseAddr, @@ -529,46 +390,40 @@ func (c *Cluster) Join(req types.JoinRequest) error { joinToken: req.JoinToken, }) if err != nil { - c.Unlock() return err } - c.Unlock() + + c.mu.Lock() + c.nr = nr + c.mu.Unlock() select { case <-time.After(swarmConnectTimeout): - // attempt to connect will continue in background, but reconnect only if it didn't fail - go func() { - select { - case <-n.Ready(): - c.reconnectOnFailure(n) - case <-n.done: - logrus.Errorf("failed to join the cluster: %+v", c.err) - } - }() return ErrSwarmJoinTimeoutReached - case <-n.Ready(): - go c.reconnectOnFailure(n) - return nil - case <-n.done: - c.RLock() - defer c.RUnlock() - return c.err + case err := <-nr.Ready(): + if err != nil { + c.mu.Lock() + c.nr = nil + c.mu.Unlock() + } + return err } } // GetUnlockKey returns the unlock key for the swarm. func (c *Cluster) GetUnlockKey() (string, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return "", c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return "", c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - client := swarmapi.NewCAClient(c.conn) + client := swarmapi.NewCAClient(state.grpcConn) r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{}) if err != nil { @@ -585,141 +440,104 @@ func (c *Cluster) GetUnlockKey() (string, error) { // UnlockSwarm provides a key to decrypt data that is encrypted at rest. func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error { - c.RLock() - if !c.isActiveManager() { - if err := c.errNoManager(); err != ErrSwarmLocked { - c.RUnlock() - return err - } - } + c.controlMutex.Lock() + defer c.controlMutex.Unlock() - if c.node != nil || c.locked != true { - c.RUnlock() + c.mu.RLock() + state := c.currentNodeState() + nr := c.nr + c.mu.RUnlock() + if nr == nil || errors.Cause(state.err) != ErrSwarmLocked { return errors.New("swarm is not locked") } - c.RUnlock() - key, err := encryption.ParseHumanReadableKey(req.UnlockKey) if err != nil { return err } - c.Lock() - config := *c.lastNodeConfig + config := nr.config config.lockKey = key - n, err := c.startNewNode(config) - if err != nil { - c.Unlock() + if err := nr.Stop(); err != nil { return err } - c.Unlock() - select { - case <-n.Ready(): - case <-n.done: - if errors.Cause(c.err) == ErrSwarmLocked { + nr, err = c.newNodeRunner(config) + if err != nil { + return err + } + + c.mu.Lock() + c.nr = nr + c.mu.Unlock() + + if err := <-nr.Ready(); err != nil { + if errors.Cause(err) == ErrSwarmLocked { return errors.New("swarm could not be unlocked: invalid key provided") } - return fmt.Errorf("swarm component could not be started: %v", c.err) + return fmt.Errorf("swarm component could not be started: %v", err) } - go c.reconnectOnFailure(n) return nil } -// stopNode is a helper that stops the active c.node and waits until it has -// shut down. Call while keeping the cluster lock. -func (c *Cluster) stopNode() error { - if c.node == nil { - return nil - } - c.stop = true - if c.cancelDelay != nil { - c.cancelDelay() - c.cancelDelay = nil - } - node := c.node - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - // TODO: can't hold lock on stop because it calls back to network - c.Unlock() - defer c.Lock() - if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { - return err - } - <-node.done - return nil -} - -func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool { - return reachable-2 <= unreachable -} - -func isLastManager(reachable, unreachable int) bool { - return reachable == 1 && unreachable == 0 -} - // Leave shuts down Cluster and removes current state. func (c *Cluster) Leave(force bool) error { - c.Lock() - node := c.node - if node == nil { - if c.locked { - c.locked = false - c.lastNodeConfig = nil - c.Unlock() - } else if c.err == ErrSwarmCertificatesExpired { - c.err = nil - c.Unlock() - } else { - c.Unlock() - return ErrNoSwarm - } - } else { - if node.Manager() != nil && !force { - msg := "You are attempting to leave the swarm on a node that is participating as a manager. " - if c.isActiveManager() { - active, reachable, unreachable, err := c.managerStats() - if err == nil { - if active && removingManagerCausesLossOfQuorum(reachable, unreachable) { - if isLastManager(reachable, unreachable) { - msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. " - c.Unlock() - return fmt.Errorf(msg) - } - msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable) - } - } - } else { - msg += "Doing so may lose the consensus of your cluster. " - } + c.controlMutex.Lock() + defer c.controlMutex.Unlock() - msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message." - c.Unlock() - return fmt.Errorf(msg) + c.mu.Lock() + nr := c.nr + if nr == nil { + c.mu.Unlock() + return ErrNoSwarm + } + state := c.currentNodeState() + if state.IsManager() && !force { + msg := "You are attempting to leave the swarm on a node that is participating as a manager. " + if state.IsActiveManager() { + active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID()) + if err == nil { + if active && removingManagerCausesLossOfQuorum(reachable, unreachable) { + if isLastManager(reachable, unreachable) { + msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. " + c.mu.Unlock() + return fmt.Errorf(msg) + } + msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable) + } + } + } else { + msg += "Doing so may lose the consensus of your cluster. " } - if err := c.stopNode(); err != nil { - logrus.Errorf("failed to shut down cluster node: %v", err) - signal.DumpStacks("") - c.Unlock() + + msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message." + c.mu.Unlock() + return fmt.Errorf(msg) + } + // release readers in here + if err := nr.Stop(); err != nil { + logrus.Errorf("failed to shut down cluster node: %v", err) + signal.DumpStacks("") + c.mu.Unlock() + return err + } + c.nr = nil + c.mu.Unlock() + if nodeID := state.NodeID(); nodeID != "" { + nodeContainers, err := c.listContainerForNode(nodeID) + if err != nil { return err } - c.Unlock() - if nodeID := node.NodeID(); nodeID != "" { - nodeContainers, err := c.listContainerForNode(nodeID) - if err != nil { - return err - } - for _, id := range nodeContainers { - if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { - logrus.Errorf("error removing %v: %v", id, err) - } + for _, id := range nodeContainers { + if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { + logrus.Errorf("error removing %v: %v", id, err) } } } c.configEvent <- struct{}{} // todo: cleanup optional? - if err := c.clearState(); err != nil { + if err := clearPersistentState(c.root); err != nil { return err } + c.config.Backend.SetClusterProvider(nil) return nil } @@ -739,35 +557,24 @@ func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) { return ids, nil } -func (c *Cluster) clearState() error { - // todo: backup this data instead of removing? - if err := os.RemoveAll(c.root); err != nil { - return err - } - if err := os.MkdirAll(c.root, 0700); err != nil { - return err - } - c.config.Backend.SetClusterProvider(nil) - return nil -} - func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost return context.WithTimeout(context.Background(), swarmRequestTimeout) } // Inspect retrieves the configuration properties of a managed swarm cluster. func (c *Cluster) Inspect() (types.Swarm, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return types.Swarm{}, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return types.Swarm{}, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - swarm, err := getSwarm(ctx, c.client) + swarm, err := getSwarm(ctx, state.controlClient) if err != nil { return types.Swarm{}, err } @@ -777,17 +584,18 @@ func (c *Cluster) Inspect() (types.Swarm, error) { // Update updates configuration of a managed swarm cluster. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - swarm, err := getSwarm(ctx, c.client) + swarm, err := getSwarm(ctx, state.controlClient) if err != nil { return err } @@ -800,7 +608,7 @@ func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlag return err } - _, err = c.client.UpdateCluster( + _, err = state.controlClient.UpdateCluster( ctx, &swarmapi.UpdateClusterRequest{ ClusterID: swarm.ID, @@ -820,61 +628,62 @@ func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlag // IsManager returns true if Cluster is participating as a manager. func (c *Cluster) IsManager() bool { - c.RLock() - defer c.RUnlock() - return c.isActiveManager() + c.mu.RLock() + defer c.mu.RUnlock() + return c.currentNodeState().IsActiveManager() } // IsAgent returns true if Cluster is participating as a worker/agent. func (c *Cluster) IsAgent() bool { - c.RLock() - defer c.RUnlock() - return c.node != nil && c.ready + c.mu.RLock() + defer c.mu.RUnlock() + return c.currentNodeState().status == types.LocalNodeStateActive } // GetLocalAddress returns the local address. func (c *Cluster) GetLocalAddress() string { - c.RLock() - defer c.RUnlock() - return c.actualLocalAddr + c.mu.RLock() + defer c.mu.RUnlock() + return c.currentNodeState().actualLocalAddr } // GetListenAddress returns the listen address. func (c *Cluster) GetListenAddress() string { - c.RLock() - defer c.RUnlock() - if c.node != nil { - return c.node.config.ListenAddr + c.mu.RLock() + defer c.mu.RUnlock() + if c.nr != nil { + return c.nr.config.ListenAddr } return "" } // GetAdvertiseAddress returns the remotely reachable address of this node. func (c *Cluster) GetAdvertiseAddress() string { - c.RLock() - defer c.RUnlock() - if c.node != nil && c.node.config.AdvertiseAddr != "" { - advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr) + c.mu.RLock() + defer c.mu.RUnlock() + if c.nr != nil && c.nr.config.AdvertiseAddr != "" { + advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr) return advertiseHost } - return c.actualLocalAddr + return c.currentNodeState().actualLocalAddr } // GetRemoteAddress returns a known advertise address of a remote manager if // available. // todo: change to array/connect with info func (c *Cluster) GetRemoteAddress() string { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() return c.getRemoteAddress() } func (c *Cluster) getRemoteAddress() string { - if c.node == nil { + state := c.currentNodeState() + if state.swarmNode == nil { return "" } - nodeID := c.node.NodeID() - for _, r := range c.node.Remotes() { + nodeID := state.swarmNode.NodeID() + for _, r := range state.swarmNode.Remotes() { if r.NodeID != nodeID { return r.Addr } @@ -894,36 +703,19 @@ func (c *Cluster) Info() types.Info { info := types.Info{ NodeAddr: c.GetAdvertiseAddress(), } + c.mu.RLock() + defer c.mu.RUnlock() - c.RLock() - defer c.RUnlock() - - if c.node == nil { - info.LocalNodeState = types.LocalNodeStateInactive - if c.cancelDelay != nil { - info.LocalNodeState = types.LocalNodeStateError - } - if c.locked { - info.LocalNodeState = types.LocalNodeStateLocked - } else if c.err == ErrSwarmCertificatesExpired { - info.LocalNodeState = types.LocalNodeStateError - } - } else { - info.LocalNodeState = types.LocalNodeStatePending - if c.ready == true { - info.LocalNodeState = types.LocalNodeStateActive - } else if c.locked { - info.LocalNodeState = types.LocalNodeStateLocked - } - } - if c.err != nil { - info.Error = c.err.Error() + state := c.currentNodeState() + info.LocalNodeState = state.status + if state.err != nil { + info.Error = state.err.Error() } ctx, cancel := c.getRequestContext() defer cancel() - if c.isActiveManager() { + if state.IsActiveManager() { info.ControlAvailable = true swarm, err := c.Inspect() if err != nil { @@ -933,7 +725,7 @@ func (c *Cluster) Info() types.Info { // Strip JoinTokens info.Cluster = swarm.ClusterInfo - if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil { + if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil { info.Nodes = len(r.Nodes) for _, n := range r.Nodes { if n.ManagerStatus != nil { @@ -943,39 +735,34 @@ func (c *Cluster) Info() types.Info { } } - if c.node != nil { - for _, r := range c.node.Remotes() { + if state.swarmNode != nil { + for _, r := range state.swarmNode.Remotes() { info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr}) } - info.NodeID = c.node.NodeID() + info.NodeID = state.swarmNode.NodeID() } return info } -// isActiveManager should not be called without a read lock -func (c *Cluster) isActiveManager() bool { - return c.node != nil && c.conn != nil -} - -// swarmExists should not be called without a read lock -func (c *Cluster) swarmExists() bool { - return c.node != nil || c.locked || c.err == ErrSwarmCertificatesExpired +// currentNodeState should not be called without a read lock +func (c *Cluster) currentNodeState() nodeState { + return c.nr.State() } // errNoManager returns error describing why manager commands can't be used. // Call with read lock. -func (c *Cluster) errNoManager() error { - if c.node == nil { - if c.locked { +func (c *Cluster) errNoManager(st nodeState) error { + if st.swarmNode == nil { + if errors.Cause(st.err) == ErrSwarmLocked { return ErrSwarmLocked } - if c.err == ErrSwarmCertificatesExpired { + if st.err == ErrSwarmCertificatesExpired { return ErrSwarmCertificatesExpired } return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.") } - if c.node.Manager() != nil { + if st.swarmNode.Manager() != nil { return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.") } return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.") @@ -983,11 +770,12 @@ func (c *Cluster) errNoManager() error { // GetServices returns all services of a managed swarm cluster. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return nil, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return nil, c.errNoManager(state) } filters, err := newListServicesFilters(options.Filters) @@ -997,7 +785,7 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv ctx, cancel := c.getRequestContext() defer cancel() - r, err := c.client.ListServices( + r, err := state.controlClient.ListServices( ctx, &swarmapi.ListServicesRequest{Filters: filters}) if err != nil { @@ -1059,17 +847,18 @@ func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authC // CreateService creates a new service in a managed swarm cluster. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return nil, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return nil, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - err := c.populateNetworkID(ctx, c.client, &s) + err := c.populateNetworkID(ctx, state.controlClient, &s) if err != nil { return nil, err } @@ -1110,7 +899,7 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apity } } - r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) + r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) if err != nil { return nil, err } @@ -1121,17 +910,18 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apity // GetService returns a service based on an ID or name. func (c *Cluster) GetService(input string) (types.Service, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return types.Service{}, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return types.Service{}, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - service, err := getService(ctx, c.client, input) + service, err := getService(ctx, state.controlClient, input) if err != nil { return types.Service{}, err } @@ -1140,17 +930,18 @@ func (c *Cluster) GetService(input string) (types.Service, error) { // UpdateService updates existing service to match new properties. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return nil, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return nil, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - err := c.populateNetworkID(ctx, c.client, &spec) + err := c.populateNetworkID(ctx, state.controlClient, &spec) if err != nil { return nil, err } @@ -1160,7 +951,7 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ return nil, err } - currentService, err := getService(ctx, c.client, serviceIDOrName) + currentService, err := getService(ctx, state.controlClient, serviceIDOrName) if err != nil { return nil, err } @@ -1219,7 +1010,7 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ } } - _, err = c.client.UpdateService( + _, err = state.controlClient.UpdateService( ctx, &swarmapi.UpdateServiceRequest{ ServiceID: currentService.ID, @@ -1235,22 +1026,23 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ // RemoveService removes a service from a managed swarm cluster. func (c *Cluster) RemoveService(input string) error { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - service, err := getService(ctx, c.client, input) + service, err := getService(ctx, state.controlClient, input) if err != nil { return err } - if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil { + if _, err := state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil { return err } return nil @@ -1258,19 +1050,20 @@ func (c *Cluster) RemoveService(input string) error { // ServiceLogs collects service logs and writes them back to `config.OutStream` func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error { - c.RLock() - if !c.isActiveManager() { - c.RUnlock() - return c.errNoManager() + c.mu.RLock() + state := c.currentNodeState() + if !state.IsActiveManager() { + c.mu.RUnlock() + return c.errNoManager(state) } - service, err := getService(ctx, c.client, input) + service, err := getService(ctx, state.controlClient, input) if err != nil { - c.RUnlock() + c.mu.RUnlock() return err } - stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{ + stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{ Selector: &swarmapi.LogSelector{ ServiceIDs: []string{service.ID}, }, @@ -1279,7 +1072,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend }, }) if err != nil { - c.RUnlock() + c.mu.RUnlock() return err } @@ -1292,7 +1085,7 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr) // Release the lock before starting the stream. - c.RUnlock() + c.mu.RUnlock() for { // Check the context before doing anything. select { @@ -1340,11 +1133,12 @@ func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend // GetNodes returns a list of all nodes known to a cluster. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return nil, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return nil, c.errNoManager(state) } filters, err := newListNodesFilters(options.Filters) @@ -1355,7 +1149,7 @@ func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, erro ctx, cancel := c.getRequestContext() defer cancel() - r, err := c.client.ListNodes( + r, err := state.controlClient.ListNodes( ctx, &swarmapi.ListNodesRequest{Filters: filters}) if err != nil { @@ -1372,17 +1166,18 @@ func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, erro // GetNode returns a node based on an ID or name. func (c *Cluster) GetNode(input string) (types.Node, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return types.Node{}, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return types.Node{}, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - node, err := getNode(ctx, c.client, input) + node, err := getNode(ctx, state.controlClient, input) if err != nil { return types.Node{}, err } @@ -1391,11 +1186,12 @@ func (c *Cluster) GetNode(input string) (types.Node, error) { // UpdateNode updates existing nodes properties. func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) } nodeSpec, err := convert.NodeSpecToGRPC(spec) @@ -1406,12 +1202,12 @@ func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) ctx, cancel := c.getRequestContext() defer cancel() - currentNode, err := getNode(ctx, c.client, input) + currentNode, err := getNode(ctx, state.controlClient, input) if err != nil { return err } - _, err = c.client.UpdateNode( + _, err = state.controlClient.UpdateNode( ctx, &swarmapi.UpdateNodeRequest{ NodeID: currentNode.ID, @@ -1426,22 +1222,23 @@ func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) // RemoveNode removes a node from a cluster func (c *Cluster) RemoveNode(input string, force bool) error { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - node, err := getNode(ctx, c.client, input) + node, err := getNode(ctx, state.controlClient, input) if err != nil { return err } - if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil { + if _, err := state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil { return err } return nil @@ -1449,11 +1246,12 @@ func (c *Cluster) RemoveNode(input string, force bool) error { // GetTasks returns a list of tasks matching the filter options. func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return nil, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return nil, c.errNoManager(state) } byName := func(filter filters.Args) error { @@ -1490,7 +1288,7 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro ctx, cancel := c.getRequestContext() defer cancel() - r, err := c.client.ListTasks( + r, err := state.controlClient.ListTasks( ctx, &swarmapi.ListTasksRequest{Filters: filters}) if err != nil { @@ -1509,17 +1307,18 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro // GetTask returns a task by an ID. func (c *Cluster) GetTask(input string) (types.Task, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return types.Task{}, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return types.Task{}, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - task, err := getTask(ctx, c.client, input) + task, err := getTask(ctx, state.controlClient, input) if err != nil { return types.Task{}, err } @@ -1528,17 +1327,18 @@ func (c *Cluster) GetTask(input string) (types.Task, error) { // GetNetwork returns a cluster network by an ID. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return apitypes.NetworkResource{}, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return apitypes.NetworkResource{}, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - network, err := getNetwork(ctx, c.client, input) + network, err := getNetwork(ctx, state.controlClient, input) if err != nil { return apitypes.NetworkResource{}, err } @@ -1547,17 +1347,18 @@ func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) { // GetNetworks returns all current cluster managed networks. func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return nil, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return nil, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{}) + r, err := state.controlClient.ListNetworks(ctx, &swarmapi.ListNetworksRequest{}) if err != nil { return nil, err } @@ -1579,9 +1380,9 @@ func attacherKey(target, containerID string) string { // waiter who is trying to start or attach the container to the // network. func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error { - c.RLock() + c.mu.RLock() attacher, ok := c.attachers[attacherKey(target, containerID)] - c.RUnlock() + c.mu.RUnlock() if !ok || attacher == nil { return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target) } @@ -1594,19 +1395,19 @@ func (c *Cluster) UpdateAttachment(target, containerID string, config *network.N // WaitForDetachment waits for the container to stop or detach from // the network. func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error { - c.RLock() + c.mu.RLock() attacher, ok := c.attachers[attacherKey(networkName, containerID)] if !ok { attacher, ok = c.attachers[attacherKey(networkID, containerID)] } - if c.node == nil || c.node.Agent() == nil { - c.RUnlock() + state := c.currentNodeState() + if state.swarmNode == nil || state.swarmNode.Agent() == nil { + c.mu.RUnlock() return fmt.Errorf("invalid cluster node while waiting for detachment") } - agent := c.node.Agent() - c.RUnlock() - + c.mu.RUnlock() + agent := state.swarmNode.Agent() if ok && attacher != nil && attacher.detachWaitCh != nil && attacher.attachCompleteCh != nil { @@ -1633,17 +1434,18 @@ func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, // AttachNetwork generates an attachment request towards the manager. func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) { aKey := attacherKey(target, containerID) - c.Lock() - if c.node == nil || c.node.Agent() == nil { - c.Unlock() + c.mu.Lock() + state := c.currentNodeState() + if state.swarmNode == nil || state.swarmNode.Agent() == nil { + c.mu.Unlock() return nil, fmt.Errorf("invalid cluster node while attaching to network") } if attacher, ok := c.attachers[aKey]; ok { - c.Unlock() + c.mu.Unlock() return attacher.config, nil } - agent := c.node.Agent() + agent := state.swarmNode.Agent() attachWaitCh := make(chan *network.NetworkingConfig) detachWaitCh := make(chan struct{}) attachCompleteCh := make(chan struct{}) @@ -1652,23 +1454,23 @@ func (c *Cluster) AttachNetwork(target string, containerID string, addresses []s attachCompleteCh: attachCompleteCh, detachWaitCh: detachWaitCh, } - c.Unlock() + c.mu.Unlock() ctx, cancel := c.getRequestContext() defer cancel() taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses) if err != nil { - c.Lock() + c.mu.Lock() delete(c.attachers, aKey) - c.Unlock() + c.mu.Unlock() return nil, fmt.Errorf("Could not attach to network %s: %v", target, err) } - c.Lock() + c.mu.Lock() c.attachers[aKey].taskID = taskID close(attachCompleteCh) - c.Unlock() + c.mu.Unlock() logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID) @@ -1679,9 +1481,9 @@ func (c *Cluster) AttachNetwork(target string, containerID string, addresses []s return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err()) } - c.Lock() + c.mu.Lock() c.attachers[aKey].config = config - c.Unlock() + c.mu.Unlock() return config, nil } @@ -1690,10 +1492,10 @@ func (c *Cluster) AttachNetwork(target string, containerID string, addresses []s func (c *Cluster) DetachNetwork(target string, containerID string) error { aKey := attacherKey(target, containerID) - c.Lock() + c.mu.Lock() attacher, ok := c.attachers[aKey] delete(c.attachers, aKey) - c.Unlock() + c.mu.Unlock() if !ok { return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target) @@ -1705,11 +1507,12 @@ func (c *Cluster) DetachNetwork(target string, containerID string) error { // CreateNetwork creates a new cluster managed network. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return "", c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return "", c.errNoManager(state) } if runconfig.IsPreDefinedNetwork(s.Name) { @@ -1721,7 +1524,7 @@ func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) defer cancel() networkSpec := convert.BasicNetworkCreateToGRPC(s) - r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec}) + r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec}) if err != nil { return "", err } @@ -1731,22 +1534,23 @@ func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) // RemoveNetwork removes a cluster network. func (c *Cluster) RemoveNetwork(input string) error { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - network, err := getNetwork(ctx, c.client, input) + network, err := getNetwork(ctx, state.controlClient, input) if err != nil { return err } - if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil { + if _, err := state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil { return err } return nil @@ -1776,15 +1580,19 @@ func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.Control // Cleanup stops active swarm node. This is run before daemon shutdown. func (c *Cluster) Cleanup() { - c.Lock() - node := c.node + c.controlMutex.Lock() + defer c.controlMutex.Unlock() + + c.mu.Lock() + node := c.nr if node == nil { - c.Unlock() + c.mu.Unlock() return } - defer c.Unlock() - if c.isActiveManager() { - active, reachable, unreachable, err := c.managerStats() + defer c.mu.Unlock() + state := c.currentNodeState() + if state.IsActiveManager() { + active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID()) if err == nil { singlenode := active && isLastManager(reachable, unreachable) if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) { @@ -1792,13 +1600,17 @@ func (c *Cluster) Cleanup() { } } } - c.stopNode() + if err := node.Stop(); err != nil { + logrus.Errorf("failed to shut down cluster node: %v", err) + signal.DumpStacks("") + } + c.nr = nil } -func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) { +func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}) + nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{}) if err != nil { return false, 0, 0, err } @@ -1806,7 +1618,7 @@ func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, if n.ManagerStatus != nil { if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE { reachable++ - if n.ID == c.node.NodeID() { + if n.ID == currentNodeID { current = true } } @@ -1857,7 +1669,7 @@ func validateAddr(addr string) (string, error) { return strings.TrimPrefix(newaddr, "tcp://"), nil } -func initClusterSpec(node *node, spec types.Spec) error { +func initClusterSpec(node *swarmnode.Node, spec types.Spec) error { ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) for conn := range node.ListenControlSocket(ctx) { if ctx.Err() != nil { diff --git a/daemon/cluster/noderunner.go b/daemon/cluster/noderunner.go new file mode 100644 index 0000000000..0152966eca --- /dev/null +++ b/daemon/cluster/noderunner.go @@ -0,0 +1,296 @@ +package cluster + +import ( + "path/filepath" + "runtime" + "strings" + "sync" + "time" + + "github.com/Sirupsen/logrus" + types "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/daemon/cluster/executor/container" + swarmapi "github.com/docker/swarmkit/api" + swarmnode "github.com/docker/swarmkit/node" + "github.com/pkg/errors" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +// nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed. +type nodeRunner struct { + nodeState + mu sync.RWMutex + done chan struct{} // closed when swarmNode exits + ready chan struct{} // closed when swarmNode becomes active + reconnectDelay time.Duration + config nodeStartConfig + + repeatedRun bool + cancelReconnect func() + stopping bool + cluster *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct +} + +// nodeStartConfig holds configuration needed to start a new node. Exported +// fields of this structure are saved to disk in json. Unexported fields +// contain data that shouldn't be persisted between daemon reloads. +type nodeStartConfig struct { + // LocalAddr is this machine's local IP or hostname, if specified. + LocalAddr string + // RemoteAddr is the address that was given to "swarm join". It is used + // to find LocalAddr if necessary. + RemoteAddr string + // ListenAddr is the address we bind to, including a port. + ListenAddr string + // AdvertiseAddr is the address other nodes should connect to, + // including a port. + AdvertiseAddr string + joinAddr string + forceNewCluster bool + joinToken string + lockKey []byte + autolock bool +} + +func (n *nodeRunner) Ready() chan error { + c := make(chan error, 1) + n.mu.RLock() + ready, done := n.ready, n.done + n.mu.RUnlock() + go func() { + select { + case <-ready: + case <-done: + } + select { + case <-ready: + default: + n.mu.RLock() + c <- n.err + n.mu.RUnlock() + } + close(c) + }() + return c +} + +func (n *nodeRunner) Start(conf nodeStartConfig) error { + n.mu.Lock() + defer n.mu.Unlock() + + n.reconnectDelay = initialReconnectDelay + + return n.start(conf) +} + +func (n *nodeRunner) start(conf nodeStartConfig) error { + var control string + if runtime.GOOS == "windows" { + control = `\\.\pipe\` + controlSocket + } else { + control = filepath.Join(n.cluster.runtimeRoot, controlSocket) + } + + node, err := swarmnode.New(&swarmnode.Config{ + Hostname: n.cluster.config.Name, + ForceNewCluster: conf.forceNewCluster, + ListenControlAPI: control, + ListenRemoteAPI: conf.ListenAddr, + AdvertiseRemoteAPI: conf.AdvertiseAddr, + JoinAddr: conf.joinAddr, + StateDir: n.cluster.root, + JoinToken: conf.joinToken, + Executor: container.NewExecutor(n.cluster.config.Backend), + HeartbeatTick: 1, + ElectionTick: 3, + UnlockKey: conf.lockKey, + AutoLockManagers: conf.autolock, + }) + if err != nil { + return err + } + if err := node.Start(context.Background()); err != nil { + return err + } + + n.done = make(chan struct{}) + n.ready = make(chan struct{}) + n.swarmNode = node + n.config = conf + savePersistentState(n.cluster.root, conf) + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + n.handleNodeExit(node) + cancel() + }() + + go n.handleReadyEvent(ctx, node, n.ready) + go n.handleControlSocketChange(ctx, node) + + return nil +} + +func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) { + for conn := range node.ListenControlSocket(ctx) { + n.mu.Lock() + if n.grpcConn != conn { + if conn == nil { + n.controlClient = nil + n.logsClient = nil + } else { + n.controlClient = swarmapi.NewControlClient(conn) + n.logsClient = swarmapi.NewLogsClient(conn) + } + } + n.grpcConn = conn + n.mu.Unlock() + n.cluster.configEvent <- struct{}{} + } +} + +func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) { + select { + case <-node.Ready(): + n.mu.Lock() + n.err = nil + n.mu.Unlock() + close(ready) + case <-ctx.Done(): + } + n.cluster.configEvent <- struct{}{} +} + +func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) { + err := detectLockedError(node.Err(context.Background())) + if err != nil { + logrus.Errorf("cluster exited with error: %v", err) + } + n.mu.Lock() + n.swarmNode = nil + n.err = err + close(n.done) + select { + case <-n.ready: + n.enableReconnectWatcher() + default: + if n.repeatedRun { + n.enableReconnectWatcher() + } + } + n.repeatedRun = true + n.mu.Unlock() +} + +// Stop stops the current swarm node if it is running. +func (n *nodeRunner) Stop() error { + n.mu.Lock() + if n.cancelReconnect != nil { // between restarts + n.cancelReconnect() + n.cancelReconnect = nil + } + if n.swarmNode == nil { + n.mu.Unlock() + return nil + } + n.stopping = true + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + n.mu.Unlock() + return err + } + n.mu.Unlock() + <-n.done + return nil +} + +func (n *nodeRunner) State() nodeState { + if n == nil { + return nodeState{status: types.LocalNodeStateInactive} + } + n.mu.RLock() + defer n.mu.RUnlock() + + ns := n.nodeState + + if ns.err != nil || n.cancelReconnect != nil { + if errors.Cause(ns.err) == ErrSwarmLocked { + ns.status = types.LocalNodeStateLocked + } else { + ns.status = types.LocalNodeStateError + } + } else { + select { + case <-n.ready: + ns.status = types.LocalNodeStateActive + default: + ns.status = types.LocalNodeStatePending + } + } + + return ns +} + +func (n *nodeRunner) enableReconnectWatcher() { + if n.stopping { + return + } + n.reconnectDelay *= 2 + if n.reconnectDelay > maxReconnectDelay { + n.reconnectDelay = maxReconnectDelay + } + logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds()) + delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay) + n.cancelReconnect = cancel + + config := n.config + go func() { + <-delayCtx.Done() + if delayCtx.Err() != context.DeadlineExceeded { + return + } + n.mu.Lock() + defer n.mu.Unlock() + if n.stopping { + return + } + config.RemoteAddr = n.cluster.getRemoteAddress() + config.joinAddr = config.RemoteAddr + if err := n.start(config); err != nil { + n.err = err + } + }() +} + +// nodeState represents information about the current state of the cluster and +// provides access to the grpc clients. +type nodeState struct { + swarmNode *swarmnode.Node + grpcConn *grpc.ClientConn + controlClient swarmapi.ControlClient + logsClient swarmapi.LogsClient + status types.LocalNodeState + actualLocalAddr string + err error +} + +// IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true. +func (ns nodeState) IsActiveManager() bool { + return ns.controlClient != nil +} + +// IsManager returns true if node is a manager. +func (ns nodeState) IsManager() bool { + return ns.swarmNode != nil && ns.swarmNode.Manager() != nil +} + +// NodeID returns node's ID or empty string if node is inactive. +func (ns nodeState) NodeID() string { + if ns.swarmNode != nil { + return ns.swarmNode.NodeID() + } + return "" +} diff --git a/daemon/cluster/secrets.go b/daemon/cluster/secrets.go index 2b9eb5da1d..959ae29376 100644 --- a/daemon/cluster/secrets.go +++ b/daemon/cluster/secrets.go @@ -9,17 +9,18 @@ import ( // GetSecret returns a secret from a managed swarm cluster func (c *Cluster) GetSecret(id string) (types.Secret, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return types.Secret{}, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return types.Secret{}, c.errNoManager(state) } ctx, cancel := c.getRequestContext() defer cancel() - r, err := c.node.client.GetSecret(ctx, &swarmapi.GetSecretRequest{SecretID: id}) + r, err := state.controlClient.GetSecret(ctx, &swarmapi.GetSecretRequest{SecretID: id}) if err != nil { return types.Secret{}, err } @@ -29,11 +30,12 @@ func (c *Cluster) GetSecret(id string) (types.Secret, error) { // GetSecrets returns all secrets of a managed swarm cluster. func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return nil, c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return nil, c.errNoManager(state) } filters, err := newListSecretsFilters(options.Filters) @@ -43,7 +45,7 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret ctx, cancel := c.getRequestContext() defer cancel() - r, err := c.node.client.ListSecrets(ctx, + r, err := state.controlClient.ListSecrets(ctx, &swarmapi.ListSecretsRequest{Filters: filters}) if err != nil { return nil, err @@ -60,11 +62,12 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret // CreateSecret creates a new secret in a managed swarm cluster. func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return "", c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return "", c.errNoManager(state) } ctx, cancel := c.getRequestContext() @@ -72,7 +75,7 @@ func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) { secretSpec := convert.SecretSpecToGRPC(s) - r, err := c.node.client.CreateSecret(ctx, + r, err := state.controlClient.CreateSecret(ctx, &swarmapi.CreateSecretRequest{Spec: &secretSpec}) if err != nil { return "", err @@ -83,11 +86,12 @@ func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) { // RemoveSecret removes a secret from a managed swarm cluster. func (c *Cluster) RemoveSecret(id string) error { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) } ctx, cancel := c.getRequestContext() @@ -97,7 +101,7 @@ func (c *Cluster) RemoveSecret(id string) error { SecretID: id, } - if _, err := c.node.client.RemoveSecret(ctx, req); err != nil { + if _, err := state.controlClient.RemoveSecret(ctx, req); err != nil { return err } return nil @@ -106,11 +110,12 @@ func (c *Cluster) RemoveSecret(id string) error { // UpdateSecret updates a secret in a managed swarm cluster. // Note: this is not exposed to the CLI but is available from the API only func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec) error { - c.RLock() - defer c.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() - if !c.isActiveManager() { - return c.errNoManager() + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) } ctx, cancel := c.getRequestContext() @@ -118,7 +123,7 @@ func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec) secretSpec := convert.SecretSpecToGRPC(spec) - if _, err := c.client.UpdateSecret(ctx, + if _, err := state.controlClient.UpdateSecret(ctx, &swarmapi.UpdateSecretRequest{ SecretID: id, SecretVersion: &swarmapi.Version{ diff --git a/daemon/cluster/utils.go b/daemon/cluster/utils.go new file mode 100644 index 0000000000..e20d19f518 --- /dev/null +++ b/daemon/cluster/utils.go @@ -0,0 +1,56 @@ +package cluster + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/docker/docker/pkg/ioutils" +) + +func loadPersistentState(root string) (*nodeStartConfig, error) { + dt, err := ioutil.ReadFile(filepath.Join(root, stateFile)) + if err != nil { + return nil, err + } + // missing certificate means no actual state to restore from + if _, err := os.Stat(filepath.Join(root, "certificates/swarm-node.crt")); err != nil { + if os.IsNotExist(err) { + clearPersistentState(root) + } + return nil, err + } + var st nodeStartConfig + if err := json.Unmarshal(dt, &st); err != nil { + return nil, err + } + return &st, nil +} + +func savePersistentState(root string, config nodeStartConfig) error { + dt, err := json.Marshal(config) + if err != nil { + return err + } + return ioutils.AtomicWriteFile(filepath.Join(root, stateFile), dt, 0600) +} + +func clearPersistentState(root string) error { + // todo: backup this data instead of removing? + if err := os.RemoveAll(root); err != nil { + return err + } + if err := os.MkdirAll(root, 0700); err != nil { + return err + } + return nil +} + +func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool { + return reachable-2 <= unreachable +} + +func isLastManager(reachable, unreachable int) bool { + return reachable == 1 && unreachable == 0 +} diff --git a/integration-cli/docker_api_swarm_test.go b/integration-cli/docker_api_swarm_test.go index 39bf721211..b95327d21d 100644 --- a/integration-cli/docker_api_swarm_test.go +++ b/integration-cli/docker_api_swarm_test.go @@ -75,6 +75,8 @@ func (s *DockerSwarmSuite) TestAPISwarmJoinToken(c *check.C) { d1 := s.AddDaemon(c, false, false) c.Assert(d1.Init(swarm.InitRequest{}), checker.IsNil) + // todo: error message differs depending if some components of token are valid + d2 := s.AddDaemon(c, false, false) err := d2.Join(swarm.JoinRequest{RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) @@ -85,7 +87,7 @@ func (s *DockerSwarmSuite) TestAPISwarmJoinToken(c *check.C) { err = d2.Join(swarm.JoinRequest{JoinToken: "foobaz", RemoteAddrs: []string{d1.listenAddr}}) c.Assert(err, checker.NotNil) - c.Assert(err.Error(), checker.Contains, "join token is necessary") + c.Assert(err.Error(), checker.Contains, "invalid join token") info, err = d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive)