diff --git a/vendor.conf b/vendor.conf index 2081a91c61..b2e5dbc485 100644 --- a/vendor.conf +++ b/vendor.conf @@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit 91c6e2db9c0c91c466a83529ed16649a1de7ccc4 +github.com/docker/swarmkit 9bca23b0de42a9ebcc71622a30d646afa1e2b564 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/agent/agent.go b/vendor/github.com/docker/swarmkit/agent/agent.go index 7da0034cda..1f4c36207d 100644 --- a/vendor/github.com/docker/swarmkit/agent/agent.go +++ b/vendor/github.com/docker/swarmkit/agent/agent.go @@ -406,37 +406,41 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api } // Publisher returns a LogPublisher for the given subscription -func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) { +// as well as a cancel function that should be called when the log stream +// is completed. +func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, func(), error) { // TODO(stevvooe): The level of coordination here is WAY too much for logs. // These should only be best effort and really just buffer until a session is // ready. Ideally, they would use a separate connection completely. var ( - err error - client api.LogBroker_PublishLogsClient + err error + publisher api.LogBroker_PublishLogsClient ) err = a.withSession(ctx, func(session *session) error { - client, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx) + publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx) return err }) if err != nil { - return nil, err + return nil, nil, err } return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error { - select { - case <-ctx.Done(): - client.CloseSend() - return ctx.Err() - default: - } + select { + case <-ctx.Done(): + publisher.CloseSend() + return ctx.Err() + default: + } - return client.Send(&api.PublishLogsMessage{ - SubscriptionID: subscriptionID, - Messages: []api.LogMessage{message}, - }) - }), nil + return publisher.Send(&api.PublishLogsMessage{ + SubscriptionID: subscriptionID, + Messages: []api.LogMessage{message}, + }) + }), func() { + publisher.CloseSend() + }, nil } // nodeDescriptionWithHostname retrieves node description, and overrides hostname if available diff --git a/vendor/github.com/docker/swarmkit/agent/exec/controller.go b/vendor/github.com/docker/swarmkit/agent/exec/controller.go index e61dffd6a3..f8d000a233 100644 --- a/vendor/github.com/docker/swarmkit/agent/exec/controller.go +++ b/vendor/github.com/docker/swarmkit/agent/exec/controller.go @@ -69,7 +69,7 @@ func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage) // LogPublisherProvider defines the protocol for receiving a log publisher type LogPublisherProvider interface { - Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error) + Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error) } // ContainerStatuser reports status of a container. diff --git a/vendor/github.com/docker/swarmkit/agent/session.go b/vendor/github.com/docker/swarmkit/agent/session.go index 47081d081b..2779445915 100644 --- a/vendor/github.com/docker/swarmkit/agent/session.go +++ b/vendor/github.com/docker/swarmkit/agent/session.go @@ -226,6 +226,16 @@ func (s *session) logSubscriptions(ctx context.Context) error { client := api.NewLogBrokerClient(s.conn) subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{}) + if grpc.Code(err) == codes.Unimplemented { + log.Warning("manager does not support log subscriptions") + // Don't return, because returning would bounce the session + select { + case <-s.closed: + return errSessionClosed + case <-ctx.Done(): + return ctx.Err() + } + } if err != nil { return err } diff --git a/vendor/github.com/docker/swarmkit/agent/worker.go b/vendor/github.com/docker/swarmkit/agent/worker.go index db9f0ca409..55e7477715 100644 --- a/vendor/github.com/docker/swarmkit/agent/worker.go +++ b/vendor/github.com/docker/swarmkit/agent/worker.go @@ -406,12 +406,12 @@ func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID strin func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error { log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector) - publisher, err := w.publisherProvider.Publisher(ctx, subscription.ID) + publisher, cancel, err := w.publisherProvider.Publisher(ctx, subscription.ID) if err != nil { return err } // Send a close once we're done - defer publisher.Publish(ctx, api.LogMessage{}) + defer cancel() match := func(t *api.Task) bool { // TODO(aluzzardi): Consider using maps to limit the iterations. @@ -436,26 +436,49 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe return false } - ch, cancel := w.taskevents.Watch() - defer cancel() - + wg := sync.WaitGroup{} w.mu.Lock() for _, tm := range w.taskManagers { if match(tm.task) { - go tm.Logs(ctx, *subscription.Options, publisher) + wg.Add(1) + go func() { + defer wg.Done() + tm.Logs(ctx, *subscription.Options, publisher) + }() } } w.mu.Unlock() + // If follow mode is disabled, wait for the current set of matched tasks + // to finish publishing logs, then close the subscription by returning. + if subscription.Options == nil || !subscription.Options.Follow { + waitCh := make(chan struct{}) + go func() { + defer close(waitCh) + wg.Wait() + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-waitCh: + return nil + } + } + + // In follow mode, watch for new tasks. Don't close the subscription + // until it's cancelled. + ch, cancel := w.taskevents.Watch() + defer cancel() for { select { case v := <-ch: - w.mu.Lock() task := v.(*api.Task) if match(task) { + w.mu.Lock() go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher) + w.mu.Unlock() } - w.mu.Unlock() case <-ctx.Done(): return ctx.Err() } diff --git a/vendor/github.com/docker/swarmkit/ca/certificates.go b/vendor/github.com/docker/swarmkit/ca/certificates.go index dc5dbc8f1c..314b628f4c 100644 --- a/vendor/github.com/docker/swarmkit/ca/certificates.go +++ b/vendor/github.com/docker/swarmkit/ca/certificates.go @@ -153,7 +153,7 @@ func (rca *RootCA) IssueAndSaveNewCertificates(kw KeyWriter, cn, ou, org string) // RequestAndSaveNewCertificates gets new certificates issued, either by signing them locally if a signer is // available, or by requesting them from the remote server at remoteAddr. -func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWriter, token string, r remotes.Remotes, transport credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) (*tls.Certificate, error) { +func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWriter, config CertificateRequestConfig) (*tls.Certificate, error) { // Create a new key/pair and CSR csr, key, err := GenerateNewCSR() if err != nil { @@ -165,7 +165,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit // responding properly (for example, it may have just been demoted). var signedCert []byte for i := 0; i != 5; i++ { - signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, r, transport, nodeInfo) + signedCert, err = GetRemoteSignedCertificate(ctx, csr, rca.Pool, config) if err == nil { break } @@ -202,7 +202,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit var kekUpdate *KEKData for i := 0; i < 5; i++ { - kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, r) + kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes) if err == nil { break } @@ -545,18 +545,20 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) { // GetRemoteSignedCertificate submits a CSR to a remote CA server address, // and that is part of a CA identified by a specific certificate pool. -func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) ([]byte, error) { +func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x509.CertPool, config CertificateRequestConfig) ([]byte, error) { if rootCAPool == nil { return nil, errors.New("valid root CA pool required") } + creds := config.Credentials + if creds == nil { // This is our only non-MTLS request, and it happens when we are boostraping our TLS certs // We're using CARole as server name, so an external CA doesn't also have to have ManagerRole in the cert SANs creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool}) } - conn, peer, err := getGRPCConnection(creds, r) + conn, peer, err := getGRPCConnection(creds, config.Remotes) if err != nil { return nil, err } @@ -566,18 +568,13 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r caClient := api.NewNodeCAClient(conn) // Send the Request and retrieve the request token - issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: token} + issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token} issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest) if err != nil { - r.Observe(peer, -remotes.DefaultObservationWeight) + config.Remotes.Observe(peer, -remotes.DefaultObservationWeight) return nil, err } - // Send back the NodeID on the nodeInfo, so the caller can know what ID was assigned by the CA - if nodeInfo != nil { - nodeInfo <- *issueResponse - } - statusRequest := &api.NodeCertificateStatusRequest{NodeID: issueResponse.NodeID} expBackoff := events.NewExponentialBackoff(events.ExponentialBackoffConfig{ Base: time.Second, @@ -592,7 +589,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r defer cancel() statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest) if err != nil { - r.Observe(peer, -remotes.DefaultObservationWeight) + config.Remotes.Observe(peer, -remotes.DefaultObservationWeight) return nil, err } @@ -608,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r // retry until the certificate gets updated per our // current request. if bytes.Equal(statusResponse.Certificate.CSR, csr) { - r.Observe(peer, remotes.DefaultObservationWeight) + config.Remotes.Observe(peer, remotes.DefaultObservationWeight) return statusResponse.Certificate.Certificate, nil } } diff --git a/vendor/github.com/docker/swarmkit/ca/config.go b/vendor/github.com/docker/swarmkit/ca/config.go index f6b5d150f0..3c03ca162a 100644 --- a/vendor/github.com/docker/swarmkit/ca/config.go +++ b/vendor/github.com/docker/swarmkit/ca/config.go @@ -21,6 +21,7 @@ import ( "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/remotes" "github.com/pkg/errors" + "google.golang.org/grpc/credentials" "golang.org/x/net/context" ) @@ -52,8 +53,13 @@ const ( // SecurityConfig is used to represent a node's security configuration. It includes information about // the RootCA and ServerTLSCreds/ClientTLSCreds transport authenticators to be used for MTLS type SecurityConfig struct { + // mu protects against concurrent access to fields inside the structure. mu sync.Mutex + // renewalMu makes sure only one certificate renewal attempt happens at + // a time. It should never be locked after mu is already locked. + renewalMu sync.Mutex + rootCA *RootCA externalCA *ExternalCA keyReadWriter *KeyReadWriter @@ -234,89 +240,138 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote return rootCA, nil } -// LoadOrCreateSecurityConfig encapsulates the security logic behind joining a cluster. -// Every node requires at least a set of TLS certificates with which to join the cluster with. -// In the case of a manager, these certificates will be used both for client and server credentials. -func LoadOrCreateSecurityConfig(ctx context.Context, rootCA RootCA, token, proposedRole string, remotes remotes.Remotes, nodeInfo chan<- api.IssueNodeCertificateResponse, krw *KeyReadWriter) (*SecurityConfig, error) { +// LoadSecurityConfig loads TLS credentials from disk, or returns an error if +// these credentials do not exist or are unusable. +func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter) (*SecurityConfig, error) { ctx = log.WithModule(ctx, "tls") // At this point we've successfully loaded the CA details from disk, or // successfully downloaded them remotely. The next step is to try to // load our certificates. - clientTLSCreds, serverTLSCreds, err := LoadTLSCreds(rootCA, krw) + + // Read both the Cert and Key from disk + cert, key, err := krw.Read() if err != nil { - if _, ok := errors.Cause(err).(ErrInvalidKEK); ok { - return nil, err - } + return nil, err + } - log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target()) + // Create an x509 certificate out of the contents on disk + certBlock, _ := pem.Decode([]byte(cert)) + if certBlock == nil { + return nil, errors.New("failed to parse certificate PEM") + } - var ( - tlsKeyPair *tls.Certificate - err error - ) + // Create an X509Cert so we can .Verify() + X509Cert, err := x509.ParseCertificate(certBlock.Bytes) + if err != nil { + return nil, err + } - if rootCA.CanSign() { - // Create a new random ID for this certificate - cn := identity.NewID() - org := identity.NewID() + // Include our root pool + opts := x509.VerifyOptions{ + Roots: rootCA.Pool, + } - if nodeInfo != nil { - nodeInfo <- api.IssueNodeCertificateResponse{ - NodeID: cn, - NodeMembership: api.NodeMembershipAccepted, - } - } - tlsKeyPair, err = rootCA.IssueAndSaveNewCertificates(krw, cn, proposedRole, org) - if err != nil { - log.G(ctx).WithFields(logrus.Fields{ - "node.id": cn, - "node.role": proposedRole, - }).WithError(err).Errorf("failed to issue and save new certificate") - return nil, err - } + // Check to see if this certificate was signed by our CA, and isn't expired + if _, err := X509Cert.Verify(opts); err != nil { + return nil, err + } + // Now that we know this certificate is valid, create a TLS Certificate for our + // credentials + keyPair, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + + // Load the Certificates as server credentials + serverTLSCreds, err := rootCA.NewServerTLSCredentials(&keyPair) + if err != nil { + return nil, err + } + + // Load the Certificates also as client credentials. + // Both workers and managers always connect to remote managers, + // so ServerName is always set to ManagerRole here. + clientTLSCreds, err := rootCA.NewClientTLSCredentials(&keyPair, ManagerRole) + if err != nil { + return nil, err + } + + log.G(ctx).WithFields(logrus.Fields{ + "node.id": clientTLSCreds.NodeID(), + "node.role": clientTLSCreds.Role(), + }).Debug("loaded node credentials") + + return NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds), nil +} + +// CertificateRequestConfig contains the information needed to request a +// certificate from a remote CA. +type CertificateRequestConfig struct { + // Token is the join token that authenticates us with the CA. + Token string + // Remotes is the set of remote CAs. + Remotes remotes.Remotes + // Credentials provides transport credentials for communicating with the + // remote server. + Credentials credentials.TransportCredentials +} + +// CreateSecurityConfig creates a new key and cert for this node, either locally +// or via a remote CA. +func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWriter, config CertificateRequestConfig) (*SecurityConfig, error) { + ctx = log.WithModule(ctx, "tls") + + var ( + tlsKeyPair *tls.Certificate + err error + ) + + if rootCA.CanSign() { + // Create a new random ID for this certificate + cn := identity.NewID() + org := identity.NewID() + + proposedRole := ManagerRole + tlsKeyPair, err = rootCA.IssueAndSaveNewCertificates(krw, cn, proposedRole, org) + if err != nil { log.G(ctx).WithFields(logrus.Fields{ "node.id": cn, "node.role": proposedRole, - }).Debug("issued new TLS certificate") - } else { - // There was an error loading our Credentials, let's get a new certificate issued - // Last argument is nil because at this point we don't have any valid TLS creds - tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, krw, token, remotes, nil, nodeInfo) - if err != nil { - log.G(ctx).WithError(err).Error("failed to request save new certificate") - return nil, err - } - } - // Create the Server TLS Credentials for this node. These will not be used by workers. - serverTLSCreds, err = rootCA.NewServerTLSCredentials(tlsKeyPair) - if err != nil { + }).WithError(err).Errorf("failed to issue and save new certificate") return nil, err } - // Create a TLSConfig to be used when this node connects as a client to another remote node. - // We're using ManagerRole as remote serverName for TLS host verification - clientTLSCreds, err = rootCA.NewClientTLSCredentials(tlsKeyPair, ManagerRole) + log.G(ctx).WithFields(logrus.Fields{ + "node.id": cn, + "node.role": proposedRole, + }).Debug("issued new TLS certificate") + } else { + // Request certificate issuance from a remote CA. + // Last argument is nil because at this point we don't have any valid TLS creds + tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, krw, config) if err != nil { + log.G(ctx).WithError(err).Error("failed to request save new certificate") return nil, err } - log.G(ctx).WithFields(logrus.Fields{ - "node.id": clientTLSCreds.NodeID(), - "node.role": clientTLSCreds.Role(), - }).Debugf("new node credentials generated: %s", krw.Target()) - } else { - if nodeInfo != nil { - nodeInfo <- api.IssueNodeCertificateResponse{ - NodeID: clientTLSCreds.NodeID(), - NodeMembership: api.NodeMembershipAccepted, - } - } - log.G(ctx).WithFields(logrus.Fields{ - "node.id": clientTLSCreds.NodeID(), - "node.role": clientTLSCreds.Role(), - }).Debug("loaded node credentials") } + // Create the Server TLS Credentials for this node. These will not be used by workers. + serverTLSCreds, err := rootCA.NewServerTLSCredentials(tlsKeyPair) + if err != nil { + return nil, err + } + + // Create a TLSConfig to be used when this node connects as a client to another remote node. + // We're using ManagerRole as remote serverName for TLS host verification + clientTLSCreds, err := rootCA.NewClientTLSCredentials(tlsKeyPair, ManagerRole) + if err != nil { + return nil, err + } + log.G(ctx).WithFields(logrus.Fields{ + "node.id": clientTLSCreds.NodeID(), + "node.role": clientTLSCreds.Role(), + }).Debugf("new node credentials generated: %s", krw.Target()) return NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds), nil } @@ -324,6 +379,9 @@ func LoadOrCreateSecurityConfig(ctx context.Context, rootCA RootCA, token, propo // RenewTLSConfigNow gets a new TLS cert and key, and updates the security config if provided. This is similar to // RenewTLSConfig, except while that monitors for expiry, and periodically renews, this renews once and is blocking func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes) error { + s.renewalMu.Lock() + defer s.renewalMu.Unlock() + ctx = log.WithModule(ctx, "tls") log := log.G(ctx).WithFields(logrus.Fields{ "node.id": s.ClientTLSCreds.NodeID(), @@ -334,10 +392,10 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes rootCA := s.RootCA() tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx, s.KeyWriter(), - "", - r, - s.ClientTLSCreds, - nil) + CertificateRequestConfig{ + Remotes: r, + Credentials: s.ClientTLSCreds, + }) if err != nil { log.WithError(err).Errorf("failed to renew the certificate") return err @@ -463,61 +521,6 @@ func calculateRandomExpiry(validFrom, validUntil time.Time) time.Duration { return expiry } -// LoadTLSCreds loads tls credentials from the specified path and verifies that -// thay are valid for the RootCA. -func LoadTLSCreds(rootCA RootCA, kr KeyReader) (*MutableTLSCreds, *MutableTLSCreds, error) { - // Read both the Cert and Key from disk - cert, key, err := kr.Read() - if err != nil { - return nil, nil, err - } - - // Create an x509 certificate out of the contents on disk - certBlock, _ := pem.Decode([]byte(cert)) - if certBlock == nil { - return nil, nil, errors.New("failed to parse certificate PEM") - } - - // Create an X509Cert so we can .Verify() - X509Cert, err := x509.ParseCertificate(certBlock.Bytes) - if err != nil { - return nil, nil, err - } - - // Include our root pool - opts := x509.VerifyOptions{ - Roots: rootCA.Pool, - } - - // Check to see if this certificate was signed by our CA, and isn't expired - if _, err := X509Cert.Verify(opts); err != nil { - return nil, nil, err - } - - // Now that we know this certificate is valid, create a TLS Certificate for our - // credentials - keyPair, err := tls.X509KeyPair(cert, key) - if err != nil { - return nil, nil, err - } - - // Load the Certificates as server credentials - serverTLSCreds, err := rootCA.NewServerTLSCredentials(&keyPair) - if err != nil { - return nil, nil, err - } - - // Load the Certificates also as client credentials. - // Both workers and managers always connect to remote managers, - // so ServerName is always set to ManagerRole here. - clientTLSCreds, err := rootCA.NewClientTLSCredentials(&keyPair, ManagerRole) - if err != nil { - return nil, nil, err - } - - return clientTLSCreds, serverTLSCreds, nil -} - // NewServerTLSConfig returns a tls.Config configured for a TLS Server, given a tls.Certificate // and the PEM-encoded root CA Certificate func NewServerTLSConfig(cert *tls.Certificate, rootCAPool *x509.CertPool) (*tls.Config, error) { @@ -554,8 +557,8 @@ func NewClientTLSConfig(cert *tls.Certificate, rootCAPool *x509.CertPool, server // NewClientTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate // a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to. -func (rca *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName string) (*MutableTLSCreds, error) { - tlsConfig, err := NewClientTLSConfig(cert, rca.Pool, serverName) +func (rootCA *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName string) (*MutableTLSCreds, error) { + tlsConfig, err := NewClientTLSConfig(cert, rootCA.Pool, serverName) if err != nil { return nil, err } @@ -567,8 +570,8 @@ func (rca *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName str // NewServerTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate // a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to. -func (rca *RootCA) NewServerTLSCredentials(cert *tls.Certificate) (*MutableTLSCreds, error) { - tlsConfig, err := NewServerTLSConfig(cert, rca.Pool) +func (rootCA *RootCA) NewServerTLSCredentials(cert *tls.Certificate) (*MutableTLSCreds, error) { + tlsConfig, err := NewServerTLSConfig(cert, rootCA.Pool) if err != nil { return nil, err } diff --git a/vendor/github.com/docker/swarmkit/ca/external.go b/vendor/github.com/docker/swarmkit/ca/external.go index 28cdf3ac7b..c2240268ce 100644 --- a/vendor/github.com/docker/swarmkit/ca/external.go +++ b/vendor/github.com/docker/swarmkit/ca/external.go @@ -12,6 +12,8 @@ import ( "github.com/cloudflare/cfssl/api" "github.com/cloudflare/cfssl/signer" "github.com/pkg/errors" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" ) // ErrNoExternalCAURLs is an error used it indicate that an ExternalCA is @@ -65,7 +67,7 @@ func (eca *ExternalCA) UpdateURLs(urls ...string) { // Sign signs a new certificate by proxying the given certificate signing // request to an external CFSSL API server. -func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) { +func (eca *ExternalCA) Sign(ctx context.Context, req signer.SignRequest) (cert []byte, err error) { // Get the current HTTP client and list of URLs in a small critical // section. We will use these to make certificate signing requests. eca.mu.Lock() @@ -85,7 +87,7 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) { // Try each configured proxy URL. Return after the first success. If // all fail then the last error will be returned. for _, url := range urls { - cert, err = makeExternalSignRequest(client, url, csrJSON) + cert, err = makeExternalSignRequest(ctx, client, url, csrJSON) if err == nil { return eca.rootCA.AppendFirstRootPEM(cert) } @@ -96,14 +98,31 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) { return nil, err } -func makeExternalSignRequest(client *http.Client, url string, csrJSON []byte) (cert []byte, err error) { - resp, err := client.Post(url, "application/json", bytes.NewReader(csrJSON)) +func makeExternalSignRequest(ctx context.Context, client *http.Client, url string, csrJSON []byte) (cert []byte, err error) { + resp, err := ctxhttp.Post(ctx, client, url, "application/json", bytes.NewReader(csrJSON)) if err != nil { return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")} } - defer resp.Body.Close() + + doneReading := make(chan struct{}) + bodyClosed := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + case <-doneReading: + } + resp.Body.Close() + close(bodyClosed) + }() body, err := ioutil.ReadAll(resp.Body) + close(doneReading) + <-bodyClosed + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } if err != nil { return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")} } diff --git a/vendor/github.com/docker/swarmkit/ca/server.go b/vendor/github.com/docker/swarmkit/ca/server.go index 938f597058..5e074b53df 100644 --- a/vendor/github.com/docker/swarmkit/ca/server.go +++ b/vendor/github.com/docker/swarmkit/ca/server.go @@ -617,7 +617,7 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error { ) // Try using the external CA first. - cert, err := externalCA.Sign(PrepareCSR(rawCSR, cn, ou, org)) + cert, err := externalCA.Sign(ctx, PrepareCSR(rawCSR, cn, ou, org)) if err == ErrNoExternalCAURLs { // No external CA servers configured. Try using the local CA. cert, err = rootCA.ParseValidateAndSignCSR(rawCSR, cn, ou, org) diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index 1db89307e4..3492b946fe 100644 --- a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -2,6 +2,7 @@ package logbroker import ( "errors" + "fmt" "io" "sync" @@ -24,6 +25,12 @@ var ( errNotRunning = errors.New("broker is not running") ) +type logMessage struct { + *api.PublishLogsMessage + completed bool + err error +} + // LogBroker coordinates log subscriptions to services and tasks. Clients can // publish and subscribe to logs channels. // @@ -35,6 +42,7 @@ type LogBroker struct { subscriptionQueue *watch.Queue registeredSubscriptions map[string]*subscription + connectedNodes map[string]struct{} pctx context.Context cancelAll context.CancelFunc @@ -62,6 +70,7 @@ func (lb *LogBroker) Run(ctx context.Context) error { lb.logQueue = watch.NewQueue() lb.subscriptionQueue = watch.NewQueue() lb.registeredSubscriptions = make(map[string]*subscription) + lb.connectedNodes = make(map[string]struct{}) lb.mu.Unlock() select { @@ -112,12 +121,30 @@ func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.Log return subscription } +func (lb *LogBroker) getSubscription(id string) *subscription { + lb.mu.RLock() + defer lb.mu.RUnlock() + + subscription, ok := lb.registeredSubscriptions[id] + if !ok { + return nil + } + return subscription +} + func (lb *LogBroker) registerSubscription(subscription *subscription) { lb.mu.Lock() defer lb.mu.Unlock() lb.registeredSubscriptions[subscription.message.ID] = subscription lb.subscriptionQueue.Publish(subscription) + + // Mark nodes that won't receive the message as done. + for _, node := range subscription.Nodes() { + if _, ok := lb.connectedNodes[node]; !ok { + subscription.Done(node, fmt.Errorf("node %s is not available", node)) + } + } } func (lb *LogBroker) unregisterSubscription(subscription *subscription) { @@ -160,7 +187,7 @@ func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) { defer lb.mu.RUnlock() return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool { - publish := event.(*api.PublishLogsMessage) + publish := event.(*logMessage) return publish.SubscriptionID == id })) } @@ -169,7 +196,7 @@ func (lb *LogBroker) publish(log *api.PublishLogsMessage) { lb.mu.RLock() defer lb.mu.RUnlock() - lb.logQueue.Publish(log) + lb.logQueue.Publish(&logMessage{PublishLogsMessage: log}) } // SubscribeLogs creates a log subscription and streams back logs @@ -190,7 +217,6 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api "subscription.id": subscription.message.ID, }, ) - log.Debug("subscribed") publishCh, publishCancel := lb.subscribe(subscription.message.ID) @@ -199,23 +225,50 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api lb.registerSubscription(subscription) defer lb.unregisterSubscription(subscription) + completed := subscription.Wait(ctx) for { select { + case <-ctx.Done(): + return ctx.Err() + case <-lb.pctx.Done(): + return lb.pctx.Err() case event := <-publishCh: - publish := event.(*api.PublishLogsMessage) + publish := event.(*logMessage) + if publish.completed { + return publish.err + } if err := stream.Send(&api.SubscribeLogsMessage{ Messages: publish.Messages, }); err != nil { return err } - case <-ctx.Done(): - return ctx.Err() - case <-lb.pctx.Done(): - return nil + case <-completed: + completed = nil + lb.logQueue.Publish(&logMessage{ + PublishLogsMessage: &api.PublishLogsMessage{ + SubscriptionID: subscription.message.ID, + }, + completed: true, + err: subscription.Err(), + }) } } } +func (lb *LogBroker) nodeConnected(nodeID string) { + lb.mu.Lock() + defer lb.mu.Unlock() + + lb.connectedNodes[nodeID] = struct{}{} +} + +func (lb *LogBroker) nodeDisconnected(nodeID string) { + lb.mu.Lock() + defer lb.mu.Unlock() + + delete(lb.connectedNodes, nodeID) +} + // ListenSubscriptions returns a stream of matching subscriptions for the current node func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error { remote, err := ca.RemoteNode(stream.Context()) @@ -223,6 +276,9 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest return err } + lb.nodeConnected(remote.NodeID) + defer lb.nodeDisconnected(remote.NodeID) + log := log.G(stream.Context()).WithFields( logrus.Fields{ "method": "(*LogBroker).ListenSubscriptions", @@ -234,7 +290,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest log.Debug("node registered") - activeSubscriptions := make(map[string]struct{}) + activeSubscriptions := make(map[string]*subscription) + defer func() { + // If the worker quits, mark all active subscriptions as finished. + for _, subscription := range activeSubscriptions { + subscription.Done(remote.NodeID, fmt.Errorf("node %s disconnected unexpectedly", remote.NodeID)) + } + }() // Start by sending down all active subscriptions. for _, subscription := range subscriptions { @@ -250,7 +312,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest log.Error(err) return err } - activeSubscriptions[subscription.message.ID] = struct{}{} + activeSubscriptions[subscription.message.ID] = subscription } // Send down new subscriptions. @@ -261,12 +323,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest if subscription.message.Close { log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed") + delete(activeSubscriptions, subscription.message.ID) } else { // Avoid sending down the same subscription multiple times if _, ok := activeSubscriptions[subscription.message.ID]; ok { continue } - activeSubscriptions[subscription.message.ID] = struct{}{} + activeSubscriptions[subscription.message.ID] = subscription log.WithField("subscription.id", subscription.message.ID).Debug("subscription added") } if err := stream.Send(subscription.message); err != nil { @@ -282,12 +345,19 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest } // PublishLogs publishes log messages for a given subscription -func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error { +func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err error) { remote, err := ca.RemoteNode(stream.Context()) if err != nil { return err } + var currentSubscription *subscription + defer func() { + if currentSubscription != nil { + currentSubscription.Done(remote.NodeID, err) + } + }() + for { log, err := stream.Recv() if err == io.EOF { @@ -301,6 +371,17 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error { return grpc.Errorf(codes.InvalidArgument, "missing subscription ID") } + if currentSubscription == nil { + currentSubscription = lb.getSubscription(log.SubscriptionID) + if currentSubscription == nil { + return grpc.Errorf(codes.NotFound, "unknown subscription ID") + } + } else { + if log.SubscriptionID != currentSubscription.message.ID { + return grpc.Errorf(codes.InvalidArgument, "different subscription IDs in the same session") + } + } + // Make sure logs are emitted using the right Node ID to avoid impersonation. for _, msg := range log.Messages { if msg.Context.NodeID != remote.NodeID { diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go b/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go index ac945e234b..6fbaf101b8 100644 --- a/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go @@ -2,6 +2,8 @@ package logbroker import ( "context" + "fmt" + "strings" "sync" events "github.com/docker/go-events" @@ -14,6 +16,7 @@ import ( type subscription struct { mu sync.RWMutex + wg sync.WaitGroup store *store.MemoryStore message *api.SubscriptionMessage @@ -22,18 +25,25 @@ type subscription struct { ctx context.Context cancel context.CancelFunc - nodes map[string]struct{} + errors []error + nodes map[string]struct{} + pendingTasks map[string]struct{} } func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription { return &subscription{ - store: store, - message: message, - changed: changed, - nodes: make(map[string]struct{}), + store: store, + message: message, + changed: changed, + nodes: make(map[string]struct{}), + pendingTasks: make(map[string]struct{}), } } +func (s *subscription) follow() bool { + return s.message.Options != nil && s.message.Options.Follow +} + func (s *subscription) Contains(nodeID string) bool { s.mu.RLock() defer s.mu.RUnlock() @@ -42,15 +52,28 @@ func (s *subscription) Contains(nodeID string) bool { return ok } +func (s *subscription) Nodes() []string { + s.mu.RLock() + defer s.mu.RUnlock() + + nodes := make([]string, 0, len(s.nodes)) + for node := range s.nodes { + nodes = append(nodes, node) + } + return nodes +} + func (s *subscription) Run(ctx context.Context) { s.ctx, s.cancel = context.WithCancel(ctx) - wq := s.store.WatchQueue() - ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{}) - go func() { - defer cancel() - s.watch(ch) - }() + if s.follow() { + wq := s.store.WatchQueue() + ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{}) + go func() { + defer cancel() + s.watch(ch) + }() + } s.match() } @@ -61,10 +84,74 @@ func (s *subscription) Stop() { } } +func (s *subscription) Wait(ctx context.Context) <-chan struct{} { + // Follow subscriptions never end + if s.follow() { + return nil + } + + ch := make(chan struct{}) + go func() { + defer close(ch) + s.wg.Wait() + }() + return ch +} + +func (s *subscription) Done(nodeID string, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + if err != nil { + s.errors = append(s.errors, err) + } + + if s.follow() { + return + } + + if _, ok := s.nodes[nodeID]; !ok { + return + } + + delete(s.nodes, nodeID) + s.wg.Done() +} + +func (s *subscription) Err() error { + s.mu.RLock() + defer s.mu.RUnlock() + + if len(s.errors) == 0 && len(s.pendingTasks) == 0 { + return nil + } + + messages := make([]string, 0, len(s.errors)) + for _, err := range s.errors { + messages = append(messages, err.Error()) + } + for t := range s.pendingTasks { + messages = append(messages, fmt.Sprintf("task %s has not been scheduled", t)) + } + + return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", ")) +} + func (s *subscription) match() { s.mu.Lock() defer s.mu.Unlock() + add := func(t *api.Task) { + if t.NodeID == "" { + s.pendingTasks[t.ID] = struct{}{} + return + } + if _, ok := s.nodes[t.NodeID]; !ok { + s.nodes[t.NodeID] = struct{}{} + s.wg.Add(1) + } + } + s.store.View(func(tx store.ReadTx) { for _, nid := range s.message.Selector.NodeIDs { s.nodes[nid] = struct{}{} @@ -72,7 +159,7 @@ func (s *subscription) match() { for _, tid := range s.message.Selector.TaskIDs { if task := store.GetTask(tx, tid); task != nil { - s.nodes[task.NodeID] = struct{}{} + add(task) } } @@ -83,7 +170,7 @@ func (s *subscription) match() { continue } for _, task := range tasks { - s.nodes[task.NodeID] = struct{}{} + add(task) } } }) @@ -100,12 +187,19 @@ func (s *subscription) watch(ch <-chan events.Event) error { matchServices[sid] = struct{}{} } - add := func(nodeID string) { + add := func(t *api.Task) { s.mu.Lock() defer s.mu.Unlock() - if _, ok := s.nodes[nodeID]; !ok { - s.nodes[nodeID] = struct{}{} + // Un-allocated task. + if t.NodeID == "" { + s.pendingTasks[t.ID] = struct{}{} + return + } + + delete(s.pendingTasks, t.ID) + if _, ok := s.nodes[t.NodeID]; !ok { + s.nodes[t.NodeID] = struct{}{} s.changed.Publish(s) } } @@ -129,10 +223,10 @@ func (s *subscription) watch(ch <-chan events.Event) error { } if _, ok := matchTasks[t.ID]; ok { - add(t.NodeID) + add(t) } if _, ok := matchServices[t.ServiceID]; ok { - add(t.NodeID) + add(t) } } } diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index edf807cdd8..f5d5e87f47 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -385,14 +385,10 @@ func (m *Manager) Run(parent context.Context) error { close(m.started) - watchDone := make(chan struct{}) - watchCtx, watchCtxCancel := context.WithCancel(parent) go func() { err := m.raftNode.Run(ctx) - watchCtxCancel() - <-watchDone if err != nil { - log.G(ctx).Error(err) + log.G(ctx).WithError(err).Error("raft node stopped") m.Stop(ctx) } }() @@ -407,7 +403,7 @@ func (m *Manager) Run(parent context.Context) error { } raftConfig := c.Spec.Raft - if err := m.watchForKEKChanges(watchCtx, watchDone); err != nil { + if err := m.watchForKEKChanges(ctx); err != nil { return err } @@ -544,8 +540,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { return nil } -func (m *Manager) watchForKEKChanges(ctx context.Context, watchDone chan struct{}) error { - defer close(watchDone) +func (m *Manager) watchForKEKChanges(ctx context.Context) error { clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization() clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(), func(tx store.ReadTx) error { diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go index 25329ff9ea..28c2a436d3 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go @@ -240,6 +240,8 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin } }) + updates := make(map[*api.Service][]orchestrator.Slot) + _, err := g.store.Batch(func(batch *store.Batch) error { var updateTasks []orchestrator.Slot for _, serviceID := range serviceIDs { @@ -274,8 +276,9 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin updateTasks = append(updateTasks, ntasks) } } + if len(updateTasks) > 0 { - g.updater.Update(ctx, g.cluster, service.Service, updateTasks) + updates[service.Service] = updateTasks } // Remove any tasks assigned to nodes not found in g.nodes. @@ -287,9 +290,15 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin } return nil }) + if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed") } + + for service, updateTasks := range updates { + g.updater.Update(ctx, g.cluster, service, updateTasks) + } + } // updateNode updates g.nodes based on the current node value diff --git a/vendor/github.com/docker/swarmkit/node/node.go b/vendor/github.com/docker/swarmkit/node/node.go index 2cc3b4bb25..6c28b2a9ef 100644 --- a/vendor/github.com/docker/swarmkit/node/node.go +++ b/vendor/github.com/docker/swarmkit/node/node.go @@ -100,26 +100,24 @@ type Config struct { // cluster. Node handles workloads and may also run as a manager. type Node struct { sync.RWMutex - config *Config - remotes *persistentRemotes - role string - roleCond *sync.Cond - conn *grpc.ClientConn - connCond *sync.Cond - nodeID string - nodeMembership api.NodeSpec_Membership - started chan struct{} - startOnce sync.Once - stopped chan struct{} - stopOnce sync.Once - ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests - certificateRequested chan struct{} // closed when certificate issue request has been sent by node - closed chan struct{} - err error - agent *agent.Agent - manager *manager.Manager - notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion - unlockKey []byte + config *Config + remotes *persistentRemotes + role string + roleCond *sync.Cond + conn *grpc.ClientConn + connCond *sync.Cond + nodeID string + started chan struct{} + startOnce sync.Once + stopped chan struct{} + stopOnce sync.Once + ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests + closed chan struct{} + err error + agent *agent.Agent + manager *manager.Manager + notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion + unlockKey []byte } // RemoteAPIAddr returns address on which remote manager api listens. @@ -155,16 +153,15 @@ func New(c *Config) (*Node, error) { } n := &Node{ - remotes: newPersistentRemotes(stateFile, p...), - role: ca.WorkerRole, - config: c, - started: make(chan struct{}), - stopped: make(chan struct{}), - closed: make(chan struct{}), - ready: make(chan struct{}), - certificateRequested: make(chan struct{}), - notifyNodeChange: make(chan *api.Node, 1), - unlockKey: c.UnlockKey, + remotes: newPersistentRemotes(stateFile, p...), + role: ca.WorkerRole, + config: c, + started: make(chan struct{}), + stopped: make(chan struct{}), + closed: make(chan struct{}), + ready: make(chan struct{}), + notifyNodeChange: make(chan *api.Node, 1), + unlockKey: c.UnlockKey, } if n.config.JoinAddr != "" || n.config.ForceNewCluster { @@ -403,13 +400,6 @@ func (n *Node) Ready() <-chan struct{} { return n.ready } -// CertificateRequested returns a channel that is closed after node has -// requested a certificate. After this call a caller can expect calls to -// NodeID() and `NodeMembership()` to succeed. -func (n *Node) CertificateRequested() <-chan struct{} { - return n.certificateRequested -} - func (n *Node) setControlSocket(conn *grpc.ClientConn) { n.Lock() if n.conn != nil { @@ -461,13 +451,6 @@ func (n *Node) NodeID() string { return n.nodeID } -// NodeMembership returns current node's membership. May be empty if not set. -func (n *Node) NodeMembership() api.NodeSpec_Membership { - n.RLock() - defer n.RUnlock() - return n.nodeMembership -} - // Manager returns manager instance started by node. May be nil. func (n *Node) Manager() *manager.Manager { n.RLock() @@ -507,18 +490,14 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro return nil, err } if err == nil { - clientTLSCreds, serverTLSCreds, err := ca.LoadTLSCreds(rootCA, krw) - _, ok := errors.Cause(err).(ca.ErrInvalidKEK) - switch { - case err == nil: - securityConfig = ca.NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds) - log.G(ctx).Debug("loaded CA and TLS certificates") - case ok: - return nil, ErrInvalidUnlockKey - case os.IsNotExist(err): - break - default: - return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert) + securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw) + if err != nil { + _, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK) + if isInvalidKEK { + return nil, ErrInvalidUnlockKey + } else if !os.IsNotExist(err) { + return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert) + } } } @@ -544,44 +523,36 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro } // Obtain new certs and setup TLS certificates renewal for this node: - // - We call LoadOrCreateSecurityConfig which blocks until a valid certificate has been issued - // - We retrieve the nodeID from LoadOrCreateSecurityConfig through the info channel. This allows - // us to display the ID before the certificate gets issued (for potential approval). - // - We wait for LoadOrCreateSecurityConfig to finish since we need a certificate to operate. - // - Given a valid certificate, spin a renewal go-routine that will ensure that certificates stay - // up to date. - issueResponseChan := make(chan api.IssueNodeCertificateResponse, 1) - go func() { - select { - case <-ctx.Done(): - case resp := <-issueResponseChan: - log.G(log.WithModule(ctx, "tls")).WithFields(logrus.Fields{ - "node.id": resp.NodeID, - }).Debugf("loaded TLS certificate") - n.Lock() - n.nodeID = resp.NodeID - n.nodeMembership = resp.NodeMembership - n.Unlock() - close(n.certificateRequested) - } - }() + // - If certificates weren't present on disk, we call CreateSecurityConfig, which blocks + // until a valid certificate has been issued. + // - We wait for CreateSecurityConfig to finish since we need a certificate to operate. - // LoadOrCreateSecurityConfig is the point at which a new node joining a cluster will retrieve TLS - // certificates and write them to disk - securityConfig, err = ca.LoadOrCreateSecurityConfig( - ctx, rootCA, n.config.JoinToken, ca.ManagerRole, n.remotes, issueResponseChan, krw) - if err != nil { + // Attempt to load certificate from disk + securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw) + if err == nil { + log.G(ctx).WithFields(logrus.Fields{ + "node.id": securityConfig.ClientTLSCreds.NodeID(), + }).Debugf("loaded TLS certificate") + } else { if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok { return nil, ErrInvalidUnlockKey } - return nil, err + log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target()) + + securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{ + Token: n.config.JoinToken, + Remotes: n.remotes, + }) + + if err != nil { + return nil, err + } } } n.Lock() n.role = securityConfig.ClientTLSCreds.Role() n.nodeID = securityConfig.ClientTLSCreds.NodeID() - n.nodeMembership = api.NodeMembershipAccepted n.roleCond.Broadcast() n.Unlock()