1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Re-vendor swarmkit to a version which does not require all cluster updates

to include an external CA certificate when updating external CAs.

Signed-off-by: Ying Li <ying.li@docker.com>
This commit is contained in:
Ying Li 2017-04-12 14:49:55 -07:00
parent bc4560e512
commit e34bee387e
6 changed files with 331 additions and 29 deletions

View file

@ -105,7 +105,7 @@ github.com/docker/containerd 9048e5e50717ea4497b757314bad98ea3763c145
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit d5232280c510d70755ab11305d46a5704735371a
github.com/docker/swarmkit b19d028de0a6e9ca281afeb76cea2544b9edd839
github.com/gogo/protobuf 8d70fb3182befc465c4a1eac8ad4d38ff49778e2
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e

259
vendor/github.com/docker/swarmkit/ca/reconciler.go generated vendored Normal file
View file

@ -0,0 +1,259 @@
package ca
import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/cloudflare/cfssl/helpers"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/store"
"github.com/pkg/errors"
)
// IssuanceStateRotateMaxBatchSize is the maximum number of nodes we'll tell to rotate their certificates in any given update
const IssuanceStateRotateMaxBatchSize = 30
func hasIssuer(n *api.Node, info *IssuerInfo) bool {
if n.Description == nil || n.Description.TLSInfo == nil {
return false
}
return bytes.Equal(info.Subject, n.Description.TLSInfo.CertIssuerSubject) && bytes.Equal(info.PublicKey, n.Description.TLSInfo.CertIssuerPublicKey)
}
var errRootRotationChanged = errors.New("target root rotation has changed")
// rootRotationReconciler keeps track of all the nodes in the store so that we can determine which ones need reconciliation when nodes are updated
// or the root CA is updated. This is meant to be used with watches on nodes and the cluster, and provides functions to be called when the
// cluster's RootCA has changed and when a node is added, updated, or removed.
type rootRotationReconciler struct {
mu sync.Mutex
clusterID string
batchUpdateInterval time.Duration
ctx context.Context
store *store.MemoryStore
currentRootCA *api.RootCA
currentIssuer IssuerInfo
unconvergedNodes map[string]*api.Node
wg sync.WaitGroup
cancel func()
}
// IssuerFromAPIRootCA returns the desired issuer given an API root CA object
func IssuerFromAPIRootCA(rootCA *api.RootCA) (*IssuerInfo, error) {
wantedIssuer := rootCA.CACert
if rootCA.RootRotation != nil {
wantedIssuer = rootCA.RootRotation.CACert
}
issuerCerts, err := helpers.ParseCertificatesPEM(wantedIssuer)
if err != nil {
return nil, errors.Wrap(err, "invalid certificate in cluster root CA object")
}
if len(issuerCerts) == 0 {
return nil, errors.New("invalid certificate in cluster root CA object")
}
return &IssuerInfo{
Subject: issuerCerts[0].RawSubject,
PublicKey: issuerCerts[0].RawSubjectPublicKeyInfo,
}, nil
}
// assumption: UpdateRootCA will never be called with a `nil` root CA because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) {
issuerInfo, err := IssuerFromAPIRootCA(newRootCA)
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to update process the current root CA")
return
}
var (
shouldStartNewLoop, waitForPrevLoop bool
loopCtx context.Context
)
r.mu.Lock()
defer func() {
r.mu.Unlock()
if shouldStartNewLoop {
if waitForPrevLoop {
r.wg.Wait()
}
r.wg.Add(1)
go r.runReconcilerLoop(loopCtx, newRootCA)
}
}()
// check if the issuer has changed, first
if reflect.DeepEqual(&r.currentIssuer, issuerInfo) {
r.currentRootCA = newRootCA
return
}
// If the issuer has changed, iterate through all the nodes to figure out which ones need rotation
if newRootCA.RootRotation != nil {
var nodes []*api.Node
r.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.ByMembership(api.NodeMembershipAccepted))
})
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA")
return
}
// from here on out, there will be no more errors that cause us to have to abandon updating the Root CA,
// so we can start making changes to r's fields
r.unconvergedNodes = make(map[string]*api.Node)
for _, n := range nodes {
if !hasIssuer(n, issuerInfo) {
r.unconvergedNodes[n.ID] = n
}
}
shouldStartNewLoop = true
if r.cancel != nil { // there's already a loop going, so cancel it
r.cancel()
waitForPrevLoop = true
}
loopCtx, r.cancel = context.WithCancel(r.ctx)
} else {
r.unconvergedNodes = nil
}
r.currentRootCA = newRootCA
r.currentIssuer = *issuerInfo
}
// assumption: UpdateNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateNode(node *api.Node) {
r.mu.Lock()
defer r.mu.Unlock()
// if we're not in the middle of a root rotation, or if this node does not have membership, ignore it
if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil || node.Spec.Membership != api.NodeMembershipAccepted {
return
}
if hasIssuer(node, &r.currentIssuer) {
delete(r.unconvergedNodes, node.ID)
} else {
r.unconvergedNodes[node.ID] = node
}
}
// assumption: DeleteNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) DeleteNode(node *api.Node) {
r.mu.Lock()
delete(r.unconvergedNodes, node.ID)
r.mu.Unlock()
}
func (r *rootRotationReconciler) runReconcilerLoop(ctx context.Context, loopRootCA *api.RootCA) {
defer r.wg.Done()
for {
r.mu.Lock()
if len(r.unconvergedNodes) == 0 {
r.mu.Unlock()
err := r.store.Update(func(tx store.Tx) error {
return r.finishRootRotation(tx, loopRootCA)
})
if err == nil {
log.G(r.ctx).Info("completed root rotation")
return
}
log.G(r.ctx).WithError(err).Error("could not complete root rotation")
if err == errRootRotationChanged {
// if the root rotation has changed, this loop will be cancelled anyway, so may as well abort early
return
}
} else {
var toUpdate []*api.Node
for _, n := range r.unconvergedNodes {
iState := n.Certificate.Status.State
if iState != api.IssuanceStateRenew && iState != api.IssuanceStatePending && iState != api.IssuanceStateRotate {
n = n.Copy()
n.Certificate.Status.State = api.IssuanceStateRotate
toUpdate = append(toUpdate, n)
if len(toUpdate) >= IssuanceStateRotateMaxBatchSize {
break
}
}
}
r.mu.Unlock()
if err := r.batchUpdateNodes(toUpdate); err != nil {
log.G(r.ctx).WithError(err).Errorf("store error when trying to batch update %d nodes to request certificate rotation", len(toUpdate))
}
}
select {
case <-ctx.Done():
return
case <-time.After(r.batchUpdateInterval):
}
}
}
// This function assumes that the expected root CA has root rotation. This is intended to be used by
// `reconcileNodeRootsAndCerts`, which uses the root CA from the `lastSeenClusterRootCA`, and checks
// that it has a root rotation before calling this function.
func (r *rootRotationReconciler) finishRootRotation(tx store.Tx, expectedRootCA *api.RootCA) error {
cluster := store.GetCluster(tx, r.clusterID)
if cluster == nil {
return fmt.Errorf("unable to get cluster %s", r.clusterID)
}
// If the RootCA object has changed (because another root rotation was started or because some other node
// had finished the root rotation), we cannot finish the root rotation that we were working on.
if !equality.RootCAEqualStable(expectedRootCA, &cluster.RootCA) {
return errRootRotationChanged
}
var signerCert []byte
if len(cluster.RootCA.RootRotation.CAKey) > 0 {
signerCert = cluster.RootCA.RootRotation.CACert
}
// we don't actually have to parse out the default node expiration from the cluster - we are just using
// the ca.RootCA object to generate new tokens and the digest
updatedRootCA, err := NewRootCA(cluster.RootCA.RootRotation.CACert, signerCert, cluster.RootCA.RootRotation.CAKey,
DefaultNodeCertExpiration, nil)
if err != nil {
return errors.Wrap(err, "invalid cluster root rotation object")
}
cluster.RootCA = api.RootCA{
CACert: cluster.RootCA.RootRotation.CACert,
CAKey: cluster.RootCA.RootRotation.CAKey,
CACertHash: updatedRootCA.Digest.String(),
JoinTokens: api.JoinTokens{
Worker: GenerateJoinToken(&updatedRootCA),
Manager: GenerateJoinToken(&updatedRootCA),
},
LastForcedRotation: cluster.RootCA.LastForcedRotation,
}
return store.UpdateCluster(tx, cluster)
}
func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error {
if len(toUpdate) == 0 {
return nil
}
_, err := r.store.Batch(func(batch *store.Batch) error {
// Directly update the nodes rather than get + update, and ignore version errors. Since
// `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have
// close to the latest versions of all the nodes. If not, the node will updated later and the
// next batch of updates should catch it.
for _, n := range toUpdate {
if err := batch.Update(func(tx store.Tx) error {
return store.UpdateNode(tx, n)
}); err != nil && err != store.ErrSequenceConflict {
log.G(r.ctx).WithError(err).Errorf("unable to update node %s to request a certificate rotation", n.ID)
}
}
return nil
})
return err
}

