mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
07efe6a0a7
Bumps the vendoring of github.com/docker/swarmkit to the above commit, which is the current master at commit time. Most notably, this includes a change making the ingress network respect the default address pool. Because of this change, a change to network integration tests was needed. Signed-off-by: Drew Erny <drew.erny@docker.com>
1253 lines
40 KiB
Go
1253 lines
40 KiB
Go
package manager
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/docker/docker/pkg/plugingetter"
|
|
"github.com/docker/go-events"
|
|
gmetrics "github.com/docker/go-metrics"
|
|
"github.com/docker/swarmkit/api"
|
|
"github.com/docker/swarmkit/ca"
|
|
"github.com/docker/swarmkit/connectionbroker"
|
|
"github.com/docker/swarmkit/identity"
|
|
"github.com/docker/swarmkit/log"
|
|
"github.com/docker/swarmkit/manager/allocator"
|
|
"github.com/docker/swarmkit/manager/allocator/cnmallocator"
|
|
"github.com/docker/swarmkit/manager/allocator/networkallocator"
|
|
"github.com/docker/swarmkit/manager/controlapi"
|
|
"github.com/docker/swarmkit/manager/dispatcher"
|
|
"github.com/docker/swarmkit/manager/drivers"
|
|
"github.com/docker/swarmkit/manager/health"
|
|
"github.com/docker/swarmkit/manager/keymanager"
|
|
"github.com/docker/swarmkit/manager/logbroker"
|
|
"github.com/docker/swarmkit/manager/metrics"
|
|
"github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
|
|
"github.com/docker/swarmkit/manager/orchestrator/global"
|
|
"github.com/docker/swarmkit/manager/orchestrator/replicated"
|
|
"github.com/docker/swarmkit/manager/orchestrator/taskreaper"
|
|
"github.com/docker/swarmkit/manager/resourceapi"
|
|
"github.com/docker/swarmkit/manager/scheduler"
|
|
"github.com/docker/swarmkit/manager/state/raft"
|
|
"github.com/docker/swarmkit/manager/state/raft/transport"
|
|
"github.com/docker/swarmkit/manager/state/store"
|
|
"github.com/docker/swarmkit/manager/watchapi"
|
|
"github.com/docker/swarmkit/remotes"
|
|
"github.com/docker/swarmkit/xnet"
|
|
gogotypes "github.com/gogo/protobuf/types"
|
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
)
|
|
|
|
const (
|
|
// defaultTaskHistoryRetentionLimit is the number of tasks to keep.
|
|
defaultTaskHistoryRetentionLimit = 5
|
|
)
|
|
|
|
// RemoteAddrs provides a listening address and an optional advertise address
|
|
// for serving the remote API.
|
|
type RemoteAddrs struct {
|
|
// Address to bind
|
|
ListenAddr string
|
|
|
|
// Address to advertise to remote nodes (optional).
|
|
AdvertiseAddr string
|
|
}
|
|
|
|
// Config is used to tune the Manager.
|
|
type Config struct {
|
|
SecurityConfig *ca.SecurityConfig
|
|
|
|
// RootCAPaths is the path to which new root certs should be save
|
|
RootCAPaths ca.CertPaths
|
|
|
|
// ExternalCAs is a list of initial CAs to which a manager node
|
|
// will make certificate signing requests for node certificates.
|
|
ExternalCAs []*api.ExternalCA
|
|
|
|
// ControlAPI is an address for serving the control API.
|
|
ControlAPI string
|
|
|
|
// RemoteAPI is a listening address for serving the remote API, and
|
|
// an optional advertise address.
|
|
RemoteAPI *RemoteAddrs
|
|
|
|
// JoinRaft is an optional address of a node in an existing raft
|
|
// cluster to join.
|
|
JoinRaft string
|
|
|
|
// ForceJoin causes us to invoke raft's Join RPC even if already part
|
|
// of a cluster.
|
|
ForceJoin bool
|
|
|
|
// StateDir is the top-level state directory
|
|
StateDir string
|
|
|
|
// ForceNewCluster defines if we have to force a new cluster
|
|
// because we are recovering from a backup data directory.
|
|
ForceNewCluster bool
|
|
|
|
// ElectionTick defines the amount of ticks needed without
|
|
// leader to trigger a new election
|
|
ElectionTick uint32
|
|
|
|
// HeartbeatTick defines the amount of ticks between each
|
|
// heartbeat sent to other members for health-check purposes
|
|
HeartbeatTick uint32
|
|
|
|
// AutoLockManagers determines whether or not managers require an unlock key
|
|
// when starting from a stopped state. This configuration parameter is only
|
|
// applicable when bootstrapping a new cluster for the first time.
|
|
AutoLockManagers bool
|
|
|
|
// UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
|
|
// as well as the raft data encryption key (DEK). It is applicable when
|
|
// bootstrapping a cluster for the first time (it's a cluster-wide setting),
|
|
// and also when loading up any raft data on disk (as a KEK for the raft DEK).
|
|
UnlockKey []byte
|
|
|
|
// Availability allows a user to control the current scheduling status of a node
|
|
Availability api.NodeSpec_Availability
|
|
|
|
// PluginGetter provides access to docker's plugin inventory.
|
|
PluginGetter plugingetter.PluginGetter
|
|
|
|
// FIPS is a boolean stating whether the node is FIPS enabled - if this is the
|
|
// first node in the cluster, this setting is used to set the cluster-wide mandatory
|
|
// FIPS setting.
|
|
FIPS bool
|
|
|
|
// NetworkConfig stores network related config for the cluster
|
|
NetworkConfig *cnmallocator.NetworkConfig
|
|
}
|
|
|
|
// Manager is the cluster manager for Swarm.
|
|
// This is the high-level object holding and initializing all the manager
|
|
// subsystems.
|
|
type Manager struct {
|
|
config Config
|
|
|
|
collector *metrics.Collector
|
|
caserver *ca.Server
|
|
dispatcher *dispatcher.Dispatcher
|
|
logbroker *logbroker.LogBroker
|
|
watchServer *watchapi.Server
|
|
replicatedOrchestrator *replicated.Orchestrator
|
|
globalOrchestrator *global.Orchestrator
|
|
taskReaper *taskreaper.TaskReaper
|
|
constraintEnforcer *constraintenforcer.ConstraintEnforcer
|
|
scheduler *scheduler.Scheduler
|
|
allocator *allocator.Allocator
|
|
keyManager *keymanager.KeyManager
|
|
server *grpc.Server
|
|
localserver *grpc.Server
|
|
raftNode *raft.Node
|
|
dekRotator *RaftDEKManager
|
|
roleManager *roleManager
|
|
|
|
cancelFunc context.CancelFunc
|
|
|
|
// mu is a general mutex used to coordinate starting/stopping and
|
|
// leadership events.
|
|
mu sync.Mutex
|
|
// addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
|
|
addrMu sync.Mutex
|
|
|
|
started chan struct{}
|
|
stopped bool
|
|
|
|
remoteListener chan net.Listener
|
|
controlListener chan net.Listener
|
|
errServe chan error
|
|
}
|
|
|
|
var (
|
|
leaderMetric gmetrics.Gauge
|
|
)
|
|
|
|
func init() {
|
|
ns := gmetrics.NewNamespace("swarm", "manager", nil)
|
|
leaderMetric = ns.NewGauge("leader", "Indicates if this manager node is a leader", "")
|
|
gmetrics.Register(ns)
|
|
}
|
|
|
|
type closeOnceListener struct {
|
|
once sync.Once
|
|
net.Listener
|
|
}
|
|
|
|
func (l *closeOnceListener) Close() error {
|
|
var err error
|
|
l.once.Do(func() {
|
|
err = l.Listener.Close()
|
|
})
|
|
return err
|
|
}
|
|
|
|
// New creates a Manager which has not started to accept requests yet.
|
|
func New(config *Config) (*Manager, error) {
|
|
err := os.MkdirAll(config.StateDir, 0700)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create state directory")
|
|
}
|
|
|
|
raftStateDir := filepath.Join(config.StateDir, "raft")
|
|
err = os.MkdirAll(raftStateDir, 0700)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create raft state directory")
|
|
}
|
|
|
|
raftCfg := raft.DefaultNodeConfig()
|
|
|
|
if config.ElectionTick > 0 {
|
|
raftCfg.ElectionTick = int(config.ElectionTick)
|
|
}
|
|
if config.HeartbeatTick > 0 {
|
|
raftCfg.HeartbeatTick = int(config.HeartbeatTick)
|
|
}
|
|
|
|
dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter(), config.FIPS)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
newNodeOpts := raft.NodeOptions{
|
|
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
|
|
JoinAddr: config.JoinRaft,
|
|
ForceJoin: config.ForceJoin,
|
|
Config: raftCfg,
|
|
StateDir: raftStateDir,
|
|
ForceNewCluster: config.ForceNewCluster,
|
|
TLSCredentials: config.SecurityConfig.ClientTLSCreds,
|
|
KeyRotator: dekRotator,
|
|
FIPS: config.FIPS,
|
|
}
|
|
raftNode := raft.NewNode(newNodeOpts)
|
|
|
|
// the interceptorWrappers are functions that wrap the prometheus grpc
|
|
// interceptor, and add some of code to log errors locally. one for stream
|
|
// and one for unary. this is needed because the grpc unary interceptor
|
|
// doesn't natively do chaining, you have to implement it in the caller.
|
|
// note that even though these are logging errors, we're still using
|
|
// debug level. returning errors from GRPC methods is common and expected,
|
|
// and logging an ERROR every time a user mistypes a service name would
|
|
// pollute the logs really fast.
|
|
//
|
|
// NOTE(dperny): Because of the fact that these functions are very simple
|
|
// in their operation and have no side effects other than the log output,
|
|
// they are not automatically tested. If you modify them later, make _sure_
|
|
// that they are correct. If you add substantial side effects, abstract
|
|
// these out and test them!
|
|
unaryInterceptorWrapper := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
// pass the call down into the grpc_prometheus interceptor
|
|
resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
|
if err != nil {
|
|
log.G(ctx).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling rpc")
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
streamInterceptorWrapper := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
|
// we can't re-write a stream context, so don't bother creating a
|
|
// sub-context like in unary methods
|
|
// pass the call down into the grpc_prometheus interceptor
|
|
err := grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler)
|
|
if err != nil {
|
|
log.G(ss.Context()).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling streaming rpc")
|
|
}
|
|
return err
|
|
}
|
|
|
|
opts := []grpc.ServerOption{
|
|
grpc.Creds(config.SecurityConfig.ServerTLSCreds),
|
|
grpc.StreamInterceptor(streamInterceptorWrapper),
|
|
grpc.UnaryInterceptor(unaryInterceptorWrapper),
|
|
grpc.MaxRecvMsgSize(transport.GRPCMaxMsgSize),
|
|
}
|
|
|
|
m := &Manager{
|
|
config: *config,
|
|
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
|
|
dispatcher: dispatcher.New(),
|
|
logbroker: logbroker.New(raftNode.MemoryStore()),
|
|
watchServer: watchapi.NewServer(raftNode.MemoryStore()),
|
|
server: grpc.NewServer(opts...),
|
|
localserver: grpc.NewServer(opts...),
|
|
raftNode: raftNode,
|
|
started: make(chan struct{}),
|
|
dekRotator: dekRotator,
|
|
remoteListener: make(chan net.Listener, 1),
|
|
controlListener: make(chan net.Listener, 1),
|
|
errServe: make(chan error, 2),
|
|
}
|
|
|
|
if config.ControlAPI != "" {
|
|
m.config.ControlAPI = ""
|
|
if err := m.BindControl(config.ControlAPI); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if config.RemoteAPI != nil {
|
|
m.config.RemoteAPI = nil
|
|
// The context isn't used in this case (before (*Manager).Run).
|
|
if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
|
|
if config.ControlAPI != "" {
|
|
l := <-m.controlListener
|
|
l.Close()
|
|
}
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// BindControl binds a local socket for the control API.
|
|
func (m *Manager) BindControl(addr string) error {
|
|
m.addrMu.Lock()
|
|
defer m.addrMu.Unlock()
|
|
|
|
if m.config.ControlAPI != "" {
|
|
return errors.New("manager already has a control API address")
|
|
}
|
|
|
|
// don't create a socket directory if we're on windows. we used named pipe
|
|
if runtime.GOOS != "windows" {
|
|
err := os.MkdirAll(filepath.Dir(addr), 0700)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to create socket directory")
|
|
}
|
|
}
|
|
|
|
l, err := xnet.ListenLocal(addr)
|
|
|
|
// A unix socket may fail to bind if the file already
|
|
// exists. Try replacing the file.
|
|
if runtime.GOOS != "windows" {
|
|
unwrappedErr := err
|
|
if op, ok := unwrappedErr.(*net.OpError); ok {
|
|
unwrappedErr = op.Err
|
|
}
|
|
if sys, ok := unwrappedErr.(*os.SyscallError); ok {
|
|
unwrappedErr = sys.Err
|
|
}
|
|
if unwrappedErr == syscall.EADDRINUSE {
|
|
os.Remove(addr)
|
|
l, err = xnet.ListenLocal(addr)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to listen on control API address")
|
|
}
|
|
|
|
m.config.ControlAPI = addr
|
|
m.controlListener <- l
|
|
return nil
|
|
}
|
|
|
|
// BindRemote binds a port for the remote API.
|
|
func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
|
|
m.addrMu.Lock()
|
|
defer m.addrMu.Unlock()
|
|
|
|
if m.config.RemoteAPI != nil {
|
|
return errors.New("manager already has remote API address")
|
|
}
|
|
|
|
// If an AdvertiseAddr was specified, we use that as our
|
|
// externally-reachable address.
|
|
advertiseAddr := addrs.AdvertiseAddr
|
|
|
|
var advertiseAddrPort string
|
|
if advertiseAddr == "" {
|
|
// Otherwise, we know we are joining an existing swarm. Use a
|
|
// wildcard address to trigger remote autodetection of our
|
|
// address.
|
|
var err error
|
|
_, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
|
|
}
|
|
|
|
// Even with an IPv6 listening address, it's okay to use
|
|
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
|
|
// be substituted with the actual source address.
|
|
advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
|
|
}
|
|
|
|
l, err := net.Listen("tcp", addrs.ListenAddr)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to listen on remote API address")
|
|
}
|
|
if advertiseAddrPort == "0" {
|
|
advertiseAddr = l.Addr().String()
|
|
addrs.ListenAddr = advertiseAddr
|
|
}
|
|
|
|
m.config.RemoteAPI = &addrs
|
|
|
|
m.raftNode.SetAddr(ctx, advertiseAddr)
|
|
m.remoteListener <- l
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemovedFromRaft returns a channel that's closed if the manager is removed
|
|
// from the raft cluster. This should be used to trigger a manager shutdown.
|
|
func (m *Manager) RemovedFromRaft() <-chan struct{} {
|
|
return m.raftNode.RemovedFromRaft
|
|
}
|
|
|
|
// Addr returns tcp address on which remote api listens.
|
|
func (m *Manager) Addr() string {
|
|
m.addrMu.Lock()
|
|
defer m.addrMu.Unlock()
|
|
|
|
if m.config.RemoteAPI == nil {
|
|
return ""
|
|
}
|
|
return m.config.RemoteAPI.ListenAddr
|
|
}
|
|
|
|
// Run starts all manager sub-systems and the gRPC server at the configured
|
|
// address.
|
|
// The call never returns unless an error occurs or `Stop()` is called.
|
|
func (m *Manager) Run(parent context.Context) error {
|
|
ctx, ctxCancel := context.WithCancel(parent)
|
|
defer ctxCancel()
|
|
|
|
m.cancelFunc = ctxCancel
|
|
|
|
leadershipCh, cancel := m.raftNode.SubscribeLeadership()
|
|
defer cancel()
|
|
|
|
go m.handleLeadershipEvents(ctx, leadershipCh)
|
|
|
|
authorize := func(ctx context.Context, roles []string) error {
|
|
var (
|
|
blacklistedCerts map[string]*api.BlacklistedCertificate
|
|
clusters []*api.Cluster
|
|
err error
|
|
)
|
|
|
|
m.raftNode.MemoryStore().View(func(readTx store.ReadTx) {
|
|
clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
|
|
|
|
})
|
|
|
|
// Not having a cluster object yet means we can't check
|
|
// the blacklist.
|
|
if err == nil && len(clusters) == 1 {
|
|
blacklistedCerts = clusters[0].BlacklistedCertificates
|
|
}
|
|
|
|
// Authorize the remote roles, ensure they can only be forwarded by managers
|
|
_, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts)
|
|
return err
|
|
}
|
|
|
|
baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.config.PluginGetter, drivers.New(m.config.PluginGetter))
|
|
baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
|
|
healthServer := health.NewHealthServer()
|
|
localHealthServer := health.NewHealthServer()
|
|
|
|
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
|
|
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
|
|
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
|
|
authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
|
|
authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
|
|
authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
|
|
authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
|
|
authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
|
|
authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize)
|
|
authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize)
|
|
authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize)
|
|
|
|
proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
|
|
proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
|
|
proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
|
|
proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
|
|
proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
|
|
proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
|
|
|
|
// The following local proxies are only wired up to receive requests
|
|
// from a trusted local socket, and these requests don't use TLS,
|
|
// therefore the requests they handle locally should bypass
|
|
// authorization. When requests are proxied from these servers, they
|
|
// are sent as requests from this manager rather than forwarded
|
|
// requests (it has no TLS information to put in the metadata map).
|
|
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
|
|
handleRequestLocally := func(ctx context.Context) (context.Context, error) {
|
|
remoteAddr := "127.0.0.1:0"
|
|
|
|
m.addrMu.Lock()
|
|
if m.config.RemoteAPI != nil {
|
|
if m.config.RemoteAPI.AdvertiseAddr != "" {
|
|
remoteAddr = m.config.RemoteAPI.AdvertiseAddr
|
|
} else {
|
|
remoteAddr = m.config.RemoteAPI.ListenAddr
|
|
}
|
|
}
|
|
m.addrMu.Unlock()
|
|
|
|
creds := m.config.SecurityConfig.ClientTLSCreds
|
|
|
|
nodeInfo := ca.RemoteNodeInfo{
|
|
Roles: []string{creds.Role()},
|
|
Organization: creds.Organization(),
|
|
NodeID: creds.NodeID(),
|
|
RemoteAddr: remoteAddr,
|
|
}
|
|
|
|
return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil
|
|
}
|
|
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
|
|
localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
|
|
localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
|
|
localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
|
|
localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
|
|
localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
|
|
localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
|
|
|
|
// Everything registered on m.server should be an authenticated
|
|
// wrapper, or a proxy wrapping an authenticated wrapper!
|
|
api.RegisterCAServer(m.server, proxyCAAPI)
|
|
api.RegisterNodeCAServer(m.server, proxyNodeCAAPI)
|
|
api.RegisterRaftServer(m.server, authenticatedRaftAPI)
|
|
api.RegisterHealthServer(m.server, authenticatedHealthAPI)
|
|
api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
|
|
api.RegisterControlServer(m.server, authenticatedControlAPI)
|
|
api.RegisterWatchServer(m.server, authenticatedWatchAPI)
|
|
api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
|
|
api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
|
|
api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
|
|
api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
|
|
grpc_prometheus.Register(m.server)
|
|
|
|
api.RegisterControlServer(m.localserver, localProxyControlAPI)
|
|
api.RegisterWatchServer(m.localserver, m.watchServer)
|
|
api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
|
|
api.RegisterHealthServer(m.localserver, localHealthServer)
|
|
api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
|
|
api.RegisterCAServer(m.localserver, localProxyCAAPI)
|
|
api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI)
|
|
api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI)
|
|
api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI)
|
|
grpc_prometheus.Register(m.localserver)
|
|
|
|
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
|
|
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
|
|
|
|
if err := m.watchServer.Start(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("watch server failed to start")
|
|
}
|
|
|
|
go m.serveListener(ctx, m.remoteListener)
|
|
go m.serveListener(ctx, m.controlListener)
|
|
|
|
defer func() {
|
|
m.server.Stop()
|
|
m.localserver.Stop()
|
|
}()
|
|
|
|
// Set the raft server as serving for the health server
|
|
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
|
|
|
|
if err := m.raftNode.JoinAndStart(ctx); err != nil {
|
|
// Don't block future calls to Stop.
|
|
close(m.started)
|
|
return errors.Wrap(err, "can't initialize raft node")
|
|
}
|
|
|
|
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
|
|
|
|
// Start metrics collection.
|
|
|
|
m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
|
|
go func(collector *metrics.Collector) {
|
|
if err := collector.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("collector failed with an error")
|
|
}
|
|
}(m.collector)
|
|
|
|
close(m.started)
|
|
|
|
go func() {
|
|
err := m.raftNode.Run(ctx)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("raft node stopped")
|
|
m.Stop(ctx, false)
|
|
}
|
|
}()
|
|
|
|
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
|
|
return err
|
|
}
|
|
|
|
c, err := raft.WaitForCluster(ctx, m.raftNode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
raftConfig := c.Spec.Raft
|
|
|
|
if err := m.watchForClusterChanges(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
|
|
log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick)
|
|
}
|
|
if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick {
|
|
log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick)
|
|
}
|
|
|
|
// wait for an error in serving.
|
|
err = <-m.errServe
|
|
m.mu.Lock()
|
|
if m.stopped {
|
|
m.mu.Unlock()
|
|
return nil
|
|
}
|
|
m.mu.Unlock()
|
|
m.Stop(ctx, false)
|
|
|
|
return err
|
|
}
|
|
|
|
const stopTimeout = 8 * time.Second
|
|
|
|
// Stop stops the manager. It immediately closes all open connections and
|
|
// active RPCs as well as stopping the manager's subsystems. If clearData is
|
|
// set, the raft logs, snapshots, and keys will be erased.
|
|
func (m *Manager) Stop(ctx context.Context, clearData bool) {
|
|
log.G(ctx).Info("Stopping manager")
|
|
// It's not safe to start shutting down while the manager is still
|
|
// starting up.
|
|
<-m.started
|
|
|
|
// the mutex stops us from trying to stop while we're already stopping, or
|
|
// from returning before we've finished stopping.
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if m.stopped {
|
|
return
|
|
}
|
|
m.stopped = true
|
|
|
|
srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
|
|
go func() {
|
|
m.server.GracefulStop()
|
|
close(srvDone)
|
|
}()
|
|
go func() {
|
|
m.localserver.GracefulStop()
|
|
close(localSrvDone)
|
|
}()
|
|
|
|
m.raftNode.Cancel()
|
|
|
|
if m.collector != nil {
|
|
m.collector.Stop()
|
|
}
|
|
|
|
// The following components are gRPC services that are
|
|
// registered when creating the manager and will need
|
|
// to be re-registered if they are recreated.
|
|
// For simplicity, they are not nilled out.
|
|
m.dispatcher.Stop()
|
|
m.logbroker.Stop()
|
|
m.watchServer.Stop()
|
|
m.caserver.Stop()
|
|
|
|
if m.allocator != nil {
|
|
m.allocator.Stop()
|
|
}
|
|
if m.replicatedOrchestrator != nil {
|
|
m.replicatedOrchestrator.Stop()
|
|
}
|
|
if m.globalOrchestrator != nil {
|
|
m.globalOrchestrator.Stop()
|
|
}
|
|
if m.taskReaper != nil {
|
|
m.taskReaper.Stop()
|
|
}
|
|
if m.constraintEnforcer != nil {
|
|
m.constraintEnforcer.Stop()
|
|
}
|
|
if m.scheduler != nil {
|
|
m.scheduler.Stop()
|
|
}
|
|
if m.roleManager != nil {
|
|
m.roleManager.Stop()
|
|
}
|
|
if m.keyManager != nil {
|
|
m.keyManager.Stop()
|
|
}
|
|
|
|
if clearData {
|
|
m.raftNode.ClearData()
|
|
}
|
|
m.cancelFunc()
|
|
<-m.raftNode.Done()
|
|
|
|
timer := time.AfterFunc(stopTimeout, func() {
|
|
m.server.Stop()
|
|
m.localserver.Stop()
|
|
})
|
|
defer timer.Stop()
|
|
// TODO: we're not waiting on ctx because it very well could be passed from Run,
|
|
// which is already cancelled here. We need to refactor that.
|
|
select {
|
|
case <-srvDone:
|
|
<-localSrvDone
|
|
case <-localSrvDone:
|
|
<-srvDone
|
|
}
|
|
|
|
log.G(ctx).Info("Manager shut down")
|
|
// mutex is released and Run can return now
|
|
}
|
|
|
|
func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
|
|
securityConfig := m.config.SecurityConfig
|
|
nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
|
|
logger := log.G(ctx).WithFields(logrus.Fields{
|
|
"node.id": nodeID,
|
|
"node.role": ca.ManagerRole,
|
|
})
|
|
|
|
kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
|
|
for _, encryptionKey := range cluster.UnlockKeys {
|
|
if encryptionKey.Subsystem == ca.ManagerRole {
|
|
kekData.KEK = encryptionKey.Key
|
|
break
|
|
}
|
|
}
|
|
updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData)
|
|
if err != nil {
|
|
logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK")
|
|
return err
|
|
}
|
|
if updated {
|
|
logger.Debug("successfully rotated KEK")
|
|
}
|
|
if unlockedToLocked {
|
|
// a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
|
|
// don't wait because it might take a bit
|
|
go func() {
|
|
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
|
|
|
|
conn, err := grpc.Dial(
|
|
m.config.ControlAPI,
|
|
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
|
|
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
|
|
grpc.WithTransportCredentials(insecureCreds),
|
|
grpc.WithDialer(
|
|
func(addr string, timeout time.Duration) (net.Conn, error) {
|
|
return xnet.DialTimeoutLocal(addr, timeout)
|
|
}),
|
|
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
|
|
)
|
|
if err != nil {
|
|
logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
|
|
return
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
connBroker := connectionbroker.New(remotes.NewRemotes())
|
|
connBroker.SetLocalConn(conn)
|
|
if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker, m.config.RootCAPaths); err != nil {
|
|
logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
|
|
}
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) watchForClusterChanges(ctx context.Context) error {
|
|
clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
|
|
var cluster *api.Cluster
|
|
clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
|
|
func(tx store.ReadTx) error {
|
|
cluster = store.GetCluster(tx, clusterID)
|
|
if cluster == nil {
|
|
return fmt.Errorf("unable to get current cluster")
|
|
}
|
|
return nil
|
|
},
|
|
api.EventUpdateCluster{
|
|
Cluster: &api.Cluster{ID: clusterID},
|
|
Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := m.updateKEK(ctx, cluster); err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case event := <-clusterWatch:
|
|
clusterEvent := event.(api.EventUpdateCluster)
|
|
m.updateKEK(ctx, clusterEvent.Cluster)
|
|
case <-ctx.Done():
|
|
clusterWatchCancel()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// getLeaderNodeID is a small helper function returning a string with the
|
|
// leader's node ID. it is only used for logging, and should not be relied on
|
|
// to give a node ID for actual operational purposes (because it returns errors
|
|
// as nicely decorated strings)
|
|
func (m *Manager) getLeaderNodeID() string {
|
|
// get the current leader ID. this variable tracks the leader *only* for
|
|
// the purposes of logging leadership changes, and should not be relied on
|
|
// for other purposes
|
|
leader, leaderErr := m.raftNode.Leader()
|
|
switch leaderErr {
|
|
case raft.ErrNoRaftMember:
|
|
// this is an unlikely case, but we have to handle it. this means this
|
|
// node is not a member of the raft quorum. this won't look very pretty
|
|
// in logs ("leadership changed from aslkdjfa to ErrNoRaftMember") but
|
|
// it also won't be very common
|
|
return "not yet part of a raft cluster"
|
|
case raft.ErrNoClusterLeader:
|
|
return "no cluster leader"
|
|
default:
|
|
id, err := m.raftNode.GetNodeIDByRaftID(leader)
|
|
// the only possible error here is "ErrMemberUnknown"
|
|
if err != nil {
|
|
return "an unknown node"
|
|
}
|
|
return id
|
|
}
|
|
}
|
|
|
|
// handleLeadershipEvents handles the is leader event or is follower event.
|
|
func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
|
|
// get the current leader and save it for logging leadership changes in
|
|
// this loop
|
|
oldLeader := m.getLeaderNodeID()
|
|
for {
|
|
select {
|
|
case leadershipEvent := <-leadershipCh:
|
|
m.mu.Lock()
|
|
if m.stopped {
|
|
m.mu.Unlock()
|
|
return
|
|
}
|
|
newState := leadershipEvent.(raft.LeadershipState)
|
|
|
|
if newState == raft.IsLeader {
|
|
m.becomeLeader(ctx)
|
|
leaderMetric.Set(1)
|
|
} else if newState == raft.IsFollower {
|
|
m.becomeFollower()
|
|
leaderMetric.Set(0)
|
|
}
|
|
m.mu.Unlock()
|
|
|
|
newLeader := m.getLeaderNodeID()
|
|
// maybe we should use logrus fields for old and new leader, so
|
|
// that users are better able to ingest leadership changes into log
|
|
// aggregators?
|
|
log.G(ctx).Infof("leadership changed from %v to %v", oldLeader, newLeader)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// serveListener serves a listener for local and non local connections.
|
|
func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
|
|
var l net.Listener
|
|
select {
|
|
case l = <-lCh:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
|
|
logrus.Fields{
|
|
"proto": l.Addr().Network(),
|
|
"addr": l.Addr().String(),
|
|
}))
|
|
if _, ok := l.(*net.TCPListener); !ok {
|
|
log.G(ctx).Info("Listening for local connections")
|
|
// we need to disallow double closes because UnixListener.Close
|
|
// can delete unix-socket file of newer listener. grpc calls
|
|
// Close twice indeed: in Serve and in Stop.
|
|
m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
|
|
} else {
|
|
log.G(ctx).Info("Listening for connections")
|
|
m.errServe <- m.server.Serve(l)
|
|
}
|
|
}
|
|
|
|
// becomeLeader starts the subsystems that are run on the leader.
|
|
func (m *Manager) becomeLeader(ctx context.Context) {
|
|
s := m.raftNode.MemoryStore()
|
|
|
|
rootCA := m.config.SecurityConfig.RootCA()
|
|
nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
|
|
|
|
raftCfg := raft.DefaultRaftConfig()
|
|
raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick)
|
|
raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick)
|
|
|
|
clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
|
|
|
|
initialCAConfig := ca.DefaultCAConfig()
|
|
initialCAConfig.ExternalCAs = m.config.ExternalCAs
|
|
|
|
var (
|
|
unlockKeys []*api.EncryptionKey
|
|
err error
|
|
)
|
|
if m.config.AutoLockManagers {
|
|
unlockKeys = []*api.EncryptionKey{{
|
|
Subsystem: ca.ManagerRole,
|
|
Key: m.config.UnlockKey,
|
|
}}
|
|
}
|
|
s.Update(func(tx store.Tx) error {
|
|
// Add a default cluster object to the
|
|
// store. Don't check the error because
|
|
// we expect this to fail unless this
|
|
// is a brand new cluster.
|
|
clusterObj := defaultClusterObject(
|
|
clusterID,
|
|
initialCAConfig,
|
|
raftCfg,
|
|
api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers},
|
|
unlockKeys,
|
|
rootCA,
|
|
m.config.FIPS,
|
|
nil,
|
|
0,
|
|
0)
|
|
|
|
// If defaultAddrPool is valid we update cluster object with new value
|
|
// If VXLANUDPPort is not 0 then we call update cluster object with new value
|
|
if m.config.NetworkConfig != nil {
|
|
if m.config.NetworkConfig.DefaultAddrPool != nil {
|
|
clusterObj.DefaultAddressPool = m.config.NetworkConfig.DefaultAddrPool
|
|
clusterObj.SubnetSize = m.config.NetworkConfig.SubnetSize
|
|
}
|
|
|
|
if m.config.NetworkConfig.VXLANUDPPort != 0 {
|
|
clusterObj.VXLANUDPPort = m.config.NetworkConfig.VXLANUDPPort
|
|
}
|
|
}
|
|
err := store.CreateCluster(tx, clusterObj)
|
|
|
|
if err != nil && err != store.ErrExist {
|
|
log.G(ctx).WithError(err).Errorf("error creating cluster object")
|
|
}
|
|
|
|
// Add Node entry for ourself, if one
|
|
// doesn't exist already.
|
|
freshCluster := nil == store.CreateNode(tx, managerNode(nodeID, m.config.Availability, clusterObj.VXLANUDPPort))
|
|
|
|
if freshCluster {
|
|
// This is a fresh swarm cluster. Add to store now any initial
|
|
// cluster resource, like the default ingress network which
|
|
// provides the routing mesh for this cluster.
|
|
log.G(ctx).Info("Creating default ingress network")
|
|
if err := store.CreateNetwork(tx, newIngressNetwork()); err != nil {
|
|
log.G(ctx).WithError(err).Error("failed to create default ingress network")
|
|
}
|
|
}
|
|
// Create now the static predefined if the store does not contain predefined
|
|
// networks like bridge/host node-local networks which
|
|
// are known to be present in each cluster node. This is needed
|
|
// in order to allow running services on the predefined docker
|
|
// networks like `bridge` and `host`.
|
|
for _, p := range allocator.PredefinedNetworks() {
|
|
if err := store.CreateNetwork(tx, newPredefinedNetwork(p.Name, p.Driver)); err != nil && err != store.ErrNameConflict {
|
|
log.G(ctx).WithError(err).Error("failed to create predefined network " + p.Name)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s)
|
|
m.constraintEnforcer = constraintenforcer.New(s)
|
|
m.globalOrchestrator = global.NewGlobalOrchestrator(s)
|
|
m.taskReaper = taskreaper.New(s)
|
|
m.scheduler = scheduler.New(s)
|
|
m.keyManager = keymanager.New(s, keymanager.DefaultConfig())
|
|
m.roleManager = newRoleManager(s, m.raftNode)
|
|
|
|
// TODO(stevvooe): Allocate a context that can be used to
|
|
// shutdown underlying manager processes when leadership isTestUpdaterRollback
|
|
// lost.
|
|
|
|
// If DefaultAddrPool is null, Read from store and check if
|
|
// DefaultAddrPool info is stored in cluster object
|
|
// If VXLANUDPPort is 0, read it from the store - cluster object
|
|
if m.config.NetworkConfig == nil || m.config.NetworkConfig.DefaultAddrPool == nil || m.config.NetworkConfig.VXLANUDPPort == 0 {
|
|
var cluster *api.Cluster
|
|
s.View(func(tx store.ReadTx) {
|
|
cluster = store.GetCluster(tx, clusterID)
|
|
})
|
|
if cluster.DefaultAddressPool != nil {
|
|
if m.config.NetworkConfig == nil {
|
|
m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
|
|
}
|
|
m.config.NetworkConfig.DefaultAddrPool = append(m.config.NetworkConfig.DefaultAddrPool, cluster.DefaultAddressPool...)
|
|
m.config.NetworkConfig.SubnetSize = cluster.SubnetSize
|
|
}
|
|
if cluster.VXLANUDPPort != 0 {
|
|
if m.config.NetworkConfig == nil {
|
|
m.config.NetworkConfig = &cnmallocator.NetworkConfig{}
|
|
}
|
|
m.config.NetworkConfig.VXLANUDPPort = cluster.VXLANUDPPort
|
|
}
|
|
}
|
|
|
|
m.allocator, err = allocator.New(s, m.config.PluginGetter, m.config.NetworkConfig)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("failed to create allocator")
|
|
// TODO(stevvooe): It doesn't seem correct here to fail
|
|
// creating the allocator but then use it anyway.
|
|
}
|
|
|
|
if m.keyManager != nil {
|
|
go func(keyManager *keymanager.KeyManager) {
|
|
if err := keyManager.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("keymanager failed with an error")
|
|
}
|
|
}(m.keyManager)
|
|
}
|
|
|
|
go func(d *dispatcher.Dispatcher) {
|
|
// Initialize the dispatcher.
|
|
d.Init(m.raftNode, dispatcher.DefaultConfig(), drivers.New(m.config.PluginGetter), m.config.SecurityConfig)
|
|
if err := d.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("Dispatcher exited with an error")
|
|
}
|
|
}(m.dispatcher)
|
|
|
|
if err := m.logbroker.Start(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("LogBroker failed to start")
|
|
}
|
|
|
|
go func(server *ca.Server) {
|
|
if err := server.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("CA signer exited with an error")
|
|
}
|
|
}(m.caserver)
|
|
|
|
// Start all sub-components in separate goroutines.
|
|
// TODO(aluzzardi): This should have some kind of error handling so that
|
|
// any component that goes down would bring the entire manager down.
|
|
if m.allocator != nil {
|
|
go func(allocator *allocator.Allocator) {
|
|
if err := allocator.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("allocator exited with an error")
|
|
}
|
|
}(m.allocator)
|
|
}
|
|
|
|
go func(scheduler *scheduler.Scheduler) {
|
|
if err := scheduler.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("scheduler exited with an error")
|
|
}
|
|
}(m.scheduler)
|
|
|
|
go func(constraintEnforcer *constraintenforcer.ConstraintEnforcer) {
|
|
constraintEnforcer.Run()
|
|
}(m.constraintEnforcer)
|
|
|
|
go func(taskReaper *taskreaper.TaskReaper) {
|
|
taskReaper.Run(ctx)
|
|
}(m.taskReaper)
|
|
|
|
go func(orchestrator *replicated.Orchestrator) {
|
|
if err := orchestrator.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error")
|
|
}
|
|
}(m.replicatedOrchestrator)
|
|
|
|
go func(globalOrchestrator *global.Orchestrator) {
|
|
if err := globalOrchestrator.Run(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("global orchestrator exited with an error")
|
|
}
|
|
}(m.globalOrchestrator)
|
|
|
|
go func(roleManager *roleManager) {
|
|
roleManager.Run(ctx)
|
|
}(m.roleManager)
|
|
}
|
|
|
|
// becomeFollower shuts down the subsystems that are only run by the leader.
|
|
func (m *Manager) becomeFollower() {
|
|
// The following components are gRPC services that are
|
|
// registered when creating the manager and will need
|
|
// to be re-registered if they are recreated.
|
|
// For simplicity, they are not nilled out.
|
|
m.dispatcher.Stop()
|
|
m.logbroker.Stop()
|
|
m.caserver.Stop()
|
|
|
|
if m.allocator != nil {
|
|
m.allocator.Stop()
|
|
m.allocator = nil
|
|
}
|
|
|
|
m.constraintEnforcer.Stop()
|
|
m.constraintEnforcer = nil
|
|
|
|
m.replicatedOrchestrator.Stop()
|
|
m.replicatedOrchestrator = nil
|
|
|
|
m.globalOrchestrator.Stop()
|
|
m.globalOrchestrator = nil
|
|
|
|
m.taskReaper.Stop()
|
|
m.taskReaper = nil
|
|
|
|
m.scheduler.Stop()
|
|
m.scheduler = nil
|
|
|
|
m.roleManager.Stop()
|
|
m.roleManager = nil
|
|
|
|
if m.keyManager != nil {
|
|
m.keyManager.Stop()
|
|
m.keyManager = nil
|
|
}
|
|
}
|
|
|
|
// defaultClusterObject creates a default cluster.
|
|
func defaultClusterObject(
|
|
clusterID string,
|
|
initialCAConfig api.CAConfig,
|
|
raftCfg api.RaftConfig,
|
|
encryptionConfig api.EncryptionConfig,
|
|
initialUnlockKeys []*api.EncryptionKey,
|
|
rootCA *ca.RootCA,
|
|
fips bool,
|
|
defaultAddressPool []string,
|
|
subnetSize uint32,
|
|
vxlanUDPPort uint32) *api.Cluster {
|
|
var caKey []byte
|
|
if rcaSigner, err := rootCA.Signer(); err == nil {
|
|
caKey = rcaSigner.Key
|
|
}
|
|
|
|
return &api.Cluster{
|
|
ID: clusterID,
|
|
Spec: api.ClusterSpec{
|
|
Annotations: api.Annotations{
|
|
Name: store.DefaultClusterName,
|
|
},
|
|
Orchestration: api.OrchestrationConfig{
|
|
TaskHistoryRetentionLimit: defaultTaskHistoryRetentionLimit,
|
|
},
|
|
Dispatcher: api.DispatcherConfig{
|
|
HeartbeatPeriod: gogotypes.DurationProto(dispatcher.DefaultHeartBeatPeriod),
|
|
},
|
|
Raft: raftCfg,
|
|
CAConfig: initialCAConfig,
|
|
EncryptionConfig: encryptionConfig,
|
|
},
|
|
RootCA: api.RootCA{
|
|
CAKey: caKey,
|
|
CACert: rootCA.Certs,
|
|
CACertHash: rootCA.Digest.String(),
|
|
JoinTokens: api.JoinTokens{
|
|
Worker: ca.GenerateJoinToken(rootCA, fips),
|
|
Manager: ca.GenerateJoinToken(rootCA, fips),
|
|
},
|
|
},
|
|
UnlockKeys: initialUnlockKeys,
|
|
FIPS: fips,
|
|
DefaultAddressPool: defaultAddressPool,
|
|
SubnetSize: subnetSize,
|
|
VXLANUDPPort: vxlanUDPPort,
|
|
}
|
|
}
|
|
|
|
// managerNode creates a new node with NodeRoleManager role.
|
|
func managerNode(nodeID string, availability api.NodeSpec_Availability, vxlanPort uint32) *api.Node {
|
|
return &api.Node{
|
|
ID: nodeID,
|
|
Certificate: api.Certificate{
|
|
CN: nodeID,
|
|
Role: api.NodeRoleManager,
|
|
Status: api.IssuanceStatus{
|
|
State: api.IssuanceStateIssued,
|
|
},
|
|
},
|
|
Spec: api.NodeSpec{
|
|
DesiredRole: api.NodeRoleManager,
|
|
Membership: api.NodeMembershipAccepted,
|
|
Availability: availability,
|
|
},
|
|
VXLANUDPPort: vxlanPort,
|
|
}
|
|
}
|
|
|
|
// newIngressNetwork returns the network object for the default ingress
|
|
// network, the network which provides the routing mesh. Caller will save to
|
|
// store this object once, at fresh cluster creation. It is expected to
|
|
// call this function inside a store update transaction.
|
|
func newIngressNetwork() *api.Network {
|
|
return &api.Network{
|
|
ID: identity.NewID(),
|
|
Spec: api.NetworkSpec{
|
|
Ingress: true,
|
|
Annotations: api.Annotations{
|
|
Name: "ingress",
|
|
},
|
|
DriverConfig: &api.Driver{},
|
|
IPAM: &api.IPAMOptions{
|
|
Driver: &api.Driver{},
|
|
Configs: []*api.IPAMConfig{},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// Creates a network object representing one of the predefined networks
|
|
// known to be statically created on the cluster nodes. These objects
|
|
// are populated in the store at cluster creation solely in order to
|
|
// support running services on the nodes' predefined networks.
|
|
// External clients can filter these predefined networks by looking
|
|
// at the predefined label.
|
|
func newPredefinedNetwork(name, driver string) *api.Network {
|
|
return &api.Network{
|
|
ID: identity.NewID(),
|
|
Spec: api.NetworkSpec{
|
|
Annotations: api.Annotations{
|
|
Name: name,
|
|
Labels: map[string]string{
|
|
networkallocator.PredefinedLabel: "true",
|
|
},
|
|
},
|
|
DriverConfig: &api.Driver{Name: driver},
|
|
},
|
|
}
|
|
}
|