mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
ebcb7d6b40
Use strongly typed errors to set HTTP status codes. Error interfaces are defined in the api/errors package and errors returned from controllers are checked against these interfaces. Errors can be wraeped in a pkg/errors.Causer, as long as somewhere in the line of causes one of the interfaces is implemented. The special error interfaces take precedence over Causer, meaning if both Causer and one of the new error interfaces are implemented, the Causer is not traversed. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
548 lines
14 KiB
Go
548 lines
14 KiB
Go
package cluster
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
apitypes "github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/filters"
|
|
types "github.com/docker/docker/api/types/swarm"
|
|
"github.com/docker/docker/daemon/cluster/convert"
|
|
"github.com/docker/docker/opts"
|
|
"github.com/docker/docker/pkg/signal"
|
|
swarmapi "github.com/docker/swarmkit/api"
|
|
"github.com/docker/swarmkit/manager/encryption"
|
|
swarmnode "github.com/docker/swarmkit/node"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
// Init initializes new cluster from user provided request.
|
|
func (c *Cluster) Init(req types.InitRequest) (string, error) {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
if c.nr != nil {
|
|
if req.ForceNewCluster {
|
|
// Take c.mu temporarily to wait for presently running
|
|
// API handlers to finish before shutting down the node.
|
|
c.mu.Lock()
|
|
c.mu.Unlock()
|
|
|
|
if err := c.nr.Stop(); err != nil {
|
|
return "", err
|
|
}
|
|
} else {
|
|
return "", errSwarmExists
|
|
}
|
|
}
|
|
|
|
if err := validateAndSanitizeInitRequest(&req); err != nil {
|
|
return "", validationError{err}
|
|
}
|
|
|
|
listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
localAddr := listenHost
|
|
|
|
// If the local address is undetermined, the advertise address
|
|
// will be used as local address, if it belongs to this system.
|
|
// If the advertise address is not local, then we try to find
|
|
// a system address to use as local address. If this fails,
|
|
// we give up and ask the user to pass the listen address.
|
|
if net.ParseIP(localAddr).IsUnspecified() {
|
|
advertiseIP := net.ParseIP(advertiseHost)
|
|
|
|
found := false
|
|
for _, systemIP := range listSystemIPs() {
|
|
if systemIP.Equal(advertiseIP) {
|
|
localAddr = advertiseIP.String()
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
ip, err := c.resolveSystemAddr()
|
|
if err != nil {
|
|
logrus.Warnf("Could not find a local address: %v", err)
|
|
return "", errMustSpecifyListenAddr
|
|
}
|
|
localAddr = ip.String()
|
|
}
|
|
}
|
|
|
|
nr, err := c.newNodeRunner(nodeStartConfig{
|
|
forceNewCluster: req.ForceNewCluster,
|
|
autolock: req.AutoLockManagers,
|
|
LocalAddr: localAddr,
|
|
ListenAddr: net.JoinHostPort(listenHost, listenPort),
|
|
AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
|
|
DataPathAddr: dataPathAddr,
|
|
availability: req.Availability,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
c.mu.Lock()
|
|
c.nr = nr
|
|
c.mu.Unlock()
|
|
|
|
if err := <-nr.Ready(); err != nil {
|
|
c.mu.Lock()
|
|
c.nr = nil
|
|
c.mu.Unlock()
|
|
if !req.ForceNewCluster { // if failure on first attempt don't keep state
|
|
if err := clearPersistentState(c.root); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return "", err
|
|
}
|
|
state := nr.State()
|
|
if state.swarmNode == nil { // should never happen but protect from panic
|
|
return "", errors.New("invalid cluster state for spec initialization")
|
|
}
|
|
if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
|
|
return "", err
|
|
}
|
|
return state.NodeID(), nil
|
|
}
|
|
|
|
// Join makes current Cluster part of an existing swarm cluster.
|
|
func (c *Cluster) Join(req types.JoinRequest) error {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
c.mu.Lock()
|
|
if c.nr != nil {
|
|
c.mu.Unlock()
|
|
return errors.WithStack(errSwarmExists)
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
if err := validateAndSanitizeJoinRequest(&req); err != nil {
|
|
return validationError{err}
|
|
}
|
|
|
|
listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var advertiseAddr string
|
|
if req.AdvertiseAddr != "" {
|
|
advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
|
|
// For joining, we don't need to provide an advertise address,
|
|
// since the remote side can detect it.
|
|
if err == nil {
|
|
advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
|
|
}
|
|
}
|
|
|
|
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
nr, err := c.newNodeRunner(nodeStartConfig{
|
|
RemoteAddr: req.RemoteAddrs[0],
|
|
ListenAddr: net.JoinHostPort(listenHost, listenPort),
|
|
AdvertiseAddr: advertiseAddr,
|
|
DataPathAddr: dataPathAddr,
|
|
joinAddr: req.RemoteAddrs[0],
|
|
joinToken: req.JoinToken,
|
|
availability: req.Availability,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.nr = nr
|
|
c.mu.Unlock()
|
|
|
|
select {
|
|
case <-time.After(swarmConnectTimeout):
|
|
return errSwarmJoinTimeoutReached
|
|
case err := <-nr.Ready():
|
|
if err != nil {
|
|
c.mu.Lock()
|
|
c.nr = nil
|
|
c.mu.Unlock()
|
|
if err := clearPersistentState(c.root); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Inspect retrieves the configuration properties of a managed swarm cluster.
|
|
func (c *Cluster) Inspect() (types.Swarm, error) {
|
|
var swarm *swarmapi.Cluster
|
|
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
s, err := getSwarm(ctx, state.controlClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
swarm = s
|
|
return nil
|
|
}); err != nil {
|
|
return types.Swarm{}, err
|
|
}
|
|
return convert.SwarmFromGRPC(*swarm), nil
|
|
}
|
|
|
|
// Update updates configuration of a managed swarm cluster.
|
|
func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
|
|
return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
swarm, err := getSwarm(ctx, state.controlClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// In update, client should provide the complete spec of the swarm, including
|
|
// Name and Labels. If a field is specified with 0 or nil, then the default value
|
|
// will be used to swarmkit.
|
|
clusterSpec, err := convert.SwarmSpecToGRPC(spec)
|
|
if err != nil {
|
|
return convertError{err}
|
|
}
|
|
|
|
_, err = state.controlClient.UpdateCluster(
|
|
ctx,
|
|
&swarmapi.UpdateClusterRequest{
|
|
ClusterID: swarm.ID,
|
|
Spec: &clusterSpec,
|
|
ClusterVersion: &swarmapi.Version{
|
|
Index: version,
|
|
},
|
|
Rotation: swarmapi.KeyRotation{
|
|
WorkerJoinToken: flags.RotateWorkerToken,
|
|
ManagerJoinToken: flags.RotateManagerToken,
|
|
ManagerUnlockKey: flags.RotateManagerUnlockKey,
|
|
},
|
|
},
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
// GetUnlockKey returns the unlock key for the swarm.
|
|
func (c *Cluster) GetUnlockKey() (string, error) {
|
|
var resp *swarmapi.GetUnlockKeyResponse
|
|
if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
|
|
client := swarmapi.NewCAClient(state.grpcConn)
|
|
|
|
r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp = r
|
|
return nil
|
|
}); err != nil {
|
|
return "", err
|
|
}
|
|
if len(resp.UnlockKey) == 0 {
|
|
// no key
|
|
return "", nil
|
|
}
|
|
return encryption.HumanReadableKey(resp.UnlockKey), nil
|
|
}
|
|
|
|
// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
|
|
func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
|
|
c.mu.RLock()
|
|
state := c.currentNodeState()
|
|
|
|
if !state.IsActiveManager() {
|
|
// when manager is not active,
|
|
// unless it is locked, otherwise return error.
|
|
if err := c.errNoManager(state); err != errSwarmLocked {
|
|
c.mu.RUnlock()
|
|
return err
|
|
}
|
|
} else {
|
|
// when manager is active, return an error of "not locked"
|
|
c.mu.RUnlock()
|
|
return notLockedError{}
|
|
}
|
|
|
|
// only when swarm is locked, code running reaches here
|
|
nr := c.nr
|
|
c.mu.RUnlock()
|
|
|
|
key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
|
|
if err != nil {
|
|
return validationError{err}
|
|
}
|
|
|
|
config := nr.config
|
|
config.lockKey = key
|
|
if err := nr.Stop(); err != nil {
|
|
return err
|
|
}
|
|
nr, err = c.newNodeRunner(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.nr = nr
|
|
c.mu.Unlock()
|
|
|
|
if err := <-nr.Ready(); err != nil {
|
|
if errors.Cause(err) == errSwarmLocked {
|
|
return invalidUnlockKey{}
|
|
}
|
|
return errors.Errorf("swarm component could not be started: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Leave shuts down Cluster and removes current state.
|
|
func (c *Cluster) Leave(force bool) error {
|
|
c.controlMutex.Lock()
|
|
defer c.controlMutex.Unlock()
|
|
|
|
c.mu.Lock()
|
|
nr := c.nr
|
|
if nr == nil {
|
|
c.mu.Unlock()
|
|
return errors.WithStack(errNoSwarm)
|
|
}
|
|
|
|
state := c.currentNodeState()
|
|
|
|
c.mu.Unlock()
|
|
|
|
if errors.Cause(state.err) == errSwarmLocked && !force {
|
|
// leave a locked swarm without --force is not allowed
|
|
return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
|
|
}
|
|
|
|
if state.IsManager() && !force {
|
|
msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
|
|
if state.IsActiveManager() {
|
|
active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
|
|
if err == nil {
|
|
if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
|
|
if isLastManager(reachable, unreachable) {
|
|
msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
|
|
return errors.WithStack(notAvailableError(msg))
|
|
}
|
|
msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
|
|
}
|
|
}
|
|
} else {
|
|
msg += "Doing so may lose the consensus of your cluster. "
|
|
}
|
|
|
|
msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
|
|
return errors.WithStack(notAvailableError(msg))
|
|
}
|
|
// release readers in here
|
|
if err := nr.Stop(); err != nil {
|
|
logrus.Errorf("failed to shut down cluster node: %v", err)
|
|
signal.DumpStacks("")
|
|
return err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.nr = nil
|
|
c.mu.Unlock()
|
|
|
|
if nodeID := state.NodeID(); nodeID != "" {
|
|
nodeContainers, err := c.listContainerForNode(nodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, id := range nodeContainers {
|
|
if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
|
|
logrus.Errorf("error removing %v: %v", id, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// todo: cleanup optional?
|
|
if err := clearPersistentState(c.root); err != nil {
|
|
return err
|
|
}
|
|
c.config.Backend.DaemonLeavesCluster()
|
|
return nil
|
|
}
|
|
|
|
// Info returns information about the current cluster state.
|
|
func (c *Cluster) Info() types.Info {
|
|
info := types.Info{
|
|
NodeAddr: c.GetAdvertiseAddress(),
|
|
}
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
state := c.currentNodeState()
|
|
info.LocalNodeState = state.status
|
|
if state.err != nil {
|
|
info.Error = state.err.Error()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
if state.IsActiveManager() {
|
|
info.ControlAvailable = true
|
|
swarm, err := c.Inspect()
|
|
if err != nil {
|
|
info.Error = err.Error()
|
|
}
|
|
|
|
info.Cluster = &swarm.ClusterInfo
|
|
|
|
if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
|
|
info.Error = err.Error()
|
|
} else {
|
|
info.Nodes = len(r.Nodes)
|
|
for _, n := range r.Nodes {
|
|
if n.ManagerStatus != nil {
|
|
info.Managers = info.Managers + 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if state.swarmNode != nil {
|
|
for _, r := range state.swarmNode.Remotes() {
|
|
info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
|
|
}
|
|
info.NodeID = state.swarmNode.NodeID()
|
|
}
|
|
|
|
return info
|
|
}
|
|
|
|
func validateAndSanitizeInitRequest(req *types.InitRequest) error {
|
|
var err error
|
|
req.ListenAddr, err = validateAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
|
|
}
|
|
|
|
if req.Spec.Annotations.Name == "" {
|
|
req.Spec.Annotations.Name = "default"
|
|
} else if req.Spec.Annotations.Name != "default" {
|
|
return errors.New(`swarm spec must be named "default"`)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
|
|
var err error
|
|
req.ListenAddr, err = validateAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
|
|
}
|
|
if len(req.RemoteAddrs) == 0 {
|
|
return errors.New("at least 1 RemoteAddr is required to join")
|
|
}
|
|
for i := range req.RemoteAddrs {
|
|
req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
|
|
if err != nil {
|
|
return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func validateAddr(addr string) (string, error) {
|
|
if addr == "" {
|
|
return addr, errors.New("invalid empty address")
|
|
}
|
|
newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
|
|
if err != nil {
|
|
return addr, nil
|
|
}
|
|
return strings.TrimPrefix(newaddr, "tcp://"), nil
|
|
}
|
|
|
|
func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
|
|
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
|
for conn := range node.ListenControlSocket(ctx) {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
if conn != nil {
|
|
client := swarmapi.NewControlClient(conn)
|
|
var cluster *swarmapi.Cluster
|
|
for i := 0; ; i++ {
|
|
lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("error on listing clusters: %v", err)
|
|
}
|
|
if len(lcr.Clusters) == 0 {
|
|
if i < 10 {
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
return errors.New("empty list of clusters was returned")
|
|
}
|
|
cluster = lcr.Clusters[0]
|
|
break
|
|
}
|
|
// In init, we take the initial default values from swarmkit, and merge
|
|
// any non nil or 0 value from spec to GRPC spec. This will leave the
|
|
// default value alone.
|
|
// Note that this is different from Update(), as in Update() we expect
|
|
// user to specify the complete spec of the cluster (as they already know
|
|
// the existing one and knows which field to update)
|
|
clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
|
|
if err != nil {
|
|
return fmt.Errorf("error updating cluster settings: %v", err)
|
|
}
|
|
_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
|
|
ClusterID: cluster.ID,
|
|
ClusterVersion: &cluster.Meta.Version,
|
|
Spec: &clusterSpec,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error updating cluster settings: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
return ctx.Err()
|
|
}
|
|
|
|
func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
|
|
var ids []string
|
|
filters := filters.NewArgs()
|
|
filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
|
|
containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
|
|
Filters: filters,
|
|
})
|
|
if err != nil {
|
|
return []string{}, err
|
|
}
|
|
for _, c := range containers {
|
|
ids = append(ids, c.ID)
|
|
}
|
|
return ids, nil
|
|
}
|