View file

@ -22,6 +22,7 @@ import (
const (
defaultReconciliationRetryInterval = 10 * time.Second
defaultRootReconciliationInterval = 3 * time.Second
)
// APISecurityConfigUpdater knows how to update a SecurityConfig from an api.Cluster object
@ -63,6 +64,10 @@ type Server struct {
// before we update the security config with the new root CA, we need to be able to save the root certs
rootPaths CertPaths
// lets us monitor and finish root rotations
rootReconciler *rootRotationReconciler
rootReconciliationRetryInterval time.Duration
}
// DefaultCAConfig returns the default CA Config, with a default expiration.
@ -75,12 +80,13 @@ func DefaultCAConfig() api.CAConfig {
// NewServer creates a CA API server.
func NewServer(store *store.MemoryStore, securityConfig *SecurityConfig, rootCAPaths CertPaths) *Server {
return &Server{
store: store,
securityConfig: securityConfig,
pending: make(map[string]*api.Node),
started: make(chan struct{}),
reconciliationRetryInterval: defaultReconciliationRetryInterval,
rootPaths: rootCAPaths,
store: store,
securityConfig: securityConfig,
pending: make(map[string]*api.Node),
started: make(chan struct{}),
reconciliationRetryInterval: defaultReconciliationRetryInterval,
rootReconciliationRetryInterval: defaultRootReconciliationInterval,
rootPaths: rootCAPaths,
}
}
@ -90,6 +96,12 @@ func (s *Server) SetReconciliationRetryInterval(reconciliationRetryInterval time
s.reconciliationRetryInterval = reconciliationRetryInterval
}
// SetRootReconciliationInterval changes the time interval between root rotation
// reconciliation attempts. This function must be called before Run.
func (s *Server) SetRootReconciliationInterval(interval time.Duration) {
s.rootReconciliationRetryInterval = interval
}
// GetUnlockKey is responsible for returning the current unlock key used for encrypting TLS private keys and
// other at rest data. Access to this RPC call should only be allowed via mutual TLS from managers.
func (s *Server) GetUnlockKey(ctx context.Context, request *api.GetUnlockKeyRequest) (*api.GetUnlockKeyResponse, error) {
@ -395,14 +407,28 @@ func (s *Server) Run(ctx context.Context) error {
return errors.New("CA signer is already running")
}
s.wg.Add(1)
s.ctx, s.cancel = context.WithCancel(log.WithModule(ctx, "ca"))
ctx = s.ctx
// we need to set it on the server, because `Server.UpdateRootCA` can be called from outside the Run function
s.rootReconciler = &rootRotationReconciler{
ctx: log.WithField(ctx, "method", "(*Server).rootRotationReconciler"),
clusterID: s.securityConfig.ClientTLSCreds.Organization(),
store: s.store,
batchUpdateInterval: s.rootReconciliationRetryInterval,
}
rootReconciler := s.rootReconciler
s.mu.Unlock()
defer s.wg.Done()
ctx = log.WithModule(ctx, "ca")
defer func() {
s.mu.Lock()
s.rootReconciler = nil
s.mu.Unlock()
}()
// Retrieve the channels to keep track of changes in the cluster
// Retrieve all the currently registered nodes
var nodes []*api.Node
updates, cancel, err := store.ViewAndWatch(
s.store,
func(readTx store.ReadTx) error {
@ -419,13 +445,12 @@ func (s *Server) Run(ctx context.Context) error {
},
api.EventCreateNode{},
api.EventUpdateNode{},
api.EventDeleteNode{},
)
// Do this after updateCluster has been called, so isRunning never
// returns true without joinTokens being set correctly.
s.mu.Lock()
s.ctx, s.cancel = context.WithCancel(ctx)
ctx = s.ctx
close(s.started)
s.mu.Unlock()
@ -464,13 +489,18 @@ func (s *Server) Run(ctx context.Context) error {
switch v := event.(type) {
case api.EventCreateNode:
s.evaluateAndSignNodeCert(ctx, v.Node)
rootReconciler.UpdateNode(v.Node)
case api.EventUpdateNode:
// If this certificate is already at a final state
// no need to evaluate and sign it.
if !isFinalState(v.Node.Certificate.Status) {
s.evaluateAndSignNodeCert(ctx, v.Node)
}
rootReconciler.UpdateNode(v.Node)
case api.EventDeleteNode:
rootReconciler.DeleteNode(v.Node)
}
case <-ticker.C:
for _, node := range s.pending {
if err := s.evaluateAndSignNodeCert(ctx, node); err != nil {
@ -541,12 +571,16 @@ func (s *Server) isRunning() bool {
func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error {
s.mu.Lock()
s.joinTokens = cluster.RootCA.JoinTokens.Copy()
reconciler := s.rootReconciler
s.mu.Unlock()
rCA := cluster.RootCA.Copy()
if reconciler != nil {
reconciler.UpdateRootCA(rCA)
}
s.secConfigMu.Lock()
defer s.secConfigMu.Unlock()
rCA := cluster.RootCA
rootCAChanged := len(rCA.CACert) != 0 && !equality.RootCAEqualStable(s.lastSeenClusterRootCA, &cluster.RootCA)
rootCAChanged := len(rCA.CACert) != 0 && !equality.RootCAEqualStable(s.lastSeenClusterRootCA, rCA)
externalCAChanged := !equality.ExternalCAsEqualStable(s.lastSeenExternalCAs, cluster.Spec.CAConfig.ExternalCAs)
logger := log.G(ctx).WithFields(logrus.Fields{
"cluster.id": cluster.ID,
@ -581,7 +615,6 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error {
if signingKey == nil {
signingCert = nil
}
updatedRootCA, err := NewRootCA(rCA.CACert, signingCert, signingKey, expiry, intermediates)
if err != nil {
return errors.Wrap(err, "invalid Root CA object in cluster")
@ -604,7 +637,7 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error {
}
// only update the server cache if we've successfully updated the root CA
logger.Debug("Root CA updated successfully")
s.lastSeenClusterRootCA = cluster.RootCA.Copy()
s.lastSeenClusterRootCA = rCA
}
// we want to update if the external CA changed, or if the root CA changed because the root CA could affect what

View file

@ -148,14 +148,16 @@ func validateHasAtLeastOneExternalCA(ctx context.Context, externalCAs map[string
// validates that the list of external CAs have valid certs associated with them, and produce a mapping of subject/pubkey:external
// for later validation of required external CAs
func getNormalizedExtCAs(caConfig *api.CAConfig) (map[string][]*api.ExternalCA, error) {
func getNormalizedExtCAs(caConfig *api.CAConfig, normalizedCurrentRootCACert []byte) (map[string][]*api.ExternalCA, error) {
extCAs := make(map[string][]*api.ExternalCA)
for _, extCA := range caConfig.ExternalCAs {
if len(extCA.CACert) == 0 {
return nil, grpc.Errorf(codes.InvalidArgument, "must specify CA certificate for each external CA")
associatedCert := normalizedCurrentRootCACert
// if no associated cert is provided, assume it's the current root cert
if len(extCA.CACert) > 0 {
associatedCert = ca.NormalizePEMs(extCA.CACert)
}
certKey := string(ca.NormalizePEMs(extCA.CACert))
certKey := string(associatedCert)
extCAs[certKey] = append(extCAs[certKey], extCA)
}
@ -191,12 +193,12 @@ func validateCAConfig(ctx context.Context, securityConfig *ca.SecurityConfig, cl
return nil, grpc.Errorf(codes.InvalidArgument, "if a signing CA key is provided, the signing CA cert must also be provided")
}
extCAs, err := getNormalizedExtCAs(newConfig) // validate that the list of external CAs is not malformed
normalizedRootCA := ca.NormalizePEMs(cluster.RootCA.CACert)
extCAs, err := getNormalizedExtCAs(newConfig, normalizedRootCA) // validate that the list of external CAs is not malformed
if err != nil {
return nil, err
}
normalizedRootCA := ca.NormalizePEMs(cluster.RootCA.CACert)
var oldCertExtCAs []*api.ExternalCA
if !hasSigningKey(&cluster.RootCA) {
oldCertExtCAs, err = validateHasAtLeastOneExternalCA(ctx, extCAs, securityConfig, normalizedRootCA, "current")

View file

@ -14,6 +14,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/cloudflare/cfssl/helpers"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
@ -776,22 +777,29 @@ func (m *Manager) rotateRootCAKEK(ctx context.Context, clusterID string) error {
}
if x509.IsEncryptedPEMBlock(keyBlock) {
// This key is already encrypted, let's try to decrypt with the current main passphrase
_, err = x509.DecryptPEMBlock(keyBlock, []byte(passphrase))
// PEM encryption does not have a digest, so sometimes decryption doesn't
// error even with the wrong passphrase. So actually try to parse it into a valid key.
_, err := helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrase))
if err == nil {
// The main key is the correct KEK, nothing to do here
// This key is already correctly encrypted with the correct KEK, nothing to do here
return nil
}
// This key is already encrypted, but failed with current main passphrase.
// Let's try to decrypt with the previous passphrase
unencryptedKey, err := x509.DecryptPEMBlock(keyBlock, []byte(passphrasePrev))
// Let's try to decrypt with the previous passphrase, and parse into a valid key, for the
// same reason as above.
_, err = helpers.ParsePrivateKeyPEMWithPassword(privKeyPEM, []byte(passphrasePrev))
if err != nil {
// We were not able to decrypt either with the main or backup passphrase, error
return err
}
// ok the above passphrase is correct, so decrypt the PEM block so we can re-encrypt -
// since the key was successfully decrypted above, there will be no error doing PEM
// decryption
unencryptedDER, _ := x509.DecryptPEMBlock(keyBlock, []byte(passphrasePrev))
unencryptedKeyBlock := &pem.Block{
Type: keyBlock.Type,
Bytes: unencryptedKey,
Bytes: unencryptedDER,
}
// we were able to decrypt the key with the previous passphrase - if the current passphrase is empty,

View file

@ -297,8 +297,9 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
updates := make(map[*api.Service][]orchestrator.Slot)
_, err := g.store.Batch(func(batch *store.Batch) error {
var updateTasks []orchestrator.Slot
for _, serviceID := range serviceIDs {
var updateTasks []orchestrator.Slot
if _, exists := nodeTasks[serviceID]; !exists {
continue
}
@ -352,7 +353,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
for service, updateTasks := range updates {
g.updater.Update(ctx, g.cluster, service, updateTasks)
}
}
// updateNode updates g.nodes based on the current node value