mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
3d30155735
In 24823, `swarm join` has been updated to take a `--token` flag and flag `--manager` has been removed. Though in errNoManager() the error message still use the old description. This fix update the error message in errNoManager() and conforms to the current available flags. This fix is related to 24823. Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
1265 lines
31 KiB
Go
1265 lines
31 KiB
Go
package cluster
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/docker/daemon/cluster/convert"
|
|
executorpkg "github.com/docker/docker/daemon/cluster/executor"
|
|
"github.com/docker/docker/daemon/cluster/executor/container"
|
|
"github.com/docker/docker/errors"
|
|
"github.com/docker/docker/opts"
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/docker/docker/runconfig"
|
|
apitypes "github.com/docker/engine-api/types"
|
|
"github.com/docker/engine-api/types/filters"
|
|
types "github.com/docker/engine-api/types/swarm"
|
|
swarmagent "github.com/docker/swarmkit/agent"
|
|
swarmapi "github.com/docker/swarmkit/api"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
const swarmDirName = "swarm"
|
|
const controlSocket = "control.sock"
|
|
const swarmConnectTimeout = 20 * time.Second
|
|
const swarmRequestTimeout = 20 * time.Second
|
|
const stateFile = "docker-state.json"
|
|
const defaultAddr = "0.0.0.0:2377"
|
|
|
|
const (
|
|
initialReconnectDelay = 100 * time.Millisecond
|
|
maxReconnectDelay = 30 * time.Second
|
|
)
|
|
|
|
// ErrNoSwarm is returned on leaving a cluster that was never initialized
|
|
var ErrNoSwarm = fmt.Errorf("This node is not part of swarm")
|
|
|
|
// ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
|
|
var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.")
|
|
|
|
// ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
|
|
var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
|
|
|
|
// ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
|
|
var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current swarm status of your node.")
|
|
|
|
// defaultSpec contains some sane defaults if cluster options are missing on init
|
|
var defaultSpec = types.Spec{
|
|
Raft: types.RaftConfig{
|
|
SnapshotInterval: 10000,
|
|
KeepOldSnapshots: 0,
|
|
LogEntriesForSlowFollowers: 500,
|
|
HeartbeatTick: 1,
|
|
ElectionTick: 3,
|
|
},
|
|
CAConfig: types.CAConfig{
|
|
NodeCertExpiry: 90 * 24 * time.Hour,
|
|
},
|
|
Dispatcher: types.DispatcherConfig{
|
|
HeartbeatPeriod: uint64((5 * time.Second).Nanoseconds()),
|
|
},
|
|
Orchestration: types.OrchestrationConfig{
|
|
TaskHistoryRetentionLimit: 10,
|
|
},
|
|
}
|
|
|
|
type state struct {
|
|
ListenAddr string
|
|
}
|
|
|
|
// Config provides values for Cluster.
|
|
type Config struct {
|
|
Root string
|
|
Name string
|
|
Backend executorpkg.Backend
|
|
}
|
|
|
|
// Cluster provides capabilities to participate in a cluster as a worker or a
|
|
// manager.
|
|
type Cluster struct {
|
|
sync.RWMutex
|
|
*node
|
|
root string
|
|
config Config
|
|
configEvent chan struct{} // todo: make this array and goroutine safe
|
|
listenAddr string
|
|
stop bool
|
|
err error
|
|
cancelDelay func()
|
|
}
|
|
|
|
type node struct {
|
|
*swarmagent.Node
|
|
done chan struct{}
|
|
ready bool
|
|
conn *grpc.ClientConn
|
|
client swarmapi.ControlClient
|
|
reconnectDelay time.Duration
|
|
}
|
|
|
|
// New creates a new Cluster instance using provided config.
|
|
func New(config Config) (*Cluster, error) {
|
|
root := filepath.Join(config.Root, swarmDirName)
|
|
if err := os.MkdirAll(root, 0700); err != nil {
|
|
return nil, err
|
|
}
|
|
c := &Cluster{
|
|
root: root,
|
|
config: config,
|
|
configEvent: make(chan struct{}, 10),
|
|
}
|
|
|
|
st, err := c.loadState()
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return c, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
n, err := c.startNewNode(false, st.ListenAddr, "", "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case <-time.After(swarmConnectTimeout):
|
|
logrus.Errorf("swarm component could not be started before timeout was reached")
|
|
case <-n.Ready():
|
|
case <-n.done:
|
|
return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
|
|
}
|
|
go c.reconnectOnFailure(n)
|
|
return c, nil
|
|
}
|
|
|
|
func (c *Cluster) loadState() (*state, error) {
|
|
dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// missing certificate means no actual state to restore from
|
|
if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
|
|
if os.IsNotExist(err) {
|
|
c.clearState()
|
|
}
|
|
return nil, err
|
|
}
|
|
var st state
|
|
if err := json.Unmarshal(dt, &st); err != nil {
|
|
return nil, err
|
|
}
|
|
return &st, nil
|
|
}
|
|
|
|
func (c *Cluster) saveState() error {
|
|
dt, err := json.Marshal(state{ListenAddr: c.listenAddr})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
|
|
}
|
|
|
|
func (c *Cluster) reconnectOnFailure(n *node) {
|
|
for {
|
|
<-n.done
|
|
c.Lock()
|
|
if c.stop || c.node != nil {
|
|
c.Unlock()
|
|
return
|
|
}
|
|
n.reconnectDelay *= 2
|
|
if n.reconnectDelay > maxReconnectDelay {
|
|
n.reconnectDelay = maxReconnectDelay
|
|
}
|
|
logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
|
|
delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
|
|
c.cancelDelay = cancel
|
|
c.Unlock()
|
|
<-delayCtx.Done()
|
|
if delayCtx.Err() != context.DeadlineExceeded {
|
|
return
|
|
}
|
|
c.Lock()
|
|
if c.node != nil {
|
|
c.Unlock()
|
|
return
|
|
}
|
|
var err error
|
|
n, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "")
|
|
if err != nil {
|
|
c.err = err
|
|
close(n.done)
|
|
}
|
|
c.Unlock()
|
|
}
|
|
}
|
|
|
|
func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, joinToken string) (*node, error) {
|
|
if err := c.config.Backend.IsSwarmCompatible(); err != nil {
|
|
return nil, err
|
|
}
|
|
c.node = nil
|
|
c.cancelDelay = nil
|
|
c.stop = false
|
|
n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
|
|
Hostname: c.config.Name,
|
|
ForceNewCluster: forceNewCluster,
|
|
ListenControlAPI: filepath.Join(c.root, controlSocket),
|
|
ListenRemoteAPI: listenAddr,
|
|
JoinAddr: joinAddr,
|
|
StateDir: c.root,
|
|
JoinToken: joinToken,
|
|
Executor: container.NewExecutor(c.config.Backend),
|
|
HeartbeatTick: 1,
|
|
ElectionTick: 3,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctx := context.Background()
|
|
if err := n.Start(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
node := &node{
|
|
Node: n,
|
|
done: make(chan struct{}),
|
|
reconnectDelay: initialReconnectDelay,
|
|
}
|
|
c.node = node
|
|
c.listenAddr = listenAddr
|
|
c.saveState()
|
|
c.config.Backend.SetClusterProvider(c)
|
|
go func() {
|
|
err := n.Err(ctx)
|
|
if err != nil {
|
|
logrus.Errorf("cluster exited with error: %v", err)
|
|
}
|
|
c.Lock()
|
|
c.node = nil
|
|
c.err = err
|
|
c.Unlock()
|
|
close(node.done)
|
|
}()
|
|
|
|
go func() {
|
|
select {
|
|
case <-n.Ready():
|
|
c.Lock()
|
|
node.ready = true
|
|
c.err = nil
|
|
c.Unlock()
|
|
case <-ctx.Done():
|
|
}
|
|
c.configEvent <- struct{}{}
|
|
}()
|
|
|
|
go func() {
|
|
for conn := range n.ListenControlSocket(ctx) {
|
|
c.Lock()
|
|
if node.conn != conn {
|
|
if conn == nil {
|
|
node.client = nil
|
|
} else {
|
|
node.client = swarmapi.NewControlClient(conn)
|
|
}
|
|
}
|
|
node.conn = conn
|
|
c.Unlock()
|
|
c.configEvent <- struct{}{}
|
|
}
|
|
}()
|
|
|
|
return node, nil
|
|
}
|
|
|
|
// Init initializes new cluster from user provided request.
|
|
func (c *Cluster) Init(req types.InitRequest) (string, error) {
|
|
c.Lock()
|
|
if node := c.node; node != nil {
|
|
if !req.ForceNewCluster {
|
|
c.Unlock()
|
|
return "", ErrSwarmExists
|
|
}
|
|
if err := c.stopNode(); err != nil {
|
|
c.Unlock()
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
if err := validateAndSanitizeInitRequest(&req); err != nil {
|
|
c.Unlock()
|
|
return "", err
|
|
}
|
|
|
|
// todo: check current state existing
|
|
n, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "")
|
|
if err != nil {
|
|
c.Unlock()
|
|
return "", err
|
|
}
|
|
c.Unlock()
|
|
|
|
select {
|
|
case <-n.Ready():
|
|
if err := initClusterSpec(n, req.Spec); err != nil {
|
|
return "", err
|
|
}
|
|
go c.reconnectOnFailure(n)
|
|
return n.NodeID(), nil
|
|
case <-n.done:
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
if !req.ForceNewCluster { // if failure on first attempt don't keep state
|
|
if err := c.clearState(); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return "", c.err
|
|
}
|
|
}
|
|
|
|
// Join makes current Cluster part of an existing swarm cluster.
|
|
func (c *Cluster) Join(req types.JoinRequest) error {
|
|
c.Lock()
|
|
if node := c.node; node != nil {
|
|
c.Unlock()
|
|
return ErrSwarmExists
|
|
}
|
|
if err := validateAndSanitizeJoinRequest(&req); err != nil {
|
|
c.Unlock()
|
|
return err
|
|
}
|
|
// todo: check current state existing
|
|
n, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.JoinToken)
|
|
if err != nil {
|
|
c.Unlock()
|
|
return err
|
|
}
|
|
c.Unlock()
|
|
|
|
select {
|
|
case <-time.After(swarmConnectTimeout):
|
|
// attempt to connect will continue in background, also reconnecting
|
|
go c.reconnectOnFailure(n)
|
|
return ErrSwarmJoinTimeoutReached
|
|
case <-n.Ready():
|
|
go c.reconnectOnFailure(n)
|
|
return nil
|
|
case <-n.done:
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
return c.err
|
|
}
|
|
}
|
|
|
|
// stopNode is a helper that stops the active c.node and waits until it has
|
|
// shut down. Call while keeping the cluster lock.
|
|
func (c *Cluster) stopNode() error {
|
|
if c.node == nil {
|
|
return nil
|
|
}
|
|
c.stop = true
|
|
if c.cancelDelay != nil {
|
|
c.cancelDelay()
|
|
c.cancelDelay = nil
|
|
}
|
|
node := c.node
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
// TODO: can't hold lock on stop because it calls back to network
|
|
c.Unlock()
|
|
defer c.Lock()
|
|
if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
|
|
return err
|
|
}
|
|
<-node.done
|
|
return nil
|
|
}
|
|
|
|
// Leave shuts down Cluster and removes current state.
|
|
func (c *Cluster) Leave(force bool) error {
|
|
c.Lock()
|
|
node := c.node
|
|
if node == nil {
|
|
c.Unlock()
|
|
return ErrNoSwarm
|
|
}
|
|
|
|
if node.Manager() != nil && !force {
|
|
msg := "You are attempting to leave cluster on a node that is participating as a manager. "
|
|
if c.isActiveManager() {
|
|
active, reachable, unreachable, err := c.managerStats()
|
|
if err == nil {
|
|
if active && reachable-2 <= unreachable {
|
|
if reachable == 1 && unreachable == 0 {
|
|
msg += "Removing the last manager will erase all current state of the cluster. Use `--force` to ignore this message. "
|
|
c.Unlock()
|
|
return fmt.Errorf(msg)
|
|
}
|
|
msg += fmt.Sprintf("Leaving the cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable)
|
|
}
|
|
}
|
|
} else {
|
|
msg += "Doing so may lose the consensus of your cluster. "
|
|
}
|
|
|
|
msg += "The only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
|
|
c.Unlock()
|
|
return fmt.Errorf(msg)
|
|
}
|
|
if err := c.stopNode(); err != nil {
|
|
c.Unlock()
|
|
return err
|
|
}
|
|
c.Unlock()
|
|
if nodeID := node.NodeID(); nodeID != "" {
|
|
for _, id := range c.config.Backend.ListContainersForNode(nodeID) {
|
|
if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
|
|
logrus.Errorf("error removing %v: %v", id, err)
|
|
}
|
|
}
|
|
}
|
|
c.configEvent <- struct{}{}
|
|
// todo: cleanup optional?
|
|
if err := c.clearState(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) clearState() error {
|
|
// todo: backup this data instead of removing?
|
|
if err := os.RemoveAll(c.root); err != nil {
|
|
return err
|
|
}
|
|
if err := os.MkdirAll(c.root, 0700); err != nil {
|
|
return err
|
|
}
|
|
c.config.Backend.SetClusterProvider(nil)
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
|
|
return context.WithTimeout(context.Background(), swarmRequestTimeout)
|
|
}
|
|
|
|
// Inspect retrieves the configuration properties of a managed swarm cluster.
|
|
func (c *Cluster) Inspect() (types.Swarm, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return types.Swarm{}, c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
swarm, err := getSwarm(ctx, c.client)
|
|
if err != nil {
|
|
return types.Swarm{}, err
|
|
}
|
|
|
|
if err != nil {
|
|
return types.Swarm{}, err
|
|
}
|
|
|
|
return convert.SwarmFromGRPC(*swarm), nil
|
|
}
|
|
|
|
// Update updates configuration of a managed swarm cluster.
|
|
func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
swarm, err := getSwarm(ctx, c.client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
swarmSpec, err := convert.SwarmSpecToGRPC(spec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = c.client.UpdateCluster(
|
|
ctx,
|
|
&swarmapi.UpdateClusterRequest{
|
|
ClusterID: swarm.ID,
|
|
Spec: &swarmSpec,
|
|
ClusterVersion: &swarmapi.Version{
|
|
Index: version,
|
|
},
|
|
Rotation: swarmapi.JoinTokenRotation{
|
|
RotateWorkerToken: flags.RotateWorkerToken,
|
|
RotateManagerToken: flags.RotateManagerToken,
|
|
},
|
|
},
|
|
)
|
|
return err
|
|
}
|
|
|
|
// IsManager returns true if Cluster is participating as a manager.
|
|
func (c *Cluster) IsManager() bool {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
return c.isActiveManager()
|
|
}
|
|
|
|
// IsAgent returns true if Cluster is participating as a worker/agent.
|
|
func (c *Cluster) IsAgent() bool {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
return c.node != nil && c.ready
|
|
}
|
|
|
|
// GetListenAddress returns the listening address for current manager's
|
|
// consensus and dispatcher APIs.
|
|
func (c *Cluster) GetListenAddress() string {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
if c.isActiveManager() {
|
|
return c.listenAddr
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// GetRemoteAddress returns a known advertise address of a remote manager if
|
|
// available.
|
|
// todo: change to array/connect with info
|
|
func (c *Cluster) GetRemoteAddress() string {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
return c.getRemoteAddress()
|
|
}
|
|
|
|
func (c *Cluster) getRemoteAddress() string {
|
|
if c.node == nil {
|
|
return ""
|
|
}
|
|
nodeID := c.node.NodeID()
|
|
for _, r := range c.node.Remotes() {
|
|
if r.NodeID != nodeID {
|
|
return r.Addr
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// ListenClusterEvents returns a channel that receives messages on cluster
|
|
// participation changes.
|
|
// todo: make cancelable and accessible to multiple callers
|
|
func (c *Cluster) ListenClusterEvents() <-chan struct{} {
|
|
return c.configEvent
|
|
}
|
|
|
|
// Info returns information about the current cluster state.
|
|
func (c *Cluster) Info() types.Info {
|
|
var info types.Info
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if c.node == nil {
|
|
info.LocalNodeState = types.LocalNodeStateInactive
|
|
if c.cancelDelay != nil {
|
|
info.LocalNodeState = types.LocalNodeStateError
|
|
}
|
|
} else {
|
|
info.LocalNodeState = types.LocalNodeStatePending
|
|
if c.ready == true {
|
|
info.LocalNodeState = types.LocalNodeStateActive
|
|
}
|
|
}
|
|
if c.err != nil {
|
|
info.Error = c.err.Error()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
if c.isActiveManager() {
|
|
info.ControlAvailable = true
|
|
if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
|
|
info.Nodes = len(r.Nodes)
|
|
for _, n := range r.Nodes {
|
|
if n.ManagerStatus != nil {
|
|
info.Managers = info.Managers + 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if c.node != nil {
|
|
for _, r := range c.node.Remotes() {
|
|
info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
|
|
}
|
|
info.NodeID = c.node.NodeID()
|
|
}
|
|
|
|
return info
|
|
}
|
|
|
|
// isActiveManager should not be called without a read lock
|
|
func (c *Cluster) isActiveManager() bool {
|
|
return c.node != nil && c.conn != nil
|
|
}
|
|
|
|
// errNoManager returns error describing why manager commands can't be used.
|
|
// Call with read lock.
|
|
func (c *Cluster) errNoManager() error {
|
|
if c.node == nil {
|
|
return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
|
|
}
|
|
if c.node.Manager() != nil {
|
|
return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
|
|
}
|
|
return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
|
|
}
|
|
|
|
// GetServices returns all services of a managed swarm cluster.
|
|
func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return nil, c.errNoManager()
|
|
}
|
|
|
|
filters, err := newListServicesFilters(options.Filter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
r, err := c.client.ListServices(
|
|
ctx,
|
|
&swarmapi.ListServicesRequest{Filters: filters})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
services := []types.Service{}
|
|
|
|
for _, service := range r.Services {
|
|
services = append(services, convert.ServiceFromGRPC(*service))
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
// CreateService creates a new service in a managed swarm cluster.
|
|
func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (string, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return "", c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
err := populateNetworkID(ctx, c.client, &s)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
serviceSpec, err := convert.ServiceSpecToGRPC(s)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if encodedAuth != "" {
|
|
ctnr := serviceSpec.Task.GetContainer()
|
|
if ctnr == nil {
|
|
return "", fmt.Errorf("service does not use container tasks")
|
|
}
|
|
ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
|
|
}
|
|
|
|
r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return r.Service.ID, nil
|
|
}
|
|
|
|
// GetService returns a service based on an ID or name.
|
|
func (c *Cluster) GetService(input string) (types.Service, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return types.Service{}, c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
service, err := getService(ctx, c.client, input)
|
|
if err != nil {
|
|
return types.Service{}, err
|
|
}
|
|
return convert.ServiceFromGRPC(*service), nil
|
|
}
|
|
|
|
// UpdateService updates existing service to match new properties.
|
|
func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec, encodedAuth string) error {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
err := populateNetworkID(ctx, c.client, &spec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
serviceSpec, err := convert.ServiceSpecToGRPC(spec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if encodedAuth != "" {
|
|
ctnr := serviceSpec.Task.GetContainer()
|
|
if ctnr == nil {
|
|
return fmt.Errorf("service does not use container tasks")
|
|
}
|
|
ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
|
|
} else {
|
|
// this is needed because if the encodedAuth isn't being updated then we
|
|
// shouldn't lose it, and continue to use the one that was already present
|
|
currentService, err := getService(ctx, c.client, serviceID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctnr := currentService.Spec.Task.GetContainer()
|
|
if ctnr == nil {
|
|
return fmt.Errorf("service does not use container tasks")
|
|
}
|
|
serviceSpec.Task.GetContainer().PullOptions = ctnr.PullOptions
|
|
}
|
|
|
|
_, err = c.client.UpdateService(
|
|
ctx,
|
|
&swarmapi.UpdateServiceRequest{
|
|
ServiceID: serviceID,
|
|
Spec: &serviceSpec,
|
|
ServiceVersion: &swarmapi.Version{
|
|
Index: version,
|
|
},
|
|
},
|
|
)
|
|
return err
|
|
}
|
|
|
|
// RemoveService removes a service from a managed swarm cluster.
|
|
func (c *Cluster) RemoveService(input string) error {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
service, err := getService(ctx, c.client, input)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetNodes returns a list of all nodes known to a cluster.
|
|
func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return nil, c.errNoManager()
|
|
}
|
|
|
|
filters, err := newListNodesFilters(options.Filter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
r, err := c.client.ListNodes(
|
|
ctx,
|
|
&swarmapi.ListNodesRequest{Filters: filters})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nodes := []types.Node{}
|
|
|
|
for _, node := range r.Nodes {
|
|
nodes = append(nodes, convert.NodeFromGRPC(*node))
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
// GetNode returns a node based on an ID or name.
|
|
func (c *Cluster) GetNode(input string) (types.Node, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return types.Node{}, c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
node, err := getNode(ctx, c.client, input)
|
|
if err != nil {
|
|
return types.Node{}, err
|
|
}
|
|
return convert.NodeFromGRPC(*node), nil
|
|
}
|
|
|
|
// UpdateNode updates existing nodes properties.
|
|
func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return c.errNoManager()
|
|
}
|
|
|
|
nodeSpec, err := convert.NodeSpecToGRPC(spec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
_, err = c.client.UpdateNode(
|
|
ctx,
|
|
&swarmapi.UpdateNodeRequest{
|
|
NodeID: nodeID,
|
|
Spec: &nodeSpec,
|
|
NodeVersion: &swarmapi.Version{
|
|
Index: version,
|
|
},
|
|
},
|
|
)
|
|
return err
|
|
}
|
|
|
|
// RemoveNode removes a node from a cluster
|
|
func (c *Cluster) RemoveNode(input string) error {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
node, err := getNode(ctx, c.client, input)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetTasks returns a list of tasks matching the filter options.
|
|
func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return nil, c.errNoManager()
|
|
}
|
|
|
|
byName := func(filter filters.Args) error {
|
|
if filter.Include("service") {
|
|
serviceFilters := filter.Get("service")
|
|
for _, serviceFilter := range serviceFilters {
|
|
service, err := c.GetService(serviceFilter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
filter.Del("service", serviceFilter)
|
|
filter.Add("service", service.ID)
|
|
}
|
|
}
|
|
if filter.Include("node") {
|
|
nodeFilters := filter.Get("node")
|
|
for _, nodeFilter := range nodeFilters {
|
|
node, err := c.GetNode(nodeFilter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
filter.Del("node", nodeFilter)
|
|
filter.Add("node", node.ID)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
filters, err := newListTasksFilters(options.Filter, byName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
r, err := c.client.ListTasks(
|
|
ctx,
|
|
&swarmapi.ListTasksRequest{Filters: filters})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tasks := []types.Task{}
|
|
|
|
for _, task := range r.Tasks {
|
|
tasks = append(tasks, convert.TaskFromGRPC(*task))
|
|
}
|
|
return tasks, nil
|
|
}
|
|
|
|
// GetTask returns a task by an ID.
|
|
func (c *Cluster) GetTask(input string) (types.Task, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return types.Task{}, c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
task, err := getTask(ctx, c.client, input)
|
|
if err != nil {
|
|
return types.Task{}, err
|
|
}
|
|
return convert.TaskFromGRPC(*task), nil
|
|
}
|
|
|
|
// GetNetwork returns a cluster network by an ID.
|
|
func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return apitypes.NetworkResource{}, c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
network, err := getNetwork(ctx, c.client, input)
|
|
if err != nil {
|
|
return apitypes.NetworkResource{}, err
|
|
}
|
|
return convert.BasicNetworkFromGRPC(*network), nil
|
|
}
|
|
|
|
// GetNetworks returns all current cluster managed networks.
|
|
func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return nil, c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var networks []apitypes.NetworkResource
|
|
|
|
for _, network := range r.Networks {
|
|
networks = append(networks, convert.BasicNetworkFromGRPC(*network))
|
|
}
|
|
|
|
return networks, nil
|
|
}
|
|
|
|
// CreateNetwork creates a new cluster managed network.
|
|
func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return "", c.errNoManager()
|
|
}
|
|
|
|
if runconfig.IsPreDefinedNetwork(s.Name) {
|
|
err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
|
|
return "", errors.NewRequestForbiddenError(err)
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
networkSpec := convert.BasicNetworkCreateToGRPC(s)
|
|
r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return r.Network.ID, nil
|
|
}
|
|
|
|
// RemoveNetwork removes a cluster network.
|
|
func (c *Cluster) RemoveNetwork(input string) error {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
if !c.isActiveManager() {
|
|
return c.errNoManager()
|
|
}
|
|
|
|
ctx, cancel := c.getRequestContext()
|
|
defer cancel()
|
|
|
|
network, err := getNetwork(ctx, c.client, input)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error {
|
|
for i, n := range s.Networks {
|
|
apiNetwork, err := getNetwork(ctx, c, n.Target)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.Networks[i].Target = apiNetwork.ID
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
|
|
// GetNetwork to match via full ID.
|
|
rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
|
|
if err != nil {
|
|
// If any error (including NotFound), ListNetworks to match via ID prefix and full name.
|
|
rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
|
|
if err != nil || len(rl.Networks) == 0 {
|
|
rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(rl.Networks) == 0 {
|
|
return nil, fmt.Errorf("network %s not found", input)
|
|
}
|
|
|
|
if l := len(rl.Networks); l > 1 {
|
|
return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
|
|
}
|
|
|
|
return rl.Networks[0], nil
|
|
}
|
|
return rg.Network, nil
|
|
}
|
|
|
|
// Cleanup stops active swarm node. This is run before daemon shutdown.
|
|
func (c *Cluster) Cleanup() {
|
|
c.Lock()
|
|
node := c.node
|
|
if node == nil {
|
|
c.Unlock()
|
|
return
|
|
}
|
|
defer c.Unlock()
|
|
if c.isActiveManager() {
|
|
active, reachable, unreachable, err := c.managerStats()
|
|
if err == nil {
|
|
singlenode := active && reachable == 1 && unreachable == 0
|
|
if active && !singlenode && reachable-2 <= unreachable {
|
|
logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
|
|
}
|
|
}
|
|
}
|
|
c.stopNode()
|
|
}
|
|
|
|
func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
|
|
if err != nil {
|
|
return false, 0, 0, err
|
|
}
|
|
for _, n := range nodes.Nodes {
|
|
if n.ManagerStatus != nil {
|
|
if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
|
|
reachable++
|
|
if n.ID == c.node.NodeID() {
|
|
current = true
|
|
}
|
|
}
|
|
if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
|
|
unreachable++
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func validateAndSanitizeInitRequest(req *types.InitRequest) error {
|
|
var err error
|
|
req.ListenAddr, err = validateAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
|
|
}
|
|
|
|
spec := &req.Spec
|
|
// provide sane defaults instead of erroring
|
|
if spec.Name == "" {
|
|
spec.Name = "default"
|
|
}
|
|
if spec.Raft.SnapshotInterval == 0 {
|
|
spec.Raft.SnapshotInterval = defaultSpec.Raft.SnapshotInterval
|
|
}
|
|
if spec.Raft.LogEntriesForSlowFollowers == 0 {
|
|
spec.Raft.LogEntriesForSlowFollowers = defaultSpec.Raft.LogEntriesForSlowFollowers
|
|
}
|
|
if spec.Raft.ElectionTick == 0 {
|
|
spec.Raft.ElectionTick = defaultSpec.Raft.ElectionTick
|
|
}
|
|
if spec.Raft.HeartbeatTick == 0 {
|
|
spec.Raft.HeartbeatTick = defaultSpec.Raft.HeartbeatTick
|
|
}
|
|
if spec.Dispatcher.HeartbeatPeriod == 0 {
|
|
spec.Dispatcher.HeartbeatPeriod = defaultSpec.Dispatcher.HeartbeatPeriod
|
|
}
|
|
if spec.CAConfig.NodeCertExpiry == 0 {
|
|
spec.CAConfig.NodeCertExpiry = defaultSpec.CAConfig.NodeCertExpiry
|
|
}
|
|
if spec.Orchestration.TaskHistoryRetentionLimit == 0 {
|
|
spec.Orchestration.TaskHistoryRetentionLimit = defaultSpec.Orchestration.TaskHistoryRetentionLimit
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
|
|
var err error
|
|
req.ListenAddr, err = validateAddr(req.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
|
|
}
|
|
if len(req.RemoteAddrs) == 0 {
|
|
return fmt.Errorf("at least 1 RemoteAddr is required to join")
|
|
}
|
|
for i := range req.RemoteAddrs {
|
|
req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
|
|
if err != nil {
|
|
return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func validateAddr(addr string) (string, error) {
|
|
if addr == "" {
|
|
return addr, fmt.Errorf("invalid empty address")
|
|
}
|
|
newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
|
|
if err != nil {
|
|
return addr, nil
|
|
}
|
|
return strings.TrimPrefix(newaddr, "tcp://"), nil
|
|
}
|
|
|
|
func initClusterSpec(node *node, spec types.Spec) error {
|
|
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
|
for conn := range node.ListenControlSocket(ctx) {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
if conn != nil {
|
|
client := swarmapi.NewControlClient(conn)
|
|
var cluster *swarmapi.Cluster
|
|
for i := 0; ; i++ {
|
|
lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("error on listing clusters: %v", err)
|
|
}
|
|
if len(lcr.Clusters) == 0 {
|
|
if i < 10 {
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
return fmt.Errorf("empty list of clusters was returned")
|
|
}
|
|
cluster = lcr.Clusters[0]
|
|
break
|
|
}
|
|
newspec, err := convert.SwarmSpecToGRPC(spec)
|
|
if err != nil {
|
|
return fmt.Errorf("error updating cluster settings: %v", err)
|
|
}
|
|
_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
|
|
ClusterID: cluster.ID,
|
|
ClusterVersion: &cluster.Meta.Version,
|
|
Spec: &newspec,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error updating cluster settings: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
return ctx.Err()
|
|
}
|