package manager import ( "crypto/tls" "crypto/x509" "encoding/pem" "fmt" "net" "os" "path/filepath" "runtime" "sync" "syscall" "time" "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/plugingetter" "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/allocator" "github.com/docker/swarmkit/manager/controlapi" "github.com/docker/swarmkit/manager/dispatcher" "github.com/docker/swarmkit/manager/health" "github.com/docker/swarmkit/manager/keymanager" "github.com/docker/swarmkit/manager/logbroker" "github.com/docker/swarmkit/manager/orchestrator/constraintenforcer" "github.com/docker/swarmkit/manager/orchestrator/global" "github.com/docker/swarmkit/manager/orchestrator/replicated" "github.com/docker/swarmkit/manager/orchestrator/taskreaper" "github.com/docker/swarmkit/manager/resourceapi" "github.com/docker/swarmkit/manager/scheduler" "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/raft" "github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/remotes" "github.com/docker/swarmkit/xnet" gogotypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) const ( // defaultTaskHistoryRetentionLimit is the number of tasks to keep. defaultTaskHistoryRetentionLimit = 5 ) // RemoteAddrs provides a listening address and an optional advertise address // for serving the remote API. type RemoteAddrs struct { // Address to bind ListenAddr string // Address to advertise to remote nodes (optional). AdvertiseAddr string } // Config is used to tune the Manager. type Config struct { SecurityConfig *ca.SecurityConfig // ExternalCAs is a list of initial CAs to which a manager node // will make certificate signing requests for node certificates. ExternalCAs []*api.ExternalCA // ControlAPI is an address for serving the control API. ControlAPI string // RemoteAPI is a listening address for serving the remote API, and // an optional advertise address. RemoteAPI *RemoteAddrs // JoinRaft is an optional address of a node in an existing raft // cluster to join. JoinRaft string // Top-level state directory StateDir string // ForceNewCluster defines if we have to force a new cluster // because we are recovering from a backup data directory. ForceNewCluster bool // ElectionTick defines the amount of ticks needed without // leader to trigger a new election ElectionTick uint32 // HeartbeatTick defines the amount of ticks between each // heartbeat sent to other members for health-check purposes HeartbeatTick uint32 // AutoLockManagers determines whether or not managers require an unlock key // when starting from a stopped state. This configuration parameter is only // applicable when bootstrapping a new cluster for the first time. AutoLockManagers bool // UnlockKey is the key to unlock a node - used for decrypting manager TLS keys // as well as the raft data encryption key (DEK). It is applicable when // bootstrapping a cluster for the first time (it's a cluster-wide setting), // and also when loading up any raft data on disk (as a KEK for the raft DEK). UnlockKey []byte // Availability allows a user to control the current scheduling status of a node Availability api.NodeSpec_Availability // PluginGetter provides access to docker's plugin inventory. PluginGetter plugingetter.PluginGetter } // Manager is the cluster manager for Swarm. // This is the high-level object holding and initializing all the manager // subsystems. type Manager struct { config Config caserver *ca.Server dispatcher *dispatcher.Dispatcher logbroker *logbroker.LogBroker replicatedOrchestrator *replicated.Orchestrator globalOrchestrator *global.Orchestrator taskReaper *taskreaper.TaskReaper constraintEnforcer *constraintenforcer.ConstraintEnforcer scheduler *scheduler.Scheduler allocator *allocator.Allocator keyManager *keymanager.KeyManager server *grpc.Server localserver *grpc.Server raftNode *raft.Node dekRotator *RaftDEKManager roleManager *roleManager cancelFunc context.CancelFunc // mu is a general mutex used to coordinate starting/stopping and // leadership events. mu sync.Mutex // addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI addrMu sync.Mutex started chan struct{} stopped bool remoteListener chan net.Listener controlListener chan net.Listener errServe chan error } type closeOnceListener struct { once sync.Once net.Listener } func (l *closeOnceListener) Close() error { var err error l.once.Do(func() { err = l.Listener.Close() }) return err } // New creates a Manager which has not started to accept requests yet. func New(config *Config) (*Manager, error) { err := os.MkdirAll(config.StateDir, 0700) if err != nil { return nil, errors.Wrap(err, "failed to create state directory") } raftStateDir := filepath.Join(config.StateDir, "raft") err = os.MkdirAll(raftStateDir, 0700) if err != nil { return nil, errors.Wrap(err, "failed to create raft state directory") } raftCfg := raft.DefaultNodeConfig() if config.ElectionTick > 0 { raftCfg.ElectionTick = int(config.ElectionTick) } if config.HeartbeatTick > 0 { raftCfg.HeartbeatTick = int(config.HeartbeatTick) } dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter()) if err != nil { return nil, err } newNodeOpts := raft.NodeOptions{ ID: config.SecurityConfig.ClientTLSCreds.NodeID(), JoinAddr: config.JoinRaft, Config: raftCfg, StateDir: raftStateDir, ForceNewCluster: config.ForceNewCluster, TLSCredentials: config.SecurityConfig.ClientTLSCreds, KeyRotator: dekRotator, } raftNode := raft.NewNode(newNodeOpts) opts := []grpc.ServerOption{ grpc.Creds(config.SecurityConfig.ServerTLSCreds)} m := &Manager{ config: *config, caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()), logbroker: logbroker.New(raftNode.MemoryStore()), server: grpc.NewServer(opts...), localserver: grpc.NewServer(opts...), raftNode: raftNode, started: make(chan struct{}), dekRotator: dekRotator, remoteListener: make(chan net.Listener, 1), controlListener: make(chan net.Listener, 1), errServe: make(chan error, 2), } if config.ControlAPI != "" { m.config.ControlAPI = "" if err := m.BindControl(config.ControlAPI); err != nil { return nil, err } } if config.RemoteAPI != nil { m.config.RemoteAPI = nil // The context isn't used in this case (before (*Manager).Run). if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil { if config.ControlAPI != "" { l := <-m.controlListener l.Close() } return nil, err } } return m, nil } // BindControl binds a local socket for the control API. func (m *Manager) BindControl(addr string) error { m.addrMu.Lock() defer m.addrMu.Unlock() if m.config.ControlAPI != "" { return errors.New("manager already has a control API address") } // don't create a socket directory if we're on windows. we used named pipe if runtime.GOOS != "windows" { err := os.MkdirAll(filepath.Dir(addr), 0700) if err != nil { return errors.Wrap(err, "failed to create socket directory") } } l, err := xnet.ListenLocal(addr) // A unix socket may fail to bind if the file already // exists. Try replacing the file. if runtime.GOOS != "windows" { unwrappedErr := err if op, ok := unwrappedErr.(*net.OpError); ok { unwrappedErr = op.Err } if sys, ok := unwrappedErr.(*os.SyscallError); ok { unwrappedErr = sys.Err } if unwrappedErr == syscall.EADDRINUSE { os.Remove(addr) l, err = xnet.ListenLocal(addr) } } if err != nil { return errors.Wrap(err, "failed to listen on control API address") } m.config.ControlAPI = addr m.controlListener <- l return nil } // BindRemote binds a port for the remote API. func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error { m.addrMu.Lock() defer m.addrMu.Unlock() if m.config.RemoteAPI != nil { return errors.New("manager already has remote API address") } // If an AdvertiseAddr was specified, we use that as our // externally-reachable address. advertiseAddr := addrs.AdvertiseAddr var advertiseAddrPort string if advertiseAddr == "" { // Otherwise, we know we are joining an existing swarm. Use a // wildcard address to trigger remote autodetection of our // address. var err error _, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr) if err != nil { return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr) } // Even with an IPv6 listening address, it's okay to use // 0.0.0.0 here. Any "unspecified" (wildcard) IP will // be substituted with the actual source address. advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort) } l, err := net.Listen("tcp", addrs.ListenAddr) if err != nil { return errors.Wrap(err, "failed to listen on remote API address") } if advertiseAddrPort == "0" { advertiseAddr = l.Addr().String() addrs.ListenAddr = advertiseAddr } m.config.RemoteAPI = &addrs m.raftNode.SetAddr(ctx, advertiseAddr) m.remoteListener <- l return nil } // RemovedFromRaft returns a channel that's closed if the manager is removed // from the raft cluster. This should be used to trigger a manager shutdown. func (m *Manager) RemovedFromRaft() <-chan struct{} { return m.raftNode.RemovedFromRaft } // Addr returns tcp address on which remote api listens. func (m *Manager) Addr() string { m.addrMu.Lock() defer m.addrMu.Unlock() if m.config.RemoteAPI == nil { return "" } return m.config.RemoteAPI.ListenAddr } // Run starts all manager sub-systems and the gRPC server at the configured // address. // The call never returns unless an error occurs or `Stop()` is called. func (m *Manager) Run(parent context.Context) error { ctx, ctxCancel := context.WithCancel(parent) defer ctxCancel() m.cancelFunc = ctxCancel leadershipCh, cancel := m.raftNode.SubscribeLeadership() defer cancel() go m.handleLeadershipEvents(ctx, leadershipCh) authorize := func(ctx context.Context, roles []string) error { var ( blacklistedCerts map[string]*api.BlacklistedCertificate clusters []*api.Cluster err error ) m.raftNode.MemoryStore().View(func(readTx store.ReadTx) { clusters, err = store.FindClusters(readTx, store.ByName("default")) }) // Not having a cluster object yet means we can't check // the blacklist. if err == nil && len(clusters) == 1 { blacklistedCerts = clusters[0].BlacklistedCertificates } // Authorize the remote roles, ensure they can only be forwarded by managers _, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts) return err } baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig.RootCA(), m.config.PluginGetter) baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore()) healthServer := health.NewHealthServer() localHealthServer := health.NewHealthServer() authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize) authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize) authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize) authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize) authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize) authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize) authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize) authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize) authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize) authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize) proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo) proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo) proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo) proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo) proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo) proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo) // The following local proxies are only wired up to receive requests // from a trusted local socket, and these requests don't use TLS, // therefore the requests they handle locally should bypass // authorization. When requests are proxied from these servers, they // are sent as requests from this manager rather than forwarded // requests (it has no TLS information to put in the metadata map). forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil } handleRequestLocally := func(ctx context.Context) (context.Context, error) { remoteAddr := "127.0.0.1:0" m.addrMu.Lock() if m.config.RemoteAPI != nil { if m.config.RemoteAPI.AdvertiseAddr != "" { remoteAddr = m.config.RemoteAPI.AdvertiseAddr } else { remoteAddr = m.config.RemoteAPI.ListenAddr } } m.addrMu.Unlock() creds := m.config.SecurityConfig.ClientTLSCreds nodeInfo := ca.RemoteNodeInfo{ Roles: []string{creds.Role()}, Organization: creds.Organization(), NodeID: creds.NodeID(), RemoteAddr: remoteAddr, } return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil } localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest) localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest) localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest) localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest) localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest) localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest) localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest) // Everything registered on m.server should be an authenticated // wrapper, or a proxy wrapping an authenticated wrapper! api.RegisterCAServer(m.server, proxyCAAPI) api.RegisterNodeCAServer(m.server, proxyNodeCAAPI) api.RegisterRaftServer(m.server, authenticatedRaftAPI) api.RegisterHealthServer(m.server, authenticatedHealthAPI) api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI) api.RegisterControlServer(m.server, authenticatedControlAPI) api.RegisterLogsServer(m.server, authenticatedLogsServerAPI) api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI) api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI) api.RegisterDispatcherServer(m.server, proxyDispatcherAPI) api.RegisterControlServer(m.localserver, localProxyControlAPI) api.RegisterLogsServer(m.localserver, localProxyLogsAPI) api.RegisterHealthServer(m.localserver, localHealthServer) api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI) api.RegisterCAServer(m.localserver, localProxyCAAPI) api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI) api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI) api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI) healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING) localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING) go m.serveListener(ctx, m.remoteListener) go m.serveListener(ctx, m.controlListener) defer func() { m.server.Stop() m.localserver.Stop() }() // Set the raft server as serving for the health server healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING) if err := m.raftNode.JoinAndStart(ctx); err != nil { return errors.Wrap(err, "can't initialize raft node") } localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING) close(m.started) go func() { err := m.raftNode.Run(ctx) if err != nil { log.G(ctx).WithError(err).Error("raft node stopped") m.Stop(ctx, false) } }() if err := raft.WaitForLeader(ctx, m.raftNode); err != nil { return err } c, err := raft.WaitForCluster(ctx, m.raftNode) if err != nil { return err } raftConfig := c.Spec.Raft if err := m.watchForKEKChanges(ctx); err != nil { return err } if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick { log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick) } if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick { log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick) } // wait for an error in serving. err = <-m.errServe m.mu.Lock() if m.stopped { m.mu.Unlock() return nil } m.mu.Unlock() m.Stop(ctx, false) return err } const stopTimeout = 8 * time.Second // Stop stops the manager. It immediately closes all open connections and // active RPCs as well as stopping the scheduler. If clearData is set, the // raft logs, snapshots, and keys will be erased. func (m *Manager) Stop(ctx context.Context, clearData bool) { log.G(ctx).Info("Stopping manager") // It's not safe to start shutting down while the manager is still // starting up. <-m.started // the mutex stops us from trying to stop while we're already stopping, or // from returning before we've finished stopping. m.mu.Lock() defer m.mu.Unlock() if m.stopped { return } m.stopped = true srvDone, localSrvDone := make(chan struct{}), make(chan struct{}) go func() { m.server.GracefulStop() close(srvDone) }() go func() { m.localserver.GracefulStop() close(localSrvDone) }() m.raftNode.Cancel() m.dispatcher.Stop() m.logbroker.Stop() m.caserver.Stop() if m.allocator != nil { m.allocator.Stop() } if m.replicatedOrchestrator != nil { m.replicatedOrchestrator.Stop() } if m.globalOrchestrator != nil { m.globalOrchestrator.Stop() } if m.taskReaper != nil { m.taskReaper.Stop() } if m.constraintEnforcer != nil { m.constraintEnforcer.Stop() } if m.scheduler != nil { m.scheduler.Stop() } if m.roleManager != nil { m.roleManager.Stop() } if m.keyManager != nil { m.keyManager.Stop() } if clearData { m.raftNode.ClearData() } m.cancelFunc() <-m.raftNode.Done() timer := time.AfterFunc(stopTimeout, func() { m.server.Stop() m.localserver.Stop() }) defer timer.Stop() // TODO: we're not waiting on ctx because it very well could be passed from Run, // which is already cancelled here. We need to refactor that. select { case <-srvDone: <-localSrvDone case <-localSrvDone: <-srvDone } log.G(ctx).Info("Manager shut down") // mutex is released and Run can return now } func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { securityConfig := m.config.SecurityConfig nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID() logger := log.G(ctx).WithFields(logrus.Fields{ "node.id": nodeID, "node.role": ca.ManagerRole, }) kekData := ca.KEKData{Version: cluster.Meta.Version.Index} for _, encryptionKey := range cluster.UnlockKeys { if encryptionKey.Subsystem == ca.ManagerRole { kekData.KEK = encryptionKey.Key break } } updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData) if err != nil { logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK") return err } if updated { logger.Debug("successfully rotated KEK") } if unlockedToLocked { // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews; // don't wait because it might take a bit go func() { insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) conn, err := grpc.Dial( m.config.ControlAPI, grpc.WithTransportCredentials(insecureCreds), grpc.WithDialer( func(addr string, timeout time.Duration) (net.Conn, error) { return xnet.DialTimeoutLocal(addr, timeout) }), ) if err != nil { logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster") return } defer conn.Close() connBroker := connectionbroker.New(remotes.NewRemotes()) connBroker.SetLocalConn(conn) if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker); err != nil { logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster") } }() } return nil } func (m *Manager) watchForKEKChanges(ctx context.Context) error { clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization() clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(), func(tx store.ReadTx) error { cluster := store.GetCluster(tx, clusterID) if cluster == nil { return fmt.Errorf("unable to get current cluster") } return m.updateKEK(ctx, cluster) }, state.EventUpdateCluster{ Cluster: &api.Cluster{ID: clusterID}, Checks: []state.ClusterCheckFunc{state.ClusterCheckID}, }, ) if err != nil { return err } go func() { for { select { case event := <-clusterWatch: clusterEvent := event.(state.EventUpdateCluster) m.updateKEK(ctx, clusterEvent.Cluster) case <-ctx.Done(): clusterWatchCancel() return } } }() return nil } // rotateRootCAKEK will attempt to rotate the key-encryption-key for root CA key-material in raft. // If there is no passphrase set in ENV, it returns. // If there is plain-text root key-material, and a passphrase set, it encrypts it. // If there is encrypted root key-material and it is using the current passphrase, it returns. // If there is encrypted root key-material, and it is using the previous passphrase, it // re-encrypts it with the current passphrase. func (m *Manager) rotateRootCAKEK(ctx context.Context, clusterID string) error { // If we don't have a KEK, we won't ever be rotating anything strPassphrase := os.Getenv(ca.PassphraseENVVar) if strPassphrase == "" { return nil } strPassphrasePrev := os.Getenv(ca.PassphraseENVVarPrev) passphrase := []byte(strPassphrase) passphrasePrev := []byte(strPassphrasePrev) s := m.raftNode.MemoryStore() var ( cluster *api.Cluster err error finalKey []byte ) // Retrieve the cluster identified by ClusterID s.View(func(readTx store.ReadTx) { cluster = store.GetCluster(readTx, clusterID) }) if cluster == nil { return fmt.Errorf("cluster not found: %s", clusterID) } // Try to get the private key from the cluster privKeyPEM := cluster.RootCA.CAKey if len(privKeyPEM) == 0 { // We have no PEM root private key in this cluster. log.G(ctx).Warnf("cluster %s does not have private key material", clusterID) return nil } // Decode the PEM private key keyBlock, _ := pem.Decode(privKeyPEM) if keyBlock == nil { return fmt.Errorf("invalid PEM-encoded private key inside of cluster %s", clusterID) } // If this key is not encrypted, then we have to encrypt it if !x509.IsEncryptedPEMBlock(keyBlock) { finalKey, err = ca.EncryptECPrivateKey(privKeyPEM, strPassphrase) if err != nil { return err } } else { // This key is already encrypted, let's try to decrypt with the current main passphrase _, err = x509.DecryptPEMBlock(keyBlock, []byte(passphrase)) if err == nil { // The main key is the correct KEK, nothing to do here return nil } // This key is already encrypted, but failed with current main passphrase. // Let's try to decrypt with the previous passphrase unencryptedKey, err := x509.DecryptPEMBlock(keyBlock, []byte(passphrasePrev)) if err != nil { // We were not able to decrypt either with the main or backup passphrase, error return err } unencryptedKeyBlock := &pem.Block{ Type: keyBlock.Type, Bytes: unencryptedKey, Headers: keyBlock.Headers, } // We were able to decrypt the key, but with the previous passphrase. Let's encrypt // with the new one and store it in raft finalKey, err = ca.EncryptECPrivateKey(pem.EncodeToMemory(unencryptedKeyBlock), strPassphrase) if err != nil { log.G(ctx).Debugf("failed to rotate the key-encrypting-key for the root key material of cluster %s", clusterID) return err } } log.G(ctx).Infof("Re-encrypting the root key material of cluster %s", clusterID) // Let's update the key in the cluster object return s.Update(func(tx store.Tx) error { cluster = store.GetCluster(tx, clusterID) if cluster == nil { return fmt.Errorf("cluster not found: %s", clusterID) } cluster.RootCA.CAKey = finalKey return store.UpdateCluster(tx, cluster) }) } // handleLeadershipEvents handles the is leader event or is follower event. func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) { for { select { case leadershipEvent := <-leadershipCh: m.mu.Lock() if m.stopped { m.mu.Unlock() return } newState := leadershipEvent.(raft.LeadershipState) if newState == raft.IsLeader { m.becomeLeader(ctx) } else if newState == raft.IsFollower { m.becomeFollower() } m.mu.Unlock() case <-ctx.Done(): return } } } // serveListener serves a listener for local and non local connections. func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) { var l net.Listener select { case l = <-lCh: case <-ctx.Done(): return } ctx = log.WithLogger(ctx, log.G(ctx).WithFields( logrus.Fields{ "proto": l.Addr().Network(), "addr": l.Addr().String(), })) if _, ok := l.(*net.TCPListener); !ok { log.G(ctx).Info("Listening for local connections") // we need to disallow double closes because UnixListener.Close // can delete unix-socket file of newer listener. grpc calls // Close twice indeed: in Serve and in Stop. m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l}) } else { log.G(ctx).Info("Listening for connections") m.errServe <- m.server.Serve(l) } } // becomeLeader starts the subsystems that are run on the leader. func (m *Manager) becomeLeader(ctx context.Context) { s := m.raftNode.MemoryStore() rootCA := m.config.SecurityConfig.RootCA() nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID() raftCfg := raft.DefaultRaftConfig() raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick) raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick) clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization() initialCAConfig := ca.DefaultCAConfig() initialCAConfig.ExternalCAs = m.config.ExternalCAs var unlockKeys []*api.EncryptionKey if m.config.AutoLockManagers { unlockKeys = []*api.EncryptionKey{{ Subsystem: ca.ManagerRole, Key: m.config.UnlockKey, }} } s.Update(func(tx store.Tx) error { // Add a default cluster object to the // store. Don't check the error because // we expect this to fail unless this // is a brand new cluster. store.CreateCluster(tx, defaultClusterObject( clusterID, initialCAConfig, raftCfg, api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers}, unlockKeys, rootCA)) // Add Node entry for ourself, if one // doesn't exist already. store.CreateNode(tx, managerNode(nodeID, m.config.Availability)) return nil }) // Attempt to rotate the key-encrypting-key of the root CA key-material err := m.rotateRootCAKEK(ctx, clusterID) if err != nil { log.G(ctx).WithError(err).Error("root key-encrypting-key rotation failed") } m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s) m.constraintEnforcer = constraintenforcer.New(s) m.globalOrchestrator = global.NewGlobalOrchestrator(s) m.taskReaper = taskreaper.New(s) m.scheduler = scheduler.New(s) m.keyManager = keymanager.New(s, keymanager.DefaultConfig()) m.roleManager = newRoleManager(s, m.raftNode) // TODO(stevvooe): Allocate a context that can be used to // shutdown underlying manager processes when leadership is // lost. m.allocator, err = allocator.New(s, m.config.PluginGetter) if err != nil { log.G(ctx).WithError(err).Error("failed to create allocator") // TODO(stevvooe): It doesn't seem correct here to fail // creating the allocator but then use it anyway. } if m.keyManager != nil { go func(keyManager *keymanager.KeyManager) { if err := keyManager.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("keymanager failed with an error") } }(m.keyManager) } go func(d *dispatcher.Dispatcher) { if err := d.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("Dispatcher exited with an error") } }(m.dispatcher) go func(lb *logbroker.LogBroker) { if err := lb.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("LogBroker exited with an error") } }(m.logbroker) go func(server *ca.Server) { if err := server.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("CA signer exited with an error") } }(m.caserver) // Start all sub-components in separate goroutines. // TODO(aluzzardi): This should have some kind of error handling so that // any component that goes down would bring the entire manager down. if m.allocator != nil { go func(allocator *allocator.Allocator) { if err := allocator.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("allocator exited with an error") } }(m.allocator) } go func(scheduler *scheduler.Scheduler) { if err := scheduler.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("scheduler exited with an error") } }(m.scheduler) go func(constraintEnforcer *constraintenforcer.ConstraintEnforcer) { constraintEnforcer.Run() }(m.constraintEnforcer) go func(taskReaper *taskreaper.TaskReaper) { taskReaper.Run() }(m.taskReaper) go func(orchestrator *replicated.Orchestrator) { if err := orchestrator.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("replicated orchestrator exited with an error") } }(m.replicatedOrchestrator) go func(globalOrchestrator *global.Orchestrator) { if err := globalOrchestrator.Run(ctx); err != nil { log.G(ctx).WithError(err).Error("global orchestrator exited with an error") } }(m.globalOrchestrator) go func(roleManager *roleManager) { roleManager.Run() }(m.roleManager) } // becomeFollower shuts down the subsystems that are only run by the leader. func (m *Manager) becomeFollower() { m.dispatcher.Stop() m.logbroker.Stop() m.caserver.Stop() if m.allocator != nil { m.allocator.Stop() m.allocator = nil } m.constraintEnforcer.Stop() m.constraintEnforcer = nil m.replicatedOrchestrator.Stop() m.replicatedOrchestrator = nil m.globalOrchestrator.Stop() m.globalOrchestrator = nil m.taskReaper.Stop() m.taskReaper = nil m.scheduler.Stop() m.scheduler = nil m.roleManager.Stop() m.roleManager = nil if m.keyManager != nil { m.keyManager.Stop() m.keyManager = nil } } // defaultClusterObject creates a default cluster. func defaultClusterObject( clusterID string, initialCAConfig api.CAConfig, raftCfg api.RaftConfig, encryptionConfig api.EncryptionConfig, initialUnlockKeys []*api.EncryptionKey, rootCA *ca.RootCA) *api.Cluster { return &api.Cluster{ ID: clusterID, Spec: api.ClusterSpec{ Annotations: api.Annotations{ Name: store.DefaultClusterName, }, Orchestration: api.OrchestrationConfig{ TaskHistoryRetentionLimit: defaultTaskHistoryRetentionLimit, }, Dispatcher: api.DispatcherConfig{ HeartbeatPeriod: gogotypes.DurationProto(dispatcher.DefaultHeartBeatPeriod), }, Raft: raftCfg, CAConfig: initialCAConfig, EncryptionConfig: encryptionConfig, }, RootCA: api.RootCA{ CAKey: rootCA.Key, CACert: rootCA.Cert, CACertHash: rootCA.Digest.String(), JoinTokens: api.JoinTokens{ Worker: ca.GenerateJoinToken(rootCA), Manager: ca.GenerateJoinToken(rootCA), }, }, UnlockKeys: initialUnlockKeys, } } // managerNode creates a new node with NodeRoleManager role. func managerNode(nodeID string, availability api.NodeSpec_Availability) *api.Node { return &api.Node{ ID: nodeID, Certificate: api.Certificate{ CN: nodeID, Role: api.NodeRoleManager, Status: api.IssuanceStatus{ State: api.IssuanceStateIssued, }, }, Spec: api.NodeSpec{ DesiredRole: api.NodeRoleManager, Membership: api.NodeMembershipAccepted, Availability: availability, }, } }