mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Revendor SwarmKit to 9bca23b0de42a9ebcc71622a30d646afa1e2b564
Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
parent
30d0c3899e
commit
e35c1747f5
14 changed files with 500 additions and 294 deletions
|
@ -100,7 +100,7 @@ github.com/docker/containerd 8517738ba4b82aff5662c97ca4627e7e4d03b531
|
||||||
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
|
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
|
||||||
|
|
||||||
# cluster
|
# cluster
|
||||||
github.com/docker/swarmkit 91c6e2db9c0c91c466a83529ed16649a1de7ccc4
|
github.com/docker/swarmkit 9bca23b0de42a9ebcc71622a30d646afa1e2b564
|
||||||
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
||||||
github.com/gogo/protobuf v0.3
|
github.com/gogo/protobuf v0.3
|
||||||
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
||||||
|
|
36
vendor/github.com/docker/swarmkit/agent/agent.go
generated
vendored
36
vendor/github.com/docker/swarmkit/agent/agent.go
generated
vendored
|
@ -406,37 +406,41 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publisher returns a LogPublisher for the given subscription
|
// Publisher returns a LogPublisher for the given subscription
|
||||||
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) {
|
// as well as a cancel function that should be called when the log stream
|
||||||
|
// is completed.
|
||||||
|
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, func(), error) {
|
||||||
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
|
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
|
||||||
// These should only be best effort and really just buffer until a session is
|
// These should only be best effort and really just buffer until a session is
|
||||||
// ready. Ideally, they would use a separate connection completely.
|
// ready. Ideally, they would use a separate connection completely.
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
client api.LogBroker_PublishLogsClient
|
publisher api.LogBroker_PublishLogsClient
|
||||||
)
|
)
|
||||||
|
|
||||||
err = a.withSession(ctx, func(session *session) error {
|
err = a.withSession(ctx, func(session *session) error {
|
||||||
client, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
|
publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
|
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
client.CloseSend()
|
publisher.CloseSend()
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
return client.Send(&api.PublishLogsMessage{
|
return publisher.Send(&api.PublishLogsMessage{
|
||||||
SubscriptionID: subscriptionID,
|
SubscriptionID: subscriptionID,
|
||||||
Messages: []api.LogMessage{message},
|
Messages: []api.LogMessage{message},
|
||||||
})
|
})
|
||||||
}), nil
|
}), func() {
|
||||||
|
publisher.CloseSend()
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
|
// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
|
||||||
|
|
2
vendor/github.com/docker/swarmkit/agent/exec/controller.go
generated
vendored
2
vendor/github.com/docker/swarmkit/agent/exec/controller.go
generated
vendored
|
@ -69,7 +69,7 @@ func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage)
|
||||||
|
|
||||||
// LogPublisherProvider defines the protocol for receiving a log publisher
|
// LogPublisherProvider defines the protocol for receiving a log publisher
|
||||||
type LogPublisherProvider interface {
|
type LogPublisherProvider interface {
|
||||||
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error)
|
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainerStatuser reports status of a container.
|
// ContainerStatuser reports status of a container.
|
||||||
|
|
10
vendor/github.com/docker/swarmkit/agent/session.go
generated
vendored
10
vendor/github.com/docker/swarmkit/agent/session.go
generated
vendored
|
@ -226,6 +226,16 @@ func (s *session) logSubscriptions(ctx context.Context) error {
|
||||||
|
|
||||||
client := api.NewLogBrokerClient(s.conn)
|
client := api.NewLogBrokerClient(s.conn)
|
||||||
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
|
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
|
||||||
|
if grpc.Code(err) == codes.Unimplemented {
|
||||||
|
log.Warning("manager does not support log subscriptions")
|
||||||
|
// Don't return, because returning would bounce the session
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
return errSessionClosed
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
39
vendor/github.com/docker/swarmkit/agent/worker.go
generated
vendored
39
vendor/github.com/docker/swarmkit/agent/worker.go
generated
vendored
|
@ -406,12 +406,12 @@ func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID strin
|
||||||
func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
|
func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
|
||||||
log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)
|
log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)
|
||||||
|
|
||||||
publisher, err := w.publisherProvider.Publisher(ctx, subscription.ID)
|
publisher, cancel, err := w.publisherProvider.Publisher(ctx, subscription.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Send a close once we're done
|
// Send a close once we're done
|
||||||
defer publisher.Publish(ctx, api.LogMessage{})
|
defer cancel()
|
||||||
|
|
||||||
match := func(t *api.Task) bool {
|
match := func(t *api.Task) bool {
|
||||||
// TODO(aluzzardi): Consider using maps to limit the iterations.
|
// TODO(aluzzardi): Consider using maps to limit the iterations.
|
||||||
|
@ -436,26 +436,49 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
ch, cancel := w.taskevents.Watch()
|
wg := sync.WaitGroup{}
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
for _, tm := range w.taskManagers {
|
for _, tm := range w.taskManagers {
|
||||||
if match(tm.task) {
|
if match(tm.task) {
|
||||||
go tm.Logs(ctx, *subscription.Options, publisher)
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
tm.Logs(ctx, *subscription.Options, publisher)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
w.mu.Unlock()
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
// If follow mode is disabled, wait for the current set of matched tasks
|
||||||
|
// to finish publishing logs, then close the subscription by returning.
|
||||||
|
if subscription.Options == nil || !subscription.Options.Follow {
|
||||||
|
waitCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(waitCh)
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-waitCh:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// In follow mode, watch for new tasks. Don't close the subscription
|
||||||
|
// until it's cancelled.
|
||||||
|
ch, cancel := w.taskevents.Watch()
|
||||||
|
defer cancel()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case v := <-ch:
|
case v := <-ch:
|
||||||
w.mu.Lock()
|
|
||||||
task := v.(*api.Task)
|
task := v.(*api.Task)
|
||||||
if match(task) {
|
if match(task) {
|
||||||
|
w.mu.Lock()
|
||||||
go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher)
|
go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher)
|
||||||
|
w.mu.Unlock()
|
||||||
}
|
}
|
||||||
w.mu.Unlock()
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
25
vendor/github.com/docker/swarmkit/ca/certificates.go
generated
vendored
25
vendor/github.com/docker/swarmkit/ca/certificates.go
generated
vendored
|
@ -153,7 +153,7 @@ func (rca *RootCA) IssueAndSaveNewCertificates(kw KeyWriter, cn, ou, org string)
|
||||||
|
|
||||||
// RequestAndSaveNewCertificates gets new certificates issued, either by signing them locally if a signer is
|
// RequestAndSaveNewCertificates gets new certificates issued, either by signing them locally if a signer is
|
||||||
// available, or by requesting them from the remote server at remoteAddr.
|
// available, or by requesting them from the remote server at remoteAddr.
|
||||||
func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWriter, token string, r remotes.Remotes, transport credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) (*tls.Certificate, error) {
|
func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWriter, config CertificateRequestConfig) (*tls.Certificate, error) {
|
||||||
// Create a new key/pair and CSR
|
// Create a new key/pair and CSR
|
||||||
csr, key, err := GenerateNewCSR()
|
csr, key, err := GenerateNewCSR()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -165,7 +165,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
|
||||||
// responding properly (for example, it may have just been demoted).
|
// responding properly (for example, it may have just been demoted).
|
||||||
var signedCert []byte
|
var signedCert []byte
|
||||||
for i := 0; i != 5; i++ {
|
for i := 0; i != 5; i++ {
|
||||||
signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, r, transport, nodeInfo)
|
signedCert, err = GetRemoteSignedCertificate(ctx, csr, rca.Pool, config)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -202,7 +202,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
|
||||||
|
|
||||||
var kekUpdate *KEKData
|
var kekUpdate *KEKData
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, r)
|
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -545,18 +545,20 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) {
|
||||||
|
|
||||||
// GetRemoteSignedCertificate submits a CSR to a remote CA server address,
|
// GetRemoteSignedCertificate submits a CSR to a remote CA server address,
|
||||||
// and that is part of a CA identified by a specific certificate pool.
|
// and that is part of a CA identified by a specific certificate pool.
|
||||||
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) ([]byte, error) {
|
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x509.CertPool, config CertificateRequestConfig) ([]byte, error) {
|
||||||
if rootCAPool == nil {
|
if rootCAPool == nil {
|
||||||
return nil, errors.New("valid root CA pool required")
|
return nil, errors.New("valid root CA pool required")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
creds := config.Credentials
|
||||||
|
|
||||||
if creds == nil {
|
if creds == nil {
|
||||||
// This is our only non-MTLS request, and it happens when we are boostraping our TLS certs
|
// This is our only non-MTLS request, and it happens when we are boostraping our TLS certs
|
||||||
// We're using CARole as server name, so an external CA doesn't also have to have ManagerRole in the cert SANs
|
// We're using CARole as server name, so an external CA doesn't also have to have ManagerRole in the cert SANs
|
||||||
creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool})
|
creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool})
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, peer, err := getGRPCConnection(creds, r)
|
conn, peer, err := getGRPCConnection(creds, config.Remotes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -566,18 +568,13 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
||||||
caClient := api.NewNodeCAClient(conn)
|
caClient := api.NewNodeCAClient(conn)
|
||||||
|
|
||||||
// Send the Request and retrieve the request token
|
// Send the Request and retrieve the request token
|
||||||
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: token}
|
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token}
|
||||||
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest)
|
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Observe(peer, -remotes.DefaultObservationWeight)
|
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send back the NodeID on the nodeInfo, so the caller can know what ID was assigned by the CA
|
|
||||||
if nodeInfo != nil {
|
|
||||||
nodeInfo <- *issueResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
statusRequest := &api.NodeCertificateStatusRequest{NodeID: issueResponse.NodeID}
|
statusRequest := &api.NodeCertificateStatusRequest{NodeID: issueResponse.NodeID}
|
||||||
expBackoff := events.NewExponentialBackoff(events.ExponentialBackoffConfig{
|
expBackoff := events.NewExponentialBackoff(events.ExponentialBackoffConfig{
|
||||||
Base: time.Second,
|
Base: time.Second,
|
||||||
|
@ -592,7 +589,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
||||||
defer cancel()
|
defer cancel()
|
||||||
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
|
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Observe(peer, -remotes.DefaultObservationWeight)
|
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -608,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
||||||
// retry until the certificate gets updated per our
|
// retry until the certificate gets updated per our
|
||||||
// current request.
|
// current request.
|
||||||
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
|
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
|
||||||
r.Observe(peer, remotes.DefaultObservationWeight)
|
config.Remotes.Observe(peer, remotes.DefaultObservationWeight)
|
||||||
return statusResponse.Certificate.Certificate, nil
|
return statusResponse.Certificate.Certificate, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
253
vendor/github.com/docker/swarmkit/ca/config.go
generated
vendored
253
vendor/github.com/docker/swarmkit/ca/config.go
generated
vendored
|
@ -21,6 +21,7 @@ import (
|
||||||
"github.com/docker/swarmkit/log"
|
"github.com/docker/swarmkit/log"
|
||||||
"github.com/docker/swarmkit/remotes"
|
"github.com/docker/swarmkit/remotes"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
@ -52,8 +53,13 @@ const (
|
||||||
// SecurityConfig is used to represent a node's security configuration. It includes information about
|
// SecurityConfig is used to represent a node's security configuration. It includes information about
|
||||||
// the RootCA and ServerTLSCreds/ClientTLSCreds transport authenticators to be used for MTLS
|
// the RootCA and ServerTLSCreds/ClientTLSCreds transport authenticators to be used for MTLS
|
||||||
type SecurityConfig struct {
|
type SecurityConfig struct {
|
||||||
|
// mu protects against concurrent access to fields inside the structure.
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
|
// renewalMu makes sure only one certificate renewal attempt happens at
|
||||||
|
// a time. It should never be locked after mu is already locked.
|
||||||
|
renewalMu sync.Mutex
|
||||||
|
|
||||||
rootCA *RootCA
|
rootCA *RootCA
|
||||||
externalCA *ExternalCA
|
externalCA *ExternalCA
|
||||||
keyReadWriter *KeyReadWriter
|
keyReadWriter *KeyReadWriter
|
||||||
|
@ -234,89 +240,138 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote
|
||||||
return rootCA, nil
|
return rootCA, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadOrCreateSecurityConfig encapsulates the security logic behind joining a cluster.
|
// LoadSecurityConfig loads TLS credentials from disk, or returns an error if
|
||||||
// Every node requires at least a set of TLS certificates with which to join the cluster with.
|
// these credentials do not exist or are unusable.
|
||||||
// In the case of a manager, these certificates will be used both for client and server credentials.
|
func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter) (*SecurityConfig, error) {
|
||||||
func LoadOrCreateSecurityConfig(ctx context.Context, rootCA RootCA, token, proposedRole string, remotes remotes.Remotes, nodeInfo chan<- api.IssueNodeCertificateResponse, krw *KeyReadWriter) (*SecurityConfig, error) {
|
|
||||||
ctx = log.WithModule(ctx, "tls")
|
ctx = log.WithModule(ctx, "tls")
|
||||||
|
|
||||||
// At this point we've successfully loaded the CA details from disk, or
|
// At this point we've successfully loaded the CA details from disk, or
|
||||||
// successfully downloaded them remotely. The next step is to try to
|
// successfully downloaded them remotely. The next step is to try to
|
||||||
// load our certificates.
|
// load our certificates.
|
||||||
clientTLSCreds, serverTLSCreds, err := LoadTLSCreds(rootCA, krw)
|
|
||||||
|
// Read both the Cert and Key from disk
|
||||||
|
cert, key, err := krw.Read()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := errors.Cause(err).(ErrInvalidKEK); ok {
|
return nil, err
|
||||||
return nil, err
|
}
|
||||||
}
|
|
||||||
|
|
||||||
log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
|
// Create an x509 certificate out of the contents on disk
|
||||||
|
certBlock, _ := pem.Decode([]byte(cert))
|
||||||
|
if certBlock == nil {
|
||||||
|
return nil, errors.New("failed to parse certificate PEM")
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
// Create an X509Cert so we can .Verify()
|
||||||
tlsKeyPair *tls.Certificate
|
X509Cert, err := x509.ParseCertificate(certBlock.Bytes)
|
||||||
err error
|
if err != nil {
|
||||||
)
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if rootCA.CanSign() {
|
// Include our root pool
|
||||||
// Create a new random ID for this certificate
|
opts := x509.VerifyOptions{
|
||||||
cn := identity.NewID()
|
Roots: rootCA.Pool,
|
||||||
org := identity.NewID()
|
}
|
||||||
|
|
||||||
if nodeInfo != nil {
|
// Check to see if this certificate was signed by our CA, and isn't expired
|
||||||
nodeInfo <- api.IssueNodeCertificateResponse{
|
if _, err := X509Cert.Verify(opts); err != nil {
|
||||||
NodeID: cn,
|
return nil, err
|
||||||
NodeMembership: api.NodeMembershipAccepted,
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
tlsKeyPair, err = rootCA.IssueAndSaveNewCertificates(krw, cn, proposedRole, org)
|
|
||||||
if err != nil {
|
|
||||||
log.G(ctx).WithFields(logrus.Fields{
|
|
||||||
"node.id": cn,
|
|
||||||
"node.role": proposedRole,
|
|
||||||
}).WithError(err).Errorf("failed to issue and save new certificate")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Now that we know this certificate is valid, create a TLS Certificate for our
|
||||||
|
// credentials
|
||||||
|
keyPair, err := tls.X509KeyPair(cert, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load the Certificates as server credentials
|
||||||
|
serverTLSCreds, err := rootCA.NewServerTLSCredentials(&keyPair)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load the Certificates also as client credentials.
|
||||||
|
// Both workers and managers always connect to remote managers,
|
||||||
|
// so ServerName is always set to ManagerRole here.
|
||||||
|
clientTLSCreds, err := rootCA.NewClientTLSCredentials(&keyPair, ManagerRole)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.G(ctx).WithFields(logrus.Fields{
|
||||||
|
"node.id": clientTLSCreds.NodeID(),
|
||||||
|
"node.role": clientTLSCreds.Role(),
|
||||||
|
}).Debug("loaded node credentials")
|
||||||
|
|
||||||
|
return NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CertificateRequestConfig contains the information needed to request a
|
||||||
|
// certificate from a remote CA.
|
||||||
|
type CertificateRequestConfig struct {
|
||||||
|
// Token is the join token that authenticates us with the CA.
|
||||||
|
Token string
|
||||||
|
// Remotes is the set of remote CAs.
|
||||||
|
Remotes remotes.Remotes
|
||||||
|
// Credentials provides transport credentials for communicating with the
|
||||||
|
// remote server.
|
||||||
|
Credentials credentials.TransportCredentials
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateSecurityConfig creates a new key and cert for this node, either locally
|
||||||
|
// or via a remote CA.
|
||||||
|
func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWriter, config CertificateRequestConfig) (*SecurityConfig, error) {
|
||||||
|
ctx = log.WithModule(ctx, "tls")
|
||||||
|
|
||||||
|
var (
|
||||||
|
tlsKeyPair *tls.Certificate
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if rootCA.CanSign() {
|
||||||
|
// Create a new random ID for this certificate
|
||||||
|
cn := identity.NewID()
|
||||||
|
org := identity.NewID()
|
||||||
|
|
||||||
|
proposedRole := ManagerRole
|
||||||
|
tlsKeyPair, err = rootCA.IssueAndSaveNewCertificates(krw, cn, proposedRole, org)
|
||||||
|
if err != nil {
|
||||||
log.G(ctx).WithFields(logrus.Fields{
|
log.G(ctx).WithFields(logrus.Fields{
|
||||||
"node.id": cn,
|
"node.id": cn,
|
||||||
"node.role": proposedRole,
|
"node.role": proposedRole,
|
||||||
}).Debug("issued new TLS certificate")
|
}).WithError(err).Errorf("failed to issue and save new certificate")
|
||||||
} else {
|
|
||||||
// There was an error loading our Credentials, let's get a new certificate issued
|
|
||||||
// Last argument is nil because at this point we don't have any valid TLS creds
|
|
||||||
tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, krw, token, remotes, nil, nodeInfo)
|
|
||||||
if err != nil {
|
|
||||||
log.G(ctx).WithError(err).Error("failed to request save new certificate")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Create the Server TLS Credentials for this node. These will not be used by workers.
|
|
||||||
serverTLSCreds, err = rootCA.NewServerTLSCredentials(tlsKeyPair)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a TLSConfig to be used when this node connects as a client to another remote node.
|
log.G(ctx).WithFields(logrus.Fields{
|
||||||
// We're using ManagerRole as remote serverName for TLS host verification
|
"node.id": cn,
|
||||||
clientTLSCreds, err = rootCA.NewClientTLSCredentials(tlsKeyPair, ManagerRole)
|
"node.role": proposedRole,
|
||||||
|
}).Debug("issued new TLS certificate")
|
||||||
|
} else {
|
||||||
|
// Request certificate issuance from a remote CA.
|
||||||
|
// Last argument is nil because at this point we don't have any valid TLS creds
|
||||||
|
tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, krw, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.G(ctx).WithError(err).Error("failed to request save new certificate")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
log.G(ctx).WithFields(logrus.Fields{
|
|
||||||
"node.id": clientTLSCreds.NodeID(),
|
|
||||||
"node.role": clientTLSCreds.Role(),
|
|
||||||
}).Debugf("new node credentials generated: %s", krw.Target())
|
|
||||||
} else {
|
|
||||||
if nodeInfo != nil {
|
|
||||||
nodeInfo <- api.IssueNodeCertificateResponse{
|
|
||||||
NodeID: clientTLSCreds.NodeID(),
|
|
||||||
NodeMembership: api.NodeMembershipAccepted,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.G(ctx).WithFields(logrus.Fields{
|
|
||||||
"node.id": clientTLSCreds.NodeID(),
|
|
||||||
"node.role": clientTLSCreds.Role(),
|
|
||||||
}).Debug("loaded node credentials")
|
|
||||||
}
|
}
|
||||||
|
// Create the Server TLS Credentials for this node. These will not be used by workers.
|
||||||
|
serverTLSCreds, err := rootCA.NewServerTLSCredentials(tlsKeyPair)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a TLSConfig to be used when this node connects as a client to another remote node.
|
||||||
|
// We're using ManagerRole as remote serverName for TLS host verification
|
||||||
|
clientTLSCreds, err := rootCA.NewClientTLSCredentials(tlsKeyPair, ManagerRole)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
log.G(ctx).WithFields(logrus.Fields{
|
||||||
|
"node.id": clientTLSCreds.NodeID(),
|
||||||
|
"node.role": clientTLSCreds.Role(),
|
||||||
|
}).Debugf("new node credentials generated: %s", krw.Target())
|
||||||
|
|
||||||
return NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds), nil
|
return NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds), nil
|
||||||
}
|
}
|
||||||
|
@ -324,6 +379,9 @@ func LoadOrCreateSecurityConfig(ctx context.Context, rootCA RootCA, token, propo
|
||||||
// RenewTLSConfigNow gets a new TLS cert and key, and updates the security config if provided. This is similar to
|
// RenewTLSConfigNow gets a new TLS cert and key, and updates the security config if provided. This is similar to
|
||||||
// RenewTLSConfig, except while that monitors for expiry, and periodically renews, this renews once and is blocking
|
// RenewTLSConfig, except while that monitors for expiry, and periodically renews, this renews once and is blocking
|
||||||
func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes) error {
|
func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes) error {
|
||||||
|
s.renewalMu.Lock()
|
||||||
|
defer s.renewalMu.Unlock()
|
||||||
|
|
||||||
ctx = log.WithModule(ctx, "tls")
|
ctx = log.WithModule(ctx, "tls")
|
||||||
log := log.G(ctx).WithFields(logrus.Fields{
|
log := log.G(ctx).WithFields(logrus.Fields{
|
||||||
"node.id": s.ClientTLSCreds.NodeID(),
|
"node.id": s.ClientTLSCreds.NodeID(),
|
||||||
|
@ -334,10 +392,10 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes
|
||||||
rootCA := s.RootCA()
|
rootCA := s.RootCA()
|
||||||
tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx,
|
tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx,
|
||||||
s.KeyWriter(),
|
s.KeyWriter(),
|
||||||
"",
|
CertificateRequestConfig{
|
||||||
r,
|
Remotes: r,
|
||||||
s.ClientTLSCreds,
|
Credentials: s.ClientTLSCreds,
|
||||||
nil)
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("failed to renew the certificate")
|
log.WithError(err).Errorf("failed to renew the certificate")
|
||||||
return err
|
return err
|
||||||
|
@ -463,61 +521,6 @@ func calculateRandomExpiry(validFrom, validUntil time.Time) time.Duration {
|
||||||
return expiry
|
return expiry
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadTLSCreds loads tls credentials from the specified path and verifies that
|
|
||||||
// thay are valid for the RootCA.
|
|
||||||
func LoadTLSCreds(rootCA RootCA, kr KeyReader) (*MutableTLSCreds, *MutableTLSCreds, error) {
|
|
||||||
// Read both the Cert and Key from disk
|
|
||||||
cert, key, err := kr.Read()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create an x509 certificate out of the contents on disk
|
|
||||||
certBlock, _ := pem.Decode([]byte(cert))
|
|
||||||
if certBlock == nil {
|
|
||||||
return nil, nil, errors.New("failed to parse certificate PEM")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create an X509Cert so we can .Verify()
|
|
||||||
X509Cert, err := x509.ParseCertificate(certBlock.Bytes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Include our root pool
|
|
||||||
opts := x509.VerifyOptions{
|
|
||||||
Roots: rootCA.Pool,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check to see if this certificate was signed by our CA, and isn't expired
|
|
||||||
if _, err := X509Cert.Verify(opts); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now that we know this certificate is valid, create a TLS Certificate for our
|
|
||||||
// credentials
|
|
||||||
keyPair, err := tls.X509KeyPair(cert, key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load the Certificates as server credentials
|
|
||||||
serverTLSCreds, err := rootCA.NewServerTLSCredentials(&keyPair)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load the Certificates also as client credentials.
|
|
||||||
// Both workers and managers always connect to remote managers,
|
|
||||||
// so ServerName is always set to ManagerRole here.
|
|
||||||
clientTLSCreds, err := rootCA.NewClientTLSCredentials(&keyPair, ManagerRole)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return clientTLSCreds, serverTLSCreds, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewServerTLSConfig returns a tls.Config configured for a TLS Server, given a tls.Certificate
|
// NewServerTLSConfig returns a tls.Config configured for a TLS Server, given a tls.Certificate
|
||||||
// and the PEM-encoded root CA Certificate
|
// and the PEM-encoded root CA Certificate
|
||||||
func NewServerTLSConfig(cert *tls.Certificate, rootCAPool *x509.CertPool) (*tls.Config, error) {
|
func NewServerTLSConfig(cert *tls.Certificate, rootCAPool *x509.CertPool) (*tls.Config, error) {
|
||||||
|
@ -554,8 +557,8 @@ func NewClientTLSConfig(cert *tls.Certificate, rootCAPool *x509.CertPool, server
|
||||||
|
|
||||||
// NewClientTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate
|
// NewClientTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate
|
||||||
// a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to.
|
// a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to.
|
||||||
func (rca *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName string) (*MutableTLSCreds, error) {
|
func (rootCA *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName string) (*MutableTLSCreds, error) {
|
||||||
tlsConfig, err := NewClientTLSConfig(cert, rca.Pool, serverName)
|
tlsConfig, err := NewClientTLSConfig(cert, rootCA.Pool, serverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -567,8 +570,8 @@ func (rca *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName str
|
||||||
|
|
||||||
// NewServerTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate
|
// NewServerTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate
|
||||||
// a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to.
|
// a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to.
|
||||||
func (rca *RootCA) NewServerTLSCredentials(cert *tls.Certificate) (*MutableTLSCreds, error) {
|
func (rootCA *RootCA) NewServerTLSCredentials(cert *tls.Certificate) (*MutableTLSCreds, error) {
|
||||||
tlsConfig, err := NewServerTLSConfig(cert, rca.Pool)
|
tlsConfig, err := NewServerTLSConfig(cert, rootCA.Pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
29
vendor/github.com/docker/swarmkit/ca/external.go
generated
vendored
29
vendor/github.com/docker/swarmkit/ca/external.go
generated
vendored
|
@ -12,6 +12,8 @@ import (
|
||||||
"github.com/cloudflare/cfssl/api"
|
"github.com/cloudflare/cfssl/api"
|
||||||
"github.com/cloudflare/cfssl/signer"
|
"github.com/cloudflare/cfssl/signer"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"golang.org/x/net/context/ctxhttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrNoExternalCAURLs is an error used it indicate that an ExternalCA is
|
// ErrNoExternalCAURLs is an error used it indicate that an ExternalCA is
|
||||||
|
@ -65,7 +67,7 @@ func (eca *ExternalCA) UpdateURLs(urls ...string) {
|
||||||
|
|
||||||
// Sign signs a new certificate by proxying the given certificate signing
|
// Sign signs a new certificate by proxying the given certificate signing
|
||||||
// request to an external CFSSL API server.
|
// request to an external CFSSL API server.
|
||||||
func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
|
func (eca *ExternalCA) Sign(ctx context.Context, req signer.SignRequest) (cert []byte, err error) {
|
||||||
// Get the current HTTP client and list of URLs in a small critical
|
// Get the current HTTP client and list of URLs in a small critical
|
||||||
// section. We will use these to make certificate signing requests.
|
// section. We will use these to make certificate signing requests.
|
||||||
eca.mu.Lock()
|
eca.mu.Lock()
|
||||||
|
@ -85,7 +87,7 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
|
||||||
// Try each configured proxy URL. Return after the first success. If
|
// Try each configured proxy URL. Return after the first success. If
|
||||||
// all fail then the last error will be returned.
|
// all fail then the last error will be returned.
|
||||||
for _, url := range urls {
|
for _, url := range urls {
|
||||||
cert, err = makeExternalSignRequest(client, url, csrJSON)
|
cert, err = makeExternalSignRequest(ctx, client, url, csrJSON)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return eca.rootCA.AppendFirstRootPEM(cert)
|
return eca.rootCA.AppendFirstRootPEM(cert)
|
||||||
}
|
}
|
||||||
|
@ -96,14 +98,31 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeExternalSignRequest(client *http.Client, url string, csrJSON []byte) (cert []byte, err error) {
|
func makeExternalSignRequest(ctx context.Context, client *http.Client, url string, csrJSON []byte) (cert []byte, err error) {
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewReader(csrJSON))
|
resp, err := ctxhttp.Post(ctx, client, url, "application/json", bytes.NewReader(csrJSON))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")}
|
return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")}
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
|
doneReading := make(chan struct{})
|
||||||
|
bodyClosed := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-doneReading:
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
close(bodyClosed)
|
||||||
|
}()
|
||||||
|
|
||||||
body, err := ioutil.ReadAll(resp.Body)
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
close(doneReading)
|
||||||
|
<-bodyClosed
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")}
|
return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")}
|
||||||
}
|
}
|
||||||
|
|
2
vendor/github.com/docker/swarmkit/ca/server.go
generated
vendored
2
vendor/github.com/docker/swarmkit/ca/server.go
generated
vendored
|
@ -617,7 +617,7 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Try using the external CA first.
|
// Try using the external CA first.
|
||||||
cert, err := externalCA.Sign(PrepareCSR(rawCSR, cn, ou, org))
|
cert, err := externalCA.Sign(ctx, PrepareCSR(rawCSR, cn, ou, org))
|
||||||
if err == ErrNoExternalCAURLs {
|
if err == ErrNoExternalCAURLs {
|
||||||
// No external CA servers configured. Try using the local CA.
|
// No external CA servers configured. Try using the local CA.
|
||||||
cert, err = rootCA.ParseValidateAndSignCSR(rawCSR, cn, ou, org)
|
cert, err = rootCA.ParseValidateAndSignCSR(rawCSR, cn, ou, org)
|
||||||
|
|
105
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
105
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
|
@ -2,6 +2,7 @@ package logbroker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -24,6 +25,12 @@ var (
|
||||||
errNotRunning = errors.New("broker is not running")
|
errNotRunning = errors.New("broker is not running")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type logMessage struct {
|
||||||
|
*api.PublishLogsMessage
|
||||||
|
completed bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
// LogBroker coordinates log subscriptions to services and tasks. Clients can
|
// LogBroker coordinates log subscriptions to services and tasks. Clients can
|
||||||
// publish and subscribe to logs channels.
|
// publish and subscribe to logs channels.
|
||||||
//
|
//
|
||||||
|
@ -35,6 +42,7 @@ type LogBroker struct {
|
||||||
subscriptionQueue *watch.Queue
|
subscriptionQueue *watch.Queue
|
||||||
|
|
||||||
registeredSubscriptions map[string]*subscription
|
registeredSubscriptions map[string]*subscription
|
||||||
|
connectedNodes map[string]struct{}
|
||||||
|
|
||||||
pctx context.Context
|
pctx context.Context
|
||||||
cancelAll context.CancelFunc
|
cancelAll context.CancelFunc
|
||||||
|
@ -62,6 +70,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
|
||||||
lb.logQueue = watch.NewQueue()
|
lb.logQueue = watch.NewQueue()
|
||||||
lb.subscriptionQueue = watch.NewQueue()
|
lb.subscriptionQueue = watch.NewQueue()
|
||||||
lb.registeredSubscriptions = make(map[string]*subscription)
|
lb.registeredSubscriptions = make(map[string]*subscription)
|
||||||
|
lb.connectedNodes = make(map[string]struct{})
|
||||||
lb.mu.Unlock()
|
lb.mu.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -112,12 +121,30 @@ func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.Log
|
||||||
return subscription
|
return subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (lb *LogBroker) getSubscription(id string) *subscription {
|
||||||
|
lb.mu.RLock()
|
||||||
|
defer lb.mu.RUnlock()
|
||||||
|
|
||||||
|
subscription, ok := lb.registeredSubscriptions[id]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return subscription
|
||||||
|
}
|
||||||
|
|
||||||
func (lb *LogBroker) registerSubscription(subscription *subscription) {
|
func (lb *LogBroker) registerSubscription(subscription *subscription) {
|
||||||
lb.mu.Lock()
|
lb.mu.Lock()
|
||||||
defer lb.mu.Unlock()
|
defer lb.mu.Unlock()
|
||||||
|
|
||||||
lb.registeredSubscriptions[subscription.message.ID] = subscription
|
lb.registeredSubscriptions[subscription.message.ID] = subscription
|
||||||
lb.subscriptionQueue.Publish(subscription)
|
lb.subscriptionQueue.Publish(subscription)
|
||||||
|
|
||||||
|
// Mark nodes that won't receive the message as done.
|
||||||
|
for _, node := range subscription.Nodes() {
|
||||||
|
if _, ok := lb.connectedNodes[node]; !ok {
|
||||||
|
subscription.Done(node, fmt.Errorf("node %s is not available", node))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
|
func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
|
||||||
|
@ -160,7 +187,7 @@ func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
|
||||||
defer lb.mu.RUnlock()
|
defer lb.mu.RUnlock()
|
||||||
|
|
||||||
return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
|
return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
|
||||||
publish := event.(*api.PublishLogsMessage)
|
publish := event.(*logMessage)
|
||||||
return publish.SubscriptionID == id
|
return publish.SubscriptionID == id
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -169,7 +196,7 @@ func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
|
||||||
lb.mu.RLock()
|
lb.mu.RLock()
|
||||||
defer lb.mu.RUnlock()
|
defer lb.mu.RUnlock()
|
||||||
|
|
||||||
lb.logQueue.Publish(log)
|
lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeLogs creates a log subscription and streams back logs
|
// SubscribeLogs creates a log subscription and streams back logs
|
||||||
|
@ -190,7 +217,6 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
|
||||||
"subscription.id": subscription.message.ID,
|
"subscription.id": subscription.message.ID,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
log.Debug("subscribed")
|
log.Debug("subscribed")
|
||||||
|
|
||||||
publishCh, publishCancel := lb.subscribe(subscription.message.ID)
|
publishCh, publishCancel := lb.subscribe(subscription.message.ID)
|
||||||
|
@ -199,23 +225,50 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
|
||||||
lb.registerSubscription(subscription)
|
lb.registerSubscription(subscription)
|
||||||
defer lb.unregisterSubscription(subscription)
|
defer lb.unregisterSubscription(subscription)
|
||||||
|
|
||||||
|
completed := subscription.Wait(ctx)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-lb.pctx.Done():
|
||||||
|
return lb.pctx.Err()
|
||||||
case event := <-publishCh:
|
case event := <-publishCh:
|
||||||
publish := event.(*api.PublishLogsMessage)
|
publish := event.(*logMessage)
|
||||||
|
if publish.completed {
|
||||||
|
return publish.err
|
||||||
|
}
|
||||||
if err := stream.Send(&api.SubscribeLogsMessage{
|
if err := stream.Send(&api.SubscribeLogsMessage{
|
||||||
Messages: publish.Messages,
|
Messages: publish.Messages,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-completed:
|
||||||
return ctx.Err()
|
completed = nil
|
||||||
case <-lb.pctx.Done():
|
lb.logQueue.Publish(&logMessage{
|
||||||
return nil
|
PublishLogsMessage: &api.PublishLogsMessage{
|
||||||
|
SubscriptionID: subscription.message.ID,
|
||||||
|
},
|
||||||
|
completed: true,
|
||||||
|
err: subscription.Err(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (lb *LogBroker) nodeConnected(nodeID string) {
|
||||||
|
lb.mu.Lock()
|
||||||
|
defer lb.mu.Unlock()
|
||||||
|
|
||||||
|
lb.connectedNodes[nodeID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lb *LogBroker) nodeDisconnected(nodeID string) {
|
||||||
|
lb.mu.Lock()
|
||||||
|
defer lb.mu.Unlock()
|
||||||
|
|
||||||
|
delete(lb.connectedNodes, nodeID)
|
||||||
|
}
|
||||||
|
|
||||||
// ListenSubscriptions returns a stream of matching subscriptions for the current node
|
// ListenSubscriptions returns a stream of matching subscriptions for the current node
|
||||||
func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
|
func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
|
||||||
remote, err := ca.RemoteNode(stream.Context())
|
remote, err := ca.RemoteNode(stream.Context())
|
||||||
|
@ -223,6 +276,9 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lb.nodeConnected(remote.NodeID)
|
||||||
|
defer lb.nodeDisconnected(remote.NodeID)
|
||||||
|
|
||||||
log := log.G(stream.Context()).WithFields(
|
log := log.G(stream.Context()).WithFields(
|
||||||
logrus.Fields{
|
logrus.Fields{
|
||||||
"method": "(*LogBroker).ListenSubscriptions",
|
"method": "(*LogBroker).ListenSubscriptions",
|
||||||
|
@ -234,7 +290,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
||||||
|
|
||||||
log.Debug("node registered")
|
log.Debug("node registered")
|
||||||
|
|
||||||
activeSubscriptions := make(map[string]struct{})
|
activeSubscriptions := make(map[string]*subscription)
|
||||||
|
defer func() {
|
||||||
|
// If the worker quits, mark all active subscriptions as finished.
|
||||||
|
for _, subscription := range activeSubscriptions {
|
||||||
|
subscription.Done(remote.NodeID, fmt.Errorf("node %s disconnected unexpectedly", remote.NodeID))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Start by sending down all active subscriptions.
|
// Start by sending down all active subscriptions.
|
||||||
for _, subscription := range subscriptions {
|
for _, subscription := range subscriptions {
|
||||||
|
@ -250,7 +312,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
activeSubscriptions[subscription.message.ID] = struct{}{}
|
activeSubscriptions[subscription.message.ID] = subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send down new subscriptions.
|
// Send down new subscriptions.
|
||||||
|
@ -261,12 +323,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
||||||
|
|
||||||
if subscription.message.Close {
|
if subscription.message.Close {
|
||||||
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
||||||
|
delete(activeSubscriptions, subscription.message.ID)
|
||||||
} else {
|
} else {
|
||||||
// Avoid sending down the same subscription multiple times
|
// Avoid sending down the same subscription multiple times
|
||||||
if _, ok := activeSubscriptions[subscription.message.ID]; ok {
|
if _, ok := activeSubscriptions[subscription.message.ID]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
activeSubscriptions[subscription.message.ID] = struct{}{}
|
activeSubscriptions[subscription.message.ID] = subscription
|
||||||
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
|
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
|
||||||
}
|
}
|
||||||
if err := stream.Send(subscription.message); err != nil {
|
if err := stream.Send(subscription.message); err != nil {
|
||||||
|
@ -282,12 +345,19 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishLogs publishes log messages for a given subscription
|
// PublishLogs publishes log messages for a given subscription
|
||||||
func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
|
func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err error) {
|
||||||
remote, err := ca.RemoteNode(stream.Context())
|
remote, err := ca.RemoteNode(stream.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var currentSubscription *subscription
|
||||||
|
defer func() {
|
||||||
|
if currentSubscription != nil {
|
||||||
|
currentSubscription.Done(remote.NodeID, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
log, err := stream.Recv()
|
log, err := stream.Recv()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
@ -301,6 +371,17 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
|
||||||
return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
|
return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if currentSubscription == nil {
|
||||||
|
currentSubscription = lb.getSubscription(log.SubscriptionID)
|
||||||
|
if currentSubscription == nil {
|
||||||
|
return grpc.Errorf(codes.NotFound, "unknown subscription ID")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if log.SubscriptionID != currentSubscription.message.ID {
|
||||||
|
return grpc.Errorf(codes.InvalidArgument, "different subscription IDs in the same session")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure logs are emitted using the right Node ID to avoid impersonation.
|
// Make sure logs are emitted using the right Node ID to avoid impersonation.
|
||||||
for _, msg := range log.Messages {
|
for _, msg := range log.Messages {
|
||||||
if msg.Context.NodeID != remote.NodeID {
|
if msg.Context.NodeID != remote.NodeID {
|
||||||
|
|
130
vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go
generated
vendored
130
vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go
generated
vendored
|
@ -2,6 +2,8 @@ package logbroker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
events "github.com/docker/go-events"
|
events "github.com/docker/go-events"
|
||||||
|
@ -14,6 +16,7 @@ import (
|
||||||
|
|
||||||
type subscription struct {
|
type subscription struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
store *store.MemoryStore
|
store *store.MemoryStore
|
||||||
message *api.SubscriptionMessage
|
message *api.SubscriptionMessage
|
||||||
|
@ -22,18 +25,25 @@ type subscription struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
nodes map[string]struct{}
|
errors []error
|
||||||
|
nodes map[string]struct{}
|
||||||
|
pendingTasks map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription {
|
func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription {
|
||||||
return &subscription{
|
return &subscription{
|
||||||
store: store,
|
store: store,
|
||||||
message: message,
|
message: message,
|
||||||
changed: changed,
|
changed: changed,
|
||||||
nodes: make(map[string]struct{}),
|
nodes: make(map[string]struct{}),
|
||||||
|
pendingTasks: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *subscription) follow() bool {
|
||||||
|
return s.message.Options != nil && s.message.Options.Follow
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subscription) Contains(nodeID string) bool {
|
func (s *subscription) Contains(nodeID string) bool {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
@ -42,15 +52,28 @@ func (s *subscription) Contains(nodeID string) bool {
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *subscription) Nodes() []string {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
nodes := make([]string, 0, len(s.nodes))
|
||||||
|
for node := range s.nodes {
|
||||||
|
nodes = append(nodes, node)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subscription) Run(ctx context.Context) {
|
func (s *subscription) Run(ctx context.Context) {
|
||||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
wq := s.store.WatchQueue()
|
if s.follow() {
|
||||||
ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{})
|
wq := s.store.WatchQueue()
|
||||||
go func() {
|
ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{})
|
||||||
defer cancel()
|
go func() {
|
||||||
s.watch(ch)
|
defer cancel()
|
||||||
}()
|
s.watch(ch)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
s.match()
|
s.match()
|
||||||
}
|
}
|
||||||
|
@ -61,10 +84,74 @@ func (s *subscription) Stop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *subscription) Wait(ctx context.Context) <-chan struct{} {
|
||||||
|
// Follow subscriptions never end
|
||||||
|
if s.follow() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(ch)
|
||||||
|
s.wg.Wait()
|
||||||
|
}()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscription) Done(nodeID string, err error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
s.errors = append(s.errors, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.follow() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := s.nodes[nodeID]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(s.nodes, nodeID)
|
||||||
|
s.wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscription) Err() error {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
if len(s.errors) == 0 && len(s.pendingTasks) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
messages := make([]string, 0, len(s.errors))
|
||||||
|
for _, err := range s.errors {
|
||||||
|
messages = append(messages, err.Error())
|
||||||
|
}
|
||||||
|
for t := range s.pendingTasks {
|
||||||
|
messages = append(messages, fmt.Sprintf("task %s has not been scheduled", t))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", "))
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subscription) match() {
|
func (s *subscription) match() {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
add := func(t *api.Task) {
|
||||||
|
if t.NodeID == "" {
|
||||||
|
s.pendingTasks[t.ID] = struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, ok := s.nodes[t.NodeID]; !ok {
|
||||||
|
s.nodes[t.NodeID] = struct{}{}
|
||||||
|
s.wg.Add(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
s.store.View(func(tx store.ReadTx) {
|
s.store.View(func(tx store.ReadTx) {
|
||||||
for _, nid := range s.message.Selector.NodeIDs {
|
for _, nid := range s.message.Selector.NodeIDs {
|
||||||
s.nodes[nid] = struct{}{}
|
s.nodes[nid] = struct{}{}
|
||||||
|
@ -72,7 +159,7 @@ func (s *subscription) match() {
|
||||||
|
|
||||||
for _, tid := range s.message.Selector.TaskIDs {
|
for _, tid := range s.message.Selector.TaskIDs {
|
||||||
if task := store.GetTask(tx, tid); task != nil {
|
if task := store.GetTask(tx, tid); task != nil {
|
||||||
s.nodes[task.NodeID] = struct{}{}
|
add(task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +170,7 @@ func (s *subscription) match() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
s.nodes[task.NodeID] = struct{}{}
|
add(task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -100,12 +187,19 @@ func (s *subscription) watch(ch <-chan events.Event) error {
|
||||||
matchServices[sid] = struct{}{}
|
matchServices[sid] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
add := func(nodeID string) {
|
add := func(t *api.Task) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
if _, ok := s.nodes[nodeID]; !ok {
|
// Un-allocated task.
|
||||||
s.nodes[nodeID] = struct{}{}
|
if t.NodeID == "" {
|
||||||
|
s.pendingTasks[t.ID] = struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(s.pendingTasks, t.ID)
|
||||||
|
if _, ok := s.nodes[t.NodeID]; !ok {
|
||||||
|
s.nodes[t.NodeID] = struct{}{}
|
||||||
s.changed.Publish(s)
|
s.changed.Publish(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,10 +223,10 @@ func (s *subscription) watch(ch <-chan events.Event) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := matchTasks[t.ID]; ok {
|
if _, ok := matchTasks[t.ID]; ok {
|
||||||
add(t.NodeID)
|
add(t)
|
||||||
}
|
}
|
||||||
if _, ok := matchServices[t.ServiceID]; ok {
|
if _, ok := matchServices[t.ServiceID]; ok {
|
||||||
add(t.NodeID)
|
add(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
11
vendor/github.com/docker/swarmkit/manager/manager.go
generated
vendored
11
vendor/github.com/docker/swarmkit/manager/manager.go
generated
vendored
|
@ -385,14 +385,10 @@ func (m *Manager) Run(parent context.Context) error {
|
||||||
|
|
||||||
close(m.started)
|
close(m.started)
|
||||||
|
|
||||||
watchDone := make(chan struct{})
|
|
||||||
watchCtx, watchCtxCancel := context.WithCancel(parent)
|
|
||||||
go func() {
|
go func() {
|
||||||
err := m.raftNode.Run(ctx)
|
err := m.raftNode.Run(ctx)
|
||||||
watchCtxCancel()
|
|
||||||
<-watchDone
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).Error(err)
|
log.G(ctx).WithError(err).Error("raft node stopped")
|
||||||
m.Stop(ctx)
|
m.Stop(ctx)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -407,7 +403,7 @@ func (m *Manager) Run(parent context.Context) error {
|
||||||
}
|
}
|
||||||
raftConfig := c.Spec.Raft
|
raftConfig := c.Spec.Raft
|
||||||
|
|
||||||
if err := m.watchForKEKChanges(watchCtx, watchDone); err != nil {
|
if err := m.watchForKEKChanges(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -544,8 +540,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) watchForKEKChanges(ctx context.Context, watchDone chan struct{}) error {
|
func (m *Manager) watchForKEKChanges(ctx context.Context) error {
|
||||||
defer close(watchDone)
|
|
||||||
clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
|
clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
|
||||||
clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
|
clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
|
||||||
func(tx store.ReadTx) error {
|
func(tx store.ReadTx) error {
|
||||||
|
|
11
vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go
generated
vendored
11
vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go
generated
vendored
|
@ -240,6 +240,8 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
updates := make(map[*api.Service][]orchestrator.Slot)
|
||||||
|
|
||||||
_, err := g.store.Batch(func(batch *store.Batch) error {
|
_, err := g.store.Batch(func(batch *store.Batch) error {
|
||||||
var updateTasks []orchestrator.Slot
|
var updateTasks []orchestrator.Slot
|
||||||
for _, serviceID := range serviceIDs {
|
for _, serviceID := range serviceIDs {
|
||||||
|
@ -274,8 +276,9 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
|
||||||
updateTasks = append(updateTasks, ntasks)
|
updateTasks = append(updateTasks, ntasks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(updateTasks) > 0 {
|
if len(updateTasks) > 0 {
|
||||||
g.updater.Update(ctx, g.cluster, service.Service, updateTasks)
|
updates[service.Service] = updateTasks
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove any tasks assigned to nodes not found in g.nodes.
|
// Remove any tasks assigned to nodes not found in g.nodes.
|
||||||
|
@ -287,9 +290,15 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed")
|
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for service, updateTasks := range updates {
|
||||||
|
g.updater.Update(ctx, g.cluster, service, updateTasks)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateNode updates g.nodes based on the current node value
|
// updateNode updates g.nodes based on the current node value
|
||||||
|
|
139
vendor/github.com/docker/swarmkit/node/node.go
generated
vendored
139
vendor/github.com/docker/swarmkit/node/node.go
generated
vendored
|
@ -100,26 +100,24 @@ type Config struct {
|
||||||
// cluster. Node handles workloads and may also run as a manager.
|
// cluster. Node handles workloads and may also run as a manager.
|
||||||
type Node struct {
|
type Node struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
config *Config
|
config *Config
|
||||||
remotes *persistentRemotes
|
remotes *persistentRemotes
|
||||||
role string
|
role string
|
||||||
roleCond *sync.Cond
|
roleCond *sync.Cond
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
connCond *sync.Cond
|
connCond *sync.Cond
|
||||||
nodeID string
|
nodeID string
|
||||||
nodeMembership api.NodeSpec_Membership
|
started chan struct{}
|
||||||
started chan struct{}
|
startOnce sync.Once
|
||||||
startOnce sync.Once
|
stopped chan struct{}
|
||||||
stopped chan struct{}
|
stopOnce sync.Once
|
||||||
stopOnce sync.Once
|
ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
|
||||||
ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
|
closed chan struct{}
|
||||||
certificateRequested chan struct{} // closed when certificate issue request has been sent by node
|
err error
|
||||||
closed chan struct{}
|
agent *agent.Agent
|
||||||
err error
|
manager *manager.Manager
|
||||||
agent *agent.Agent
|
notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion
|
||||||
manager *manager.Manager
|
unlockKey []byte
|
||||||
notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion
|
|
||||||
unlockKey []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteAPIAddr returns address on which remote manager api listens.
|
// RemoteAPIAddr returns address on which remote manager api listens.
|
||||||
|
@ -155,16 +153,15 @@ func New(c *Config) (*Node, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
n := &Node{
|
n := &Node{
|
||||||
remotes: newPersistentRemotes(stateFile, p...),
|
remotes: newPersistentRemotes(stateFile, p...),
|
||||||
role: ca.WorkerRole,
|
role: ca.WorkerRole,
|
||||||
config: c,
|
config: c,
|
||||||
started: make(chan struct{}),
|
started: make(chan struct{}),
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
certificateRequested: make(chan struct{}),
|
notifyNodeChange: make(chan *api.Node, 1),
|
||||||
notifyNodeChange: make(chan *api.Node, 1),
|
unlockKey: c.UnlockKey,
|
||||||
unlockKey: c.UnlockKey,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
|
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
|
||||||
|
@ -403,13 +400,6 @@ func (n *Node) Ready() <-chan struct{} {
|
||||||
return n.ready
|
return n.ready
|
||||||
}
|
}
|
||||||
|
|
||||||
// CertificateRequested returns a channel that is closed after node has
|
|
||||||
// requested a certificate. After this call a caller can expect calls to
|
|
||||||
// NodeID() and `NodeMembership()` to succeed.
|
|
||||||
func (n *Node) CertificateRequested() <-chan struct{} {
|
|
||||||
return n.certificateRequested
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) setControlSocket(conn *grpc.ClientConn) {
|
func (n *Node) setControlSocket(conn *grpc.ClientConn) {
|
||||||
n.Lock()
|
n.Lock()
|
||||||
if n.conn != nil {
|
if n.conn != nil {
|
||||||
|
@ -461,13 +451,6 @@ func (n *Node) NodeID() string {
|
||||||
return n.nodeID
|
return n.nodeID
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeMembership returns current node's membership. May be empty if not set.
|
|
||||||
func (n *Node) NodeMembership() api.NodeSpec_Membership {
|
|
||||||
n.RLock()
|
|
||||||
defer n.RUnlock()
|
|
||||||
return n.nodeMembership
|
|
||||||
}
|
|
||||||
|
|
||||||
// Manager returns manager instance started by node. May be nil.
|
// Manager returns manager instance started by node. May be nil.
|
||||||
func (n *Node) Manager() *manager.Manager {
|
func (n *Node) Manager() *manager.Manager {
|
||||||
n.RLock()
|
n.RLock()
|
||||||
|
@ -507,18 +490,14 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
clientTLSCreds, serverTLSCreds, err := ca.LoadTLSCreds(rootCA, krw)
|
securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw)
|
||||||
_, ok := errors.Cause(err).(ca.ErrInvalidKEK)
|
if err != nil {
|
||||||
switch {
|
_, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK)
|
||||||
case err == nil:
|
if isInvalidKEK {
|
||||||
securityConfig = ca.NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds)
|
return nil, ErrInvalidUnlockKey
|
||||||
log.G(ctx).Debug("loaded CA and TLS certificates")
|
} else if !os.IsNotExist(err) {
|
||||||
case ok:
|
return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
|
||||||
return nil, ErrInvalidUnlockKey
|
}
|
||||||
case os.IsNotExist(err):
|
|
||||||
break
|
|
||||||
default:
|
|
||||||
return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -544,44 +523,36 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// Obtain new certs and setup TLS certificates renewal for this node:
|
// Obtain new certs and setup TLS certificates renewal for this node:
|
||||||
// - We call LoadOrCreateSecurityConfig which blocks until a valid certificate has been issued
|
// - If certificates weren't present on disk, we call CreateSecurityConfig, which blocks
|
||||||
// - We retrieve the nodeID from LoadOrCreateSecurityConfig through the info channel. This allows
|
// until a valid certificate has been issued.
|
||||||
// us to display the ID before the certificate gets issued (for potential approval).
|
// - We wait for CreateSecurityConfig to finish since we need a certificate to operate.
|
||||||
// - We wait for LoadOrCreateSecurityConfig to finish since we need a certificate to operate.
|
|
||||||
// - Given a valid certificate, spin a renewal go-routine that will ensure that certificates stay
|
|
||||||
// up to date.
|
|
||||||
issueResponseChan := make(chan api.IssueNodeCertificateResponse, 1)
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case resp := <-issueResponseChan:
|
|
||||||
log.G(log.WithModule(ctx, "tls")).WithFields(logrus.Fields{
|
|
||||||
"node.id": resp.NodeID,
|
|
||||||
}).Debugf("loaded TLS certificate")
|
|
||||||
n.Lock()
|
|
||||||
n.nodeID = resp.NodeID
|
|
||||||
n.nodeMembership = resp.NodeMembership
|
|
||||||
n.Unlock()
|
|
||||||
close(n.certificateRequested)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// LoadOrCreateSecurityConfig is the point at which a new node joining a cluster will retrieve TLS
|
// Attempt to load certificate from disk
|
||||||
// certificates and write them to disk
|
securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw)
|
||||||
securityConfig, err = ca.LoadOrCreateSecurityConfig(
|
if err == nil {
|
||||||
ctx, rootCA, n.config.JoinToken, ca.ManagerRole, n.remotes, issueResponseChan, krw)
|
log.G(ctx).WithFields(logrus.Fields{
|
||||||
if err != nil {
|
"node.id": securityConfig.ClientTLSCreds.NodeID(),
|
||||||
|
}).Debugf("loaded TLS certificate")
|
||||||
|
} else {
|
||||||
if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
|
if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
|
||||||
return nil, ErrInvalidUnlockKey
|
return nil, ErrInvalidUnlockKey
|
||||||
}
|
}
|
||||||
return nil, err
|
log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
|
||||||
|
|
||||||
|
securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{
|
||||||
|
Token: n.config.JoinToken,
|
||||||
|
Remotes: n.remotes,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n.Lock()
|
n.Lock()
|
||||||
n.role = securityConfig.ClientTLSCreds.Role()
|
n.role = securityConfig.ClientTLSCreds.Role()
|
||||||
n.nodeID = securityConfig.ClientTLSCreds.NodeID()
|
n.nodeID = securityConfig.ClientTLSCreds.NodeID()
|
||||||
n.nodeMembership = api.NodeMembershipAccepted
|
|
||||||
n.roleCond.Broadcast()
|
n.roleCond.Broadcast()
|
||||||
n.Unlock()
|
n.Unlock()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue