From 79bf23845b7829bf68b9dda47af46886b274ac78 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Fri, 10 Mar 2017 11:15:44 -0800 Subject: [PATCH] Vendor swarmkit 0e2d9eb Signed-off-by: Aaron Lehmann --- vendor.conf | 2 +- .../docker/swarmkit/ca/certificates.go | 40 ++++++++ .../github.com/docker/swarmkit/ca/external.go | 63 ++++++++++++ .../swarmkit/manager/allocator/network.go | 96 ++++++++++++------- .../swarmkit/manager/controlapi/network.go | 6 +- .../swarmkit/manager/dispatcher/dispatcher.go | 23 ++--- .../swarmkit/manager/logbroker/broker.go | 55 +++++++---- .../docker/swarmkit/manager/manager.go | 2 +- .../docker/swarmkit/manager/role_manager.go | 25 ++--- .../swarmkit/manager/state/raft/raft.go | 3 + .../github.com/docker/swarmkit/node/node.go | 29 ++++-- 11 files changed, 253 insertions(+), 91 deletions(-) diff --git a/vendor.conf b/vendor.conf index 74e313a8ad..0ba33c7674 100644 --- a/vendor.conf +++ b/vendor.conf @@ -104,7 +104,7 @@ github.com/docker/containerd 665e84e6c28653a9c29a6db601636a92d46896f3 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit d60ccf366a6758c7857db968857b72202cb2f902 +github.com/docker/swarmkit 0e2d9ebcea9d5bbd4a06b3b964fb96356801f880 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf 8d70fb3182befc465c4a1eac8ad4d38ff49778e2 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/ca/certificates.go b/vendor/github.com/docker/swarmkit/ca/certificates.go index 348f16ad83..7a7b9abc73 100644 --- a/vendor/github.com/docker/swarmkit/ca/certificates.go +++ b/vendor/github.com/docker/swarmkit/ca/certificates.go @@ -9,6 +9,7 @@ import ( "crypto/rsa" "crypto/tls" "crypto/x509" + "encoding/asn1" "encoding/pem" "fmt" "io" @@ -72,6 +73,9 @@ const ( MinNodeCertExpiration = 1 * time.Hour ) +// BasicConstraintsOID is the ASN1 Object ID indicating a basic constraints extension +var BasicConstraintsOID = asn1.ObjectIdentifier{2, 5, 29, 19} + // A recoverableErr is a non-fatal error encountered signing a certificate, // which means that the certificate issuance may be retried at a later time. type recoverableErr struct { @@ -305,6 +309,42 @@ func (rca *RootCA) ParseValidateAndSignCSR(csrBytes []byte, cn, ou, org string) return cert, nil } +// CrossSignCACertificate takes a CA root certificate and generates an intermediate CA from it signed with the current root signer +func (rca *RootCA) CrossSignCACertificate(otherCAPEM []byte) ([]byte, error) { + if !rca.CanSign() { + return nil, ErrNoValidSigner + } + + // create a new cert with exactly the same parameters, including the public key and exact NotBefore and NotAfter + rootCert, err := helpers.ParseCertificatePEM(rca.Cert) + if err != nil { + return nil, errors.Wrap(err, "could not parse old CA certificate") + } + rootSigner, err := helpers.ParsePrivateKeyPEM(rca.Signer.Key) + if err != nil { + return nil, errors.Wrap(err, "could not parse old CA key") + } + + newCert, err := helpers.ParseCertificatePEM(otherCAPEM) + if err != nil { + return nil, errors.New("could not parse new CA certificate") + } + + if !newCert.IsCA { + return nil, errors.New("certificate not a CA") + } + + derBytes, err := x509.CreateCertificate(cryptorand.Reader, newCert, rootCert, newCert.PublicKey, rootSigner) + if err != nil { + return nil, errors.Wrap(err, "could not cross-sign new CA certificate using old CA material") + } + + return pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: derBytes, + }), nil +} + // NewRootCA creates a new RootCA object from unparsed PEM cert bundle and key byte // slices. key may be nil, and in this case NewRootCA will return a RootCA // without a signer. diff --git a/vendor/github.com/docker/swarmkit/ca/external.go b/vendor/github.com/docker/swarmkit/ca/external.go index 492d55c300..f53078f8f4 100644 --- a/vendor/github.com/docker/swarmkit/ca/external.go +++ b/vendor/github.com/docker/swarmkit/ca/external.go @@ -2,14 +2,21 @@ package ca import ( "bytes" + cryptorand "crypto/rand" "crypto/tls" + "crypto/x509" + "encoding/hex" "encoding/json" + "encoding/pem" "io/ioutil" "net/http" "sync" "github.com/Sirupsen/logrus" "github.com/cloudflare/cfssl/api" + "github.com/cloudflare/cfssl/config" + "github.com/cloudflare/cfssl/csr" + "github.com/cloudflare/cfssl/helpers" "github.com/cloudflare/cfssl/signer" "github.com/pkg/errors" "golang.org/x/net/context" @@ -97,6 +104,62 @@ func (eca *ExternalCA) Sign(ctx context.Context, req signer.SignRequest) (cert [ return nil, err } +// CrossSignRootCA takes a RootCA object, generates a CA CSR, sends a signing request with the CA CSR to the external +// CFSSL API server in order to obtain a cross-signed root +func (eca *ExternalCA) CrossSignRootCA(ctx context.Context, rca RootCA) ([]byte, error) { + if !rca.CanSign() { + return nil, errors.Wrap(ErrNoValidSigner, "cannot generate CSR for a cross-signed root") + } + rootCert, err := helpers.ParseCertificatePEM(rca.Cert) + if err != nil { + return nil, errors.Wrap(err, "could not parse CA certificate") + } + rootSigner, err := helpers.ParsePrivateKeyPEM(rca.Signer.Key) + if err != nil { + return nil, errors.Wrap(err, "could not parse old CA key") + } + // ExtractCertificateRequest generates a new key request, and we want to continue to use the old + // key. However, ExtractCertificateRequest will also convert the pkix.Name to csr.Name, which we + // need in order to generate a signing request + cfCSRObj := csr.ExtractCertificateRequest(rootCert) + + der, err := x509.CreateCertificateRequest(cryptorand.Reader, &x509.CertificateRequest{ + RawSubjectPublicKeyInfo: rootCert.RawSubjectPublicKeyInfo, + RawSubject: rootCert.RawSubject, + PublicKeyAlgorithm: rootCert.PublicKeyAlgorithm, + Subject: rootCert.Subject, + Extensions: rootCert.Extensions, + DNSNames: rootCert.DNSNames, + EmailAddresses: rootCert.EmailAddresses, + IPAddresses: rootCert.IPAddresses, + }, rootSigner) + if err != nil { + return nil, err + } + req := signer.SignRequest{ + Request: string(pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE REQUEST", + Bytes: der, + })), + Subject: &signer.Subject{ + CN: rootCert.Subject.CommonName, + Names: cfCSRObj.Names, + }, + } + // cfssl actually ignores non subject alt name extensions in the CSR, so we have to add the CA extension in the signing + // request as well + for _, ext := range rootCert.Extensions { + if ext.Id.Equal(BasicConstraintsOID) { + req.Extensions = append(req.Extensions, signer.Extension{ + ID: config.OID(ext.Id), + Critical: ext.Critical, + Value: hex.EncodeToString(ext.Value), + }) + } + } + return eca.Sign(ctx, req) +} + func makeExternalSignRequest(ctx context.Context, client *http.Client, url string, csrJSON []byte) (cert []byte, err error) { resp, err := ctxhttp.Post(ctx, client, url, "application/json", bytes.NewReader(csrJSON)) if err != nil { diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/network.go b/vendor/github.com/docker/swarmkit/manager/allocator/network.go index 38871fa0e7..13e0235aac 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/network.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/network.go @@ -26,7 +26,11 @@ const ( allocatedStatusMessage = "pending task scheduling" ) -var errNoChanges = errors.New("task unchanged") +var ( + errNoChanges = errors.New("task unchanged") + + retryInterval = 5 * time.Minute +) func newIngressNetwork() *api.Network { return &api.Network{ @@ -57,19 +61,28 @@ type networkContext struct { // the actual network allocation. nwkAllocator *networkallocator.NetworkAllocator - // A table of unallocated tasks which will be revisited if any thing + // A set of tasks which are ready to be allocated as a batch. This is + // distinct from "unallocatedTasks" which are tasks that failed to + // allocate on the first try, being held for a future retry. + pendingTasks map[string]*api.Task + + // A set of unallocated tasks which will be revisited if any thing // changes in system state that might help task allocation. unallocatedTasks map[string]*api.Task - // A table of unallocated services which will be revisited if + // A set of unallocated services which will be revisited if // any thing changes in system state that might help service // allocation. unallocatedServices map[string]*api.Service - // A table of unallocated networks which will be revisited if + // A set of unallocated networks which will be revisited if // any thing changes in system state that might help network // allocation. unallocatedNetworks map[string]*api.Network + + // lastRetry is the last timestamp when unallocated + // tasks/services/networks were retried. + lastRetry time.Time } func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { @@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { nc := &networkContext{ nwkAllocator: na, + pendingTasks: make(map[string]*api.Task), unallocatedTasks: make(map[string]*api.Task), unallocatedServices: make(map[string]*api.Service), unallocatedNetworks: make(map[string]*api.Network), ingressNetwork: newIngressNetwork(), + lastRetry: time.Now(), } a.netCtx = nc defer func() { @@ -266,7 +281,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { } for _, t := range tasks { - if taskDead(t) { + if t.Status.State > api.TaskStateRunning { continue } @@ -351,6 +366,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { if err := nc.nwkAllocator.Deallocate(n); err != nil { log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID) } + + delete(nc.unallocatedNetworks, n.ID) case state.EventCreateService: s := v.Service.Copy() @@ -387,6 +404,9 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { return a.commitAllocatedService(ctx, batch, s) }); err != nil { log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID) + nc.unallocatedServices[s.ID] = s + } else { + delete(nc.unallocatedServices, s.ID) } case state.EventDeleteService: s := v.Service.Copy() @@ -403,10 +423,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask: a.doTaskAlloc(ctx, ev) case state.EventCommit: - a.procUnallocatedNetworks(ctx) - a.procUnallocatedServices(ctx) - a.procUnallocatedTasksNetwork(ctx) - return + a.procTasksNetwork(ctx, false) + + if time.Since(nc.lastRetry) > retryInterval { + a.procUnallocatedNetworks(ctx) + a.procUnallocatedServices(ctx) + a.procTasksNetwork(ctx, true) + nc.lastRetry = time.Now() + } + + // Any left over tasks are moved to the unallocated set + for _, t := range nc.pendingTasks { + nc.unallocatedTasks[t.ID] = t + } + nc.pendingTasks = make(map[string]*api.Task) } } @@ -456,17 +486,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) { } } -// taskRunning checks whether a task is either actively running, or in the -// process of starting up. -func taskRunning(t *api.Task) bool { - return t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning -} - -// taskDead checks whether a task is not actively running as far as allocator purposes are concerned. -func taskDead(t *api.Task) bool { - return t.DesiredState > api.TaskStateRunning && t.Status.State > api.TaskStateRunning -} - // taskReadyForNetworkVote checks if the task is ready for a network // vote to move it to PENDING state. func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool { @@ -569,17 +588,17 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { nc := a.netCtx - // If the task has stopped running or it's being deleted then - // we should free the network resources associated with the - // task right away. - if taskDead(t) || isDelete { + // If the task has stopped running then we should free the network + // resources associated with the task right away. + if t.Status.State > api.TaskStateRunning || isDelete { if nc.nwkAllocator.IsTaskAllocated(t) { if err := nc.nwkAllocator.DeallocateTask(t); err != nil { log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID) } } - // Cleanup any task references that might exist in unallocatedTasks + // Cleanup any task references that might exist + delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) return } @@ -587,6 +606,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // If we are already in allocated state, there is // absolutely nothing else to do. if t.Status.State >= api.TaskStatePending { + delete(nc.pendingTasks, t.ID) delete(nc.unallocatedTasks, t.ID) return } @@ -605,7 +625,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // available in store. But we still need to // cleanup network resources associated with // the task. - if taskRunning(t) && !isDelete { + if t.Status.State <= api.TaskStateRunning && !isDelete { log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID) return } @@ -616,7 +636,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { // based on service spec. a.taskCreateNetworkAttachments(t, s) - nc.unallocatedTasks[t.ID] = t + nc.pendingTasks[t.ID] = t } func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error { @@ -948,15 +968,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { } } -func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) { +func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) { nc := a.netCtx - allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks)) + quiet := false + toAllocate := nc.pendingTasks + if onRetry { + toAllocate = nc.unallocatedTasks + quiet = true + } + allocatedTasks := make([]*api.Task, 0, len(toAllocate)) - for _, t := range nc.unallocatedTasks { + for _, t := range toAllocate { if err := a.allocateTask(ctx, t); err == nil { allocatedTasks = append(allocatedTasks, t) } else if err != errNoChanges { - log.G(ctx).WithError(err).Error("task allocation failure") + if quiet { + log.G(ctx).WithError(err).Debug("task allocation failure") + } else { + log.G(ctx).WithError(err).Error("task allocation failure") + } } } @@ -978,11 +1008,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) { }) if err != nil { - log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks") + log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks") } for _, t := range allocatedTasks[:committed] { - delete(nc.unallocatedTasks, t.ID) + delete(toAllocate, t.ID) } } diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/network.go b/vendor/github.com/docker/swarmkit/manager/controlapi/network.go index 00634f184b..b5cd25fd07 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/network.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/network.go @@ -167,8 +167,10 @@ func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRe return grpc.Errorf(codes.Internal, "could not find tasks using network %s: %v", request.NetworkID, err) } - if len(tasks) != 0 { - return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, tasks[0].ID) + for _, t := range tasks { + if t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning { + return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, t.ID) + } } nw := store.GetNetwork(tx, request.NetworkID) diff --git a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index 2bb738b08a..3751cbd08a 100644 --- a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -841,11 +841,6 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche } var newSecrets []*api.Secret for _, secretRef := range container.Secrets { - // Empty ID prefix will return all secrets. Bail if there is no SecretID - if secretRef.SecretID == "" { - log.Debugf("invalid secret reference") - continue - } secretID := secretRef.SecretID log := log.WithFields(logrus.Fields{ "secret.id": secretID, @@ -855,21 +850,15 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche if len(tasksUsingSecret[secretID]) == 0 { tasksUsingSecret[secretID] = make(map[string]struct{}) - secrets, err := store.FindSecrets(readTx, store.ByIDPrefix(secretID)) - if err != nil { - log.WithError(err).Errorf("error retrieving secret") - continue - } - if len(secrets) != 1 { - log.Debugf("secret not found") + secret := store.GetSecret(readTx, secretID) + if secret == nil { + log.Debug("secret not found") continue } - // If the secret was found and there was one result - // (there should never be more than one because of the - // uniqueness constraint), add this secret to our - // initial set that we send down. - newSecrets = append(newSecrets, secrets[0]) + // If the secret was found, add this secret to + // our set that we send down. + newSecrets = append(newSecrets, secret) } tasksUsingSecret[secretID][t.ID] = struct{}{} } diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index b8f380e51b..f5ec2b30bd 100644 --- a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -42,7 +42,7 @@ type LogBroker struct { subscriptionQueue *watch.Queue registeredSubscriptions map[string]*subscription - connectedNodes map[string]struct{} + subscriptionsByNode map[string]map[*subscription]struct{} pctx context.Context cancelAll context.CancelFunc @@ -70,7 +70,7 @@ func (lb *LogBroker) Run(ctx context.Context) error { lb.logQueue = watch.NewQueue() lb.subscriptionQueue = watch.NewQueue() lb.registeredSubscriptions = make(map[string]*subscription) - lb.connectedNodes = make(map[string]struct{}) + lb.subscriptionsByNode = make(map[string]map[*subscription]struct{}) lb.mu.Unlock() select { @@ -139,10 +139,13 @@ func (lb *LogBroker) registerSubscription(subscription *subscription) { lb.registeredSubscriptions[subscription.message.ID] = subscription lb.subscriptionQueue.Publish(subscription) - // Mark nodes that won't receive the message as done. for _, node := range subscription.Nodes() { - if _, ok := lb.connectedNodes[node]; !ok { + if _, ok := lb.subscriptionsByNode[node]; !ok { + // Mark nodes that won't receive the message as done. subscription.Done(node, fmt.Errorf("node %s is not available", node)) + } else { + // otherwise, add the subscription to the node's subscriptions list + lb.subscriptionsByNode[node][subscription] = struct{}{} } } } @@ -153,6 +156,14 @@ func (lb *LogBroker) unregisterSubscription(subscription *subscription) { delete(lb.registeredSubscriptions, subscription.message.ID) + // remove the subscription from all of the nodes + for _, node := range subscription.Nodes() { + // but only if a node exists + if _, ok := lb.subscriptionsByNode[node]; ok { + delete(lb.subscriptionsByNode[node], subscription) + } + } + subscription.Close() lb.subscriptionQueue.Publish(subscription) } @@ -200,6 +211,21 @@ func (lb *LogBroker) publish(log *api.PublishLogsMessage) { lb.logQueue.Publish(&logMessage{PublishLogsMessage: log}) } +// markDone wraps (*Subscription).Done() so that the removal of the sub from +// the node's subscription list is possible +func (lb *LogBroker) markDone(sub *subscription, nodeID string, err error) { + lb.mu.Lock() + defer lb.mu.Unlock() + + // remove the subscription from the node's subscription list, if it exists + if _, ok := lb.subscriptionsByNode[nodeID]; ok { + delete(lb.subscriptionsByNode[nodeID], sub) + } + + // mark the sub as done + sub.Done(nodeID, err) +} + // SubscribeLogs creates a log subscription and streams back logs func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error { ctx := stream.Context() @@ -260,14 +286,19 @@ func (lb *LogBroker) nodeConnected(nodeID string) { lb.mu.Lock() defer lb.mu.Unlock() - lb.connectedNodes[nodeID] = struct{}{} + if _, ok := lb.subscriptionsByNode[nodeID]; !ok { + lb.subscriptionsByNode[nodeID] = make(map[*subscription]struct{}) + } } func (lb *LogBroker) nodeDisconnected(nodeID string) { lb.mu.Lock() defer lb.mu.Unlock() - delete(lb.connectedNodes, nodeID) + for sub := range lb.subscriptionsByNode[nodeID] { + sub.Done(nodeID, fmt.Errorf("node %s disconnected unexpectedly", nodeID)) + } + delete(lb.subscriptionsByNode, nodeID) } // ListenSubscriptions returns a stream of matching subscriptions for the current node @@ -292,12 +323,6 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest log.Debug("node registered") activeSubscriptions := make(map[string]*subscription) - defer func() { - // If the worker quits, mark all active subscriptions as finished. - for _, subscription := range activeSubscriptions { - subscription.Done(remote.NodeID, fmt.Errorf("node %s disconnected unexpectedly", remote.NodeID)) - } - }() // Start by sending down all active subscriptions. for _, subscription := range subscriptions { @@ -323,7 +348,6 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest subscription := v.(*subscription) if subscription.Closed() { - log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed") delete(activeSubscriptions, subscription.message.ID) } else { // Avoid sending down the same subscription multiple times @@ -331,7 +355,6 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest continue } activeSubscriptions[subscription.message.ID] = subscription - log.WithField("subscription.id", subscription.message.ID).Debug("subscription added") } if err := stream.Send(subscription.message); err != nil { log.Error(err) @@ -355,7 +378,7 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err er var currentSubscription *subscription defer func() { if currentSubscription != nil { - currentSubscription.Done(remote.NodeID, err) + lb.markDone(currentSubscription, remote.NodeID, err) } }() @@ -387,7 +410,7 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err er if logMsg.Close { // Mark done and then set to nil so if we error after this point, // we don't try to close again in the defer - currentSubscription.Done(remote.NodeID, err) + lb.markDone(currentSubscription, remote.NodeID, err) currentSubscription = nil return nil } diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index 7c8cddb357..1f2cdf93f3 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -985,7 +985,7 @@ func (m *Manager) becomeLeader(ctx context.Context) { }(m.globalOrchestrator) go func(roleManager *roleManager) { - roleManager.Run() + roleManager.Run(ctx) }(m.roleManager) } diff --git a/vendor/github.com/docker/swarmkit/manager/role_manager.go b/vendor/github.com/docker/swarmkit/manager/role_manager.go index 4fd95c1293..5193b90159 100644 --- a/vendor/github.com/docker/swarmkit/manager/role_manager.go +++ b/vendor/github.com/docker/swarmkit/manager/role_manager.go @@ -41,7 +41,8 @@ func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager } // Run is roleManager's main loop. -func (rm *roleManager) Run() { +// ctx is only used for logging. +func (rm *roleManager) Run(ctx context.Context) { defer close(rm.doneChan) var ( @@ -60,11 +61,11 @@ func (rm *roleManager) Run() { defer cancelWatch() if err != nil { - log.L.WithError(err).Error("failed to check nodes for role changes") + log.G(ctx).WithError(err).Error("failed to check nodes for role changes") } else { for _, node := range nodes { rm.pending[node.ID] = node - rm.reconcileRole(node) + rm.reconcileRole(ctx, node) } if len(rm.pending) != 0 { ticker = time.NewTicker(roleReconcileInterval) @@ -77,14 +78,14 @@ func (rm *roleManager) Run() { case event := <-watcher: node := event.(state.EventUpdateNode).Node rm.pending[node.ID] = node - rm.reconcileRole(node) + rm.reconcileRole(ctx, node) if len(rm.pending) != 0 && ticker == nil { ticker = time.NewTicker(roleReconcileInterval) tickerCh = ticker.C } case <-tickerCh: for _, node := range rm.pending { - rm.reconcileRole(node) + rm.reconcileRole(ctx, node) } if len(rm.pending) == 0 { ticker.Stop() @@ -100,7 +101,7 @@ func (rm *roleManager) Run() { } } -func (rm *roleManager) reconcileRole(node *api.Node) { +func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if node.Role == node.Spec.DesiredRole { // Nothing to do. delete(rm.pending, node.ID) @@ -118,7 +119,7 @@ func (rm *roleManager) reconcileRole(node *api.Node) { return store.UpdateNode(tx, updatedNode) }) if err != nil { - log.L.WithError(err).Errorf("failed to promote node %s", node.ID) + log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID) } else { delete(rm.pending, node.ID) } @@ -129,7 +130,7 @@ func (rm *roleManager) reconcileRole(node *api.Node) { // Quorum safeguard if !rm.raft.CanRemoveMember(member.RaftID) { // TODO(aaronl): Retry later - log.L.Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID) + log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID) return } @@ -139,16 +140,16 @@ func (rm *roleManager) reconcileRole(node *api.Node) { if member.RaftID == rm.raft.Config.ID { // Don't use rmCtx, because we expect to lose // leadership, which will cancel this context. - log.L.Info("demoted; transferring leadership") + log.G(ctx).Info("demoted; transferring leadership") err := rm.raft.TransferLeadership(context.Background()) if err == nil { return } - log.L.WithError(err).Info("failed to transfer leadership") + log.G(ctx).WithError(err).Info("failed to transfer leadership") } if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { // TODO(aaronl): Retry later - log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID) + log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", node.ID) } return } @@ -163,7 +164,7 @@ func (rm *roleManager) reconcileRole(node *api.Node) { return store.UpdateNode(tx, updatedNode) }) if err != nil { - log.L.WithError(err).Errorf("failed to demote node %s", node.ID) + log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID) } else { delete(rm.pending, node.ID) } diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go index 65b74bad77..0d2eae9eab 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -375,6 +375,9 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { n.addrLock.Lock() defer n.addrLock.Unlock() + // override the module field entirely, since etcd/raft is not exactly a submodule + n.Config.Logger = log.G(ctx).WithField("module", "raft") + // restore from snapshot if loadAndStartErr == nil { if n.opts.JoinAddr != "" { diff --git a/vendor/github.com/docker/swarmkit/node/node.go b/vendor/github.com/docker/swarmkit/node/node.go index f8f8ab7088..9c505b4acf 100644 --- a/vendor/github.com/docker/swarmkit/node/node.go +++ b/vendor/github.com/docker/swarmkit/node/node.go @@ -227,19 +227,21 @@ func (n *Node) run(ctx context.Context) (err error) { defer cancel() ctx = log.WithModule(ctx, "node") - go func() { + go func(ctx context.Context) { select { case <-ctx.Done(): case <-n.stopped: cancel() } - }() + }(ctx) securityConfig, err := n.loadSecurityConfig(ctx) if err != nil { return err } + ctx = log.WithLogger(ctx, log.G(ctx).WithField("node.id", n.NodeID())) + taskDBPath := filepath.Join(n.config.StateDir, "worker/tasks.db") if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil { return err @@ -251,18 +253,26 @@ func (n *Node) run(ctx context.Context) (err error) { } defer db.Close() + agentDone := make(chan struct{}) + forceCertRenewal := make(chan struct{}) renewCert := func() { - select { - case forceCertRenewal <- struct{}{}: - case <-ctx.Done(): + for { + select { + case forceCertRenewal <- struct{}{}: + return + case <-agentDone: + return + case <-n.notifyNodeChange: + // consume from the channel to avoid blocking the writer + } } } go func() { for { select { - case <-ctx.Done(): + case <-agentDone: return case node := <-n.notifyNodeChange: // If the server is sending us a ForceRenewal State, renew @@ -320,6 +330,7 @@ func (n *Node) run(ctx context.Context) (err error) { agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady) wg.Done() cancel() + close(agentDone) }() go func() { @@ -733,12 +744,12 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig } done := make(chan struct{}) var runErr error - go func() { - if err := m.Run(context.Background()); err != nil { + go func(logger *logrus.Entry) { + if err := m.Run(log.WithLogger(context.Background(), logger)); err != nil { runErr = err } close(done) - }() + }(log.G(ctx)) var clearData bool defer func() {