From 077f08bf54dc89d382cbcbd797b2bd7c4867151d Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 26 Apr 2017 11:01:01 -0700 Subject: [PATCH] Vendor swarmkit 8f053c2 Signed-off-by: Aaron Lehmann --- vendor.conf | 4 +- vendor/github.com/coreos/etcd/raft/raft.go | 4 + vendor/github.com/docker/swarmkit/README.md | 4 +- .../docker/swarmkit/agent/worker.go | 22 ++- .../github.com/docker/swarmkit/ca/config.go | 98 ---------- .../docker/swarmkit/ca/reconciler.go | 2 +- .../github.com/docker/swarmkit/ca/renewer.go | 166 +++++++++++++++++ .../github.com/docker/swarmkit/ca/server.go | 13 +- .../docker/swarmkit/identity/doc.go | 13 +- .../docker/swarmkit/manager/allocator/doc.go | 20 +- .../swarmkit/manager/allocator/network.go | 92 +++++----- .../networkallocator/networkallocator.go | 171 +++++++++++++----- .../networkallocator/portallocator.go | 8 +- .../swarmkit/manager/controlapi/service.go | 79 +++++++- .../swarmkit/manager/dispatcher/dispatcher.go | 12 +- .../constraintenforcer/constraint_enforcer.go | 2 +- .../manager/orchestrator/global/global.go | 8 +- .../orchestrator/replicated/services.go | 6 +- .../manager/orchestrator/replicated/tasks.go | 2 +- .../swarmkit/manager/orchestrator/service.go | 2 +- .../manager/orchestrator/taskinit/init.go | 2 +- .../manager/orchestrator/update/updater.go | 6 +- .../swarmkit/manager/scheduler/scheduler.go | 6 +- .../swarmkit/manager/state/raft/raft.go | 24 ++- .../swarmkit/manager/state/{ => store}/doc.go | 16 +- .../swarmkit/manager/state/store/memory.go | 15 +- .../github.com/docker/swarmkit/node/node.go | 107 +++++------ vendor/github.com/docker/swarmkit/vendor.conf | 2 +- 28 files changed, 567 insertions(+), 339 deletions(-) create mode 100644 vendor/github.com/docker/swarmkit/ca/renewer.go rename vendor/github.com/docker/swarmkit/manager/state/{ => store}/doc.go (68%) diff --git a/vendor.conf b/vendor.conf index 021c45b77d..23512ce23d 100644 --- a/vendor.conf +++ b/vendor.conf @@ -41,7 +41,7 @@ github.com/vishvananda/netlink 1e86b2bee5b6a7d377e4c02bb7f98209d6a7297c github.com/BurntSushi/toml f706d00e3de6abe700c994cdd545a1a4915af060 github.com/samuel/go-zookeeper d0e0d8e11f318e000a8cc434616d69e329edc374 github.com/deckarep/golang-set ef32fa3046d9f249d399f98ebaf9be944430fd1d -github.com/coreos/etcd 824277cb3a577a0e8c829ca9ec557b973fe06d20 +github.com/coreos/etcd ea5389a79f40206170582c1ea076191b8622cb8e https://github.com/aaronlehmann/etcd # for https://github.com/coreos/etcd/pull/7830 github.com/ugorji/go f1f1a805ed361a0e078bb537e4ea78cd37dcf065 github.com/hashicorp/consul v0.5.2 github.com/boltdb/bolt fff57c100f4dea1905678da7e90d92429dff2904 @@ -108,7 +108,7 @@ github.com/docker/containerd 9048e5e50717ea4497b757314bad98ea3763c145 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit 61a92e8ec074df5769decda985df4a3ab43c77eb +github.com/docker/swarmkit 8f053c2030ebfc90f19f241fb7880e95b9761b7a github.com/gogo/protobuf 8d70fb3182befc465c4a1eac8ad4d38ff49778e2 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e diff --git a/vendor/github.com/coreos/etcd/raft/raft.go b/vendor/github.com/coreos/etcd/raft/raft.go index 70a260dbe6..633cc147f5 100644 --- a/vendor/github.com/coreos/etcd/raft/raft.go +++ b/vendor/github.com/coreos/etcd/raft/raft.go @@ -1154,6 +1154,10 @@ func (r *raft) addNode(id uint64) { } r.setProgress(id, 0, r.raftLog.lastIndex()+1) + // When a node is first added, we should mark it as recently active. + // Otherwise, CheckQuorum may cause us to step down if it is invoked + // before the added node has a chance to communicate with us. + r.prs[id].RecentActive = true } func (r *raft) removeNode(id uint64) { diff --git a/vendor/github.com/docker/swarmkit/README.md b/vendor/github.com/docker/swarmkit/README.md index 33098c91fc..4900fe3625 100644 --- a/vendor/github.com/docker/swarmkit/README.md +++ b/vendor/github.com/docker/swarmkit/README.md @@ -1,6 +1,6 @@ # [SwarmKit](https://github.com/docker/swarmkit) -[![GoDoc](https://godoc.org/github.com/docker/swarmkit?status.png)](https://godoc.org/github.com/docker/swarmkit) +[![GoDoc](https://godoc.org/github.com/docker/swarmkit?status.svg)](https://godoc.org/github.com/docker/swarmkit) [![Circle CI](https://circleci.com/gh/docker/swarmkit.svg?style=shield&circle-token=a7bf494e28963703a59de71cf19b73ad546058a7)](https://circleci.com/gh/docker/swarmkit) [![codecov.io](https://codecov.io/github/docker/swarmkit/coverage.svg?branch=master&token=LqD1dzTjsN)](https://codecov.io/github/docker/swarmkit?branch=master) [![Badge Badge](http://doyouevenbadge.com/github.com/docker/swarmkit)](http://doyouevenbadge.com/report/github.com/docker/swarmkit) @@ -83,7 +83,7 @@ Requirements: - Go 1.6 or higher - A [working golang](https://golang.org/doc/code.html) environment -- [Protobuf 3.x or higher] (https://developers.google.com/protocol-buffers/docs/downloads) to regenerate protocol buffer files (e.g. using `make generate`) +- [Protobuf 3.x or higher](https://developers.google.com/protocol-buffers/docs/downloads) to regenerate protocol buffer files (e.g. using `make generate`) *SwarmKit* is built in Go and leverages a standard project structure to work well with Go tooling. If you are new to Go, please see [BUILDING.md](BUILDING.md) for a more detailed guide. diff --git a/vendor/github.com/docker/swarmkit/agent/worker.go b/vendor/github.com/docker/swarmkit/agent/worker.go index a27cd96ee4..ff138c2b74 100644 --- a/vendor/github.com/docker/swarmkit/agent/worker.go +++ b/vendor/github.com/docker/swarmkit/agent/worker.go @@ -426,14 +426,19 @@ func (w *worker) Listen(ctx context.Context, reporter StatusReporter) { } func (w *worker) startTask(ctx context.Context, tx *bolt.Tx, task *api.Task) error { - w.taskevents.Publish(task.Copy()) _, err := w.taskManager(ctx, tx, task) // side-effect taskManager creation. if err != nil { log.G(ctx).WithError(err).Error("failed to start taskManager") + // we ignore this error: it gets reported in the taskStatus within + // `newTaskManager`. We log it here and move on. If their is an + // attempted restart, the lack of taskManager will have this retry + // again. + return nil } - // TODO(stevvooe): Add start method for taskmanager + // only publish if controller resolution was successful. + w.taskevents.Publish(task.Copy()) return nil } @@ -464,7 +469,7 @@ func (w *worker) newTaskManager(ctx context.Context, tx *bolt.Tx, task *api.Task } if err != nil { - log.G(ctx).Error("controller resolution failed") + log.G(ctx).WithError(err).Error("controller resolution failed") return nil, err } @@ -568,9 +573,14 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe case v := <-ch: task := v.(*api.Task) if match(task) { - w.mu.Lock() - go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher) - w.mu.Unlock() + w.mu.RLock() + tm, ok := w.taskManagers[task.ID] + w.mu.RUnlock() + if !ok { + continue + } + + go tm.Logs(ctx, *subscription.Options, publisher) } case <-ctx.Done(): return ctx.Err() diff --git a/vendor/github.com/docker/swarmkit/ca/config.go b/vendor/github.com/docker/swarmkit/ca/config.go index 90db943082..758d15799d 100644 --- a/vendor/github.com/docker/swarmkit/ca/config.go +++ b/vendor/github.com/docker/swarmkit/ca/config.go @@ -14,7 +14,6 @@ import ( "github.com/Sirupsen/logrus" cfconfig "github.com/cloudflare/cfssl/config" - events "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/identity" @@ -51,13 +50,6 @@ const ( base36DigestLen = 50 ) -// RenewTLSExponentialBackoff sets the exponential backoff when trying to renew TLS certificates that have expired -var RenewTLSExponentialBackoff = events.ExponentialBackoffConfig{ - Base: time.Second * 5, - Factor: time.Second * 5, - Max: 1 * time.Hour, -} - // SecurityConfig is used to represent a node's security configuration. It includes information about // the RootCA and ServerTLSCreds/ClientTLSCreds transport authenticators to be used for MTLS type SecurityConfig struct { @@ -468,96 +460,6 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, connBroker *conne return s.updateTLSCredentials(tlsKeyPair, issuerInfo) } -// RenewTLSConfig will continuously monitor for the necessity of renewing the local certificates, either by -// issuing them locally if key-material is available, or requesting them from a remote CA. -func RenewTLSConfig(ctx context.Context, s *SecurityConfig, connBroker *connectionbroker.Broker, renew <-chan struct{}) <-chan CertificateUpdate { - updates := make(chan CertificateUpdate) - - go func() { - var ( - retry time.Duration - forceRetry bool - ) - expBackoff := events.NewExponentialBackoff(RenewTLSExponentialBackoff) - defer close(updates) - for { - ctx = log.WithModule(ctx, "tls") - log := log.G(ctx).WithFields(logrus.Fields{ - "node.id": s.ClientTLSCreds.NodeID(), - "node.role": s.ClientTLSCreds.Role(), - }) - // Our starting default will be 5 minutes - retry = 5 * time.Minute - - // Since the expiration of the certificate is managed remotely we should update our - // retry timer on every iteration of this loop. - // Retrieve the current certificate expiration information. - validFrom, validUntil, err := readCertValidity(s.KeyReader()) - if err != nil { - // We failed to read the expiration, let's stick with the starting default - log.Errorf("failed to read the expiration of the TLS certificate in: %s", s.KeyReader().Target()) - - select { - case updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")}: - case <-ctx.Done(): - log.Info("shutting down certificate renewal routine") - return - } - } else { - // If we have an expired certificate, try to renew immediately: the hope that this is a temporary clock skew, or - // we can issue our own TLS certs. - if validUntil.Before(time.Now()) { - log.Warn("the current TLS certificate is expired, so an attempt to renew it will be made immediately") - // retry immediately(ish) with exponential backoff - retry = expBackoff.Proceed(nil) - } else if forceRetry { - // A forced renewal was requested, but did not succeed yet. - // retry immediately(ish) with exponential backoff - retry = expBackoff.Proceed(nil) - } else { - // Random retry time between 50% and 80% of the total time to expiration - retry = calculateRandomExpiry(validFrom, validUntil) - } - } - - log.WithFields(logrus.Fields{ - "time": time.Now().Add(retry), - }).Debugf("next certificate renewal scheduled for %v from now", retry) - - select { - case <-time.After(retry): - log.Info("renewing certificate") - case <-renew: - forceRetry = true - log.Info("forced certificate renewal") - case <-ctx.Done(): - log.Info("shutting down certificate renewal routine") - return - } - - // ignore errors - it will just try again later - var certUpdate CertificateUpdate - if err := RenewTLSConfigNow(ctx, s, connBroker); err != nil { - certUpdate.Err = err - expBackoff.Failure(nil, nil) - } else { - certUpdate.Role = s.ClientTLSCreds.Role() - expBackoff = events.NewExponentialBackoff(RenewTLSExponentialBackoff) - forceRetry = false - } - - select { - case updates <- certUpdate: - case <-ctx.Done(): - log.Info("shutting down certificate renewal routine") - return - } - } - }() - - return updates -} - // calculateRandomExpiry returns a random duration between 50% and 80% of the // original validity period func calculateRandomExpiry(validFrom, validUntil time.Time) time.Duration { diff --git a/vendor/github.com/docker/swarmkit/ca/reconciler.go b/vendor/github.com/docker/swarmkit/ca/reconciler.go index 6e326f3f5b..6651393b50 100644 --- a/vendor/github.com/docker/swarmkit/ca/reconciler.go +++ b/vendor/github.com/docker/swarmkit/ca/reconciler.go @@ -241,7 +241,7 @@ func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error { if len(toUpdate) == 0 { return nil } - _, err := r.store.Batch(func(batch *store.Batch) error { + err := r.store.Batch(func(batch *store.Batch) error { // Directly update the nodes rather than get + update, and ignore version errors. Since // `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have // close to the latest versions of all the nodes. If not, the node will updated later and the diff --git a/vendor/github.com/docker/swarmkit/ca/renewer.go b/vendor/github.com/docker/swarmkit/ca/renewer.go new file mode 100644 index 0000000000..213a66a25b --- /dev/null +++ b/vendor/github.com/docker/swarmkit/ca/renewer.go @@ -0,0 +1,166 @@ +package ca + +import ( + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/go-events" + "github.com/docker/swarmkit/connectionbroker" + "github.com/docker/swarmkit/log" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// RenewTLSExponentialBackoff sets the exponential backoff when trying to renew TLS certificates that have expired +var RenewTLSExponentialBackoff = events.ExponentialBackoffConfig{ + Base: time.Second * 5, + Factor: time.Second * 5, + Max: 1 * time.Hour, +} + +// TLSRenewer handles renewing TLS certificates, either automatically or upon +// request. +type TLSRenewer struct { + mu sync.Mutex + s *SecurityConfig + connBroker *connectionbroker.Broker + renew chan struct{} + expectedRole string +} + +// NewTLSRenewer creates a new TLS renewer. It must be started with Start. +func NewTLSRenewer(s *SecurityConfig, connBroker *connectionbroker.Broker) *TLSRenewer { + return &TLSRenewer{ + s: s, + connBroker: connBroker, + renew: make(chan struct{}, 1), + } +} + +// SetExpectedRole sets the expected role. If a renewal is forced, and the role +// doesn't match this expectation, renewal will be retried with exponential +// backoff until it does match. +func (t *TLSRenewer) SetExpectedRole(role string) { + t.mu.Lock() + t.expectedRole = role + t.mu.Unlock() +} + +// Renew causes the TLSRenewer to renew the certificate (nearly) right away, +// instead of waiting for the next automatic renewal. +func (t *TLSRenewer) Renew() { + select { + case t.renew <- struct{}{}: + default: + } +} + +// Start will continuously monitor for the necessity of renewing the local certificates, either by +// issuing them locally if key-material is available, or requesting them from a remote CA. +func (t *TLSRenewer) Start(ctx context.Context) <-chan CertificateUpdate { + updates := make(chan CertificateUpdate) + + go func() { + var ( + retry time.Duration + forceRetry bool + ) + expBackoff := events.NewExponentialBackoff(RenewTLSExponentialBackoff) + defer close(updates) + for { + ctx = log.WithModule(ctx, "tls") + log := log.G(ctx).WithFields(logrus.Fields{ + "node.id": t.s.ClientTLSCreds.NodeID(), + "node.role": t.s.ClientTLSCreds.Role(), + }) + // Our starting default will be 5 minutes + retry = 5 * time.Minute + + // Since the expiration of the certificate is managed remotely we should update our + // retry timer on every iteration of this loop. + // Retrieve the current certificate expiration information. + validFrom, validUntil, err := readCertValidity(t.s.KeyReader()) + if err != nil { + // We failed to read the expiration, let's stick with the starting default + log.Errorf("failed to read the expiration of the TLS certificate in: %s", t.s.KeyReader().Target()) + + select { + case updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")}: + case <-ctx.Done(): + log.Info("shutting down certificate renewal routine") + return + } + } else { + // If we have an expired certificate, try to renew immediately: the hope that this is a temporary clock skew, or + // we can issue our own TLS certs. + if validUntil.Before(time.Now()) { + log.Warn("the current TLS certificate is expired, so an attempt to renew it will be made immediately") + // retry immediately(ish) with exponential backoff + retry = expBackoff.Proceed(nil) + } else if forceRetry { + // A forced renewal was requested, but did not succeed yet. + // retry immediately(ish) with exponential backoff + retry = expBackoff.Proceed(nil) + } else { + // Random retry time between 50% and 80% of the total time to expiration + retry = calculateRandomExpiry(validFrom, validUntil) + } + } + + log.WithFields(logrus.Fields{ + "time": time.Now().Add(retry), + }).Debugf("next certificate renewal scheduled for %v from now", retry) + + select { + case <-time.After(retry): + log.Info("renewing certificate") + case <-t.renew: + forceRetry = true + log.Info("forced certificate renewal") + + // Pause briefly before attempting the renewal, + // to give the CA a chance to reconcile the + // desired role. + select { + case <-time.After(500 * time.Millisecond): + case <-ctx.Done(): + log.Info("shutting down certificate renewal routine") + return + } + case <-ctx.Done(): + log.Info("shutting down certificate renewal routine") + return + } + + // ignore errors - it will just try again later + var certUpdate CertificateUpdate + if err := RenewTLSConfigNow(ctx, t.s, t.connBroker); err != nil { + certUpdate.Err = err + expBackoff.Failure(nil, nil) + } else { + newRole := t.s.ClientTLSCreds.Role() + t.mu.Lock() + expectedRole := t.expectedRole + t.mu.Unlock() + if expectedRole != "" && expectedRole != newRole { + expBackoff.Failure(nil, nil) + continue + } + + certUpdate.Role = newRole + expBackoff.Success(nil) + forceRetry = false + } + + select { + case updates <- certUpdate: + case <-ctx.Done(): + log.Info("shutting down certificate renewal routine") + return + } + } + }() + + return updates +} diff --git a/vendor/github.com/docker/swarmkit/ca/server.go b/vendor/github.com/docker/swarmkit/ca/server.go index 982b0e2e83..b4b0668049 100644 --- a/vendor/github.com/docker/swarmkit/ca/server.go +++ b/vendor/github.com/docker/swarmkit/ca/server.go @@ -580,6 +580,7 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { s.secConfigMu.Lock() defer s.secConfigMu.Unlock() + firstSeenCluster := s.lastSeenClusterRootCA == nil && s.lastSeenExternalCAs == nil rootCAChanged := len(rCA.CACert) != 0 && !equality.RootCAEqualStable(s.lastSeenClusterRootCA, rCA) externalCAChanged := !equality.ExternalCAsEqualStable(s.lastSeenExternalCAs, cluster.Spec.CAConfig.ExternalCAs) logger := log.G(ctx).WithFields(logrus.Fields{ @@ -588,7 +589,11 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { }) if rootCAChanged { - logger.Debug("Updating security config due to change in cluster Root CA") + setOrUpdate := "set" + if !firstSeenCluster { + logger.Debug("Updating security config due to change in cluster Root CA") + setOrUpdate = "updated" + } expiry := DefaultNodeCertExpiration if cluster.Spec.CAConfig.NodeCertExpiry != nil { // NodeCertExpiry exists, let's try to parse the duration out of it @@ -636,14 +641,16 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster) error { return errors.Wrap(err, "updating Root CA failed") } // only update the server cache if we've successfully updated the root CA - logger.Debug("Root CA updated successfully") + logger.Debugf("Root CA %s successfully", setOrUpdate) s.lastSeenClusterRootCA = rCA } // we want to update if the external CA changed, or if the root CA changed because the root CA could affect what // certificate for external CAs we want to filter by if rootCAChanged || externalCAChanged { - logger.Debug("Updating security config due to change in cluster Root CA or cluster spec") + if !firstSeenCluster { + logger.Debug("Updating security config external CA URLs due to change in cluster Root CA or cluster spec") + } wantedExternalCACert := rCA.CACert // we want to only add external CA URLs that use this cert if rCA.RootRotation != nil { // we're rotating to a new root, so we only want external CAs with the new root cert diff --git a/vendor/github.com/docker/swarmkit/identity/doc.go b/vendor/github.com/docker/swarmkit/identity/doc.go index 7ebb8104c7..b91aca7ecf 100644 --- a/vendor/github.com/docker/swarmkit/identity/doc.go +++ b/vendor/github.com/docker/swarmkit/identity/doc.go @@ -1,6 +1,6 @@ -// Package identity provides functionality for generating and manager -// identifiers within swarm. This includes entity identification, such as that -// of Service, Task and Network but also cryptographically-secure Node identity. +// Package identity provides functionality for generating and managing +// identifiers within a swarm. This includes entity identification, such as for +// Services, Tasks and Networks but also cryptographically-secure Node identities. // // Random Identifiers // @@ -8,10 +8,9 @@ // 128 bit numbers encoded in Base36. This method is preferred over UUID4 since // it requires less storage and leverages the full 128 bits of entropy. // -// Generating an identifier is simple. Simply call the `NewID` function, check -// the error and proceed: +// Generating an identifier is simple. Simply call the `NewID` function: // -// id, err := NewID() -// if err != nil { /* ... handle it, please ... */ } +// id := NewID() // +// If an error occurs while generating the ID, it will panic. package identity diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/doc.go b/vendor/github.com/docker/swarmkit/manager/allocator/doc.go index 7177334196..0579c669ae 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/doc.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/doc.go @@ -3,16 +3,16 @@ // manages a set of independent allocator processes which can mostly // execute concurrently with only a minimal need for coordination. // -// One of the instances where it needs coordination is when to move a -// task to ALLOCATED state. Since a task can move to ALLOCATED state -// only when all task allocators have completed their service of -// allocation, they all have to agree on that. The way this achieved -// in `allocator` is by creating a `taskBallot` to which all task -// allocators register themselves as mandatory voters. For each task -// that needs allocation, each allocator independently votes to indicate -// the completion of their allocation. Once all registered voters have -// voted then the task is moved to ALLOCATED state. +// One of the instances where it needs coordination is when deciding to +// move a task to the PENDING state. Since a task can move to the +// PENDING state only when all the task allocators have completed, +// they must cooperate. The way `allocator` achieves this is by creating +// a `taskBallot` to which all task allocators register themselves as +// mandatory voters. For each task that needs allocation, each allocator +// independently votes to indicate the completion of their allocation. +// Once all registered voters have voted then the task is moved to the +// PENDING state. // -// Other than the coordination needed for task ALLOCATED state, all +// Other than the coordination needed for task PENDING state, all // the allocators function fairly independently. package allocator diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/network.go b/vendor/github.com/docker/swarmkit/manager/allocator/network.go index 7231760982..4e9e5c55a7 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/network.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/network.go @@ -95,7 +95,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { if !na.IsAllocated(nc.ingressNetwork) { if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil { log.G(ctx).WithError(err).Error("failed allocating ingress network during init") - } else if _, err := a.store.Batch(func(batch *store.Batch) error { + } else if err := a.store.Batch(func(batch *store.Batch) error { if err := a.commitAllocatedNetwork(ctx, batch, nc.ingressNetwork); err != nil { log.G(ctx).WithError(err).Error("failed committing allocation of ingress network during init") } @@ -134,7 +134,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { allocatedNetworks = append(allocatedNetworks, n) } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { for _, n := range allocatedNetworks { if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { log.G(ctx).WithError(err).Errorf("failed committing allocation of network %s during init", n.ID) @@ -164,7 +164,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { var allocatedServices []*api.Service for _, s := range services { - if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) { + if !nc.nwkAllocator.ServiceNeedsAllocation(s, networkallocator.OnInit) { continue } @@ -175,7 +175,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { allocatedServices = append(allocatedServices, s) } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { for _, s := range allocatedServices { if err := a.commitAllocatedService(ctx, batch, s); err != nil { log.G(ctx).WithError(err).Errorf("failed committing allocation of service %s during init", s.ID) @@ -239,7 +239,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { } } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { for _, t := range allocatedTasks { if err := a.commitAllocatedTask(ctx, batch, t); err != nil { log.G(ctx).WithError(err).Errorf("failed committing allocation of task %s during init", t.ID) @@ -275,7 +275,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { return a.commitAllocatedNetwork(ctx, batch, n) }); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit allocation for network %s", n.ID) @@ -317,7 +317,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { + if !nc.nwkAllocator.ServiceNeedsAllocation(s) { break } @@ -326,7 +326,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { return a.commitAllocatedService(ctx, batch, s) }); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit allocation for service %s", s.ID) @@ -345,8 +345,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { - if nc.nwkAllocator.PortsAllocatedInHostPublishMode(s) { + if !nc.nwkAllocator.ServiceNeedsAllocation(s) { + if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) { break } updatePortsInHostPublishMode(s) @@ -357,7 +357,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { } } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { return a.commitAllocatedService(ctx, batch, s) }); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID) @@ -447,7 +447,7 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) { return } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { return a.commitAllocatedNode(ctx, batch, node) }); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID) @@ -489,7 +489,7 @@ func (a *Allocator) allocateNodes(ctx context.Context) error { allocatedNodes = append(allocatedNodes, node) } - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { for _, node := range allocatedNodes { if err := a.commitAllocatedNode(ctx, batch, node); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit allocation of network resources for node %s", node.ID) @@ -523,7 +523,7 @@ func (a *Allocator) deallocateNodes(ctx context.Context) error { log.G(ctx).WithError(err).Errorf("Failed freeing network resources for node %s", node.ID) } node.Attachment = nil - if _, err := a.store.Batch(func(batch *store.Batch) error { + if err := a.store.Batch(func(batch *store.Batch) error { return a.commitAllocatedNode(ctx, batch, node) }); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit deallocation of network resources for node %s", node.ID) @@ -544,7 +544,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo // network configured or service endpoints have been // allocated. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) && - (s == nil || nc.nwkAllocator.IsServiceAllocated(s)) + (s == nil || !nc.nwkAllocator.ServiceNeedsAllocation(s)) } func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) { @@ -732,28 +732,29 @@ func (a *Allocator) commitAllocatedNode(ctx context.Context, batch *store.Batch, // so that the service allocation invoked on this new service object will trigger the deallocation // of any old publish mode port and allocation of any new one. func updatePortsInHostPublishMode(s *api.Service) { + // First, remove all host-mode ports from s.Endpoint.Ports if s.Endpoint != nil { var portConfigs []*api.PortConfig for _, portConfig := range s.Endpoint.Ports { - if portConfig.PublishMode == api.PublishModeIngress { + if portConfig.PublishMode != api.PublishModeHost { portConfigs = append(portConfigs, portConfig) } } s.Endpoint.Ports = portConfigs } + // Add back all host-mode ports if s.Spec.Endpoint != nil { if s.Endpoint == nil { s.Endpoint = &api.Endpoint{} } for _, portConfig := range s.Spec.Endpoint.Ports { - if portConfig.PublishMode == api.PublishModeIngress { - continue + if portConfig.PublishMode == api.PublishModeHost { + s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy()) } - s.Endpoint.Ports = append(s.Endpoint.Ports, portConfig.Copy()) } - s.Endpoint.Spec = s.Spec.Endpoint.Copy() } + s.Endpoint.Spec = s.Spec.Endpoint.Copy() } func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error { @@ -886,7 +887,7 @@ func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) { return } - if !nc.nwkAllocator.IsServiceAllocated(s) { + if nc.nwkAllocator.ServiceNeedsAllocation(s) { err = fmt.Errorf("service %s to which this task %s belongs has pending allocations", s.ID, t.ID) return } @@ -977,22 +978,25 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context) { return } - committed, err := a.store.Batch(func(batch *store.Batch) error { + err := a.store.Batch(func(batch *store.Batch) error { for _, n := range allocatedNetworks { if err := a.commitAllocatedNetwork(ctx, batch, n); err != nil { log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated network %s", n.ID) continue } + delete(nc.unallocatedNetworks, n.ID) } return nil }) if err != nil { log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated networks") - } - - for _, n := range allocatedNetworks[:committed] { - delete(nc.unallocatedNetworks, n.ID) + // We optimistically removed these from nc.unallocatedNetworks + // above in anticipation of successfully committing the batch, + // but since the transaction has failed, we requeue them here. + for _, n := range allocatedNetworks { + nc.unallocatedNetworks[n.ID] = n + } } } @@ -1000,7 +1004,7 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { nc := a.netCtx var allocatedServices []*api.Service for _, s := range nc.unallocatedServices { - if !nc.nwkAllocator.IsServiceAllocated(s) { + if nc.nwkAllocator.ServiceNeedsAllocation(s) { if err := a.allocateService(ctx, s); err != nil { log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID) continue @@ -1013,22 +1017,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { return } - committed, err := a.store.Batch(func(batch *store.Batch) error { + err := a.store.Batch(func(batch *store.Batch) error { for _, s := range allocatedServices { if err := a.commitAllocatedService(ctx, batch, s); err != nil { log.G(ctx).WithError(err).Debugf("Failed to commit allocation of unallocated service %s", s.ID) continue } + delete(nc.unallocatedServices, s.ID) } return nil }) if err != nil { log.G(ctx).WithError(err).Error("Failed to commit allocation of unallocated services") - } - - for _, s := range allocatedServices[:committed] { - delete(nc.unallocatedServices, s.ID) + // We optimistically removed these from nc.unallocatedServices + // above in anticipation of successfully committing the batch, + // but since the transaction has failed, we requeue them here. + for _, s := range allocatedServices { + nc.unallocatedServices[s.ID] = s + } } } @@ -1058,14 +1065,14 @@ func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) { return } - committed, err := a.store.Batch(func(batch *store.Batch) error { + err := a.store.Batch(func(batch *store.Batch) error { for _, t := range allocatedTasks { err := a.commitAllocatedTask(ctx, batch, t) - if err != nil { log.G(ctx).WithError(err).Error("task allocation commit failure") continue } + delete(toAllocate, t.ID) } return nil @@ -1073,10 +1080,12 @@ func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) { if err != nil { log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks") - } - - for _, t := range allocatedTasks[:committed] { - delete(toAllocate, t.ID) + // We optimistically removed these from toAllocate above in + // anticipation of successfully committing the batch, but since + // the transaction has failed, we requeue them here. + for _, t := range allocatedTasks { + toAllocate[t.ID] = t + } } } @@ -1089,12 +1098,7 @@ func updateTaskStatus(t *api.Task, newStatus api.TaskState, message string) { // IsIngressNetwork returns whether the passed network is an ingress network. func IsIngressNetwork(nw *api.Network) bool { - if nw.Spec.Ingress { - return true - } - // Check if legacy defined ingress network - _, ok := nw.Spec.Annotations.Labels["com.docker.swarm.internal"] - return ok && nw.Spec.Annotations.Name == "ingress" + return networkallocator.IsIngressNetwork(nw) } // GetIngressNetwork fetches the ingress network from store. diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go index d956c547b5..b7e83cbec7 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go @@ -153,7 +153,7 @@ func (na *NetworkAllocator) Deallocate(n *api.Network) error { // IP and ports needed by the service. func (na *NetworkAllocator) ServiceAllocate(s *api.Service) (err error) { if err = na.portAllocator.serviceAllocatePorts(s); err != nil { - return + return err } defer func() { if err != nil { @@ -169,54 +169,74 @@ func (na *NetworkAllocator) ServiceAllocate(s *api.Service) (err error) { // If ResolutionMode is DNSRR do not try allocating VIPs, but // free any VIP from previous state. if s.Spec.Endpoint != nil && s.Spec.Endpoint.Mode == api.ResolutionModeDNSRoundRobin { - if s.Endpoint != nil { - for _, vip := range s.Endpoint.VirtualIPs { - if err := na.deallocateVIP(vip); err != nil { - // don't bail here, deallocate as many as possible. - log.L.WithError(err). - WithField("vip.network", vip.NetworkID). - WithField("vip.addr", vip.Addr).Error("error deallocating vip") - } + for _, vip := range s.Endpoint.VirtualIPs { + if err := na.deallocateVIP(vip); err != nil { + // don't bail here, deallocate as many as possible. + log.L.WithError(err). + WithField("vip.network", vip.NetworkID). + WithField("vip.addr", vip.Addr).Error("error deallocating vip") } - - s.Endpoint.VirtualIPs = nil } + s.Endpoint.VirtualIPs = nil + delete(na.services, s.ID) - return + return nil } - // First allocate VIPs for all the pre-populated endpoint attachments + specNetworks := serviceNetworks(s) + + // Allocate VIPs for all the pre-populated endpoint attachments + eVIPs := s.Endpoint.VirtualIPs[:0] + +vipLoop: for _, eAttach := range s.Endpoint.VirtualIPs { - if err = na.allocateVIP(eAttach); err != nil { - return + if na.IsVIPOnIngressNetwork(eAttach) { + if err = na.allocateVIP(eAttach); err != nil { + return err + } + eVIPs = append(eVIPs, eAttach) + continue vipLoop + } + for _, nAttach := range specNetworks { + if nAttach.Target == eAttach.NetworkID { + if err = na.allocateVIP(eAttach); err != nil { + return err + } + eVIPs = append(eVIPs, eAttach) + continue vipLoop + } + } + // If the network of the VIP is not part of the service spec, + // deallocate the vip + na.deallocateVIP(eAttach) } - // Always prefer NetworkAttachmentConfig in the TaskSpec - specNetworks := s.Spec.Task.Networks - if len(specNetworks) == 0 && s != nil && len(s.Spec.Networks) != 0 { - specNetworks = s.Spec.Networks - } - -outer: +networkLoop: for _, nAttach := range specNetworks { for _, vip := range s.Endpoint.VirtualIPs { if vip.NetworkID == nAttach.Target { - continue outer + continue networkLoop } } vip := &api.Endpoint_VirtualIP{NetworkID: nAttach.Target} if err = na.allocateVIP(vip); err != nil { - return + return err } - s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, vip) + eVIPs = append(eVIPs, vip) } - na.services[s.ID] = struct{}{} - return + if len(eVIPs) > 0 { + na.services[s.ID] = struct{}{} + } else { + delete(na.services, s.ID) + } + + s.Endpoint.VirtualIPs = eVIPs + return nil } // ServiceDeallocate de-allocates all the network resources such as @@ -234,6 +254,7 @@ func (na *NetworkAllocator) ServiceDeallocate(s *api.Service) error { WithField("vip.addr", vip.Addr).Error("error deallocating vip") } } + s.Endpoint.VirtualIPs = nil na.portAllocator.serviceDeallocatePorts(s) delete(na.services, s.ID) @@ -284,10 +305,10 @@ func (na *NetworkAllocator) IsTaskAllocated(t *api.Task) bool { return true } -// PortsAllocatedInHostPublishMode returns if the passed service has its published ports in -// host (non ingress) mode allocated -func (na *NetworkAllocator) PortsAllocatedInHostPublishMode(s *api.Service) bool { - return na.portAllocator.portsAllocatedInHostPublishMode(s) +// HostPublishPortsNeedUpdate returns true if the passed service needs +// allocations for its published ports in host (non ingress) mode +func (na *NetworkAllocator) HostPublishPortsNeedUpdate(s *api.Service) bool { + return na.portAllocator.hostPublishPortsNeedUpdate(s) } // ServiceAllocationOpts is struct used for functional options in IsServiceAllocated @@ -300,41 +321,74 @@ func OnInit(options *ServiceAllocationOpts) { options.OnInit = true } -// IsServiceAllocated returns if the passed service has its network resources allocated or not. -// init bool indicates if the func is called during allocator initialization stage. -func (na *NetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(*ServiceAllocationOpts)) bool { +// ServiceNeedsAllocation returns true if the passed service needs to have network resources allocated/updated. +func (na *NetworkAllocator) ServiceNeedsAllocation(s *api.Service, flags ...func(*ServiceAllocationOpts)) bool { var options ServiceAllocationOpts - for _, flag := range flags { flag(&options) } + specNetworks := serviceNetworks(s) + // If endpoint mode is VIP and allocator does not have the - // service in VIP allocated set then it is not allocated. - if (len(s.Spec.Task.Networks) != 0 || len(s.Spec.Networks) != 0) && + // service in VIP allocated set then it needs to be allocated. + if len(specNetworks) != 0 && (s.Spec.Endpoint == nil || s.Spec.Endpoint.Mode == api.ResolutionModeVirtualIP) { + if _, ok := na.services[s.ID]; !ok { - return false + return true + } + + if s.Endpoint == nil || len(s.Endpoint.VirtualIPs) == 0 { + return true + } + + // If the spec has networks which don't have a corresponding VIP, + // the service needs to be allocated. + networkLoop: + for _, net := range specNetworks { + for _, vip := range s.Endpoint.VirtualIPs { + if vip.NetworkID == net.Target { + continue networkLoop + } + } + return true + } + } + + // If the spec no longer has networks attached and has a vip allocated + // from previous spec the service needs to allocated. + if s.Endpoint != nil { + vipLoop: + for _, vip := range s.Endpoint.VirtualIPs { + if na.IsVIPOnIngressNetwork(vip) { + continue vipLoop + } + for _, net := range specNetworks { + if vip.NetworkID == net.Target { + continue vipLoop + } + } + return true } } // If the endpoint mode is DNSRR and allocator has the service - // in VIP allocated set then we return not allocated to make + // in VIP allocated set then we return to be allocated to make // sure the allocator triggers networkallocator to free up the // resources if any. if s.Spec.Endpoint != nil && s.Spec.Endpoint.Mode == api.ResolutionModeDNSRoundRobin { if _, ok := na.services[s.ID]; ok { - return false + return true } } if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) || (s.Endpoint != nil && len(s.Endpoint.Ports) != 0) { - return na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) + return !na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) } - - return true + return false } // IsNodeAllocated returns if the passed node has its network resources allocated or not. @@ -828,3 +882,34 @@ func initializeDrivers(reg *drvregistry.DrvRegistry) error { } return nil } + +func serviceNetworks(s *api.Service) []*api.NetworkAttachmentConfig { + // Always prefer NetworkAttachmentConfig in the TaskSpec + if len(s.Spec.Task.Networks) == 0 && len(s.Spec.Networks) != 0 { + return s.Spec.Networks + } + return s.Spec.Task.Networks +} + +// IsVIPOnIngressNetwork check if the vip is in ingress network +func (na *NetworkAllocator) IsVIPOnIngressNetwork(vip *api.Endpoint_VirtualIP) bool { + if vip == nil { + return false + } + + localNet := na.getNetwork(vip.NetworkID) + if localNet != nil && localNet.nw != nil { + return IsIngressNetwork(localNet.nw) + } + return false +} + +// IsIngressNetwork check if the network is an ingress network +func IsIngressNetwork(nw *api.Network) bool { + if nw.Spec.Ingress { + return true + } + // Check if legacy defined ingress network + _, ok := nw.Spec.Annotations.Labels["com.docker.swarm.internal"] + return ok && nw.Spec.Annotations.Name == "ingress" +} diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go index 8df3cf3b0b..af8126ea25 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go @@ -269,9 +269,9 @@ func (pa *portAllocator) serviceDeallocatePorts(s *api.Service) { s.Endpoint.Ports = nil } -func (pa *portAllocator) portsAllocatedInHostPublishMode(s *api.Service) bool { +func (pa *portAllocator) hostPublishPortsNeedUpdate(s *api.Service) bool { if s.Endpoint == nil && s.Spec.Endpoint == nil { - return true + return false } portStates := allocatedPorts{} @@ -288,13 +288,13 @@ func (pa *portAllocator) portsAllocatedInHostPublishMode(s *api.Service) bool { if portConfig.PublishMode == api.PublishModeHost && portConfig.PublishedPort != 0 { if portStates.delState(portConfig) == nil { - return false + return true } } } } - return true + return false } func (pa *portAllocator) isPortsAllocated(s *api.Service) bool { diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go index d1cf825206..66ea58aedd 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go @@ -2,7 +2,6 @@ package controlapi import ( "errors" - "path/filepath" "reflect" "strconv" "strings" @@ -30,6 +29,8 @@ var ( errModeChangeNotAllowed = errors.New("service mode change is not allowed") ) +const minimumDuration = 1 * time.Millisecond + func validateResources(r *api.Resources) error { if r == nil { return nil @@ -143,16 +144,37 @@ func validateContainerSpec(taskSpec api.TaskSpec) error { return grpc.Errorf(codes.InvalidArgument, err.Error()) } - if container.Image == "" { + if err := validateImage(container.Image); err != nil { + return err + } + + if err := validateMounts(container.Mounts); err != nil { + return err + } + + if err := validateHealthCheck(container.Healthcheck); err != nil { + return err + } + + return nil +} + +// validateImage validates image name in containerSpec +func validateImage(image string) error { + if image == "" { return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided") } - if _, err := reference.ParseNormalizedNamed(container.Image); err != nil { - return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %q is not a valid repository/tag", container.Image) + if _, err := reference.ParseNormalizedNamed(image); err != nil { + return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %q is not a valid repository/tag", image) } + return nil +} +// validateMounts validates if there are duplicate mounts in containerSpec +func validateMounts(mounts []api.Mount) error { mountMap := make(map[string]bool) - for _, mount := range container.Mounts { + for _, mount := range mounts { if _, exists := mountMap[mount.Target]; exists { return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: duplicate mount point: %s", mount.Target) } @@ -162,6 +184,49 @@ func validateContainerSpec(taskSpec api.TaskSpec) error { return nil } +// validateHealthCheck validates configs about container's health check +func validateHealthCheck(hc *api.HealthConfig) error { + if hc == nil { + return nil + } + + if hc.Interval != nil { + interval, err := gogotypes.DurationFromProto(hc.Interval) + if err != nil { + return err + } + if interval != 0 && interval < time.Duration(minimumDuration) { + return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: Interval in HealthConfig cannot be less than %s", minimumDuration) + } + } + + if hc.Timeout != nil { + timeout, err := gogotypes.DurationFromProto(hc.Timeout) + if err != nil { + return err + } + if timeout != 0 && timeout < time.Duration(minimumDuration) { + return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: Timeout in HealthConfig cannot be less than %s", minimumDuration) + } + } + + if hc.StartPeriod != nil { + sp, err := gogotypes.DurationFromProto(hc.StartPeriod) + if err != nil { + return err + } + if sp != 0 && sp < time.Duration(minimumDuration) { + return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: StartPeriod in HealthConfig cannot be less than %s", minimumDuration) + } + } + + if hc.Retries < 0 { + return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: Retries in HealthConfig cannot be negative") + } + + return nil +} + func validateGenericRuntimeSpec(taskSpec api.TaskSpec) error { generic := taskSpec.GetGeneric() @@ -302,11 +367,9 @@ func validateSecretRefsSpec(spec api.TaskSpec) error { // If this is a file target, we will ensure filename uniqueness if secretRef.GetFile() != nil { fileName := secretRef.GetFile().Name - // Validate the file name - if fileName == "" || fileName != filepath.Base(filepath.Clean(fileName)) { + if fileName == "" { return grpc.Errorf(codes.InvalidArgument, "malformed file secret reference, invalid target file name provided") } - // If this target is already in use, we have conflicting targets if prevSecretName, ok := existingTargets[fileName]; ok { return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, fileName) diff --git a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index f6d71de53c..0e81ab3265 100644 --- a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -333,7 +333,7 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { if err != nil { return errors.Wrap(err, "failed to get list of nodes") } - _, err = d.store.Batch(func(batch *store.Batch) error { + err = d.store.Batch(func(batch *store.Batch) error { for _, n := range nodes { err := batch.Update(func(tx store.Tx) error { // check if node is still here @@ -600,7 +600,7 @@ func (d *Dispatcher) processUpdates(ctx context.Context) { "method": "(*Dispatcher).processUpdates", }) - _, err := d.store.Batch(func(batch *store.Batch) error { + err := d.store.Batch(func(batch *store.Batch) error { for taskID, status := range taskUpdates { err := batch.Update(func(tx store.Tx) error { logger := log.WithField("task.id", taskID) @@ -951,7 +951,7 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche } func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error { - _, err := d.store.Batch(func(batch *store.Batch) error { + err := d.store.Batch(func(batch *store.Batch) error { var ( tasks []*api.Task err error @@ -1151,6 +1151,9 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio return err } + clusterUpdatesCh, clusterCancel := d.clusterUpdateQueue.Watch() + defer clusterCancel() + if err := stream.Send(&api.SessionMessage{ SessionID: sessionID, Node: nodeObj, @@ -1161,9 +1164,6 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio return err } - clusterUpdatesCh, clusterCancel := d.clusterUpdateQueue.Watch() - defer clusterCancel() - // disconnectNode is a helper forcibly shutdown connection disconnectNode := func() error { // force disconnect by shutting down the stream. diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/constraintenforcer/constraint_enforcer.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/constraintenforcer/constraint_enforcer.go index 9ab9ddc7a3..06c34d9a21 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/constraintenforcer/constraint_enforcer.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/constraintenforcer/constraint_enforcer.go @@ -129,7 +129,7 @@ func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) { } if len(removeTasks) != 0 { - _, err := ce.store.Batch(func(batch *store.Batch) error { + err := ce.store.Batch(func(batch *store.Batch) error { for _, t := range removeTasks { err := batch.Update(func(tx store.Tx) error { t = store.GetTask(tx, t.ID) diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go index 17b6c04318..f5e6b3afc0 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go @@ -249,7 +249,7 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node) return } - _, err = g.store.Batch(func(batch *store.Batch) error { + err = g.store.Batch(func(batch *store.Batch) error { for _, t := range tasks { // Global orchestrator only removes tasks from globalServices if _, exists := g.globalServices[t.ServiceID]; exists { @@ -296,7 +296,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin updates := make(map[*api.Service][]orchestrator.Slot) - _, err := g.store.Batch(func(batch *store.Batch) error { + err := g.store.Batch(func(batch *store.Batch) error { for _, serviceID := range serviceIDs { var updateTasks []orchestrator.Slot @@ -433,7 +433,7 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs } } - _, err = g.store.Batch(func(batch *store.Batch) error { + err = g.store.Batch(func(batch *store.Batch) error { for _, serviceID := range serviceIDs { service, exists := g.globalServices[serviceID] if !exists { @@ -505,7 +505,7 @@ func (g *Orchestrator) tickTasks(ctx context.Context) { if len(g.restartTasks) == 0 { return } - _, err := g.store.Batch(func(batch *store.Batch) error { + err := g.store.Batch(func(batch *store.Batch) error { for taskID := range g.restartTasks { err := batch.Update(func(tx store.Tx) error { t := store.GetTask(tx, taskID) diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go index c40a76984d..101976d96e 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go @@ -108,7 +108,7 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) { log.G(ctx).Debugf("Service %s was scaled up from %d to %d instances", service.ID, numSlots, specifiedSlots) // Update all current tasks then add missing tasks r.updater.Update(ctx, r.cluster, service, slotsSlice) - _, err = r.store.Batch(func(batch *store.Batch) error { + err = r.store.Batch(func(batch *store.Batch) error { r.addTasks(ctx, batch, service, runningSlots, deadSlots, specifiedSlots-uint64(numSlots)) r.deleteTasksMap(ctx, batch, deadSlots) return nil @@ -155,7 +155,7 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) { } r.updater.Update(ctx, r.cluster, service, sortedSlots[:specifiedSlots]) - _, err = r.store.Batch(func(batch *store.Batch) error { + err = r.store.Batch(func(batch *store.Batch) error { r.deleteTasksMap(ctx, batch, deadSlots) r.deleteTasks(ctx, batch, sortedSlots[specifiedSlots:]) return nil @@ -165,7 +165,7 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) { } case specifiedSlots == uint64(numSlots): - _, err = r.store.Batch(func(batch *store.Batch) error { + err = r.store.Batch(func(batch *store.Batch) error { r.deleteTasksMap(ctx, batch, deadSlots) return nil }) diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go index c350d7a138..66000e5d86 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go @@ -45,7 +45,7 @@ func (r *Orchestrator) handleTaskEvent(ctx context.Context, event events.Event) func (r *Orchestrator) tickTasks(ctx context.Context) { if len(r.restartTasks) > 0 { - _, err := r.store.Batch(func(batch *store.Batch) error { + err := r.store.Batch(func(batch *store.Batch) error { for taskID := range r.restartTasks { err := batch.Update(func(tx store.Tx) error { // TODO(aaronl): optimistic update? diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/service.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/service.go index a5e3f5c819..4e52c83abf 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/service.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/service.go @@ -41,7 +41,7 @@ func DeleteServiceTasks(ctx context.Context, s *store.MemoryStore, service *api. return } - _, err = s.Batch(func(batch *store.Batch) error { + err = s.Batch(func(batch *store.Batch) error { for _, t := range tasks { err := batch.Update(func(tx store.Tx) error { if err := store.DeleteTask(tx, t.ID); err != nil { diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/taskinit/init.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/taskinit/init.go index 09ff9d75d0..33558a43c9 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/taskinit/init.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/taskinit/init.go @@ -21,7 +21,7 @@ type InitHandler interface { // CheckTasks fixes tasks in the store before orchestrator runs. The previous leader might // not have finished processing their updates and left them in an inconsistent state. func CheckTasks(ctx context.Context, s *store.MemoryStore, readTx store.ReadTx, initHandler InitHandler, startSupervisor *restart.Supervisor) error { - _, err := s.Batch(func(batch *store.Batch) error { + err := s.Batch(func(batch *store.Batch) error { tasks, err := store.FindTasks(readTx, store.All) if err != nil { return err diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go index 9d0c773693..2b1f55d3f2 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/update/updater.go @@ -378,7 +378,7 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update startThenStop := false var delayStartCh <-chan struct{} // Atomically create the updated task and bring down the old one. - _, err := u.store.Batch(func(batch *store.Batch) error { + err := u.store.Batch(func(batch *store.Batch) error { err := batch.Update(func(tx store.Tx) error { if store.GetService(tx, updated.ServiceID) == nil { return errors.New("service was deleted") @@ -431,7 +431,7 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update u.updatedTasksMu.Unlock() if startThenStop { - _, err := u.store.Batch(func(batch *store.Batch) error { + err := u.store.Batch(func(batch *store.Batch) error { _, err := u.removeOldTasks(ctx, batch, slot) if err != nil { log.G(ctx).WithError(err).WithField("task.id", updated.ID).Warning("failed to remove old task after starting replacement") @@ -457,7 +457,7 @@ func (u *Updater) useExistingTask(ctx context.Context, slot orchestrator.Slot, e } if len(removeTasks) != 0 || existing.DesiredState != api.TaskStateRunning { var delayStartCh <-chan struct{} - _, err := u.store.Batch(func(batch *store.Batch) error { + err := u.store.Batch(func(batch *store.Batch) error { var oldTask *api.Task if len(removeTasks) != 0 { var err error diff --git a/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go b/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go index ccb165dcc7..b06217e171 100644 --- a/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go +++ b/vendor/github.com/docker/swarmkit/manager/scheduler/scheduler.go @@ -394,7 +394,7 @@ func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDeci successful = make([]schedulingDecision, 0, len(schedulingDecisions)) // Apply changes to master store - applied, err := s.store.Batch(func(batch *store.Batch) error { + err := s.store.Batch(func(batch *store.Batch) error { for len(schedulingDecisions) > 0 { err := batch.Update(func(tx store.Tx) error { // Update exactly one task inside this Update @@ -452,8 +452,8 @@ func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDeci if err != nil { log.G(ctx).WithError(err).Error("scheduler tick transaction failed") - failed = append(failed, successful[applied:]...) - successful = successful[:applied] + failed = append(failed, successful...) + successful = nil } return } diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go index fe9e5102b6..e5e2e014b3 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -58,6 +58,10 @@ var ( // ErrMemberUnknown is sent in response to a message from an // unrecognized peer. ErrMemberUnknown = errors.New("raft: member unknown") + + // work around lint + lostQuorumMessage = "The swarm does not have a leader. It's possible that too few managers are online. Make sure more than half of the managers are online." + errLostQuorum = errors.New(lostQuorumMessage) ) // LeadershipState indicates whether the node is a leader or follower. @@ -68,6 +72,10 @@ const ( IsLeader LeadershipState = iota // IsFollower indicates that the node is a raft follower. IsFollower + + // lostQuorumTimeout is the number of ticks that can elapse with no + // leader before LeaderConn starts returning an error right away. + lostQuorumTimeout = 10 ) // EncryptionKeys are the current and, if necessary, pending DEKs with which to @@ -143,6 +151,7 @@ type Node struct { rotationQueued bool clearData bool waitForAppliedIndex uint64 + ticksWithNoLeader uint32 } // NodeOptions provides node-level options. @@ -207,6 +216,7 @@ func NewNode(opts NodeOptions) *Node { MaxSizePerMsg: cfg.MaxSizePerMsg, MaxInflightMsgs: cfg.MaxInflightMsgs, Logger: cfg.Logger, + CheckQuorum: cfg.CheckQuorum, }, doneCh: make(chan struct{}), RemovedFromRaft: make(chan struct{}), @@ -528,6 +538,12 @@ func (n *Node) Run(ctx context.Context) error { select { case <-n.ticker.C(): n.raftNode.Tick() + + if n.leader() == raft.None { + atomic.AddUint32(&n.ticksWithNoLeader, 1) + } else { + atomic.StoreUint32(&n.ticksWithNoLeader, 0) + } case rd := <-n.raftNode.Ready(): raftConfig := n.getCurrentRaftConfig() @@ -698,9 +714,7 @@ func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error { for _, removedMember := range snapCluster.Removed { n.cluster.RemoveMember(removedMember) - if err := n.transport.RemovePeer(removedMember); err != nil { - log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", removedMember) - } + n.transport.RemovePeer(removedMember) delete(oldMembers, removedMember) } @@ -1356,6 +1370,10 @@ func (n *Node) getLeaderConn() (*grpc.ClientConn, error) { // LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader // if current machine is leader. func (n *Node) LeaderConn(ctx context.Context) (*grpc.ClientConn, error) { + if atomic.LoadUint32(&n.ticksWithNoLeader) > lostQuorumTimeout { + return nil, errLostQuorum + } + cc, err := n.getLeaderConn() if err == nil { return cc, nil diff --git a/vendor/github.com/docker/swarmkit/manager/state/doc.go b/vendor/github.com/docker/swarmkit/manager/state/store/doc.go similarity index 68% rename from vendor/github.com/docker/swarmkit/manager/state/doc.go rename to vendor/github.com/docker/swarmkit/manager/state/store/doc.go index 759088b51a..660c7c69d4 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/doc.go +++ b/vendor/github.com/docker/swarmkit/manager/state/store/doc.go @@ -1,16 +1,16 @@ -// Package state provides interfaces to work with swarm cluster state. +// Package store provides interfaces to work with swarm cluster state. // -// The primary interface is Store, which abstracts storage of this cluster -// state. Store exposes a transactional interface for both reads and writes. +// The primary interface is MemoryStore, which abstracts storage of this cluster +// state. MemoryStore exposes a transactional interface for both reads and writes. // To perform a read transaction, View accepts a callback function that it // will invoke with a ReadTx object that gives it a consistent view of the // state. Similarly, Update accepts a callback function that it will invoke with // a Tx object that allows reads and writes to happen without interference from // other transactions. // -// This is an example of making an update to a Store: +// This is an example of making an update to a MemoryStore: // -// err := store.Update(func(tx state.Tx) { +// err := store.Update(func(tx store.Tx) { // if err := tx.Nodes().Update(newNode); err != nil { // return err // } @@ -20,8 +20,8 @@ // return fmt.Errorf("transaction failed: %v", err) // } // -// WatchableStore is a version of Store that exposes watch functionality. -// These expose a publish/subscribe queue where code can subscribe to +// MemoryStore exposes watch functionality. +// It exposes a publish/subscribe queue where code can subscribe to // changes of interest. This can be combined with the ViewAndWatch function to // "fork" a store, by making a snapshot and then applying future changes // to keep the copy in sync. This approach lets consumers of the data @@ -29,4 +29,4 @@ // strategies. It can lead to more efficient code because data consumers // don't necessarily have to lock the main data store if they are // maintaining their own copies of the state. -package state +package store diff --git a/vendor/github.com/docker/swarmkit/manager/state/store/memory.go b/vendor/github.com/docker/swarmkit/manager/state/store/memory.go index 4b82d48f9f..62ab9279d4 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/store/memory.go +++ b/vendor/github.com/docker/swarmkit/manager/state/store/memory.go @@ -348,9 +348,6 @@ type Batch struct { store *MemoryStore // applied counts the times Update has run successfully applied int - // committed is the number of times Update had run successfully as of - // the time pending changes were committed. - committed int // transactionSizeEstimate is the running count of the size of the // current transaction. transactionSizeEstimate int @@ -434,8 +431,6 @@ func (batch *Batch) commit() error { return batch.err } - batch.committed = batch.applied - for _, c := range batch.tx.changelist { batch.store.queue.Publish(c) } @@ -461,9 +456,9 @@ func (batch *Batch) commit() error { // excessive time, or producing a transaction that exceeds the maximum // size. // -// Batch returns the number of calls to batch.Update whose changes were -// successfully committed to the store. -func (s *MemoryStore) Batch(cb func(*Batch) error) (int, error) { +// If Batch returns an error, no guarantees are made about how many updates +// were committed successfully. +func (s *MemoryStore) Batch(cb func(*Batch) error) error { s.updateLock.Lock() batch := Batch{ @@ -474,12 +469,12 @@ func (s *MemoryStore) Batch(cb func(*Batch) error) (int, error) { if err := cb(&batch); err != nil { batch.tx.memDBTx.Abort() s.updateLock.Unlock() - return batch.committed, err + return err } err := batch.commit() s.updateLock.Unlock() - return batch.committed, err + return err } func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) { diff --git a/vendor/github.com/docker/swarmkit/node/node.go b/vendor/github.com/docker/swarmkit/node/node.go index b6ff9b8c11..bbf0570815 100644 --- a/vendor/github.com/docker/swarmkit/node/node.go +++ b/vendor/github.com/docker/swarmkit/node/node.go @@ -133,29 +133,17 @@ type Node struct { manager *manager.Manager notifyNodeChange chan *agent.NodeChanges // used by the agent to relay node updates from the dispatcher Session stream to (*Node).run unlockKey []byte - - // lastNodeRole is the last-seen value of Node.Role, used to make role - // changes "edge triggered" and avoid renewal loops. - lastNodeRole lastSeenRole - // lastNodeDesiredRole is the last-seen value of Node.Spec.DesiredRole, - // used to make role changes "edge triggered" and avoid renewal loops. - // This exists in addition to lastNodeRole to support older CAs that - // only fill in the DesiredRole field. - lastNodeDesiredRole lastSeenRole } type lastSeenRole struct { - role *api.NodeRole + role api.NodeRole } // observe notes the latest value of this node role, and returns true if it // is the first seen value, or is different from the most recently seen value. func (l *lastSeenRole) observe(newRole api.NodeRole) bool { - changed := l.role == nil || *l.role != newRole - if l.role == nil { - l.role = new(api.NodeRole) - } - *l.role = newRole + changed := l.role != newRole + l.role = newRole return changed } @@ -244,6 +232,16 @@ func (n *Node) Start(ctx context.Context) error { return err } +func (n *Node) currentRole() api.NodeRole { + n.Lock() + currentRole := api.NodeRoleWorker + if n.role == ca.ManagerRole { + currentRole = api.NodeRoleManager + } + n.Unlock() + return currentRole +} + func (n *Node) run(ctx context.Context) (err error) { defer func() { n.err = err @@ -267,9 +265,11 @@ func (n *Node) run(ctx context.Context) (err error) { return err } + renewer := ca.NewTLSRenewer(securityConfig, n.connBroker) + ctx = log.WithLogger(ctx, log.G(ctx).WithField("node.id", n.NodeID())) - taskDBPath := filepath.Join(n.config.StateDir, "worker/tasks.db") + taskDBPath := filepath.Join(n.config.StateDir, "worker", "tasks.db") if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil { return err } @@ -282,57 +282,39 @@ func (n *Node) run(ctx context.Context) (err error) { agentDone := make(chan struct{}) - forceCertRenewal := make(chan struct{}) - renewCert := func() { - for { - select { - case forceCertRenewal <- struct{}{}: - return - case <-agentDone: - return - case <-n.notifyNodeChange: - // consume from the channel to avoid blocking the writer - } - } - } - go func() { + // lastNodeDesiredRole is the last-seen value of Node.Spec.DesiredRole, + // used to make role changes "edge triggered" and avoid renewal loops. + lastNodeDesiredRole := lastSeenRole{role: n.currentRole()} + for { select { case <-agentDone: return case nodeChanges := <-n.notifyNodeChange: - n.Lock() - currentRole := api.NodeRoleWorker - if n.role == ca.ManagerRole { - currentRole = api.NodeRoleManager - } - n.Unlock() + currentRole := n.currentRole() if nodeChanges.Node != nil { // This is a bit complex to be backward compatible with older CAs that // don't support the Node.Role field. They only use what's presently // called DesiredRole. - // 1) If we haven't seen the node object before, and the desired role - // is different from our current role, renew the cert. This covers - // the case of starting up after a role change. - // 2) If we have seen the node before, the desired role is - // different from our current role, and either the actual role or - // desired role has changed relative to the last values we saw in - // those fields, renew the cert. This covers the case of the role - // changing while this node is running, but prevents getting into a - // rotation loop if Node.Role isn't what we expect (because it's - // unset). We may renew the certificate an extra time (first when - // DesiredRole changes, and then again when Role changes). - // 3) If the server is sending us IssuanceStateRotate, renew the cert as + // 1) If DesiredRole changes, kick off a certificate renewal. The renewal + // is delayed slightly to give Role time to change as well if this is + // a newer CA. If the certificate we get back doesn't have the expected + // role, we continue renewing with exponential backoff. + // 2) If the server is sending us IssuanceStateRotate, renew the cert as // requested by the CA. - roleChanged := n.lastNodeRole.observe(nodeChanges.Node.Role) - desiredRoleChanged := n.lastNodeDesiredRole.observe(nodeChanges.Node.Spec.DesiredRole) - if (currentRole != nodeChanges.Node.Spec.DesiredRole && - ((roleChanged && currentRole != nodeChanges.Node.Role) || - desiredRoleChanged)) || - nodeChanges.Node.Certificate.Status.State == api.IssuanceStateRotate { - renewCert() + desiredRoleChanged := lastNodeDesiredRole.observe(nodeChanges.Node.Spec.DesiredRole) + if desiredRoleChanged { + switch nodeChanges.Node.Spec.DesiredRole { + case api.NodeRoleManager: + renewer.SetExpectedRole(ca.ManagerRole) + case api.NodeRoleWorker: + renewer.SetExpectedRole(ca.WorkerRole) + } + } + if desiredRoleChanged || nodeChanges.Node.Certificate.Status.State == api.IssuanceStateRotate { + renewer.Renew() } } @@ -364,7 +346,7 @@ func (n *Node) run(ctx context.Context) (err error) { var wg sync.WaitGroup wg.Add(3) - updates := ca.RenewTLSConfig(ctx, securityConfig, n.connBroker, forceCertRenewal) + updates := renewer.Start(ctx) go func() { for certUpdate := range updates { if certUpdate.Err != nil { @@ -387,7 +369,7 @@ func (n *Node) run(ctx context.Context) (err error) { var managerErr error var agentErr error go func() { - managerErr = n.superviseManager(ctx, securityConfig, paths.RootCA, managerReady, forceCertRenewal) // store err and loop + managerErr = n.superviseManager(ctx, securityConfig, paths.RootCA, managerReady, renewer) // store err and loop wg.Done() cancel() }() @@ -869,7 +851,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig return clearData, nil } -func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, forceCertRenewal chan struct{}) error { +func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, renewer *ca.TLSRenewer) error { for { if err := n.waitRole(ctx, ca.ManagerRole); err != nil { return err @@ -924,14 +906,7 @@ func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.Security log.G(ctx).Warn("failed to get worker role after manager stop, forcing certificate renewal") timer.Reset(roleChangeTimeout) - select { - case forceCertRenewal <- struct{}{}: - case <-timer.C: - log.G(ctx).Warn("failed to trigger certificate renewal after manager stop, restarting manager") - return nil - case <-ctx.Done(): - return ctx.Err() - } + renewer.Renew() // Now that the renewal request has been sent to the // renewal goroutine, wait for a change in role. diff --git a/vendor/github.com/docker/swarmkit/vendor.conf b/vendor/github.com/docker/swarmkit/vendor.conf index 0a651c41e7..742d158e6c 100644 --- a/vendor/github.com/docker/swarmkit/vendor.conf +++ b/vendor/github.com/docker/swarmkit/vendor.conf @@ -8,7 +8,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0 # etcd/raft -github.com/coreos/etcd 824277cb3a577a0e8c829ca9ec557b973fe06d20 +github.com/coreos/etcd ea5389a79f40206170582c1ea076191b8622cb8e https://github.com/aaronlehmann/etcd # for https://github.com/coreos/etcd/pull/7830 github.com/coreos/go-systemd v12 github.com/coreos/pkg v3 github.com/prometheus/client_golang 52437c81da6b127a9925d17eb3a382a2e5fd395e