2016-06-07 14:28:28 -07:00
package manager
import (
"crypto/x509"
"encoding/pem"
"fmt"
"net"
"os"
"path/filepath"
2016-11-02 19:43:27 +01:00
"runtime"
2016-06-07 14:28:28 -07:00
"sync"
"syscall"
2016-10-20 11:26:04 -07:00
"time"
2016-06-07 14:28:28 -07:00
"github.com/Sirupsen/logrus"
2016-08-17 22:43:33 -07:00
"github.com/docker/go-events"
2016-06-07 14:28:28 -07:00
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator"
"github.com/docker/swarmkit/manager/controlapi"
"github.com/docker/swarmkit/manager/dispatcher"
2016-06-30 13:34:48 -07:00
"github.com/docker/swarmkit/manager/health"
2016-06-07 14:28:28 -07:00
"github.com/docker/swarmkit/manager/keymanager"
2016-11-04 12:11:41 -07:00
"github.com/docker/swarmkit/manager/logbroker"
2016-10-26 06:35:48 -07:00
"github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
"github.com/docker/swarmkit/manager/orchestrator/global"
"github.com/docker/swarmkit/manager/orchestrator/replicated"
"github.com/docker/swarmkit/manager/orchestrator/taskreaper"
2016-08-22 22:30:01 -07:00
"github.com/docker/swarmkit/manager/resourceapi"
2016-06-07 14:28:28 -07:00
"github.com/docker/swarmkit/manager/scheduler"
2016-10-21 12:53:24 -07:00
"github.com/docker/swarmkit/manager/state"
2016-06-07 14:28:28 -07:00
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/manager/state/store"
2016-06-17 19:01:18 -07:00
"github.com/docker/swarmkit/protobuf/ptypes"
2016-10-21 12:53:24 -07:00
"github.com/docker/swarmkit/remotes"
2016-11-02 19:43:27 +01:00
"github.com/docker/swarmkit/xnet"
2016-09-26 23:48:16 -07:00
"github.com/pkg/errors"
2016-06-07 14:28:28 -07:00
"golang.org/x/net/context"
"google.golang.org/grpc"
)
const (
// defaultTaskHistoryRetentionLimit is the number of tasks to keep.
2016-07-22 10:26:45 -07:00
defaultTaskHistoryRetentionLimit = 5
2016-06-07 14:28:28 -07:00
)
2016-11-02 19:43:27 +01:00
// RemoteAddrs provides an listening address and an optional advertise address
// for serving the remote API.
type RemoteAddrs struct {
// Address to bind
ListenAddr string
// Address to advertise to remote nodes (optional).
AdvertiseAddr string
}
2016-06-07 14:28:28 -07:00
// Config is used to tune the Manager.
type Config struct {
SecurityConfig * ca . SecurityConfig
2016-06-30 13:34:48 -07:00
// ExternalCAs is a list of initial CAs to which a manager node
// will make certificate signing requests for node certificates.
ExternalCAs [ ] * api . ExternalCA
2016-11-02 19:43:27 +01:00
// ControlAPI is an address for serving the control API.
ControlAPI string
2016-06-07 14:28:28 -07:00
2016-11-02 19:43:27 +01:00
// RemoteAPI is a listening address for serving the remote API, and
// an optional advertise address.
RemoteAPI RemoteAddrs
2016-07-22 10:26:45 -07:00
2016-06-07 14:28:28 -07:00
// JoinRaft is an optional address of a node in an existing raft
// cluster to join.
JoinRaft string
// Top-level state directory
StateDir string
// ForceNewCluster defines if we have to force a new cluster
// because we are recovering from a backup data directory.
ForceNewCluster bool
// ElectionTick defines the amount of ticks needed without
// leader to trigger a new election
ElectionTick uint32
// HeartbeatTick defines the amount of ticks between each
// heartbeat sent to other members for health-check purposes
HeartbeatTick uint32
2016-10-21 12:53:24 -07:00
// AutoLockManagers determines whether or not managers require an unlock key
// when starting from a stopped state. This configuration parameter is only
// applicable when bootstrapping a new cluster for the first time.
AutoLockManagers bool
// UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
// as well as the raft data encryption key (DEK). It is applicable when
// bootstrapping a cluster for the first time (it's a cluster-wide setting),
// and also when loading up any raft data on disk (as a KEK for the raft DEK).
UnlockKey [ ] byte
2016-06-07 14:28:28 -07:00
}
// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
config * Config
2016-11-02 19:43:27 +01:00
listeners [ ] net . Listener
2016-06-07 14:28:28 -07:00
caserver * ca . Server
2016-10-20 11:26:04 -07:00
dispatcher * dispatcher . Dispatcher
2016-11-04 12:11:41 -07:00
logbroker * logbroker . LogBroker
2016-10-26 06:35:48 -07:00
replicatedOrchestrator * replicated . Orchestrator
globalOrchestrator * global . Orchestrator
taskReaper * taskreaper . TaskReaper
constraintEnforcer * constraintenforcer . ConstraintEnforcer
2016-06-07 14:28:28 -07:00
scheduler * scheduler . Scheduler
allocator * allocator . Allocator
keyManager * keymanager . KeyManager
server * grpc . Server
localserver * grpc . Server
2016-10-20 11:26:04 -07:00
raftNode * raft . Node
2016-10-21 12:53:24 -07:00
dekRotator * RaftDEKManager
2016-06-07 14:28:28 -07:00
2016-11-02 19:43:27 +01:00
cancelFunc context . CancelFunc
2016-06-07 14:28:28 -07:00
2016-11-02 19:43:27 +01:00
mu sync . Mutex
2016-07-27 21:17:00 -07:00
started chan struct { }
2016-11-02 19:43:27 +01:00
stopped bool
2016-06-07 14:28:28 -07:00
}
2016-07-20 08:16:54 -07:00
type closeOnceListener struct {
once sync . Once
net . Listener
}
func ( l * closeOnceListener ) Close ( ) error {
var err error
l . once . Do ( func ( ) {
err = l . Listener . Close ( )
} )
return err
}
2016-06-07 14:28:28 -07:00
// New creates a Manager which has not started to accept requests yet.
func New ( config * Config ) ( * Manager , error ) {
dispatcherConfig := dispatcher . DefaultConfig ( )
2016-07-26 00:15:08 -07:00
// If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
2016-11-02 19:43:27 +01:00
advertiseAddr := config . RemoteAPI . AdvertiseAddr
2016-06-15 22:41:30 -07:00
2016-11-02 19:43:27 +01:00
var advertiseAddrPort string
if advertiseAddr == "" {
2016-07-26 00:15:08 -07:00
// Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
2016-10-17 19:17:53 -02:00
var err error
2016-11-02 19:43:27 +01:00
_ , advertiseAddrPort , err = net . SplitHostPort ( config . RemoteAPI . ListenAddr )
2016-07-26 00:15:08 -07:00
if err != nil {
2016-11-02 19:43:27 +01:00
return nil , fmt . Errorf ( "missing or invalid listen address %s" , config . RemoteAPI . ListenAddr )
2016-07-26 00:15:08 -07:00
}
// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
2016-11-02 19:43:27 +01:00
advertiseAddr = net . JoinHostPort ( "0.0.0.0" , advertiseAddrPort )
2016-06-07 14:28:28 -07:00
}
2016-11-02 19:43:27 +01:00
err := os . MkdirAll ( config . StateDir , 0700 )
2016-06-07 14:28:28 -07:00
if err != nil {
2016-09-26 23:48:16 -07:00
return nil , errors . Wrap ( err , "failed to create state directory" )
2016-06-07 14:28:28 -07:00
}
raftStateDir := filepath . Join ( config . StateDir , "raft" )
err = os . MkdirAll ( raftStateDir , 0700 )
if err != nil {
2016-09-26 23:48:16 -07:00
return nil , errors . Wrap ( err , "failed to create raft state directory" )
2016-06-07 14:28:28 -07:00
}
2016-11-02 19:43:27 +01:00
var listeners [ ] net . Listener
2016-06-07 14:28:28 -07:00
2016-11-02 19:43:27 +01:00
// don't create a socket directory if we're on windows. we used named pipe
if runtime . GOOS != "windows" {
err := os . MkdirAll ( filepath . Dir ( config . ControlAPI ) , 0700 )
if err != nil {
return nil , errors . Wrap ( err , "failed to create socket directory" )
}
}
2016-06-07 14:28:28 -07:00
2016-11-02 19:43:27 +01:00
l , err := xnet . ListenLocal ( config . ControlAPI )
// A unix socket may fail to bind if the file already
// exists. Try replacing the file.
if runtime . GOOS != "windows" {
unwrappedErr := err
if op , ok := unwrappedErr . ( * net . OpError ) ; ok {
unwrappedErr = op . Err
}
if sys , ok := unwrappedErr . ( * os . SyscallError ) ; ok {
unwrappedErr = sys . Err
2016-06-07 14:28:28 -07:00
}
2016-11-02 19:43:27 +01:00
if unwrappedErr == syscall . EADDRINUSE {
os . Remove ( config . ControlAPI )
l , err = xnet . ListenLocal ( config . ControlAPI )
}
}
if err != nil {
return nil , errors . Wrap ( err , "failed to listen on control API address" )
2016-06-07 14:28:28 -07:00
}
2016-11-02 19:43:27 +01:00
listeners = append ( listeners , l )
l , err = net . Listen ( "tcp" , config . RemoteAPI . ListenAddr )
if err != nil {
return nil , errors . Wrap ( err , "failed to listen on remote API address" )
}
if advertiseAddrPort == "0" {
advertiseAddr = l . Addr ( ) . String ( )
config . RemoteAPI . ListenAddr = advertiseAddr
}
listeners = append ( listeners , l )
2016-06-07 14:28:28 -07:00
raftCfg := raft . DefaultNodeConfig ( )
if config . ElectionTick > 0 {
raftCfg . ElectionTick = int ( config . ElectionTick )
}
if config . HeartbeatTick > 0 {
raftCfg . HeartbeatTick = int ( config . HeartbeatTick )
}
2016-10-21 12:53:24 -07:00
dekRotator , err := NewRaftDEKManager ( config . SecurityConfig . KeyWriter ( ) )
if err != nil {
return nil , err
}
2016-10-15 08:49:04 -07:00
newNodeOpts := raft . NodeOptions {
2016-06-07 14:28:28 -07:00
ID : config . SecurityConfig . ClientTLSCreds . NodeID ( ) ,
2016-11-02 19:43:27 +01:00
Addr : advertiseAddr ,
2016-06-07 14:28:28 -07:00
JoinAddr : config . JoinRaft ,
Config : raftCfg ,
StateDir : raftStateDir ,
ForceNewCluster : config . ForceNewCluster ,
TLSCredentials : config . SecurityConfig . ClientTLSCreds ,
2016-10-21 12:53:24 -07:00
KeyRotator : dekRotator ,
2016-06-07 14:28:28 -07:00
}
2016-10-20 11:26:04 -07:00
raftNode := raft . NewNode ( newNodeOpts )
2016-06-07 14:28:28 -07:00
opts := [ ] grpc . ServerOption {
grpc . Creds ( config . SecurityConfig . ServerTLSCreds ) }
m := & Manager {
config : config ,
listeners : listeners ,
2016-10-20 11:26:04 -07:00
caserver : ca . NewServer ( raftNode . MemoryStore ( ) , config . SecurityConfig ) ,
dispatcher : dispatcher . New ( raftNode , dispatcherConfig ) ,
2016-11-16 18:08:18 -08:00
logbroker : logbroker . New ( raftNode . MemoryStore ( ) ) ,
2016-06-07 14:28:28 -07:00
server : grpc . NewServer ( opts ... ) ,
localserver : grpc . NewServer ( opts ... ) ,
2016-10-20 11:26:04 -07:00
raftNode : raftNode ,
2016-07-27 21:17:00 -07:00
started : make ( chan struct { } ) ,
2016-10-21 12:53:24 -07:00
dekRotator : dekRotator ,
2016-06-07 14:28:28 -07:00
}
return m , nil
}
2016-10-15 08:49:04 -07:00
// Addr returns tcp address on which remote api listens.
2016-11-02 19:43:27 +01:00
func ( m * Manager ) Addr ( ) string {
return m . config . RemoteAPI . ListenAddr
2016-10-15 08:49:04 -07:00
}
2016-06-07 14:28:28 -07:00
// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func ( m * Manager ) Run ( parent context . Context ) error {
ctx , ctxCancel := context . WithCancel ( parent )
defer ctxCancel ( )
2016-11-02 19:43:27 +01:00
m . cancelFunc = ctxCancel
2016-06-07 14:28:28 -07:00
2016-10-20 11:26:04 -07:00
leadershipCh , cancel := m . raftNode . SubscribeLeadership ( )
2016-06-07 14:28:28 -07:00
defer cancel ( )
2016-08-17 22:43:33 -07:00
go m . handleLeadershipEvents ( ctx , leadershipCh )
2016-06-07 14:28:28 -07:00
authorize := func ( ctx context . Context , roles [ ] string ) error {
2016-10-20 11:26:04 -07:00
var (
2016-10-26 06:35:48 -07:00
blacklistedCerts map [ string ] * api . BlacklistedCertificate
clusters [ ] * api . Cluster
err error
2016-10-20 11:26:04 -07:00
)
m . raftNode . MemoryStore ( ) . View ( func ( readTx store . ReadTx ) {
clusters , err = store . FindClusters ( readTx , store . ByName ( "default" ) )
} )
// Not having a cluster object yet means we can't check
// the blacklist.
if err == nil && len ( clusters ) == 1 {
2016-10-26 06:35:48 -07:00
blacklistedCerts = clusters [ 0 ] . BlacklistedCertificates
2016-10-20 11:26:04 -07:00
}
2016-06-07 14:28:28 -07:00
// Authorize the remote roles, ensure they can only be forwarded by managers
2016-10-26 06:35:48 -07:00
_ , err = ca . AuthorizeForwardedRoleAndOrg ( ctx , roles , [ ] string { ca . ManagerRole } , m . config . SecurityConfig . ClientTLSCreds . Organization ( ) , blacklistedCerts )
2016-06-07 14:28:28 -07:00
return err
}
2016-10-20 11:26:04 -07:00
baseControlAPI := controlapi . NewServer ( m . raftNode . MemoryStore ( ) , m . raftNode , m . config . SecurityConfig . RootCA ( ) )
baseResourceAPI := resourceapi . New ( m . raftNode . MemoryStore ( ) )
2016-06-30 13:34:48 -07:00
healthServer := health . NewHealthServer ( )
2016-08-22 22:30:01 -07:00
localHealthServer := health . NewHealthServer ( )
2016-06-07 14:28:28 -07:00
authenticatedControlAPI := api . NewAuthenticatedWrapperControlServer ( baseControlAPI , authorize )
2016-08-22 22:30:01 -07:00
authenticatedResourceAPI := api . NewAuthenticatedWrapperResourceAllocatorServer ( baseResourceAPI , authorize )
2016-11-04 12:11:41 -07:00
authenticatedLogsServerAPI := api . NewAuthenticatedWrapperLogsServer ( m . logbroker , authorize )
authenticatedLogBrokerAPI := api . NewAuthenticatedWrapperLogBrokerServer ( m . logbroker , authorize )
2016-10-20 11:26:04 -07:00
authenticatedDispatcherAPI := api . NewAuthenticatedWrapperDispatcherServer ( m . dispatcher , authorize )
2016-06-07 14:28:28 -07:00
authenticatedCAAPI := api . NewAuthenticatedWrapperCAServer ( m . caserver , authorize )
authenticatedNodeCAAPI := api . NewAuthenticatedWrapperNodeCAServer ( m . caserver , authorize )
2016-10-20 11:26:04 -07:00
authenticatedRaftAPI := api . NewAuthenticatedWrapperRaftServer ( m . raftNode , authorize )
2016-06-30 13:34:48 -07:00
authenticatedHealthAPI := api . NewAuthenticatedWrapperHealthServer ( healthServer , authorize )
2016-10-20 11:26:04 -07:00
authenticatedRaftMembershipAPI := api . NewAuthenticatedWrapperRaftMembershipServer ( m . raftNode , authorize )
2016-06-07 14:28:28 -07:00
2016-10-20 11:26:04 -07:00
proxyDispatcherAPI := api . NewRaftProxyDispatcherServer ( authenticatedDispatcherAPI , m . raftNode , ca . WithMetadataForwardTLSInfo )
proxyCAAPI := api . NewRaftProxyCAServer ( authenticatedCAAPI , m . raftNode , ca . WithMetadataForwardTLSInfo )
proxyNodeCAAPI := api . NewRaftProxyNodeCAServer ( authenticatedNodeCAAPI , m . raftNode , ca . WithMetadataForwardTLSInfo )
proxyRaftMembershipAPI := api . NewRaftProxyRaftMembershipServer ( authenticatedRaftMembershipAPI , m . raftNode , ca . WithMetadataForwardTLSInfo )
proxyResourceAPI := api . NewRaftProxyResourceAllocatorServer ( authenticatedResourceAPI , m . raftNode , ca . WithMetadataForwardTLSInfo )
2016-11-04 12:11:41 -07:00
proxyLogBrokerAPI := api . NewRaftProxyLogBrokerServer ( authenticatedLogBrokerAPI , m . raftNode , ca . WithMetadataForwardTLSInfo )
2016-06-07 14:28:28 -07:00
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func ( ctx context . Context ) ( context . Context , error ) { return ctx , nil }
2016-10-20 11:26:04 -07:00
localProxyControlAPI := api . NewRaftProxyControlServer ( baseControlAPI , m . raftNode , forwardAsOwnRequest )
2016-11-04 12:11:41 -07:00
localProxyLogsAPI := api . NewRaftProxyLogsServer ( m . logbroker , m . raftNode , forwardAsOwnRequest )
2016-10-21 12:53:24 -07:00
localCAAPI := api . NewRaftProxyCAServer ( m . caserver , m . raftNode , forwardAsOwnRequest )
2016-06-07 14:28:28 -07:00
// Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper!
api . RegisterCAServer ( m . server , proxyCAAPI )
api . RegisterNodeCAServer ( m . server , proxyNodeCAAPI )
api . RegisterRaftServer ( m . server , authenticatedRaftAPI )
2016-06-30 13:34:48 -07:00
api . RegisterHealthServer ( m . server , authenticatedHealthAPI )
2016-06-14 17:07:14 -07:00
api . RegisterRaftMembershipServer ( m . server , proxyRaftMembershipAPI )
2016-06-07 14:28:28 -07:00
api . RegisterControlServer ( m . server , authenticatedControlAPI )
2016-11-04 12:11:41 -07:00
api . RegisterLogsServer ( m . server , authenticatedLogsServerAPI )
api . RegisterLogBrokerServer ( m . server , proxyLogBrokerAPI )
2016-08-22 22:30:01 -07:00
api . RegisterResourceAllocatorServer ( m . server , proxyResourceAPI )
2016-06-07 14:28:28 -07:00
api . RegisterDispatcherServer ( m . server , proxyDispatcherAPI )
2016-08-22 22:30:01 -07:00
api . RegisterControlServer ( m . localserver , localProxyControlAPI )
2016-11-04 12:11:41 -07:00
api . RegisterLogsServer ( m . localserver , localProxyLogsAPI )
2016-08-22 22:30:01 -07:00
api . RegisterHealthServer ( m . localserver , localHealthServer )
2016-10-21 12:53:24 -07:00
api . RegisterCAServer ( m . localserver , localCAAPI )
2016-08-22 22:30:01 -07:00
2016-09-26 23:48:16 -07:00
healthServer . SetServingStatus ( "Raft" , api . HealthCheckResponse_NOT_SERVING )
localHealthServer . SetServingStatus ( "ControlAPI" , api . HealthCheckResponse_NOT_SERVING )
2016-09-13 09:28:01 -07:00
errServe := make ( chan error , len ( m . listeners ) )
2016-11-02 19:43:27 +01:00
for _ , lis := range m . listeners {
go m . serveListener ( ctx , errServe , lis )
2016-06-07 14:28:28 -07:00
}
2016-08-22 22:30:01 -07:00
defer func ( ) {
m . server . Stop ( )
m . localserver . Stop ( )
} ( )
2016-06-30 13:34:48 -07:00
2016-09-26 23:48:16 -07:00
// Set the raft server as serving for the health server
healthServer . SetServingStatus ( "Raft" , api . HealthCheckResponse_SERVING )
2016-10-20 11:26:04 -07:00
if err := m . raftNode . JoinAndStart ( ctx ) ; err != nil {
2016-09-26 23:48:16 -07:00
return errors . Wrap ( err , "can't initialize raft node" )
2016-06-30 13:34:48 -07:00
}
2016-09-26 23:48:16 -07:00
localHealthServer . SetServingStatus ( "ControlAPI" , api . HealthCheckResponse_SERVING )
2016-07-27 21:17:00 -07:00
close ( m . started )
2016-06-30 13:34:48 -07:00
go func ( ) {
2016-10-20 11:26:04 -07:00
err := m . raftNode . Run ( ctx )
2016-06-30 13:34:48 -07:00
if err != nil {
2016-11-18 13:39:34 -08:00
log . G ( ctx ) . WithError ( err ) . Error ( "raft node stopped" )
2016-06-30 13:34:48 -07:00
m . Stop ( ctx )
}
} ( )
2016-10-20 11:26:04 -07:00
if err := raft . WaitForLeader ( ctx , m . raftNode ) ; err != nil {
2016-06-07 14:28:28 -07:00
return err
}
2016-10-20 11:26:04 -07:00
c , err := raft . WaitForCluster ( ctx , m . raftNode )
2016-06-07 14:28:28 -07:00
if err != nil {
return err
}
raftConfig := c . Spec . Raft
2016-11-18 13:39:34 -08:00
if err := m . watchForKEKChanges ( ctx ) ; err != nil {
2016-10-21 12:53:24 -07:00
return err
}
2016-10-20 11:26:04 -07:00
if int ( raftConfig . ElectionTick ) != m . raftNode . Config . ElectionTick {
log . G ( ctx ) . Warningf ( "election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable" , m . raftNode . Config . ElectionTick , raftConfig . ElectionTick )
2016-06-07 14:28:28 -07:00
}
2016-10-20 11:26:04 -07:00
if int ( raftConfig . HeartbeatTick ) != m . raftNode . Config . HeartbeatTick {
log . G ( ctx ) . Warningf ( "heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable" , m . raftNode . Config . HeartbeatTick , raftConfig . HeartbeatTick )
2016-06-07 14:28:28 -07:00
}
// wait for an error in serving.
err = <- errServe
2016-11-02 19:43:27 +01:00
m . mu . Lock ( )
if m . stopped {
2016-06-07 14:28:28 -07:00
m . mu . Unlock ( )
return nil
}
2016-11-02 19:43:27 +01:00
m . mu . Unlock ( )
m . Stop ( ctx )
return err
2016-06-07 14:28:28 -07:00
}
2016-10-20 11:26:04 -07:00
const stopTimeout = 8 * time . Second
2016-06-07 14:28:28 -07:00
// Stop stops the manager. It immediately closes all open connections and
// active RPCs as well as stopping the scheduler.
func ( m * Manager ) Stop ( ctx context . Context ) {
log . G ( ctx ) . Info ( "Stopping manager" )
2016-07-27 21:17:00 -07:00
// It's not safe to start shutting down while the manager is still
// starting up.
<- m . started
2016-06-07 14:28:28 -07:00
// the mutex stops us from trying to stop while we're alrady stopping, or
// from returning before we've finished stopping.
m . mu . Lock ( )
defer m . mu . Unlock ( )
2016-11-02 19:43:27 +01:00
if m . stopped {
2016-06-07 14:28:28 -07:00
return
}
2016-11-02 19:43:27 +01:00
m . stopped = true
2016-06-07 14:28:28 -07:00
2016-10-20 11:26:04 -07:00
srvDone , localSrvDone := make ( chan struct { } ) , make ( chan struct { } )
go func ( ) {
m . server . GracefulStop ( )
close ( srvDone )
} ( )
go func ( ) {
m . localserver . GracefulStop ( )
close ( localSrvDone )
} ( )
2016-06-07 14:28:28 -07:00
2016-10-20 11:26:04 -07:00
m . dispatcher . Stop ( )
2016-11-04 12:11:41 -07:00
m . logbroker . Stop ( )
2016-06-07 14:28:28 -07:00
m . caserver . Stop ( )
if m . allocator != nil {
m . allocator . Stop ( )
}
if m . replicatedOrchestrator != nil {
m . replicatedOrchestrator . Stop ( )
}
if m . globalOrchestrator != nil {
m . globalOrchestrator . Stop ( )
}
if m . taskReaper != nil {
m . taskReaper . Stop ( )
}
2016-10-20 11:26:04 -07:00
if m . constraintEnforcer != nil {
m . constraintEnforcer . Stop ( )
}
2016-06-07 14:28:28 -07:00
if m . scheduler != nil {
m . scheduler . Stop ( )
}
if m . keyManager != nil {
m . keyManager . Stop ( )
}
2016-11-02 19:43:27 +01:00
m . cancelFunc ( )
2016-10-20 11:26:04 -07:00
<- m . raftNode . Done ( )
timer := time . AfterFunc ( stopTimeout , func ( ) {
m . server . Stop ( )
m . localserver . Stop ( )
} )
defer timer . Stop ( )
// TODO: we're not waiting on ctx because it very well could be passed from Run,
// which is already cancelled here. We need to refactor that.
select {
case <- srvDone :
<- localSrvDone
case <- localSrvDone :
<- srvDone
}
2016-06-07 14:28:28 -07:00
log . G ( ctx ) . Info ( "Manager shut down" )
// mutex is released and Run can return now
}
2016-10-21 12:53:24 -07:00
func ( m * Manager ) updateKEK ( ctx context . Context , cluster * api . Cluster ) error {
securityConfig := m . config . SecurityConfig
nodeID := m . config . SecurityConfig . ClientTLSCreds . NodeID ( )
logger := log . G ( ctx ) . WithFields ( logrus . Fields {
"node.id" : nodeID ,
"node.role" : ca . ManagerRole ,
} )
// we are our own peer from which we get certs - try to connect over the local socket
r := remotes . NewRemotes ( api . Peer { Addr : m . Addr ( ) , NodeID : nodeID } )
kekData := ca . KEKData { Version : cluster . Meta . Version . Index }
for _ , encryptionKey := range cluster . UnlockKeys {
if encryptionKey . Subsystem == ca . ManagerRole {
kekData . KEK = encryptionKey . Key
break
}
}
updated , unlockedToLocked , err := m . dekRotator . MaybeUpdateKEK ( kekData )
if err != nil {
logger . WithError ( err ) . Errorf ( "failed to re-encrypt TLS key with a new KEK" )
return err
}
if updated {
logger . Debug ( "successfully rotated KEK" )
}
if unlockedToLocked {
// a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
// don't wait because it might take a bit
go func ( ) {
if err := ca . RenewTLSConfigNow ( ctx , securityConfig , r ) ; err != nil {
logger . WithError ( err ) . Errorf ( "failed to download new TLS certificate after locking the cluster" )
}
} ( )
}
return nil
}
2016-11-18 13:39:34 -08:00
func ( m * Manager ) watchForKEKChanges ( ctx context . Context ) error {
2016-10-21 12:53:24 -07:00
clusterID := m . config . SecurityConfig . ClientTLSCreds . Organization ( )
clusterWatch , clusterWatchCancel , err := store . ViewAndWatch ( m . raftNode . MemoryStore ( ) ,
func ( tx store . ReadTx ) error {
cluster := store . GetCluster ( tx , clusterID )
if cluster == nil {
return fmt . Errorf ( "unable to get current cluster" )
}
return m . updateKEK ( ctx , cluster )
} ,
state . EventUpdateCluster {
Cluster : & api . Cluster { ID : clusterID } ,
Checks : [ ] state . ClusterCheckFunc { state . ClusterCheckID } ,
} ,
)
if err != nil {
return err
}
go func ( ) {
for {
select {
case event := <- clusterWatch :
clusterEvent := event . ( state . EventUpdateCluster )
m . updateKEK ( ctx , clusterEvent . Cluster )
case <- ctx . Done ( ) :
clusterWatchCancel ( )
return
}
}
} ( )
return nil
}
2016-06-07 14:28:28 -07:00
// rotateRootCAKEK will attempt to rotate the key-encryption-key for root CA key-material in raft.
// If there is no passphrase set in ENV, it returns.
// If there is plain-text root key-material, and a passphrase set, it encrypts it.
// If there is encrypted root key-material and it is using the current passphrase, it returns.
// If there is encrypted root key-material, and it is using the previous passphrase, it
// re-encrypts it with the current passphrase.
func ( m * Manager ) rotateRootCAKEK ( ctx context . Context , clusterID string ) error {
// If we don't have a KEK, we won't ever be rotating anything
strPassphrase := os . Getenv ( ca . PassphraseENVVar )
if strPassphrase == "" {
return nil
}
strPassphrasePrev := os . Getenv ( ca . PassphraseENVVarPrev )
passphrase := [ ] byte ( strPassphrase )
passphrasePrev := [ ] byte ( strPassphrasePrev )
2016-10-20 11:26:04 -07:00
s := m . raftNode . MemoryStore ( )
2016-06-07 14:28:28 -07:00
var (
cluster * api . Cluster
err error
finalKey [ ] byte
)
// Retrieve the cluster identified by ClusterID
s . View ( func ( readTx store . ReadTx ) {
cluster = store . GetCluster ( readTx , clusterID )
} )
if cluster == nil {
return fmt . Errorf ( "cluster not found: %s" , clusterID )
}
// Try to get the private key from the cluster
privKeyPEM := cluster . RootCA . CAKey
2016-10-20 11:26:04 -07:00
if len ( privKeyPEM ) == 0 {
2016-06-07 14:28:28 -07:00
// We have no PEM root private key in this cluster.
log . G ( ctx ) . Warnf ( "cluster %s does not have private key material" , clusterID )
return nil
}
// Decode the PEM private key
keyBlock , _ := pem . Decode ( privKeyPEM )
if keyBlock == nil {
return fmt . Errorf ( "invalid PEM-encoded private key inside of cluster %s" , clusterID )
}
// If this key is not encrypted, then we have to encrypt it
if ! x509 . IsEncryptedPEMBlock ( keyBlock ) {
finalKey , err = ca . EncryptECPrivateKey ( privKeyPEM , strPassphrase )
if err != nil {
return err
}
} else {
// This key is already encrypted, let's try to decrypt with the current main passphrase
_ , err = x509 . DecryptPEMBlock ( keyBlock , [ ] byte ( passphrase ) )
if err == nil {
// The main key is the correct KEK, nothing to do here
return nil
}
// This key is already encrypted, but failed with current main passphrase.
// Let's try to decrypt with the previous passphrase
unencryptedKey , err := x509 . DecryptPEMBlock ( keyBlock , [ ] byte ( passphrasePrev ) )
if err != nil {
// We were not able to decrypt either with the main or backup passphrase, error
return err
}
unencryptedKeyBlock := & pem . Block {
Type : keyBlock . Type ,
Bytes : unencryptedKey ,
Headers : keyBlock . Headers ,
}
// We were able to decrypt the key, but with the previous passphrase. Let's encrypt
// with the new one and store it in raft
finalKey , err = ca . EncryptECPrivateKey ( pem . EncodeToMemory ( unencryptedKeyBlock ) , strPassphrase )
if err != nil {
log . G ( ctx ) . Debugf ( "failed to rotate the key-encrypting-key for the root key material of cluster %s" , clusterID )
return err
}
}
log . G ( ctx ) . Infof ( "Re-encrypting the root key material of cluster %s" , clusterID )
// Let's update the key in the cluster object
return s . Update ( func ( tx store . Tx ) error {
cluster = store . GetCluster ( tx , clusterID )
if cluster == nil {
2016-06-15 22:41:30 -07:00
return fmt . Errorf ( "cluster not found: %s" , clusterID )
2016-06-07 14:28:28 -07:00
}
cluster . RootCA . CAKey = finalKey
return store . UpdateCluster ( tx , cluster )
} )
}
2016-08-17 22:43:33 -07:00
2016-08-22 22:30:01 -07:00
// handleLeadershipEvents handles the is leader event or is follower event.
2016-08-17 22:43:33 -07:00
func ( m * Manager ) handleLeadershipEvents ( ctx context . Context , leadershipCh chan events . Event ) {
2016-08-22 22:30:01 -07:00
for {
2016-08-17 22:43:33 -07:00
select {
2016-08-22 22:30:01 -07:00
case leadershipEvent := <- leadershipCh :
m . mu . Lock ( )
2016-11-02 19:43:27 +01:00
if m . stopped {
2016-08-22 22:30:01 -07:00
m . mu . Unlock ( )
return
}
newState := leadershipEvent . ( raft . LeadershipState )
2016-08-17 22:43:33 -07:00
2016-08-22 22:30:01 -07:00
if newState == raft . IsLeader {
m . becomeLeader ( ctx )
} else if newState == raft . IsFollower {
m . becomeFollower ( )
}
m . mu . Unlock ( )
case <- ctx . Done ( ) :
return
2016-08-17 22:43:33 -07:00
}
}
}
// serveListener serves a listener for local and non local connections.
2016-11-02 19:43:27 +01:00
func ( m * Manager ) serveListener ( ctx context . Context , errServe chan error , l net . Listener ) {
2016-08-17 22:43:33 -07:00
ctx = log . WithLogger ( ctx , log . G ( ctx ) . WithFields (
logrus . Fields {
2016-11-02 19:43:27 +01:00
"proto" : l . Addr ( ) . Network ( ) ,
"addr" : l . Addr ( ) . String ( ) ,
} ) )
if _ , ok := l . ( * net . TCPListener ) ; ! ok {
2016-08-17 22:43:33 -07:00
log . G ( ctx ) . Info ( "Listening for local connections" )
// we need to disallow double closes because UnixListener.Close
// can delete unix-socket file of newer listener. grpc calls
// Close twice indeed: in Serve and in Stop.
2016-11-02 19:43:27 +01:00
errServe <- m . localserver . Serve ( & closeOnceListener { Listener : l } )
2016-08-17 22:43:33 -07:00
} else {
log . G ( ctx ) . Info ( "Listening for connections" )
2016-11-02 19:43:27 +01:00
errServe <- m . server . Serve ( l )
2016-08-17 22:43:33 -07:00
}
}
// becomeLeader starts the subsystems that are run on the leader.
func ( m * Manager ) becomeLeader ( ctx context . Context ) {
2016-10-20 11:26:04 -07:00
s := m . raftNode . MemoryStore ( )
2016-08-17 22:43:33 -07:00
rootCA := m . config . SecurityConfig . RootCA ( )
nodeID := m . config . SecurityConfig . ClientTLSCreds . NodeID ( )
raftCfg := raft . DefaultRaftConfig ( )
2016-10-20 11:26:04 -07:00
raftCfg . ElectionTick = uint32 ( m . raftNode . Config . ElectionTick )
raftCfg . HeartbeatTick = uint32 ( m . raftNode . Config . HeartbeatTick )
2016-08-17 22:43:33 -07:00
clusterID := m . config . SecurityConfig . ClientTLSCreds . Organization ( )
initialCAConfig := ca . DefaultCAConfig ( )
initialCAConfig . ExternalCAs = m . config . ExternalCAs
2016-10-21 12:53:24 -07:00
var unlockKeys [ ] * api . EncryptionKey
if m . config . AutoLockManagers {
unlockKeys = [ ] * api . EncryptionKey { {
Subsystem : ca . ManagerRole ,
Key : m . config . UnlockKey ,
} }
}
2016-08-17 22:43:33 -07:00
s . Update ( func ( tx store . Tx ) error {
// Add a default cluster object to the
// store. Don't check the error because
// we expect this to fail unless this
// is a brand new cluster.
2016-10-21 12:53:24 -07:00
store . CreateCluster ( tx , defaultClusterObject (
clusterID ,
initialCAConfig ,
raftCfg ,
api . EncryptionConfig { AutoLockManagers : m . config . AutoLockManagers } ,
unlockKeys ,
rootCA ) )
2016-08-17 22:43:33 -07:00
// Add Node entry for ourself, if one
// doesn't exist already.
store . CreateNode ( tx , managerNode ( nodeID ) )
return nil
} )
// Attempt to rotate the key-encrypting-key of the root CA key-material
err := m . rotateRootCAKEK ( ctx , clusterID )
if err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "root key-encrypting-key rotation failed" )
}
2016-10-26 06:35:48 -07:00
m . replicatedOrchestrator = replicated . NewReplicatedOrchestrator ( s )
m . constraintEnforcer = constraintenforcer . New ( s )
m . globalOrchestrator = global . NewGlobalOrchestrator ( s )
m . taskReaper = taskreaper . New ( s )
2016-08-17 22:43:33 -07:00
m . scheduler = scheduler . New ( s )
2016-08-22 22:30:01 -07:00
m . keyManager = keymanager . New ( s , keymanager . DefaultConfig ( ) )
2016-08-17 22:43:33 -07:00
// TODO(stevvooe): Allocate a context that can be used to
// shutdown underlying manager processes when leadership is
// lost.
m . allocator , err = allocator . New ( s )
if err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "failed to create allocator" )
// TODO(stevvooe): It doesn't seem correct here to fail
// creating the allocator but then use it anyway.
}
if m . keyManager != nil {
go func ( keyManager * keymanager . KeyManager ) {
if err := keyManager . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "keymanager failed with an error" )
}
} ( m . keyManager )
}
go func ( d * dispatcher . Dispatcher ) {
if err := d . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "Dispatcher exited with an error" )
}
2016-10-20 11:26:04 -07:00
} ( m . dispatcher )
2016-08-17 22:43:33 -07:00
2016-11-04 12:11:41 -07:00
go func ( lb * logbroker . LogBroker ) {
if err := lb . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "LogBroker exited with an error" )
}
} ( m . logbroker )
2016-08-17 22:43:33 -07:00
go func ( server * ca . Server ) {
if err := server . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "CA signer exited with an error" )
}
} ( m . caserver )
// Start all sub-components in separate goroutines.
// TODO(aluzzardi): This should have some kind of error handling so that
// any component that goes down would bring the entire manager down.
if m . allocator != nil {
go func ( allocator * allocator . Allocator ) {
if err := allocator . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "allocator exited with an error" )
}
} ( m . allocator )
}
go func ( scheduler * scheduler . Scheduler ) {
if err := scheduler . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "scheduler exited with an error" )
}
} ( m . scheduler )
2016-10-26 06:35:48 -07:00
go func ( constraintEnforcer * constraintenforcer . ConstraintEnforcer ) {
2016-10-20 11:26:04 -07:00
constraintEnforcer . Run ( )
} ( m . constraintEnforcer )
2016-10-26 06:35:48 -07:00
go func ( taskReaper * taskreaper . TaskReaper ) {
2016-08-17 22:43:33 -07:00
taskReaper . Run ( )
} ( m . taskReaper )
2016-10-26 06:35:48 -07:00
go func ( orchestrator * replicated . Orchestrator ) {
2016-08-17 22:43:33 -07:00
if err := orchestrator . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "replicated orchestrator exited with an error" )
}
} ( m . replicatedOrchestrator )
2016-10-26 06:35:48 -07:00
go func ( globalOrchestrator * global . Orchestrator ) {
2016-08-17 22:43:33 -07:00
if err := globalOrchestrator . Run ( ctx ) ; err != nil {
log . G ( ctx ) . WithError ( err ) . Error ( "global orchestrator exited with an error" )
}
} ( m . globalOrchestrator )
}
// becomeFollower shuts down the subsystems that are only run by the leader.
func ( m * Manager ) becomeFollower ( ) {
2016-10-20 11:26:04 -07:00
m . dispatcher . Stop ( )
2016-11-04 12:11:41 -07:00
m . logbroker . Stop ( )
2016-08-17 22:43:33 -07:00
m . caserver . Stop ( )
if m . allocator != nil {
m . allocator . Stop ( )
m . allocator = nil
}
2016-10-20 11:26:04 -07:00
m . constraintEnforcer . Stop ( )
m . constraintEnforcer = nil
2016-08-17 22:43:33 -07:00
m . replicatedOrchestrator . Stop ( )
m . replicatedOrchestrator = nil
m . globalOrchestrator . Stop ( )
m . globalOrchestrator = nil
m . taskReaper . Stop ( )
m . taskReaper = nil
m . scheduler . Stop ( )
m . scheduler = nil
if m . keyManager != nil {
m . keyManager . Stop ( )
m . keyManager = nil
}
}
// defaultClusterObject creates a default cluster.
2016-10-21 12:53:24 -07:00
func defaultClusterObject (
clusterID string ,
initialCAConfig api . CAConfig ,
raftCfg api . RaftConfig ,
encryptionConfig api . EncryptionConfig ,
initialUnlockKeys [ ] * api . EncryptionKey ,
rootCA * ca . RootCA ) * api . Cluster {
2016-08-17 22:43:33 -07:00
return & api . Cluster {
ID : clusterID ,
Spec : api . ClusterSpec {
Annotations : api . Annotations {
Name : store . DefaultClusterName ,
} ,
Orchestration : api . OrchestrationConfig {
TaskHistoryRetentionLimit : defaultTaskHistoryRetentionLimit ,
} ,
Dispatcher : api . DispatcherConfig {
HeartbeatPeriod : ptypes . DurationProto ( dispatcher . DefaultHeartBeatPeriod ) ,
} ,
2016-10-21 12:53:24 -07:00
Raft : raftCfg ,
CAConfig : initialCAConfig ,
EncryptionConfig : encryptionConfig ,
2016-08-17 22:43:33 -07:00
} ,
RootCA : api . RootCA {
CAKey : rootCA . Key ,
CACert : rootCA . Cert ,
CACertHash : rootCA . Digest . String ( ) ,
JoinTokens : api . JoinTokens {
Worker : ca . GenerateJoinToken ( rootCA ) ,
Manager : ca . GenerateJoinToken ( rootCA ) ,
} ,
} ,
2016-10-21 12:53:24 -07:00
UnlockKeys : initialUnlockKeys ,
2016-08-17 22:43:33 -07:00
}
}
// managerNode creates a new node with NodeRoleManager role.
func managerNode ( nodeID string ) * api . Node {
return & api . Node {
ID : nodeID ,
Certificate : api . Certificate {
CN : nodeID ,
Role : api . NodeRoleManager ,
Status : api . IssuanceStatus {
State : api . IssuanceStateIssued ,
} ,
} ,
Spec : api . NodeSpec {
Role : api . NodeRoleManager ,
Membership : api . NodeMembershipAccepted ,
} ,
}
}