mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
32180ac0c7
This commit contains changes to configure DataPathPort option. By default we use 4789 port number. But this commit will allow user to configure port number during swarm init. DataPathPort can't be modified after swarm init. Signed-off-by: selansen <elango.siva@docker.com>
400 lines
11 KiB
Go
400 lines
11 KiB
Go
package cluster // import "github.com/docker/docker/daemon/cluster"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
types "github.com/docker/docker/api/types/swarm"
|
|
"github.com/docker/docker/daemon/cluster/executor/container"
|
|
lncluster "github.com/docker/libnetwork/cluster"
|
|
swarmapi "github.com/docker/swarmkit/api"
|
|
swarmallocator "github.com/docker/swarmkit/manager/allocator/cnmallocator"
|
|
swarmnode "github.com/docker/swarmkit/node"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
|
|
type nodeRunner struct {
|
|
nodeState
|
|
mu sync.RWMutex
|
|
done chan struct{} // closed when swarmNode exits
|
|
ready chan struct{} // closed when swarmNode becomes active
|
|
reconnectDelay time.Duration
|
|
config nodeStartConfig
|
|
|
|
repeatedRun bool
|
|
cancelReconnect func()
|
|
stopping bool
|
|
cluster *Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
|
|
}
|
|
|
|
// nodeStartConfig holds configuration needed to start a new node. Exported
|
|
// fields of this structure are saved to disk in json. Unexported fields
|
|
// contain data that shouldn't be persisted between daemon reloads.
|
|
type nodeStartConfig struct {
|
|
// LocalAddr is this machine's local IP or hostname, if specified.
|
|
LocalAddr string
|
|
// RemoteAddr is the address that was given to "swarm join". It is used
|
|
// to find LocalAddr if necessary.
|
|
RemoteAddr string
|
|
// ListenAddr is the address we bind to, including a port.
|
|
ListenAddr string
|
|
// AdvertiseAddr is the address other nodes should connect to,
|
|
// including a port.
|
|
AdvertiseAddr string
|
|
// DataPathAddr is the address that has to be used for the data path
|
|
DataPathAddr string
|
|
// DefaultAddressPool contains list of subnets
|
|
DefaultAddressPool []string
|
|
// SubnetSize contains subnet size of DefaultAddressPool
|
|
SubnetSize uint32
|
|
// DataPathPort contains Data path port (VXLAN UDP port) number that is used for data traffic.
|
|
DataPathPort uint32
|
|
// JoinInProgress is set to true if a join operation has started, but
|
|
// not completed yet.
|
|
JoinInProgress bool
|
|
|
|
joinAddr string
|
|
forceNewCluster bool
|
|
joinToken string
|
|
lockKey []byte
|
|
autolock bool
|
|
availability types.NodeAvailability
|
|
}
|
|
|
|
func (n *nodeRunner) Ready() chan error {
|
|
c := make(chan error, 1)
|
|
n.mu.RLock()
|
|
ready, done := n.ready, n.done
|
|
n.mu.RUnlock()
|
|
go func() {
|
|
select {
|
|
case <-ready:
|
|
case <-done:
|
|
}
|
|
select {
|
|
case <-ready:
|
|
default:
|
|
n.mu.RLock()
|
|
c <- n.err
|
|
n.mu.RUnlock()
|
|
}
|
|
close(c)
|
|
}()
|
|
return c
|
|
}
|
|
|
|
func (n *nodeRunner) Start(conf nodeStartConfig) error {
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
|
|
n.reconnectDelay = initialReconnectDelay
|
|
|
|
return n.start(conf)
|
|
}
|
|
|
|
func (n *nodeRunner) start(conf nodeStartConfig) error {
|
|
var control string
|
|
if runtime.GOOS == "windows" {
|
|
control = `\\.\pipe\` + controlSocket
|
|
} else {
|
|
control = filepath.Join(n.cluster.runtimeRoot, controlSocket)
|
|
}
|
|
|
|
joinAddr := conf.joinAddr
|
|
if joinAddr == "" && conf.JoinInProgress {
|
|
// We must have been restarted while trying to join a cluster.
|
|
// Continue trying to join instead of forming our own cluster.
|
|
joinAddr = conf.RemoteAddr
|
|
}
|
|
|
|
// Hostname is not set here. Instead, it is obtained from
|
|
// the node description that is reported periodically
|
|
swarmnodeConfig := swarmnode.Config{
|
|
ForceNewCluster: conf.forceNewCluster,
|
|
ListenControlAPI: control,
|
|
ListenRemoteAPI: conf.ListenAddr,
|
|
AdvertiseRemoteAPI: conf.AdvertiseAddr,
|
|
NetworkConfig: &swarmallocator.NetworkConfig{
|
|
DefaultAddrPool: conf.DefaultAddressPool,
|
|
SubnetSize: conf.SubnetSize,
|
|
VXLANUDPPort: conf.DataPathPort,
|
|
},
|
|
JoinAddr: joinAddr,
|
|
StateDir: n.cluster.root,
|
|
JoinToken: conf.joinToken,
|
|
Executor: container.NewExecutor(
|
|
n.cluster.config.Backend,
|
|
n.cluster.config.PluginBackend,
|
|
n.cluster.config.ImageBackend,
|
|
n.cluster.config.VolumeBackend,
|
|
),
|
|
HeartbeatTick: n.cluster.config.RaftHeartbeatTick,
|
|
// Recommended value in etcd/raft is 10 x (HeartbeatTick).
|
|
// Lower values were seen to have caused instability because of
|
|
// frequent leader elections when running on flakey networks.
|
|
ElectionTick: n.cluster.config.RaftElectionTick,
|
|
UnlockKey: conf.lockKey,
|
|
AutoLockManagers: conf.autolock,
|
|
PluginGetter: n.cluster.config.Backend.PluginGetter(),
|
|
}
|
|
if conf.availability != "" {
|
|
avail, ok := swarmapi.NodeSpec_Availability_value[strings.ToUpper(string(conf.availability))]
|
|
if !ok {
|
|
return fmt.Errorf("invalid Availability: %q", conf.availability)
|
|
}
|
|
swarmnodeConfig.Availability = swarmapi.NodeSpec_Availability(avail)
|
|
}
|
|
node, err := swarmnode.New(&swarmnodeConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := node.Start(context.Background()); err != nil {
|
|
return err
|
|
}
|
|
|
|
n.done = make(chan struct{})
|
|
n.ready = make(chan struct{})
|
|
n.swarmNode = node
|
|
if conf.joinAddr != "" {
|
|
conf.JoinInProgress = true
|
|
}
|
|
n.config = conf
|
|
savePersistentState(n.cluster.root, conf)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
go func() {
|
|
n.handleNodeExit(node)
|
|
cancel()
|
|
}()
|
|
|
|
go n.handleReadyEvent(ctx, node, n.ready)
|
|
go n.handleControlSocketChange(ctx, node)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *nodeRunner) handleControlSocketChange(ctx context.Context, node *swarmnode.Node) {
|
|
for conn := range node.ListenControlSocket(ctx) {
|
|
n.mu.Lock()
|
|
if n.grpcConn != conn {
|
|
if conn == nil {
|
|
n.controlClient = nil
|
|
n.logsClient = nil
|
|
} else {
|
|
n.controlClient = swarmapi.NewControlClient(conn)
|
|
n.logsClient = swarmapi.NewLogsClient(conn)
|
|
// push store changes to daemon
|
|
go n.watchClusterEvents(ctx, conn)
|
|
}
|
|
}
|
|
n.grpcConn = conn
|
|
n.mu.Unlock()
|
|
n.cluster.SendClusterEvent(lncluster.EventSocketChange)
|
|
}
|
|
}
|
|
|
|
func (n *nodeRunner) watchClusterEvents(ctx context.Context, conn *grpc.ClientConn) {
|
|
client := swarmapi.NewWatchClient(conn)
|
|
watch, err := client.Watch(ctx, &swarmapi.WatchRequest{
|
|
Entries: []*swarmapi.WatchRequest_WatchEntry{
|
|
{
|
|
Kind: "node",
|
|
Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
|
|
},
|
|
{
|
|
Kind: "service",
|
|
Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
|
|
},
|
|
{
|
|
Kind: "network",
|
|
Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
|
|
},
|
|
{
|
|
Kind: "secret",
|
|
Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
|
|
},
|
|
{
|
|
Kind: "config",
|
|
Action: swarmapi.WatchActionKindCreate | swarmapi.WatchActionKindUpdate | swarmapi.WatchActionKindRemove,
|
|
},
|
|
},
|
|
IncludeOldObject: true,
|
|
})
|
|
if err != nil {
|
|
logrus.WithError(err).Error("failed to watch cluster store")
|
|
return
|
|
}
|
|
for {
|
|
msg, err := watch.Recv()
|
|
if err != nil {
|
|
// store watch is broken
|
|
errStatus, ok := status.FromError(err)
|
|
if !ok || errStatus.Code() != codes.Canceled {
|
|
logrus.WithError(err).Error("failed to receive changes from store watch API")
|
|
}
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case n.cluster.watchStream <- msg:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *nodeRunner) handleReadyEvent(ctx context.Context, node *swarmnode.Node, ready chan struct{}) {
|
|
select {
|
|
case <-node.Ready():
|
|
n.mu.Lock()
|
|
n.err = nil
|
|
if n.config.JoinInProgress {
|
|
n.config.JoinInProgress = false
|
|
savePersistentState(n.cluster.root, n.config)
|
|
}
|
|
n.mu.Unlock()
|
|
close(ready)
|
|
case <-ctx.Done():
|
|
}
|
|
n.cluster.SendClusterEvent(lncluster.EventNodeReady)
|
|
}
|
|
|
|
func (n *nodeRunner) handleNodeExit(node *swarmnode.Node) {
|
|
err := detectLockedError(node.Err(context.Background()))
|
|
if err != nil {
|
|
logrus.Errorf("cluster exited with error: %v", err)
|
|
}
|
|
n.mu.Lock()
|
|
n.swarmNode = nil
|
|
n.err = err
|
|
close(n.done)
|
|
select {
|
|
case <-n.ready:
|
|
n.enableReconnectWatcher()
|
|
default:
|
|
if n.repeatedRun {
|
|
n.enableReconnectWatcher()
|
|
}
|
|
}
|
|
n.repeatedRun = true
|
|
n.mu.Unlock()
|
|
}
|
|
|
|
// Stop stops the current swarm node if it is running.
|
|
func (n *nodeRunner) Stop() error {
|
|
n.mu.Lock()
|
|
if n.cancelReconnect != nil { // between restarts
|
|
n.cancelReconnect()
|
|
n.cancelReconnect = nil
|
|
}
|
|
if n.swarmNode == nil {
|
|
n.mu.Unlock()
|
|
return nil
|
|
}
|
|
n.stopping = true
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
n.mu.Unlock()
|
|
if err := n.swarmNode.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
|
|
return err
|
|
}
|
|
n.cluster.SendClusterEvent(lncluster.EventNodeLeave)
|
|
<-n.done
|
|
return nil
|
|
}
|
|
|
|
func (n *nodeRunner) State() nodeState {
|
|
if n == nil {
|
|
return nodeState{status: types.LocalNodeStateInactive}
|
|
}
|
|
n.mu.RLock()
|
|
defer n.mu.RUnlock()
|
|
|
|
ns := n.nodeState
|
|
|
|
if ns.err != nil || n.cancelReconnect != nil {
|
|
if errors.Cause(ns.err) == errSwarmLocked {
|
|
ns.status = types.LocalNodeStateLocked
|
|
} else {
|
|
ns.status = types.LocalNodeStateError
|
|
}
|
|
} else {
|
|
select {
|
|
case <-n.ready:
|
|
ns.status = types.LocalNodeStateActive
|
|
default:
|
|
ns.status = types.LocalNodeStatePending
|
|
}
|
|
}
|
|
|
|
return ns
|
|
}
|
|
|
|
func (n *nodeRunner) enableReconnectWatcher() {
|
|
if n.stopping {
|
|
return
|
|
}
|
|
n.reconnectDelay *= 2
|
|
if n.reconnectDelay > maxReconnectDelay {
|
|
n.reconnectDelay = maxReconnectDelay
|
|
}
|
|
logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
|
|
delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
|
|
n.cancelReconnect = cancel
|
|
|
|
go func() {
|
|
<-delayCtx.Done()
|
|
if delayCtx.Err() != context.DeadlineExceeded {
|
|
return
|
|
}
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
if n.stopping {
|
|
return
|
|
}
|
|
|
|
if err := n.start(n.config); err != nil {
|
|
n.err = err
|
|
}
|
|
}()
|
|
}
|
|
|
|
// nodeState represents information about the current state of the cluster and
|
|
// provides access to the grpc clients.
|
|
type nodeState struct {
|
|
swarmNode *swarmnode.Node
|
|
grpcConn *grpc.ClientConn
|
|
controlClient swarmapi.ControlClient
|
|
logsClient swarmapi.LogsClient
|
|
status types.LocalNodeState
|
|
actualLocalAddr string
|
|
err error
|
|
}
|
|
|
|
// IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
|
|
func (ns nodeState) IsActiveManager() bool {
|
|
return ns.controlClient != nil
|
|
}
|
|
|
|
// IsManager returns true if node is a manager.
|
|
func (ns nodeState) IsManager() bool {
|
|
return ns.swarmNode != nil && ns.swarmNode.Manager() != nil
|
|
}
|
|
|
|
// NodeID returns node's ID or empty string if node is inactive.
|
|
func (ns nodeState) NodeID() string {
|
|
if ns.swarmNode != nil {
|
|
return ns.swarmNode.NodeID()
|
|
}
|
|
return ""
|
|
}
|