2016-06-07 21:28:28 +00:00
package manager
import (
2018-10-11 21:03:18 +00:00
"context"
2017-01-10 21:00:11 +00:00
"crypto/tls"
2016-06-07 21:28:28 +00:00
"fmt"
"net"
"os"
"path/filepath"
2016-11-02 18:43:27 +00:00
"runtime"
2016-06-07 21:28:28 +00:00
"sync"
"syscall"
2016-10-20 18:26:04 +00:00
"time"
2016-06-07 21:28:28 +00:00
2017-01-20 01:18:22 +00:00
"github.com/docker/docker/pkg/plugingetter"
2016-08-18 05:43:33 +00:00
"github.com/docker/go-events"
2017-07-31 18:21:04 +00:00
gmetrics "github.com/docker/go-metrics"
2016-06-07 21:28:28 +00:00
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
2017-01-10 21:00:11 +00:00
"github.com/docker/swarmkit/connectionbroker"
2017-03-13 21:45:06 +00:00
"github.com/docker/swarmkit/identity"
2016-06-07 21:28:28 +00:00
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator"
2018-08-30 03:28:22 +00:00
"github.com/docker/swarmkit/manager/allocator/cnmallocator"
2017-05-16 22:10:53 +00:00
"github.com/docker/swarmkit/manager/allocator/networkallocator"
2016-06-07 21:28:28 +00:00
"github.com/docker/swarmkit/manager/controlapi"
"github.com/docker/swarmkit/manager/dispatcher"
2017-07-15 13:03:17 +00:00
"github.com/docker/swarmkit/manager/drivers"
2016-06-30 20:34:48 +00:00
"github.com/docker/swarmkit/manager/health"
2016-06-07 21:28:28 +00:00
"github.com/docker/swarmkit/manager/keymanager"
2016-11-04 19:11:41 +00:00
"github.com/docker/swarmkit/manager/logbroker"
2017-05-11 22:18:12 +00:00
"github.com/docker/swarmkit/manager/metrics"
2016-10-26 13:35:48 +00:00
"github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
"github.com/docker/swarmkit/manager/orchestrator/global"
"github.com/docker/swarmkit/manager/orchestrator/replicated"
"github.com/docker/swarmkit/manager/orchestrator/taskreaper"
2016-08-23 05:30:01 +00:00
"github.com/docker/swarmkit/manager/resourceapi"
2016-06-07 21:28:28 +00:00
"github.com/docker/swarmkit/manager/scheduler"
"github.com/docker/swarmkit/manager/state/raft"
2017-12-05 00:38:37 +00:00
"github.com/docker/swarmkit/manager/state/raft/transport"
2016-06-07 21:28:28 +00:00
"github.com/docker/swarmkit/manager/state/store"
2017-05-05 18:18:09 +00:00
"github.com/docker/swarmkit/manager/watchapi"
2016-10-21 19:53:24 +00:00
"github.com/docker/swarmkit/remotes"
2016-11-02 18:43:27 +00:00
"github.com/docker/swarmkit/xnet"
2017-01-23 23:50:10 +00:00
gogotypes "github.com/gogo/protobuf/types"
2017-02-27 19:51:00 +00:00
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
2016-09-27 06:48:16 +00:00
"github.com/pkg/errors"
2017-07-26 22:03:47 +00:00
"github.com/sirupsen/logrus"
2016-06-07 21:28:28 +00:00
"google.golang.org/grpc"
2017-01-10 21:00:11 +00:00
"google.golang.org/grpc/credentials"
2016-06-07 21:28:28 +00:00
)
const (
// defaultTaskHistoryRetentionLimit is the number of tasks to keep.
2016-07-22 17:26:45 +00:00
defaultTaskHistoryRetentionLimit = 5
2016-06-07 21:28:28 +00:00
)
2017-01-05 22:55:52 +00:00
// RemoteAddrs provides a listening address and an optional advertise address
2016-11-02 18:43:27 +00:00
// for serving the remote API.
type RemoteAddrs struct {
// Address to bind
ListenAddr string
// Address to advertise to remote nodes (optional).
AdvertiseAddr string
}
2016-06-07 21:28:28 +00:00
// Config is used to tune the Manager.
type Config struct {
SecurityConfig * ca . SecurityConfig
2017-04-04 03:54:30 +00:00
// RootCAPaths is the path to which new root certs should be save
RootCAPaths ca . CertPaths
2016-06-30 20:34:48 +00:00
// ExternalCAs is a list of initial CAs to which a manager node
// will make certificate signing requests for node certificates.
ExternalCAs [ ] * api . ExternalCA
2016-11-02 18:43:27 +00:00
// ControlAPI is an address for serving the control API.
ControlAPI string
2016-06-07 21:28:28 +00:00
2016-11-02 18:43:27 +00:00
// RemoteAPI is a listening address for serving the remote API, and
// an optional advertise address.
2017-01-23 23:50:10 +00:00
RemoteAPI * RemoteAddrs
2016-07-22 17:26:45 +00:00
2016-06-07 21:28:28 +00:00
// JoinRaft is an optional address of a node in an existing raft
// cluster to join.
JoinRaft string
2017-07-31 18:21:04 +00:00
// ForceJoin causes us to invoke raft's Join RPC even if already part
// of a cluster.
ForceJoin bool
// StateDir is the top-level state directory
2016-06-07 21:28:28 +00:00
StateDir string
// ForceNewCluster defines if we have to force a new cluster
// because we are recovering from a backup data directory.
ForceNewCluster bool
// ElectionTick defines the amount of ticks needed without
// leader to trigger a new election
ElectionTick uint32
// HeartbeatTick defines the amount of ticks between each
// heartbeat sent to other members for health-check purposes
HeartbeatTick uint32
2016-10-21 19:53:24 +00:00
// AutoLockManagers determines whether or not managers require an unlock key
// when starting from a stopped state. This configuration parameter is only
// applicable when bootstrapping a new cluster for the first time.
AutoLockManagers bool
// UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
// as well as the raft data encryption key (DEK). It is applicable when
// bootstrapping a cluster for the first time (it's a cluster-wide setting),
// and also when loading up any raft data on disk (as a KEK for the raft DEK).
UnlockKey [ ] byte
2016-11-22 21:45:45 +00:00
// Availability allows a user to control the current scheduling status of a node
Availability api . NodeSpec_Availability
2017-01-20 01:18:22 +00:00
// PluginGetter provides access to docker's plugin inventory.
PluginGetter plugingetter . PluginGetter
2018-04-18 21:55:50 +00:00
// FIPS is a boolean stating whether the node is FIPS enabled - if this is the
// first node in the cluster, this setting is used to set the cluster-wide mandatory
// FIPS setting.
FIPS bool
2018-07-30 15:25:02 +00:00
2018-08-30 03:28:22 +00:00
// NetworkConfig stores network related config for the cluster
NetworkConfig * cnmallocator . NetworkConfig
2016-06-07 21:28:28 +00:00
}
// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
2017-01-23 23:50:10 +00:00
config Config
2016-06-07 21:28:28 +00:00
2017-05-11 22:18:12 +00:00
collector * metrics . Collector
2016-06-07 21:28:28 +00:00
caserver * ca . Server
2016-10-20 18:26:04 +00:00
dispatcher * dispatcher . Dispatcher
2016-11-04 19:11:41 +00:00
logbroker * logbroker . LogBroker
2017-07-31 18:21:04 +00:00
watchServer * watchapi . Server
2016-10-26 13:35:48 +00:00
replicatedOrchestrator * replicated . Orchestrator
globalOrchestrator * global . Orchestrator
taskReaper * taskreaper . TaskReaper
constraintEnforcer * constraintenforcer . ConstraintEnforcer
2016-06-07 21:28:28 +00:00
scheduler * scheduler . Scheduler
allocator * allocator . Allocator
keyManager * keymanager . KeyManager
server * grpc . Server
localserver * grpc . Server
2016-10-20 18:26:04 +00:00
raftNode * raft . Node
2016-10-21 19:53:24 +00:00
dekRotator * RaftDEKManager
2017-01-07 01:30:33 +00:00
roleManager * roleManager
2016-06-07 21:28:28 +00:00
2016-11-02 18:43:27 +00:00
cancelFunc context . CancelFunc
2016-06-07 21:28:28 +00:00
2017-01-23 23:50:10 +00:00
// mu is a general mutex used to coordinate starting/stopping and
// leadership events.
mu sync . Mutex
// addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
addrMu sync . Mutex
2016-07-28 04:17:00 +00:00
started chan struct { }
2016-11-02 18:43:27 +00:00
stopped bool
2017-01-23 23:50:10 +00:00
remoteListener chan net . Listener
controlListener chan net . Listener
errServe chan error
2016-06-07 21:28:28 +00:00
}
2017-07-31 18:21:04 +00:00
var (
leaderMetric gmetrics . Gauge
)
func init ( ) {
ns := gmetrics . NewNamespace ( "swarm" , "manager" , nil )
leaderMetric = ns . NewGauge ( "leader" , "Indicates if this manager node is a leader" , "" )
gmetrics . Register ( ns )
}
2016-07-20 15:16:54 +00:00
type closeOnceListener struct {
once sync . Once
net . Listener
}
func ( l * closeOnceListener ) Close ( ) error {
var err error
l . once . Do ( func ( ) {
err = l . Listener . Close ( )
} )
return err
}
2016-06-07 21:28:28 +00:00
// New creates a Manager which has not started to accept requests yet.
func New ( config * Config ) ( * Manager , error ) {
2016-11-02 18:43:27 +00:00
err := os . MkdirAll ( config . StateDir , 0700 )
2016-06-07 21:28:28 +00:00
if err != nil {
2016-09-27 06:48:16 +00:00
return nil , errors . Wrap ( err , "failed to create state directory" )
2016-06-07 21:28:28 +00:00
}
raftStateDir := filepath . Join ( config . StateDir , "raft" )
err = os . MkdirAll ( raftStateDir , 0700 )
if err != nil {
2016-09-27 06:48:16 +00:00
return nil , errors . Wrap ( err , "failed to create raft state directory" )
2016-06-07 21:28:28 +00:00
}
2017-01-23 23:50:10 +00:00
raftCfg := raft . DefaultNodeConfig ( )
if config . ElectionTick > 0 {
raftCfg . ElectionTick = int ( config . ElectionTick )
}
if config . HeartbeatTick > 0 {
raftCfg . HeartbeatTick = int ( config . HeartbeatTick )
}
2018-04-18 21:55:50 +00:00
dekRotator , err := NewRaftDEKManager ( config . SecurityConfig . KeyWriter ( ) , config . FIPS )
2017-01-23 23:50:10 +00:00
if err != nil {
return nil , err
}
newNodeOpts := raft . NodeOptions {
ID : config . SecurityConfig . ClientTLSCreds . NodeID ( ) ,
JoinAddr : config . JoinRaft ,
2017-07-31 18:21:04 +00:00
ForceJoin : config . ForceJoin ,
2017-01-23 23:50:10 +00:00
Config : raftCfg ,
StateDir : raftStateDir ,
ForceNewCluster : config . ForceNewCluster ,
TLSCredentials : config . SecurityConfig . ClientTLSCreds ,
KeyRotator : dekRotator ,
2018-04-18 21:55:50 +00:00
FIPS : config . FIPS ,
2017-01-23 23:50:10 +00:00
}
raftNode := raft . NewNode ( newNodeOpts )
2018-03-26 19:29:15 +00:00
// the interceptorWrappers are functions that wrap the prometheus grpc
// interceptor, and add some of code to log errors locally. one for stream
// and one for unary. this is needed because the grpc unary interceptor
// doesn't natively do chaining, you have to implement it in the caller.
// note that even though these are logging errors, we're still using
// debug level. returning errors from GRPC methods is common and expected,
// and logging an ERROR every time a user mistypes a service name would
// pollute the logs really fast.
//
// NOTE(dperny): Because of the fact that these functions are very simple
// in their operation and have no side effects other than the log output,
// they are not automatically tested. If you modify them later, make _sure_
// that they are correct. If you add substantial side effects, abstract
// these out and test them!
unaryInterceptorWrapper := func ( ctx context . Context , req interface { } , info * grpc . UnaryServerInfo , handler grpc . UnaryHandler ) ( interface { } , error ) {
// pass the call down into the grpc_prometheus interceptor
resp , err := grpc_prometheus . UnaryServerInterceptor ( ctx , req , info , handler )
if err != nil {
log . G ( ctx ) . WithField ( "rpc" , info . FullMethod ) . WithError ( err ) . Debug ( "error handling rpc" )
}
return resp , err
}
streamInterceptorWrapper := func ( srv interface { } , ss grpc . ServerStream , info * grpc . StreamServerInfo , handler grpc . StreamHandler ) error {
// we can't re-write a stream context, so don't bother creating a
// sub-context like in unary methods
// pass the call down into the grpc_prometheus interceptor
err := grpc_prometheus . StreamServerInterceptor ( srv , ss , info , handler )
if err != nil {
log . G ( ss . Context ( ) ) . WithField ( "rpc" , info . FullMethod ) . WithError ( err ) . Debug ( "error handling streaming rpc" )
}
return err
}
2017-01-23 23:50:10 +00:00
opts := [ ] grpc . ServerOption {
2017-02-27 19:51:00 +00:00
grpc . Creds ( config . SecurityConfig . ServerTLSCreds ) ,
2018-03-26 19:29:15 +00:00
grpc . StreamInterceptor ( streamInterceptorWrapper ) ,
grpc . UnaryInterceptor ( unaryInterceptorWrapper ) ,
2018-04-18 21:55:50 +00:00
grpc . MaxRecvMsgSize ( transport . GRPCMaxMsgSize ) ,
2017-02-27 19:51:00 +00:00
}
2017-01-23 23:50:10 +00:00
m := & Manager {
config : * config ,
2017-07-31 18:21:04 +00:00
caserver : ca . NewServer ( raftNode . MemoryStore ( ) , config . SecurityConfig ) ,
2018-03-26 19:29:15 +00:00
dispatcher : dispatcher . New ( ) ,
2017-01-23 23:50:10 +00:00
logbroker : logbroker . New ( raftNode . MemoryStore ( ) ) ,
2017-07-31 18:21:04 +00:00
watchServer : watchapi . NewServer ( raftNode . MemoryStore ( ) ) ,
2017-01-23 23:50:10 +00:00
server : grpc . NewServer ( opts ... ) ,
localserver : grpc . NewServer ( opts ... ) ,
raftNode : raftNode ,
started : make ( chan struct { } ) ,
dekRotator : dekRotator ,
remoteListener : make ( chan net . Listener , 1 ) ,
controlListener : make ( chan net . Listener , 1 ) ,
errServe : make ( chan error , 2 ) ,
}
if config . ControlAPI != "" {
m . config . ControlAPI = ""
if err := m . BindControl ( config . ControlAPI ) ; err != nil {
return nil , err
}
}
if config . RemoteAPI != nil {
m . config . RemoteAPI = nil
// The context isn't used in this case (before (*Manager).Run).
if err := m . BindRemote ( context . Background ( ) , * config . RemoteAPI ) ; err != nil {
if config . ControlAPI != "" {
l := <- m . controlListener
l . Close ( )
}
return nil , err
}
}
return m , nil
}
// BindControl binds a local socket for the control API.
func ( m * Manager ) BindControl ( addr string ) error {
m . addrMu . Lock ( )
defer m . addrMu . Unlock ( )
if m . config . ControlAPI != "" {
return errors . New ( "manager already has a control API address" )
}
2016-06-07 21:28:28 +00:00
2016-11-02 18:43:27 +00:00
// don't create a socket directory if we're on windows. we used named pipe
if runtime . GOOS != "windows" {
2017-01-23 23:50:10 +00:00
err := os . MkdirAll ( filepath . Dir ( addr ) , 0700 )
2016-11-02 18:43:27 +00:00
if err != nil {
2017-01-23 23:50:10 +00:00
return errors . Wrap ( err , "failed to create socket directory" )
2016-11-02 18:43:27 +00:00
}
}
2016-06-07 21:28:28 +00:00
2017-01-23 23:50:10 +00:00
l , err := xnet . ListenLocal ( addr )
2016-11-02 18:43:27 +00:00
// A unix socket may fail to bind if the file already
// exists. Try replacing the file.
if runtime . GOOS != "windows" {
unwrappedErr := err
if op , ok := unwrappedErr . ( * net . OpError ) ; ok {
unwrappedErr = op . Err
}
if sys , ok := unwrappedErr . ( * os . SyscallError ) ; ok {
unwrappedErr = sys . Err
2016-06-07 21:28:28 +00:00
}
2016-11-02 18:43:27 +00:00
if unwrappedErr == syscall . EADDRINUSE {
2017-01-23 23:50:10 +00:00
os . Remove ( addr )
l , err = xnet . ListenLocal ( addr )
2016-11-02 18:43:27 +00:00
}
}
if err != nil {
2017-01-23 23:50:10 +00:00
return errors . Wrap ( err , "failed to listen on control API address" )
2016-06-07 21:28:28 +00:00
}
2017-01-23 23:50:10 +00:00
m . config . ControlAPI = addr
m . controlListener <- l
return nil
}
2016-11-02 18:43:27 +00:00
2017-01-23 23:50:10 +00:00
// BindRemote binds a port for the remote API.
func ( m * Manager ) BindRemote ( ctx context . Context , addrs RemoteAddrs ) error {
m . addrMu . Lock ( )
defer m . addrMu . Unlock ( )
if m . config . RemoteAPI != nil {
return errors . New ( "manager already has remote API address" )
2016-11-02 18:43:27 +00:00
}
2017-01-23 23:50:10 +00:00
// If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
advertiseAddr := addrs . AdvertiseAddr
2016-06-07 21:28:28 +00:00
2017-01-23 23:50:10 +00:00
var advertiseAddrPort string
if advertiseAddr == "" {
// Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
var err error
_ , advertiseAddrPort , err = net . SplitHostPort ( addrs . ListenAddr )
if err != nil {
return fmt . Errorf ( "missing or invalid listen address %s" , addrs . ListenAddr )
}
// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
advertiseAddr = net . JoinHostPort ( "0.0.0.0" , advertiseAddrPort )
2016-06-07 21:28:28 +00:00
}
2017-01-23 23:50:10 +00:00
l , err := net . Listen ( "tcp" , addrs . ListenAddr )
2016-10-21 19:53:24 +00:00
if err != nil {
2017-01-23 23:50:10 +00:00
return errors . Wrap ( err , "failed to listen on remote API address" )
2016-10-21 19:53:24 +00:00
}
2017-01-23 23:50:10 +00:00
if advertiseAddrPort == "0" {
advertiseAddr = l . Addr ( ) . String ( )
addrs . ListenAddr = advertiseAddr
2016-06-07 21:28:28 +00:00
}
2017-01-23 23:50:10 +00:00
m . config . RemoteAPI = & addrs
2016-06-07 21:28:28 +00:00
2017-01-23 23:50:10 +00:00
m . raftNode . SetAddr ( ctx , advertiseAddr )
m . remoteListener <- l
2016-06-07 21:28:28 +00:00
2017-01-23 23:50:10 +00:00
return nil
2016-06-07 21:28:28 +00:00
}
2017-01-07 01:30:33 +00:00
// RemovedFromRaft returns a channel that's closed if the manager is removed
// from the raft cluster. This should be used to trigger a manager shutdown.
func ( m * Manager ) RemovedFromRaft ( ) <- chan struct { } {
return m . raftNode . RemovedFromRaft
}
2016-10-15 15:49:04 +00:00
// Addr returns tcp address on which remote api listens.
2016-11-02 18:43:27 +00:00
func ( m * Manager ) Addr ( ) string {
2017-01-23 23:50:10 +00:00
m . addrMu . Lock ( )
defer m . addrMu . Unlock ( )
if m . config . RemoteAPI == nil {
return ""
}
2016-11-02 18:43:27 +00:00
return m . config . RemoteAPI . ListenAddr
2016-10-15 15:49:04 +00:00
}
2016-06-07 21:28:28 +00:00
// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func ( m * Manager ) Run ( parent context . Context ) error {
ctx , ctxCancel := context . WithCancel ( parent )
defer ctxCancel ( )
2016-11-02 18:43:27 +00:00
m . cancelFunc = ctxCancel
2016-06-07 21:28:28 +00:00
2016-10-20 18:26:04 +00:00
leadershipCh , cancel := m . raftNode . SubscribeLeadership ( )
2016-06-07 21:28:28 +00:00
defer cancel ( )
2016-08-18 05:43:33 +00:00
go m . handleLeadershipEvents ( ctx , leadershipCh )
2016-06-07 21:28:28 +00:00
authorize := func ( ctx context . Context , roles [ ] string ) error {
2016-10-20 18:26:04 +00:00
var (
2016-10-26 13:35:48 +00:00
blacklistedCerts map [ string ] * api . BlacklistedCertificate
clusters [ ] * api . Cluster
err error
2016-10-20 18:26:04 +00:00
)
m . raftNode . MemoryStore ( ) . View ( func ( readTx store . ReadTx ) {
2017-12-05 00:38:37 +00:00
clusters , err = store . FindClusters ( readTx , store . ByName ( store . DefaultClusterName ) )
2016-10-20 18:26:04 +00:00
} )
// Not having a cluster object yet means we can't check
// the blacklist.
if err == nil && len ( clusters ) == 1 {
2016-10-26 13:35:48 +00:00
blacklistedCerts = clusters [ 0 ] . BlacklistedCertificates
2016-10-20 18:26:04 +00:00
}
2016-06-07 21:28:28 +00:00
// Authorize the remote roles, ensure they can only be forwarded by managers
2016-10-26 13:35:48 +00:00
_ , err = ca . AuthorizeForwardedRoleAndOrg ( ctx , roles , [ ] string { ca . ManagerRole } , m . config . SecurityConfig . ClientTLSCreds . Organization ( ) , blacklistedCerts )
2016-06-07 21:28:28 +00:00
return err
}
2017-07-31 18:21:04 +00:00
baseControlAPI := controlapi . NewServer ( m . raftNode . MemoryStore ( ) , m . raftNode , m . config . SecurityConfig , m . config . PluginGetter , drivers . New ( m . config . PluginGetter ) )
2016-10-20 18:26:04 +00:00
baseResourceAPI := resourceapi . New ( m . raftNode . MemoryStore ( ) )
2016-06-30 20:34:48 +00:00
healthServer := health . NewHealthServer ( )
2016-08-23 05:30:01 +00:00
localHealthServer := health . NewHealthServer ( )
2016-06-07 21:28:28 +00:00
authenticatedControlAPI := api . NewAuthenticatedWrapperControlServer ( baseControlAPI , authorize )
2017-07-31 18:21:04 +00:00
authenticatedWatchAPI := api . NewAuthenticatedWrapperWatchServer ( m . watchServer , authorize )
2016-08-23 05:30:01 +00:00
authenticatedResourceAPI := api . NewAuthenticatedWrapperResourceAllocatorServer ( baseResourceAPI , authorize )
2016-11-04 19:11:41 +00:00
authenticatedLogsServerAPI := api . NewAuthenticatedWrapperLogsServer ( m . logbroker , authorize )
authenticatedLogBrokerAPI := api . NewAuthenticatedWrapperLogBrokerServer ( m . logbroker , authorize )
2016-10-20 18:26:04 +00:00
authenticatedDispatcherAPI := api . NewAuthenticatedWrapperDispatcherServer ( m . dispatcher , authorize )
2016-06-07 21:28:28 +00:00
authenticatedCAAPI := api . NewAuthenticatedWrapperCAServer ( m . caserver , authorize )
authenticatedNodeCAAPI := api . NewAuthenticatedWrapperNodeCAServer ( m . caserver , authorize )
2016-10-20 18:26:04 +00:00
authenticatedRaftAPI := api . NewAuthenticatedWrapperRaftServer ( m . raftNode , authorize )
2016-06-30 20:34:48 +00:00
authenticatedHealthAPI := api . NewAuthenticatedWrapperHealthServer ( healthServer , authorize )
2016-10-20 18:26:04 +00:00
authenticatedRaftMembershipAPI := api . NewAuthenticatedWrapperRaftMembershipServer ( m . raftNode , authorize )
2016-06-07 21:28:28 +00:00
2017-01-10 11:46:42 +00:00
proxyDispatcherAPI := api . NewRaftProxyDispatcherServer ( authenticatedDispatcherAPI , m . raftNode , nil , ca . WithMetadataForwardTLSInfo )
proxyCAAPI := api . NewRaftProxyCAServer ( authenticatedCAAPI , m . raftNode , nil , ca . WithMetadataForwardTLSInfo )
proxyNodeCAAPI := api . NewRaftProxyNodeCAServer ( authenticatedNodeCAAPI , m . raftNode , nil , ca . WithMetadataForwardTLSInfo )
proxyRaftMembershipAPI := api . NewRaftProxyRaftMembershipServer ( authenticatedRaftMembershipAPI , m . raftNode , nil , ca . WithMetadataForwardTLSInfo )
proxyResourceAPI := api . NewRaftProxyResourceAllocatorServer ( authenticatedResourceAPI , m . raftNode , nil , ca . WithMetadataForwardTLSInfo )
proxyLogBrokerAPI := api . NewRaftProxyLogBrokerServer ( authenticatedLogBrokerAPI , m . raftNode , nil , ca . WithMetadataForwardTLSInfo )
// The following local proxies are only wired up to receive requests
// from a trusted local socket, and these requests don't use TLS,
// therefore the requests they handle locally should bypass
// authorization. When requests are proxied from these servers, they
// are sent as requests from this manager rather than forwarded
// requests (it has no TLS information to put in the metadata map).
2016-06-07 21:28:28 +00:00
forwardAsOwnRequest := func ( ctx context . Context ) ( context . Context , error ) { return ctx , nil }
2017-01-10 11:46:42 +00:00
handleRequestLocally := func ( ctx context . Context ) ( context . Context , error ) {
2017-01-23 23:50:10 +00:00
remoteAddr := "127.0.0.1:0"
m . addrMu . Lock ( )
if m . config . RemoteAPI != nil {
if m . config . RemoteAPI . AdvertiseAddr != "" {
remoteAddr = m . config . RemoteAPI . AdvertiseAddr
} else {
remoteAddr = m . config . RemoteAPI . ListenAddr
}
2017-01-10 11:46:42 +00:00
}
2017-01-23 23:50:10 +00:00
m . addrMu . Unlock ( )
2017-01-10 11:46:42 +00:00
creds := m . config . SecurityConfig . ClientTLSCreds
nodeInfo := ca . RemoteNodeInfo {
Roles : [ ] string { creds . Role ( ) } ,
Organization : creds . Organization ( ) ,
NodeID : creds . NodeID ( ) ,
RemoteAddr : remoteAddr ,
}
return context . WithValue ( ctx , ca . LocalRequestKey , nodeInfo ) , nil
}
localProxyControlAPI := api . NewRaftProxyControlServer ( baseControlAPI , m . raftNode , handleRequestLocally , forwardAsOwnRequest )
localProxyLogsAPI := api . NewRaftProxyLogsServer ( m . logbroker , m . raftNode , handleRequestLocally , forwardAsOwnRequest )
localProxyDispatcherAPI := api . NewRaftProxyDispatcherServer ( m . dispatcher , m . raftNode , handleRequestLocally , forwardAsOwnRequest )
localProxyCAAPI := api . NewRaftProxyCAServer ( m . caserver , m . raftNode , handleRequestLocally , forwardAsOwnRequest )
localProxyNodeCAAPI := api . NewRaftProxyNodeCAServer ( m . caserver , m . raftNode , handleRequestLocally , forwardAsOwnRequest )
localProxyResourceAPI := api . NewRaftProxyResourceAllocatorServer ( baseResourceAPI , m . raftNode , handleRequestLocally , forwardAsOwnRequest )
localProxyLogBrokerAPI := api . NewRaftProxyLogBrokerServer ( m . logbroker , m . raftNode , handleRequestLocally , forwardAsOwnRequest )
2016-06-07 21:28:28 +00:00
// Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper!
api . RegisterCAServer ( m . server , proxyCAAPI )
api . RegisterNodeCAServer ( m . server , proxyNodeCAAPI )
api . RegisterRaftServer ( m . server , authenticatedRaftAPI )
2016-06-30 20:34:48 +00:00
api . RegisterHealthServer ( m . server , authenticatedHealthAPI )
2016-06-15 00:07:14 +00:00
api . RegisterRaftMembershipServer ( m . server , proxyRaftMembershipAPI )
2016-06-07 21:28:28 +00:00
api . RegisterControlServer ( m . server , authenticatedControlAPI )
2017-05-05 18:18:09 +00:00
api . RegisterWatchServer ( m . server , authenticatedWatchAPI )
2016-11-04 19:11:41 +00:00
api . RegisterLogsServer ( m . server , authenticatedLogsServerAPI )
api . RegisterLogBrokerServer ( m . server , proxyLogBrokerAPI )
2016-08-23 05:30:01 +00:00
api . RegisterResourceAllocatorServer ( m . server , proxyResourceAPI )
2016-06-07 21:28:28 +00:00
api . RegisterDispatcherServer ( m . server , proxyDispatcherAPI )
2017-02-27 19:51:00 +00:00
grpc_prometheus . Register ( m . server )
2016-06-07 21:28:28 +00:00
2016-08-23 05:30:01 +00:00
api . RegisterControlServer ( m . localserver , localProxyControlAPI )
2017-07-31 18:21:04 +00:00
api . RegisterWatchServer ( m . localserver , m . watchServer )
2016-11-04 19:11:41 +00:00
api . RegisterLogsServer ( m . localserver , localProxyLogsAPI )
2016-08-23 05:30:01 +00:00
api . RegisterHealthServer ( m . localserver , localHealthServer )
2017-01-10 11:46:42 +00:00
api . RegisterDispatcherServer ( m . localserver , localProxyDispatcherAPI )
api . RegisterCAServer ( m . localserver , localProxyCAAPI )
api . RegisterNodeCAServer ( m . localserver , localProxyNodeCAAPI )
api . RegisterResourceAllocatorServer ( m . localserver , localProxyResourceAPI )
api . RegisterLogBrokerServer ( m . localserver , localProxyLogBrokerAPI )
2017-02-27 19:51:00 +00:00
grpc_prometheus . Register ( m . localserver )
2016-08-23 05:30:01 +00:00
2016-09-27 06:48:16 +00:00
healthServer . SetServingStatus ( "Raft" , api . HealthCheckResponse_NOT_SERVING )
localHealthServer . SetServingStatus ( "ControlAPI" , api . HealthCheckResponse_NOT_SERVING )
2017-07-31 18:21:04 +00:00
if err := m . watchServer . Start ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "watch server failed to start" )
}
2017-01-23 23:50:10 +00:00
go m . serveListener ( ctx , m . remoteListener )
go m . serveListener ( ctx , m . controlListener )
2016-06-07 21:28:28 +00:00
2016-08-23 05:30:01 +00:00
defer func ( ) {
m . server . Stop ( )
m . localserver . Stop ( )
} ( )
2016-06-30 20:34:48 +00:00
2016-09-27 06:48:16 +00:00
// Set the raft server as serving for the health server
healthServer . SetServingStatus ( "Raft" , api . HealthCheckResponse_SERVING )
2016-10-20 18:26:04 +00:00
if err := m . raftNode . JoinAndStart ( ctx ) ; err != nil {
2017-06-12 17:27:11 +00:00
// Don't block future calls to Stop.
close ( m . started )
2016-09-27 06:48:16 +00:00
return errors . Wrap ( err , "can't initialize raft node" )
2016-06-30 20:34:48 +00:00
}
2016-09-27 06:48:16 +00:00
localHealthServer . SetServingStatus ( "ControlAPI" , api . HealthCheckResponse_SERVING )
2017-05-11 22:18:12 +00:00
// Start metrics collection.
2017-06-12 17:27:11 +00:00
m . collector = metrics . NewCollector ( m . raftNode . MemoryStore ( ) )
2017-05-11 22:18:12 +00:00
go func ( collector * metrics . Collector ) {
if err := collector . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "collector failed with an error" )
}
} ( m . collector )
2016-07-28 04:17:00 +00:00
close ( m . started )
2016-06-30 20:34:48 +00:00
go func ( ) {
2016-10-20 18:26:04 +00:00
err := m . raftNode . Run ( ctx )
2016-06-30 20:34:48 +00:00
if err != nil {
2016-11-18 20:41:16 +00:00
log . G ( ctx ) . WithError ( err ) . Error ( "raft node stopped" )
2017-01-07 01:30:33 +00:00
m . Stop ( ctx , false )
2016-06-30 20:34:48 +00:00
}
} ( )
2016-12-05 04:56:40 +00:00
if err := raft . WaitForLeader ( ctx , m . raftNode ) ; err != nil {
2017-01-07 01:30:33 +00:00
return err
2016-12-05 04:56:40 +00:00
}
2016-10-20 18:26:04 +00:00
c , err := raft . WaitForCluster ( ctx , m . raftNode )
2016-06-07 21:28:28 +00:00
if err != nil {
2017-01-07 01:30:33 +00:00
return err
2016-06-07 21:28:28 +00:00
}
raftConfig := c . Spec . Raft
2017-03-04 00:24:15 +00:00
if err := m . watchForClusterChanges ( ctx ) ; err != nil {
2017-01-07 01:30:33 +00:00
return err
2016-10-21 19:53:24 +00:00
}
2016-10-20 18:26:04 +00:00
if int ( raftConfig . ElectionTick ) != m . raftNode . Config . ElectionTick {
log . G ( ctx ) . Warningf ( "election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable" , m . raftNode . Config . ElectionTick , raftConfig . ElectionTick )
2016-06-07 21:28:28 +00:00
}
2016-10-20 18:26:04 +00:00
if int ( raftConfig . HeartbeatTick ) != m . raftNode . Config . HeartbeatTick {
log . G ( ctx ) . Warningf ( "heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable" , m . raftNode . Config . HeartbeatTick , raftConfig . HeartbeatTick )
2016-06-07 21:28:28 +00:00
}
// wait for an error in serving.
2017-01-23 23:50:10 +00:00
err = <- m . errServe
2016-11-02 18:43:27 +00:00
m . mu . Lock ( )
if m . stopped {
2016-06-07 21:28:28 +00:00
m . mu . Unlock ( )
return nil
}
2016-11-02 18:43:27 +00:00
m . mu . Unlock ( )
2017-01-07 01:30:33 +00:00
m . Stop ( ctx , false )
2016-12-05 04:56:40 +00:00
2017-01-07 01:30:33 +00:00
return err
2016-06-07 21:28:28 +00:00
}
2016-10-20 18:26:04 +00:00
const stopTimeout = 8 * time . Second
2016-06-07 21:28:28 +00:00
// Stop stops the manager. It immediately closes all open connections and
2017-07-31 18:21:04 +00:00
// active RPCs as well as stopping the manager's subsystems. If clearData is
// set, the raft logs, snapshots, and keys will be erased.
2017-01-07 01:30:33 +00:00
func ( m * Manager ) Stop ( ctx context . Context , clearData bool ) {
2016-06-07 21:28:28 +00:00
log . G ( ctx ) . Info ( "Stopping manager" )
2016-07-28 04:17:00 +00:00
// It's not safe to start shutting down while the manager is still
// starting up.
<- m . started
2017-01-20 01:18:22 +00:00
// the mutex stops us from trying to stop while we're already stopping, or
2016-06-07 21:28:28 +00:00
// from returning before we've finished stopping.
m . mu . Lock ( )
defer m . mu . Unlock ( )
2016-11-02 18:43:27 +00:00
if m . stopped {
2016-06-07 21:28:28 +00:00
return
}
2016-11-02 18:43:27 +00:00
m . stopped = true
2016-06-07 21:28:28 +00:00
2016-10-20 18:26:04 +00:00
srvDone , localSrvDone := make ( chan struct { } ) , make ( chan struct { } )
go func ( ) {
m . server . GracefulStop ( )
close ( srvDone )
} ( )
go func ( ) {
m . localserver . GracefulStop ( )
close ( localSrvDone )
} ( )
2016-06-07 21:28:28 +00:00
2017-01-07 01:30:33 +00:00
m . raftNode . Cancel ( )
2017-06-12 17:27:11 +00:00
if m . collector != nil {
m . collector . Stop ( )
}
2018-03-26 19:29:15 +00:00
// The following components are gRPC services that are
// registered when creating the manager and will need
// to be re-registered if they are recreated.
// For simplicity, they are not nilled out.
2016-10-20 18:26:04 +00:00
m . dispatcher . Stop ( )
2016-11-04 19:11:41 +00:00
m . logbroker . Stop ( )
2017-07-31 18:21:04 +00:00
m . watchServer . Stop ( )
2016-06-07 21:28:28 +00:00
m . caserver . Stop ( )
if m . allocator != nil {
m . allocator . Stop ( )
}
if m . replicatedOrchestrator != nil {
m . replicatedOrchestrator . Stop ( )
}
if m . globalOrchestrator != nil {
m . globalOrchestrator . Stop ( )
}
if m . taskReaper != nil {
m . taskReaper . Stop ( )
}
2016-10-20 18:26:04 +00:00
if m . constraintEnforcer != nil {
m . constraintEnforcer . Stop ( )
}
2016-06-07 21:28:28 +00:00
if m . scheduler != nil {
m . scheduler . Stop ( )
}
2017-01-07 01:30:33 +00:00
if m . roleManager != nil {
m . roleManager . Stop ( )
}
2016-06-07 21:28:28 +00:00
if m . keyManager != nil {
m . keyManager . Stop ( )
}
2017-01-07 01:30:33 +00:00
if clearData {
m . raftNode . ClearData ( )
}
2016-11-02 18:43:27 +00:00
m . cancelFunc ( )
2016-10-20 18:26:04 +00:00
<- m . raftNode . Done ( )
timer := time . AfterFunc ( stopTimeout , func ( ) {
m . server . Stop ( )
m . localserver . Stop ( )
} )
defer timer . Stop ( )
// TODO: we're not waiting on ctx because it very well could be passed from Run,
// which is already cancelled here. We need to refactor that.
select {
case <- srvDone :
<- localSrvDone
case <- localSrvDone :
<- srvDone
}
2016-06-07 21:28:28 +00:00
log . G ( ctx ) . Info ( "Manager shut down" )
// mutex is released and Run can return now
}
2016-10-21 19:53:24 +00:00
func ( m * Manager ) updateKEK ( ctx context . Context , cluster * api . Cluster ) error {
securityConfig := m . config . SecurityConfig
nodeID := m . config . SecurityConfig . ClientTLSCreds . NodeID ( )
logger := log . G ( ctx ) . WithFields ( logrus . Fields {
"node.id" : nodeID ,
"node.role" : ca . ManagerRole ,
} )
kekData := ca . KEKData { Version : cluster . Meta . Version . Index }
for _ , encryptionKey := range cluster . UnlockKeys {
if encryptionKey . Subsystem == ca . ManagerRole {
kekData . KEK = encryptionKey . Key
break
}
}
updated , unlockedToLocked , err := m . dekRotator . MaybeUpdateKEK ( kekData )
if err != nil {
logger . WithError ( err ) . Errorf ( "failed to re-encrypt TLS key with a new KEK" )
return err
}
if updated {
logger . Debug ( "successfully rotated KEK" )
}
if unlockedToLocked {
// a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
// don't wait because it might take a bit
go func ( ) {
2017-01-10 21:00:11 +00:00
insecureCreds := credentials . NewTLS ( & tls . Config { InsecureSkipVerify : true } )
conn , err := grpc . Dial (
m . config . ControlAPI ,
2017-02-27 19:51:00 +00:00
grpc . WithUnaryInterceptor ( grpc_prometheus . UnaryClientInterceptor ) ,
grpc . WithStreamInterceptor ( grpc_prometheus . StreamClientInterceptor ) ,
2017-01-10 21:00:11 +00:00
grpc . WithTransportCredentials ( insecureCreds ) ,
grpc . WithDialer (
func ( addr string , timeout time . Duration ) ( net . Conn , error ) {
return xnet . DialTimeoutLocal ( addr , timeout )
} ) ,
)
if err != nil {
logger . WithError ( err ) . Error ( "failed to connect to local manager socket after locking the cluster" )
return
}
defer conn . Close ( )
connBroker := connectionbroker . New ( remotes . NewRemotes ( ) )
connBroker . SetLocalConn ( conn )
2017-06-15 18:11:48 +00:00
if err := ca . RenewTLSConfigNow ( ctx , securityConfig , connBroker , m . config . RootCAPaths ) ; err != nil {
2017-01-10 21:00:11 +00:00
logger . WithError ( err ) . Error ( "failed to download new TLS certificate after locking the cluster" )
2016-10-21 19:53:24 +00:00
}
} ( )
}
return nil
}
2017-03-04 00:24:15 +00:00
func ( m * Manager ) watchForClusterChanges ( ctx context . Context ) error {
2016-10-21 19:53:24 +00:00
clusterID := m . config . SecurityConfig . ClientTLSCreds . Organization ( )
2017-07-31 18:21:04 +00:00
var cluster * api . Cluster
2016-10-21 19:53:24 +00:00
clusterWatch , clusterWatchCancel , err := store . ViewAndWatch ( m . raftNode . MemoryStore ( ) ,
func ( tx store . ReadTx ) error {
2017-07-31 18:21:04 +00:00
cluster = store . GetCluster ( tx , clusterID )
2016-10-21 19:53:24 +00:00
if cluster == nil {
return fmt . Errorf ( "unable to get current cluster" )
}
2017-07-31 18:21:04 +00:00
return nil
2016-10-21 19:53:24 +00:00
} ,
2017-03-28 18:51:33 +00:00
api . EventUpdateCluster {
2016-10-21 19:53:24 +00:00
Cluster : & api . Cluster { ID : clusterID } ,
2017-03-30 23:12:33 +00:00
Checks : [ ] api . ClusterCheckFunc { api . ClusterCheckID } ,
2016-10-21 19:53:24 +00:00
} ,
)
if err != nil {
return err
}
2017-07-31 18:21:04 +00:00
if err := m . updateKEK ( ctx , cluster ) ; err != nil {
return err
}
2016-10-21 19:53:24 +00:00
go func ( ) {
for {
select {
case event := <- clusterWatch :
2017-03-28 18:51:33 +00:00
clusterEvent := event . ( api . EventUpdateCluster )
2016-10-21 19:53:24 +00:00
m . updateKEK ( ctx , clusterEvent . Cluster )
case <- ctx . Done ( ) :
clusterWatchCancel ( )
return
}
}
} ( )
return nil
}
2018-03-26 19:29:15 +00:00
// getLeaderNodeID is a small helper function returning a string with the
// leader's node ID. it is only used for logging, and should not be relied on
// to give a node ID for actual operational purposes (because it returns errors
// as nicely decorated strings)
func ( m * Manager ) getLeaderNodeID ( ) string {
// get the current leader ID. this variable tracks the leader *only* for
// the purposes of logging leadership changes, and should not be relied on
// for other purposes
leader , leaderErr := m . raftNode . Leader ( )
switch leaderErr {
case raft . ErrNoRaftMember :
// this is an unlikely case, but we have to handle it. this means this
// node is not a member of the raft quorum. this won't look very pretty
// in logs ("leadership changed from aslkdjfa to ErrNoRaftMember") but
// it also won't be very common
return "not yet part of a raft cluster"
case raft . ErrNoClusterLeader :
return "no cluster leader"
default :
id , err := m . raftNode . GetNodeIDByRaftID ( leader )
// the only possible error here is "ErrMemberUnknown"
if err != nil {
return "an unknown node"
2016-06-07 21:28:28 +00:00
}
2018-03-26 19:29:15 +00:00
return id
}
2016-06-07 21:28:28 +00:00
}
2016-08-18 05:43:33 +00:00
2016-08-23 05:30:01 +00:00
// handleLeadershipEvents handles the is leader event or is follower event.
2016-08-18 05:43:33 +00:00
func ( m * Manager ) handleLeadershipEvents ( ctx context . Context , leadershipCh chan events . Event ) {
2018-03-26 19:29:15 +00:00
// get the current leader and save it for logging leadership changes in
// this loop
oldLeader := m . getLeaderNodeID ( )
2016-08-23 05:30:01 +00:00
for {
2016-08-18 05:43:33 +00:00
select {
2016-08-23 05:30:01 +00:00
case leadershipEvent := <- leadershipCh :
m . mu . Lock ( )
2016-11-02 18:43:27 +00:00
if m . stopped {
2016-08-23 05:30:01 +00:00
m . mu . Unlock ( )
return
}
newState := leadershipEvent . ( raft . LeadershipState )
2016-08-18 05:43:33 +00:00
2016-08-23 05:30:01 +00:00
if newState == raft . IsLeader {
m . becomeLeader ( ctx )
2017-07-31 18:21:04 +00:00
leaderMetric . Set ( 1 )
2016-08-23 05:30:01 +00:00
} else if newState == raft . IsFollower {
m . becomeFollower ( )
2017-07-31 18:21:04 +00:00
leaderMetric . Set ( 0 )
2016-08-23 05:30:01 +00:00
}
m . mu . Unlock ( )
2018-03-26 19:29:15 +00:00
newLeader := m . getLeaderNodeID ( )
// maybe we should use logrus fields for old and new leader, so
// that users are better able to ingest leadership changes into log
// aggregators?
log . G ( ctx ) . Infof ( "leadership changed from %v to %v" , oldLeader , newLeader )
2016-08-23 05:30:01 +00:00
case <- ctx . Done ( ) :
return
2016-08-18 05:43:33 +00:00
}
}
}
// serveListener serves a listener for local and non local connections.
2017-01-23 23:50:10 +00:00
func ( m * Manager ) serveListener ( ctx context . Context , lCh <- chan net . Listener ) {
var l net . Listener
select {
case l = <- lCh :
case <- ctx . Done ( ) :
return
}
2016-08-18 05:43:33 +00:00
ctx = log . WithLogger ( ctx , log . G ( ctx ) . WithFields (
logrus . Fields {
2016-11-02 18:43:27 +00:00
"proto" : l . Addr ( ) . Network ( ) ,
"addr" : l . Addr ( ) . String ( ) ,
} ) )
if _ , ok := l . ( * net . TCPListener ) ; ! ok {
2016-08-18 05:43:33 +00:00
log . G ( ctx ) . Info ( "Listening for local connections" )
// we need to disallow double closes because UnixListener.Close
// can delete unix-socket file of newer listener. grpc calls
// Close twice indeed: in Serve and in Stop.
2017-01-23 23:50:10 +00:00
m . errServe <- m . localserver . Serve ( & closeOnceListener { Listener : l } )
2016-08-18 05:43:33 +00:00
} else {
log . G ( ctx ) . Info ( "Listening for connections" )
2017-01-23 23:50:10 +00:00
m . errServe <- m . server . Serve ( l )
2016-08-18 05:43:33 +00:00
}
}
// becomeLeader starts the subsystems that are run on the leader.
func ( m * Manager ) becomeLeader ( ctx context . Context ) {
2016-10-20 18:26:04 +00:00
s := m . raftNode . MemoryStore ( )
2016-08-18 05:43:33 +00:00
rootCA := m . config . SecurityConfig . RootCA ( )
nodeID := m . config . SecurityConfig . ClientTLSCreds . NodeID ( )
raftCfg := raft . DefaultRaftConfig ( )
2016-10-20 18:26:04 +00:00
raftCfg . ElectionTick = uint32 ( m . raftNode . Config . ElectionTick )
raftCfg . HeartbeatTick = uint32 ( m . raftNode . Config . HeartbeatTick )
2016-08-18 05:43:33 +00:00
clusterID := m . config . SecurityConfig . ClientTLSCreds . Organization ( )
initialCAConfig := ca . DefaultCAConfig ( )
initialCAConfig . ExternalCAs = m . config . ExternalCAs
2018-03-26 19:29:15 +00:00
var (
unlockKeys [ ] * api . EncryptionKey
err error
)
2016-10-21 19:53:24 +00:00
if m . config . AutoLockManagers {
unlockKeys = [ ] * api . EncryptionKey { {
Subsystem : ca . ManagerRole ,
Key : m . config . UnlockKey ,
} }
}
2016-08-18 05:43:33 +00:00
s . Update ( func ( tx store . Tx ) error {
// Add a default cluster object to the
// store. Don't check the error because
// we expect this to fail unless this
// is a brand new cluster.
2018-07-30 15:25:02 +00:00
clusterObj := defaultClusterObject (
2016-10-21 19:53:24 +00:00
clusterID ,
initialCAConfig ,
raftCfg ,
api . EncryptionConfig { AutoLockManagers : m . config . AutoLockManagers } ,
unlockKeys ,
2018-04-18 21:55:50 +00:00
rootCA ,
2018-07-30 15:25:02 +00:00
m . config . FIPS ,
nil ,
2018-11-20 21:48:47 +00:00
0 ,
2018-07-30 15:25:02 +00:00
0 )
// If defaultAddrPool is valid we update cluster object with new value
2018-11-20 21:48:47 +00:00
// If VXLANUDPPort is not 0 then we call update cluster object with new value
if m . config . NetworkConfig != nil {
if m . config . NetworkConfig . DefaultAddrPool != nil {
clusterObj . DefaultAddressPool = m . config . NetworkConfig . DefaultAddrPool
clusterObj . SubnetSize = m . config . NetworkConfig . SubnetSize
}
2018-07-30 15:25:02 +00:00
2018-11-20 21:48:47 +00:00
if m . config . NetworkConfig . VXLANUDPPort != 0 {
clusterObj . VXLANUDPPort = m . config . NetworkConfig . VXLANUDPPort
}
}
2018-07-30 15:25:02 +00:00
err := store . CreateCluster ( tx , clusterObj )
2017-12-05 00:38:37 +00:00
if err != nil && err != store . ErrExist {
log . G ( ctx ) . WithError ( err ) . Errorf ( "error creating cluster object" )
}
2016-08-18 05:43:33 +00:00
// Add Node entry for ourself, if one
// doesn't exist already.
2018-11-20 21:48:47 +00:00
freshCluster := nil == store . CreateNode ( tx , managerNode ( nodeID , m . config . Availability , clusterObj . VXLANUDPPort ) )
2017-03-13 21:45:06 +00:00
if freshCluster {
// This is a fresh swarm cluster. Add to store now any initial
// cluster resource, like the default ingress network which
// provides the routing mesh for this cluster.
log . G ( ctx ) . Info ( "Creating default ingress network" )
if err := store . CreateNetwork ( tx , newIngressNetwork ( ) ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "failed to create default ingress network" )
}
2017-06-15 18:11:48 +00:00
}
// Create now the static predefined if the store does not contain predefined
2018-04-17 20:35:35 +00:00
// networks like bridge/host node-local networks which
2017-06-15 18:11:48 +00:00
// are known to be present in each cluster node. This is needed
// in order to allow running services on the predefined docker
// networks like `bridge` and `host`.
for _ , p := range allocator . PredefinedNetworks ( ) {
2018-08-03 16:49:44 +00:00
if err := store . CreateNetwork ( tx , newPredefinedNetwork ( p . Name , p . Driver ) ) ; err != nil && err != store . ErrNameConflict {
2018-04-17 20:35:35 +00:00
log . G ( ctx ) . WithError ( err ) . Error ( "failed to create predefined network " + p . Name )
2017-05-16 22:10:53 +00:00
}
2017-03-13 21:45:06 +00:00
}
2016-08-18 05:43:33 +00:00
return nil
} )
2016-10-26 13:35:48 +00:00
m . replicatedOrchestrator = replicated . NewReplicatedOrchestrator ( s )
m . constraintEnforcer = constraintenforcer . New ( s )
m . globalOrchestrator = global . NewGlobalOrchestrator ( s )
m . taskReaper = taskreaper . New ( s )
2016-08-18 05:43:33 +00:00
m . scheduler = scheduler . New ( s )
2016-08-23 05:30:01 +00:00
m . keyManager = keymanager . New ( s , keymanager . DefaultConfig ( ) )
2017-01-07 01:30:33 +00:00
m . roleManager = newRoleManager ( s , m . raftNode )
2016-08-18 05:43:33 +00:00
// TODO(stevvooe): Allocate a context that can be used to
2018-11-20 21:48:47 +00:00
// shutdown underlying manager processes when leadership isTestUpdaterRollback
2016-08-18 05:43:33 +00:00
// lost.
2018-07-30 15:25:02 +00:00
// If DefaultAddrPool is null, Read from store and check if
// DefaultAddrPool info is stored in cluster object
2018-11-20 21:48:47 +00:00
// If VXLANUDPPort is 0, read it from the store - cluster object
if m . config . NetworkConfig == nil || m . config . NetworkConfig . DefaultAddrPool == nil || m . config . NetworkConfig . VXLANUDPPort == 0 {
2018-07-30 15:25:02 +00:00
var cluster * api . Cluster
s . View ( func ( tx store . ReadTx ) {
cluster = store . GetCluster ( tx , clusterID )
} )
if cluster . DefaultAddressPool != nil {
2018-10-11 21:03:18 +00:00
m . config . NetworkConfig . DefaultAddrPool = append ( m . config . NetworkConfig . DefaultAddrPool , cluster . DefaultAddressPool ... )
2018-08-30 03:28:22 +00:00
m . config . NetworkConfig . SubnetSize = cluster . SubnetSize
2018-07-30 15:25:02 +00:00
}
2018-11-20 21:48:47 +00:00
if cluster . VXLANUDPPort != 0 {
m . config . NetworkConfig . VXLANUDPPort = cluster . VXLANUDPPort
}
2018-07-30 15:25:02 +00:00
}
2018-08-30 03:28:22 +00:00
m . allocator , err = allocator . New ( s , m . config . PluginGetter , m . config . NetworkConfig )
2016-08-18 05:43:33 +00:00
if err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "failed to create allocator" )
// TODO(stevvooe): It doesn't seem correct here to fail
// creating the allocator but then use it anyway.
}
if m . keyManager != nil {
go func ( keyManager * keymanager . KeyManager ) {
if err := keyManager . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "keymanager failed with an error" )
}
} ( m . keyManager )
}
go func ( d * dispatcher . Dispatcher ) {
2018-03-26 19:29:15 +00:00
// Initialize the dispatcher.
d . Init ( m . raftNode , dispatcher . DefaultConfig ( ) , drivers . New ( m . config . PluginGetter ) , m . config . SecurityConfig )
2016-08-18 05:43:33 +00:00
if err := d . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "Dispatcher exited with an error" )
}
2016-10-20 18:26:04 +00:00
} ( m . dispatcher )
2016-08-18 05:43:33 +00:00
2017-07-31 18:21:04 +00:00
if err := m . logbroker . Start ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "LogBroker failed to start" )
}
2016-11-04 19:11:41 +00:00
2016-08-18 05:43:33 +00:00
go func ( server * ca . Server ) {
if err := server . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "CA signer exited with an error" )
}
} ( m . caserver )
// Start all sub-components in separate goroutines.
// TODO(aluzzardi): This should have some kind of error handling so that
// any component that goes down would bring the entire manager down.
if m . allocator != nil {
go func ( allocator * allocator . Allocator ) {
if err := allocator . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "allocator exited with an error" )
}
} ( m . allocator )
}
go func ( scheduler * scheduler . Scheduler ) {
if err := scheduler . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "scheduler exited with an error" )
}
} ( m . scheduler )
2016-10-26 13:35:48 +00:00
go func ( constraintEnforcer * constraintenforcer . ConstraintEnforcer ) {
2016-10-20 18:26:04 +00:00
constraintEnforcer . Run ( )
} ( m . constraintEnforcer )
2016-10-26 13:35:48 +00:00
go func ( taskReaper * taskreaper . TaskReaper ) {
2017-06-15 18:11:48 +00:00
taskReaper . Run ( ctx )
2016-08-18 05:43:33 +00:00
} ( m . taskReaper )
2016-10-26 13:35:48 +00:00
go func ( orchestrator * replicated . Orchestrator ) {
2016-08-18 05:43:33 +00:00
if err := orchestrator . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "replicated orchestrator exited with an error" )
}
} ( m . replicatedOrchestrator )
2016-10-26 13:35:48 +00:00
go func ( globalOrchestrator * global . Orchestrator ) {
2016-08-18 05:43:33 +00:00
if err := globalOrchestrator . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "global orchestrator exited with an error" )
}
} ( m . globalOrchestrator )
2017-01-07 01:30:33 +00:00
go func ( roleManager * roleManager ) {
2017-03-10 19:15:44 +00:00
roleManager . Run ( ctx )
2017-01-07 01:30:33 +00:00
} ( m . roleManager )
2016-08-18 05:43:33 +00:00
}
// becomeFollower shuts down the subsystems that are only run by the leader.
func ( m * Manager ) becomeFollower ( ) {
2018-03-26 19:29:15 +00:00
// The following components are gRPC services that are
// registered when creating the manager and will need
// to be re-registered if they are recreated.
// For simplicity, they are not nilled out.
2016-10-20 18:26:04 +00:00
m . dispatcher . Stop ( )
2016-11-04 19:11:41 +00:00
m . logbroker . Stop ( )
2016-08-18 05:43:33 +00:00
m . caserver . Stop ( )
if m . allocator != nil {
m . allocator . Stop ( )
m . allocator = nil
}
2016-10-20 18:26:04 +00:00
m . constraintEnforcer . Stop ( )
m . constraintEnforcer = nil
2016-08-18 05:43:33 +00:00
m . replicatedOrchestrator . Stop ( )
m . replicatedOrchestrator = nil
m . globalOrchestrator . Stop ( )
m . globalOrchestrator = nil
m . taskReaper . Stop ( )
m . taskReaper = nil
m . scheduler . Stop ( )
m . scheduler = nil
2017-01-07 01:30:33 +00:00
m . roleManager . Stop ( )
m . roleManager = nil
2016-08-18 05:43:33 +00:00
if m . keyManager != nil {
m . keyManager . Stop ( )
m . keyManager = nil
}
}
// defaultClusterObject creates a default cluster.
2016-10-21 19:53:24 +00:00
func defaultClusterObject (
clusterID string ,
initialCAConfig api . CAConfig ,
raftCfg api . RaftConfig ,
encryptionConfig api . EncryptionConfig ,
initialUnlockKeys [ ] * api . EncryptionKey ,
2018-04-18 21:55:50 +00:00
rootCA * ca . RootCA ,
2018-07-30 15:25:02 +00:00
fips bool ,
defaultAddressPool [ ] string ,
2018-11-20 21:48:47 +00:00
subnetSize uint32 ,
vxlanUDPPort uint32 ) * api . Cluster {
2017-03-04 00:24:15 +00:00
var caKey [ ] byte
2017-03-28 18:51:33 +00:00
if rcaSigner , err := rootCA . Signer ( ) ; err == nil {
caKey = rcaSigner . Key
2017-03-04 00:24:15 +00:00
}
2016-10-21 19:53:24 +00:00
2016-08-18 05:43:33 +00:00
return & api . Cluster {
ID : clusterID ,
Spec : api . ClusterSpec {
Annotations : api . Annotations {
Name : store . DefaultClusterName ,
} ,
Orchestration : api . OrchestrationConfig {
TaskHistoryRetentionLimit : defaultTaskHistoryRetentionLimit ,
} ,
Dispatcher : api . DispatcherConfig {
2017-01-23 23:50:10 +00:00
HeartbeatPeriod : gogotypes . DurationProto ( dispatcher . DefaultHeartBeatPeriod ) ,
2016-08-18 05:43:33 +00:00
} ,
2016-10-21 19:53:24 +00:00
Raft : raftCfg ,
CAConfig : initialCAConfig ,
EncryptionConfig : encryptionConfig ,
2016-08-18 05:43:33 +00:00
} ,
RootCA : api . RootCA {
2017-03-04 00:24:15 +00:00
CAKey : caKey ,
2017-03-28 18:51:33 +00:00
CACert : rootCA . Certs ,
2016-08-18 05:43:33 +00:00
CACertHash : rootCA . Digest . String ( ) ,
JoinTokens : api . JoinTokens {
2018-04-18 21:55:50 +00:00
Worker : ca . GenerateJoinToken ( rootCA , fips ) ,
Manager : ca . GenerateJoinToken ( rootCA , fips ) ,
2016-08-18 05:43:33 +00:00
} ,
} ,
2018-07-30 15:25:02 +00:00
UnlockKeys : initialUnlockKeys ,
FIPS : fips ,
DefaultAddressPool : defaultAddressPool ,
2018-08-30 03:28:22 +00:00
SubnetSize : subnetSize ,
2018-11-20 21:48:47 +00:00
VXLANUDPPort : vxlanUDPPort ,
2016-08-18 05:43:33 +00:00
}
}
// managerNode creates a new node with NodeRoleManager role.
2018-11-20 21:48:47 +00:00
func managerNode ( nodeID string , availability api . NodeSpec_Availability , vxlanPort uint32 ) * api . Node {
2016-08-18 05:43:33 +00:00
return & api . Node {
ID : nodeID ,
Certificate : api . Certificate {
CN : nodeID ,
Role : api . NodeRoleManager ,
Status : api . IssuanceStatus {
State : api . IssuanceStateIssued ,
} ,
} ,
Spec : api . NodeSpec {
2017-01-07 01:30:33 +00:00
DesiredRole : api . NodeRoleManager ,
2016-11-22 21:45:45 +00:00
Membership : api . NodeMembershipAccepted ,
Availability : availability ,
2016-08-18 05:43:33 +00:00
} ,
2018-11-20 21:48:47 +00:00
VXLANUDPPort : vxlanPort ,
2016-08-18 05:43:33 +00:00
}
}
2017-03-13 21:45:06 +00:00
// newIngressNetwork returns the network object for the default ingress
// network, the network which provides the routing mesh. Caller will save to
// store this object once, at fresh cluster creation. It is expected to
// call this function inside a store update transaction.
func newIngressNetwork ( ) * api . Network {
return & api . Network {
ID : identity . NewID ( ) ,
Spec : api . NetworkSpec {
Ingress : true ,
Annotations : api . Annotations {
Name : "ingress" ,
} ,
DriverConfig : & api . Driver { } ,
IPAM : & api . IPAMOptions {
Driver : & api . Driver { } ,
Configs : [ ] * api . IPAMConfig {
{
Subnet : "10.255.0.0/16" ,
} ,
} ,
} ,
} ,
}
}
2017-05-16 22:10:53 +00:00
// Creates a network object representing one of the predefined networks
// known to be statically created on the cluster nodes. These objects
// are populated in the store at cluster creation solely in order to
// support running services on the nodes' predefined networks.
// External clients can filter these predefined networks by looking
// at the predefined label.
func newPredefinedNetwork ( name , driver string ) * api . Network {
return & api . Network {
ID : identity . NewID ( ) ,
Spec : api . NetworkSpec {
Annotations : api . Annotations {
Name : name ,
Labels : map [ string ] string {
networkallocator . PredefinedLabel : "true" ,
} ,
} ,
DriverConfig : & api . Driver { Name : driver } ,
} ,
}
}