mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Vendor swarmkit 0e2d9eb
Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
3fe2730ab3
commit
79bf23845b
11 changed files with 253 additions and 91 deletions
|
@ -104,7 +104,7 @@ github.com/docker/containerd 665e84e6c28653a9c29a6db601636a92d46896f3
|
|||
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
|
||||
|
||||
# cluster
|
||||
github.com/docker/swarmkit d60ccf366a6758c7857db968857b72202cb2f902
|
||||
github.com/docker/swarmkit 0e2d9ebcea9d5bbd4a06b3b964fb96356801f880
|
||||
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
||||
github.com/gogo/protobuf 8d70fb3182befc465c4a1eac8ad4d38ff49778e2
|
||||
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
||||
|
|
40
vendor/github.com/docker/swarmkit/ca/certificates.go
generated
vendored
40
vendor/github.com/docker/swarmkit/ca/certificates.go
generated
vendored
|
@ -9,6 +9,7 @@ import (
|
|||
"crypto/rsa"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/asn1"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -72,6 +73,9 @@ const (
|
|||
MinNodeCertExpiration = 1 * time.Hour
|
||||
)
|
||||
|
||||
// BasicConstraintsOID is the ASN1 Object ID indicating a basic constraints extension
|
||||
var BasicConstraintsOID = asn1.ObjectIdentifier{2, 5, 29, 19}
|
||||
|
||||
// A recoverableErr is a non-fatal error encountered signing a certificate,
|
||||
// which means that the certificate issuance may be retried at a later time.
|
||||
type recoverableErr struct {
|
||||
|
@ -305,6 +309,42 @@ func (rca *RootCA) ParseValidateAndSignCSR(csrBytes []byte, cn, ou, org string)
|
|||
return cert, nil
|
||||
}
|
||||
|
||||
// CrossSignCACertificate takes a CA root certificate and generates an intermediate CA from it signed with the current root signer
|
||||
func (rca *RootCA) CrossSignCACertificate(otherCAPEM []byte) ([]byte, error) {
|
||||
if !rca.CanSign() {
|
||||
return nil, ErrNoValidSigner
|
||||
}
|
||||
|
||||
// create a new cert with exactly the same parameters, including the public key and exact NotBefore and NotAfter
|
||||
rootCert, err := helpers.ParseCertificatePEM(rca.Cert)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse old CA certificate")
|
||||
}
|
||||
rootSigner, err := helpers.ParsePrivateKeyPEM(rca.Signer.Key)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse old CA key")
|
||||
}
|
||||
|
||||
newCert, err := helpers.ParseCertificatePEM(otherCAPEM)
|
||||
if err != nil {
|
||||
return nil, errors.New("could not parse new CA certificate")
|
||||
}
|
||||
|
||||
if !newCert.IsCA {
|
||||
return nil, errors.New("certificate not a CA")
|
||||
}
|
||||
|
||||
derBytes, err := x509.CreateCertificate(cryptorand.Reader, newCert, rootCert, newCert.PublicKey, rootSigner)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not cross-sign new CA certificate using old CA material")
|
||||
}
|
||||
|
||||
return pem.EncodeToMemory(&pem.Block{
|
||||
Type: "CERTIFICATE",
|
||||
Bytes: derBytes,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// NewRootCA creates a new RootCA object from unparsed PEM cert bundle and key byte
|
||||
// slices. key may be nil, and in this case NewRootCA will return a RootCA
|
||||
// without a signer.
|
||||
|
|
63
vendor/github.com/docker/swarmkit/ca/external.go
generated
vendored
63
vendor/github.com/docker/swarmkit/ca/external.go
generated
vendored
|
@ -2,14 +2,21 @@ package ca
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
cryptorand "crypto/rand"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"encoding/pem"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/cloudflare/cfssl/api"
|
||||
"github.com/cloudflare/cfssl/config"
|
||||
"github.com/cloudflare/cfssl/csr"
|
||||
"github.com/cloudflare/cfssl/helpers"
|
||||
"github.com/cloudflare/cfssl/signer"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -97,6 +104,62 @@ func (eca *ExternalCA) Sign(ctx context.Context, req signer.SignRequest) (cert [
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// CrossSignRootCA takes a RootCA object, generates a CA CSR, sends a signing request with the CA CSR to the external
|
||||
// CFSSL API server in order to obtain a cross-signed root
|
||||
func (eca *ExternalCA) CrossSignRootCA(ctx context.Context, rca RootCA) ([]byte, error) {
|
||||
if !rca.CanSign() {
|
||||
return nil, errors.Wrap(ErrNoValidSigner, "cannot generate CSR for a cross-signed root")
|
||||
}
|
||||
rootCert, err := helpers.ParseCertificatePEM(rca.Cert)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse CA certificate")
|
||||
}
|
||||
rootSigner, err := helpers.ParsePrivateKeyPEM(rca.Signer.Key)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not parse old CA key")
|
||||
}
|
||||
// ExtractCertificateRequest generates a new key request, and we want to continue to use the old
|
||||
// key. However, ExtractCertificateRequest will also convert the pkix.Name to csr.Name, which we
|
||||
// need in order to generate a signing request
|
||||
cfCSRObj := csr.ExtractCertificateRequest(rootCert)
|
||||
|
||||
der, err := x509.CreateCertificateRequest(cryptorand.Reader, &x509.CertificateRequest{
|
||||
RawSubjectPublicKeyInfo: rootCert.RawSubjectPublicKeyInfo,
|
||||
RawSubject: rootCert.RawSubject,
|
||||
PublicKeyAlgorithm: rootCert.PublicKeyAlgorithm,
|
||||
Subject: rootCert.Subject,
|
||||
Extensions: rootCert.Extensions,
|
||||
DNSNames: rootCert.DNSNames,
|
||||
EmailAddresses: rootCert.EmailAddresses,
|
||||
IPAddresses: rootCert.IPAddresses,
|
||||
}, rootSigner)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req := signer.SignRequest{
|
||||
Request: string(pem.EncodeToMemory(&pem.Block{
|
||||
Type: "CERTIFICATE REQUEST",
|
||||
Bytes: der,
|
||||
})),
|
||||
Subject: &signer.Subject{
|
||||
CN: rootCert.Subject.CommonName,
|
||||
Names: cfCSRObj.Names,
|
||||
},
|
||||
}
|
||||
// cfssl actually ignores non subject alt name extensions in the CSR, so we have to add the CA extension in the signing
|
||||
// request as well
|
||||
for _, ext := range rootCert.Extensions {
|
||||
if ext.Id.Equal(BasicConstraintsOID) {
|
||||
req.Extensions = append(req.Extensions, signer.Extension{
|
||||
ID: config.OID(ext.Id),
|
||||
Critical: ext.Critical,
|
||||
Value: hex.EncodeToString(ext.Value),
|
||||
})
|
||||
}
|
||||
}
|
||||
return eca.Sign(ctx, req)
|
||||
}
|
||||
|
||||
func makeExternalSignRequest(ctx context.Context, client *http.Client, url string, csrJSON []byte) (cert []byte, err error) {
|
||||
resp, err := ctxhttp.Post(ctx, client, url, "application/json", bytes.NewReader(csrJSON))
|
||||
if err != nil {
|
||||
|
|
96
vendor/github.com/docker/swarmkit/manager/allocator/network.go
generated
vendored
96
vendor/github.com/docker/swarmkit/manager/allocator/network.go
generated
vendored
|
@ -26,7 +26,11 @@ const (
|
|||
allocatedStatusMessage = "pending task scheduling"
|
||||
)
|
||||
|
||||
var errNoChanges = errors.New("task unchanged")
|
||||
var (
|
||||
errNoChanges = errors.New("task unchanged")
|
||||
|
||||
retryInterval = 5 * time.Minute
|
||||
)
|
||||
|
||||
func newIngressNetwork() *api.Network {
|
||||
return &api.Network{
|
||||
|
@ -57,19 +61,28 @@ type networkContext struct {
|
|||
// the actual network allocation.
|
||||
nwkAllocator *networkallocator.NetworkAllocator
|
||||
|
||||
// A table of unallocated tasks which will be revisited if any thing
|
||||
// A set of tasks which are ready to be allocated as a batch. This is
|
||||
// distinct from "unallocatedTasks" which are tasks that failed to
|
||||
// allocate on the first try, being held for a future retry.
|
||||
pendingTasks map[string]*api.Task
|
||||
|
||||
// A set of unallocated tasks which will be revisited if any thing
|
||||
// changes in system state that might help task allocation.
|
||||
unallocatedTasks map[string]*api.Task
|
||||
|
||||
// A table of unallocated services which will be revisited if
|
||||
// A set of unallocated services which will be revisited if
|
||||
// any thing changes in system state that might help service
|
||||
// allocation.
|
||||
unallocatedServices map[string]*api.Service
|
||||
|
||||
// A table of unallocated networks which will be revisited if
|
||||
// A set of unallocated networks which will be revisited if
|
||||
// any thing changes in system state that might help network
|
||||
// allocation.
|
||||
unallocatedNetworks map[string]*api.Network
|
||||
|
||||
// lastRetry is the last timestamp when unallocated
|
||||
// tasks/services/networks were retried.
|
||||
lastRetry time.Time
|
||||
}
|
||||
|
||||
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
|
||||
|
@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
|
|||
|
||||
nc := &networkContext{
|
||||
nwkAllocator: na,
|
||||
pendingTasks: make(map[string]*api.Task),
|
||||
unallocatedTasks: make(map[string]*api.Task),
|
||||
unallocatedServices: make(map[string]*api.Service),
|
||||
unallocatedNetworks: make(map[string]*api.Network),
|
||||
ingressNetwork: newIngressNetwork(),
|
||||
lastRetry: time.Now(),
|
||||
}
|
||||
a.netCtx = nc
|
||||
defer func() {
|
||||
|
@ -266,7 +281,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
|
|||
}
|
||||
|
||||
for _, t := range tasks {
|
||||
if taskDead(t) {
|
||||
if t.Status.State > api.TaskStateRunning {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -351,6 +366,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|||
if err := nc.nwkAllocator.Deallocate(n); err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("Failed during network free for network %s", n.ID)
|
||||
}
|
||||
|
||||
delete(nc.unallocatedNetworks, n.ID)
|
||||
case state.EventCreateService:
|
||||
s := v.Service.Copy()
|
||||
|
||||
|
@ -387,6 +404,9 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|||
return a.commitAllocatedService(ctx, batch, s)
|
||||
}); err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("Failed to commit allocation during update for service %s", s.ID)
|
||||
nc.unallocatedServices[s.ID] = s
|
||||
} else {
|
||||
delete(nc.unallocatedServices, s.ID)
|
||||
}
|
||||
case state.EventDeleteService:
|
||||
s := v.Service.Copy()
|
||||
|
@ -403,10 +423,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
|
|||
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
|
||||
a.doTaskAlloc(ctx, ev)
|
||||
case state.EventCommit:
|
||||
a.procUnallocatedNetworks(ctx)
|
||||
a.procUnallocatedServices(ctx)
|
||||
a.procUnallocatedTasksNetwork(ctx)
|
||||
return
|
||||
a.procTasksNetwork(ctx, false)
|
||||
|
||||
if time.Since(nc.lastRetry) > retryInterval {
|
||||
a.procUnallocatedNetworks(ctx)
|
||||
a.procUnallocatedServices(ctx)
|
||||
a.procTasksNetwork(ctx, true)
|
||||
nc.lastRetry = time.Now()
|
||||
}
|
||||
|
||||
// Any left over tasks are moved to the unallocated set
|
||||
for _, t := range nc.pendingTasks {
|
||||
nc.unallocatedTasks[t.ID] = t
|
||||
}
|
||||
nc.pendingTasks = make(map[string]*api.Task)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -456,17 +486,6 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
// taskRunning checks whether a task is either actively running, or in the
|
||||
// process of starting up.
|
||||
func taskRunning(t *api.Task) bool {
|
||||
return t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
|
||||
}
|
||||
|
||||
// taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
|
||||
func taskDead(t *api.Task) bool {
|
||||
return t.DesiredState > api.TaskStateRunning && t.Status.State > api.TaskStateRunning
|
||||
}
|
||||
|
||||
// taskReadyForNetworkVote checks if the task is ready for a network
|
||||
// vote to move it to PENDING state.
|
||||
func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bool {
|
||||
|
@ -569,17 +588,17 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|||
|
||||
nc := a.netCtx
|
||||
|
||||
// If the task has stopped running or it's being deleted then
|
||||
// we should free the network resources associated with the
|
||||
// task right away.
|
||||
if taskDead(t) || isDelete {
|
||||
// If the task has stopped running then we should free the network
|
||||
// resources associated with the task right away.
|
||||
if t.Status.State > api.TaskStateRunning || isDelete {
|
||||
if nc.nwkAllocator.IsTaskAllocated(t) {
|
||||
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("Failed freeing network resources for task %s", t.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup any task references that might exist in unallocatedTasks
|
||||
// Cleanup any task references that might exist
|
||||
delete(nc.pendingTasks, t.ID)
|
||||
delete(nc.unallocatedTasks, t.ID)
|
||||
return
|
||||
}
|
||||
|
@ -587,6 +606,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|||
// If we are already in allocated state, there is
|
||||
// absolutely nothing else to do.
|
||||
if t.Status.State >= api.TaskStatePending {
|
||||
delete(nc.pendingTasks, t.ID)
|
||||
delete(nc.unallocatedTasks, t.ID)
|
||||
return
|
||||
}
|
||||
|
@ -605,7 +625,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|||
// available in store. But we still need to
|
||||
// cleanup network resources associated with
|
||||
// the task.
|
||||
if taskRunning(t) && !isDelete {
|
||||
if t.Status.State <= api.TaskStateRunning && !isDelete {
|
||||
log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: could not find service %s", ev, t.ServiceID, t.ID, t.Status.State, t.ServiceID)
|
||||
return
|
||||
}
|
||||
|
@ -616,7 +636,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
|
|||
// based on service spec.
|
||||
a.taskCreateNetworkAttachments(t, s)
|
||||
|
||||
nc.unallocatedTasks[t.ID] = t
|
||||
nc.pendingTasks[t.ID] = t
|
||||
}
|
||||
|
||||
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
|
||||
|
@ -948,15 +968,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
|
||||
func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
|
||||
nc := a.netCtx
|
||||
allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
|
||||
quiet := false
|
||||
toAllocate := nc.pendingTasks
|
||||
if onRetry {
|
||||
toAllocate = nc.unallocatedTasks
|
||||
quiet = true
|
||||
}
|
||||
allocatedTasks := make([]*api.Task, 0, len(toAllocate))
|
||||
|
||||
for _, t := range nc.unallocatedTasks {
|
||||
for _, t := range toAllocate {
|
||||
if err := a.allocateTask(ctx, t); err == nil {
|
||||
allocatedTasks = append(allocatedTasks, t)
|
||||
} else if err != errNoChanges {
|
||||
log.G(ctx).WithError(err).Error("task allocation failure")
|
||||
if quiet {
|
||||
log.G(ctx).WithError(err).Debug("task allocation failure")
|
||||
} else {
|
||||
log.G(ctx).WithError(err).Error("task allocation failure")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -978,11 +1008,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
|
||||
log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
|
||||
}
|
||||
|
||||
for _, t := range allocatedTasks[:committed] {
|
||||
delete(nc.unallocatedTasks, t.ID)
|
||||
delete(toAllocate, t.ID)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
6
vendor/github.com/docker/swarmkit/manager/controlapi/network.go
generated
vendored
6
vendor/github.com/docker/swarmkit/manager/controlapi/network.go
generated
vendored
|
@ -167,8 +167,10 @@ func (s *Server) RemoveNetwork(ctx context.Context, request *api.RemoveNetworkRe
|
|||
return grpc.Errorf(codes.Internal, "could not find tasks using network %s: %v", request.NetworkID, err)
|
||||
}
|
||||
|
||||
if len(tasks) != 0 {
|
||||
return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, tasks[0].ID)
|
||||
for _, t := range tasks {
|
||||
if t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning {
|
||||
return grpc.Errorf(codes.FailedPrecondition, "network %s is in use by task %s", request.NetworkID, t.ID)
|
||||
}
|
||||
}
|
||||
|
||||
nw := store.GetNetwork(tx, request.NetworkID)
|
||||
|
|
23
vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go
generated
vendored
23
vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go
generated
vendored
|
@ -841,11 +841,6 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
|
|||
}
|
||||
var newSecrets []*api.Secret
|
||||
for _, secretRef := range container.Secrets {
|
||||
// Empty ID prefix will return all secrets. Bail if there is no SecretID
|
||||
if secretRef.SecretID == "" {
|
||||
log.Debugf("invalid secret reference")
|
||||
continue
|
||||
}
|
||||
secretID := secretRef.SecretID
|
||||
log := log.WithFields(logrus.Fields{
|
||||
"secret.id": secretID,
|
||||
|
@ -855,21 +850,15 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
|
|||
if len(tasksUsingSecret[secretID]) == 0 {
|
||||
tasksUsingSecret[secretID] = make(map[string]struct{})
|
||||
|
||||
secrets, err := store.FindSecrets(readTx, store.ByIDPrefix(secretID))
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("error retrieving secret")
|
||||
continue
|
||||
}
|
||||
if len(secrets) != 1 {
|
||||
log.Debugf("secret not found")
|
||||
secret := store.GetSecret(readTx, secretID)
|
||||
if secret == nil {
|
||||
log.Debug("secret not found")
|
||||
continue
|
||||
}
|
||||
|
||||
// If the secret was found and there was one result
|
||||
// (there should never be more than one because of the
|
||||
// uniqueness constraint), add this secret to our
|
||||
// initial set that we send down.
|
||||
newSecrets = append(newSecrets, secrets[0])
|
||||
// If the secret was found, add this secret to
|
||||
// our set that we send down.
|
||||
newSecrets = append(newSecrets, secret)
|
||||
}
|
||||
tasksUsingSecret[secretID][t.ID] = struct{}{}
|
||||
}
|
||||
|
|
55
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
55
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
|
@ -42,7 +42,7 @@ type LogBroker struct {
|
|||
subscriptionQueue *watch.Queue
|
||||
|
||||
registeredSubscriptions map[string]*subscription
|
||||
connectedNodes map[string]struct{}
|
||||
subscriptionsByNode map[string]map[*subscription]struct{}
|
||||
|
||||
pctx context.Context
|
||||
cancelAll context.CancelFunc
|
||||
|
@ -70,7 +70,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
|
|||
lb.logQueue = watch.NewQueue()
|
||||
lb.subscriptionQueue = watch.NewQueue()
|
||||
lb.registeredSubscriptions = make(map[string]*subscription)
|
||||
lb.connectedNodes = make(map[string]struct{})
|
||||
lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
|
||||
lb.mu.Unlock()
|
||||
|
||||
select {
|
||||
|
@ -139,10 +139,13 @@ func (lb *LogBroker) registerSubscription(subscription *subscription) {
|
|||
lb.registeredSubscriptions[subscription.message.ID] = subscription
|
||||
lb.subscriptionQueue.Publish(subscription)
|
||||
|
||||
// Mark nodes that won't receive the message as done.
|
||||
for _, node := range subscription.Nodes() {
|
||||
if _, ok := lb.connectedNodes[node]; !ok {
|
||||
if _, ok := lb.subscriptionsByNode[node]; !ok {
|
||||
// Mark nodes that won't receive the message as done.
|
||||
subscription.Done(node, fmt.Errorf("node %s is not available", node))
|
||||
} else {
|
||||
// otherwise, add the subscription to the node's subscriptions list
|
||||
lb.subscriptionsByNode[node][subscription] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -153,6 +156,14 @@ func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
|
|||
|
||||
delete(lb.registeredSubscriptions, subscription.message.ID)
|
||||
|
||||
// remove the subscription from all of the nodes
|
||||
for _, node := range subscription.Nodes() {
|
||||
// but only if a node exists
|
||||
if _, ok := lb.subscriptionsByNode[node]; ok {
|
||||
delete(lb.subscriptionsByNode[node], subscription)
|
||||
}
|
||||
}
|
||||
|
||||
subscription.Close()
|
||||
lb.subscriptionQueue.Publish(subscription)
|
||||
}
|
||||
|
@ -200,6 +211,21 @@ func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
|
|||
lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
|
||||
}
|
||||
|
||||
// markDone wraps (*Subscription).Done() so that the removal of the sub from
|
||||
// the node's subscription list is possible
|
||||
func (lb *LogBroker) markDone(sub *subscription, nodeID string, err error) {
|
||||
lb.mu.Lock()
|
||||
defer lb.mu.Unlock()
|
||||
|
||||
// remove the subscription from the node's subscription list, if it exists
|
||||
if _, ok := lb.subscriptionsByNode[nodeID]; ok {
|
||||
delete(lb.subscriptionsByNode[nodeID], sub)
|
||||
}
|
||||
|
||||
// mark the sub as done
|
||||
sub.Done(nodeID, err)
|
||||
}
|
||||
|
||||
// SubscribeLogs creates a log subscription and streams back logs
|
||||
func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error {
|
||||
ctx := stream.Context()
|
||||
|
@ -260,14 +286,19 @@ func (lb *LogBroker) nodeConnected(nodeID string) {
|
|||
lb.mu.Lock()
|
||||
defer lb.mu.Unlock()
|
||||
|
||||
lb.connectedNodes[nodeID] = struct{}{}
|
||||
if _, ok := lb.subscriptionsByNode[nodeID]; !ok {
|
||||
lb.subscriptionsByNode[nodeID] = make(map[*subscription]struct{})
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LogBroker) nodeDisconnected(nodeID string) {
|
||||
lb.mu.Lock()
|
||||
defer lb.mu.Unlock()
|
||||
|
||||
delete(lb.connectedNodes, nodeID)
|
||||
for sub := range lb.subscriptionsByNode[nodeID] {
|
||||
sub.Done(nodeID, fmt.Errorf("node %s disconnected unexpectedly", nodeID))
|
||||
}
|
||||
delete(lb.subscriptionsByNode, nodeID)
|
||||
}
|
||||
|
||||
// ListenSubscriptions returns a stream of matching subscriptions for the current node
|
||||
|
@ -292,12 +323,6 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
log.Debug("node registered")
|
||||
|
||||
activeSubscriptions := make(map[string]*subscription)
|
||||
defer func() {
|
||||
// If the worker quits, mark all active subscriptions as finished.
|
||||
for _, subscription := range activeSubscriptions {
|
||||
subscription.Done(remote.NodeID, fmt.Errorf("node %s disconnected unexpectedly", remote.NodeID))
|
||||
}
|
||||
}()
|
||||
|
||||
// Start by sending down all active subscriptions.
|
||||
for _, subscription := range subscriptions {
|
||||
|
@ -323,7 +348,6 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
subscription := v.(*subscription)
|
||||
|
||||
if subscription.Closed() {
|
||||
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
||||
delete(activeSubscriptions, subscription.message.ID)
|
||||
} else {
|
||||
// Avoid sending down the same subscription multiple times
|
||||
|
@ -331,7 +355,6 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
continue
|
||||
}
|
||||
activeSubscriptions[subscription.message.ID] = subscription
|
||||
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
|
||||
}
|
||||
if err := stream.Send(subscription.message); err != nil {
|
||||
log.Error(err)
|
||||
|
@ -355,7 +378,7 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err er
|
|||
var currentSubscription *subscription
|
||||
defer func() {
|
||||
if currentSubscription != nil {
|
||||
currentSubscription.Done(remote.NodeID, err)
|
||||
lb.markDone(currentSubscription, remote.NodeID, err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -387,7 +410,7 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err er
|
|||
if logMsg.Close {
|
||||
// Mark done and then set to nil so if we error after this point,
|
||||
// we don't try to close again in the defer
|
||||
currentSubscription.Done(remote.NodeID, err)
|
||||
lb.markDone(currentSubscription, remote.NodeID, err)
|
||||
currentSubscription = nil
|
||||
return nil
|
||||
}
|
||||
|
|
2
vendor/github.com/docker/swarmkit/manager/manager.go
generated
vendored
2
vendor/github.com/docker/swarmkit/manager/manager.go
generated
vendored
|
@ -985,7 +985,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
|
|||
}(m.globalOrchestrator)
|
||||
|
||||
go func(roleManager *roleManager) {
|
||||
roleManager.Run()
|
||||
roleManager.Run(ctx)
|
||||
}(m.roleManager)
|
||||
}
|
||||
|
||||
|
|
25
vendor/github.com/docker/swarmkit/manager/role_manager.go
generated
vendored
25
vendor/github.com/docker/swarmkit/manager/role_manager.go
generated
vendored
|
@ -41,7 +41,8 @@ func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager
|
|||
}
|
||||
|
||||
// Run is roleManager's main loop.
|
||||
func (rm *roleManager) Run() {
|
||||
// ctx is only used for logging.
|
||||
func (rm *roleManager) Run(ctx context.Context) {
|
||||
defer close(rm.doneChan)
|
||||
|
||||
var (
|
||||
|
@ -60,11 +61,11 @@ func (rm *roleManager) Run() {
|
|||
defer cancelWatch()
|
||||
|
||||
if err != nil {
|
||||
log.L.WithError(err).Error("failed to check nodes for role changes")
|
||||
log.G(ctx).WithError(err).Error("failed to check nodes for role changes")
|
||||
} else {
|
||||
for _, node := range nodes {
|
||||
rm.pending[node.ID] = node
|
||||
rm.reconcileRole(node)
|
||||
rm.reconcileRole(ctx, node)
|
||||
}
|
||||
if len(rm.pending) != 0 {
|
||||
ticker = time.NewTicker(roleReconcileInterval)
|
||||
|
@ -77,14 +78,14 @@ func (rm *roleManager) Run() {
|
|||
case event := <-watcher:
|
||||
node := event.(state.EventUpdateNode).Node
|
||||
rm.pending[node.ID] = node
|
||||
rm.reconcileRole(node)
|
||||
rm.reconcileRole(ctx, node)
|
||||
if len(rm.pending) != 0 && ticker == nil {
|
||||
ticker = time.NewTicker(roleReconcileInterval)
|
||||
tickerCh = ticker.C
|
||||
}
|
||||
case <-tickerCh:
|
||||
for _, node := range rm.pending {
|
||||
rm.reconcileRole(node)
|
||||
rm.reconcileRole(ctx, node)
|
||||
}
|
||||
if len(rm.pending) == 0 {
|
||||
ticker.Stop()
|
||||
|
@ -100,7 +101,7 @@ func (rm *roleManager) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (rm *roleManager) reconcileRole(node *api.Node) {
|
||||
func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) {
|
||||
if node.Role == node.Spec.DesiredRole {
|
||||
// Nothing to do.
|
||||
delete(rm.pending, node.ID)
|
||||
|
@ -118,7 +119,7 @@ func (rm *roleManager) reconcileRole(node *api.Node) {
|
|||
return store.UpdateNode(tx, updatedNode)
|
||||
})
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to promote node %s", node.ID)
|
||||
log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID)
|
||||
} else {
|
||||
delete(rm.pending, node.ID)
|
||||
}
|
||||
|
@ -129,7 +130,7 @@ func (rm *roleManager) reconcileRole(node *api.Node) {
|
|||
// Quorum safeguard
|
||||
if !rm.raft.CanRemoveMember(member.RaftID) {
|
||||
// TODO(aaronl): Retry later
|
||||
log.L.Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID)
|
||||
log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", node.ID)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -139,16 +140,16 @@ func (rm *roleManager) reconcileRole(node *api.Node) {
|
|||
if member.RaftID == rm.raft.Config.ID {
|
||||
// Don't use rmCtx, because we expect to lose
|
||||
// leadership, which will cancel this context.
|
||||
log.L.Info("demoted; transferring leadership")
|
||||
log.G(ctx).Info("demoted; transferring leadership")
|
||||
err := rm.raft.TransferLeadership(context.Background())
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
log.L.WithError(err).Info("failed to transfer leadership")
|
||||
log.G(ctx).WithError(err).Info("failed to transfer leadership")
|
||||
}
|
||||
if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil {
|
||||
// TODO(aaronl): Retry later
|
||||
log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID)
|
||||
log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", node.ID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -163,7 +164,7 @@ func (rm *roleManager) reconcileRole(node *api.Node) {
|
|||
return store.UpdateNode(tx, updatedNode)
|
||||
})
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to demote node %s", node.ID)
|
||||
log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID)
|
||||
} else {
|
||||
delete(rm.pending, node.ID)
|
||||
}
|
||||
|
|
3
vendor/github.com/docker/swarmkit/manager/state/raft/raft.go
generated
vendored
3
vendor/github.com/docker/swarmkit/manager/state/raft/raft.go
generated
vendored
|
@ -375,6 +375,9 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
|
|||
n.addrLock.Lock()
|
||||
defer n.addrLock.Unlock()
|
||||
|
||||
// override the module field entirely, since etcd/raft is not exactly a submodule
|
||||
n.Config.Logger = log.G(ctx).WithField("module", "raft")
|
||||
|
||||
// restore from snapshot
|
||||
if loadAndStartErr == nil {
|
||||
if n.opts.JoinAddr != "" {
|
||||
|
|
29
vendor/github.com/docker/swarmkit/node/node.go
generated
vendored
29
vendor/github.com/docker/swarmkit/node/node.go
generated
vendored
|
@ -227,19 +227,21 @@ func (n *Node) run(ctx context.Context) (err error) {
|
|||
defer cancel()
|
||||
ctx = log.WithModule(ctx, "node")
|
||||
|
||||
go func() {
|
||||
go func(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-n.stopped:
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
}(ctx)
|
||||
|
||||
securityConfig, err := n.loadSecurityConfig(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("node.id", n.NodeID()))
|
||||
|
||||
taskDBPath := filepath.Join(n.config.StateDir, "worker/tasks.db")
|
||||
if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil {
|
||||
return err
|
||||
|
@ -251,18 +253,26 @@ func (n *Node) run(ctx context.Context) (err error) {
|
|||
}
|
||||
defer db.Close()
|
||||
|
||||
agentDone := make(chan struct{})
|
||||
|
||||
forceCertRenewal := make(chan struct{})
|
||||
renewCert := func() {
|
||||
select {
|
||||
case forceCertRenewal <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
for {
|
||||
select {
|
||||
case forceCertRenewal <- struct{}{}:
|
||||
return
|
||||
case <-agentDone:
|
||||
return
|
||||
case <-n.notifyNodeChange:
|
||||
// consume from the channel to avoid blocking the writer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-agentDone:
|
||||
return
|
||||
case node := <-n.notifyNodeChange:
|
||||
// If the server is sending us a ForceRenewal State, renew
|
||||
|
@ -320,6 +330,7 @@ func (n *Node) run(ctx context.Context) (err error) {
|
|||
agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
|
||||
wg.Done()
|
||||
cancel()
|
||||
close(agentDone)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
|
@ -733,12 +744,12 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
|
|||
}
|
||||
done := make(chan struct{})
|
||||
var runErr error
|
||||
go func() {
|
||||
if err := m.Run(context.Background()); err != nil {
|
||||
go func(logger *logrus.Entry) {
|
||||
if err := m.Run(log.WithLogger(context.Background(), logger)); err != nil {
|
||||
runErr = err
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
}(log.G(ctx))
|
||||
|
||||
var clearData bool
|
||||
defer func() {
|
||||
|
|
Loading…
Reference in a new issue