diff --git a/vendor.conf b/vendor.conf index 8992f6cf7d..8d94aff986 100644 --- a/vendor.conf +++ b/vendor.conf @@ -103,7 +103,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit c97146840a26c9ce8023284d0e9c989586cc1857 +github.com/docker/swarmkit 62d835f478b2e4fd2768deb88fb3b32e334faaee github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/agent/agent.go b/vendor/github.com/docker/swarmkit/agent/agent.go index 434bdd75f9..d8cc595474 100644 --- a/vendor/github.com/docker/swarmkit/agent/agent.go +++ b/vendor/github.com/docker/swarmkit/agent/agent.go @@ -336,12 +336,10 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe seen := map[api.Peer]struct{}{} for _, manager := range message.Managers { if manager.Peer.Addr == "" { - log.G(ctx).WithField("manager.addr", manager.Peer.Addr). - Warnf("skipping bad manager address") continue } - a.config.Managers.Observe(*manager.Peer, int(manager.Weight)) + a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight)) seen[*manager.Peer] = struct{}{} } @@ -358,9 +356,9 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe } // prune managers not in list. - for peer := range a.config.Managers.Weights() { + for peer := range a.config.ConnBroker.Remotes().Weights() { if _, ok := seen[peer]; !ok { - a.config.Managers.Remove(peer) + a.config.ConnBroker.Remotes().Remove(peer) } } @@ -468,7 +466,7 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP ) err = a.withSession(ctx, func(session *session) error { - publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx) + publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx) return err }) if err != nil { diff --git a/vendor/github.com/docker/swarmkit/agent/config.go b/vendor/github.com/docker/swarmkit/agent/config.go index d62e15c8d5..de9359842e 100644 --- a/vendor/github.com/docker/swarmkit/agent/config.go +++ b/vendor/github.com/docker/swarmkit/agent/config.go @@ -4,7 +4,7 @@ import ( "github.com/boltdb/bolt" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" - "github.com/docker/swarmkit/remotes" + "github.com/docker/swarmkit/connectionbroker" "github.com/pkg/errors" "google.golang.org/grpc/credentials" ) @@ -14,9 +14,9 @@ type Config struct { // Hostname the name of host for agent instance. Hostname string - // Managers provides the manager backend used by the agent. It will be - // updated with managers weights as observed by the agent. - Managers remotes.Remotes + // ConnBroker provides a connection broker for retrieving gRPC + // connections to managers. + ConnBroker *connectionbroker.Broker // Executor specifies the executor to use for the agent. Executor exec.Executor diff --git a/vendor/github.com/docker/swarmkit/agent/resource.go b/vendor/github.com/docker/swarmkit/agent/resource.go index eca7564aa0..8e88d2cd65 100644 --- a/vendor/github.com/docker/swarmkit/agent/resource.go +++ b/vendor/github.com/docker/swarmkit/agent/resource.go @@ -30,7 +30,7 @@ type ResourceAllocator interface { func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string, addresses []string) (string, error) { var taskID string if err := r.agent.withSession(ctx, func(session *session) error { - client := api.NewResourceAllocatorClient(session.conn) + client := api.NewResourceAllocatorClient(session.conn.ClientConn) r, err := client.AttachNetwork(ctx, &api.AttachNetworkRequest{ Config: &api.NetworkAttachmentConfig{ Target: target, @@ -53,7 +53,7 @@ func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string // DetachNetwork deletes a network attachment. func (r *resourceAllocator) DetachNetwork(ctx context.Context, aID string) error { return r.agent.withSession(ctx, func(session *session) error { - client := api.NewResourceAllocatorClient(session.conn) + client := api.NewResourceAllocatorClient(session.conn.ClientConn) _, err := client.DetachNetwork(ctx, &api.DetachNetworkRequest{ AttachmentID: aID, }) diff --git a/vendor/github.com/docker/swarmkit/agent/session.go b/vendor/github.com/docker/swarmkit/agent/session.go index a8f657ffa3..fc1dca0db6 100644 --- a/vendor/github.com/docker/swarmkit/agent/session.go +++ b/vendor/github.com/docker/swarmkit/agent/session.go @@ -7,9 +7,9 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/protobuf/ptypes" - "github.com/docker/swarmkit/remotes" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -30,8 +30,7 @@ var ( // flow into the agent, such as task assignment, are called back into the // agent through errs, messages and tasks. type session struct { - conn *grpc.ClientConn - addr string + conn *connectionbroker.Conn agent *Agent sessionID string @@ -61,12 +60,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI // TODO(stevvooe): Need to move connection management up a level or create // independent connection for log broker client. - peer, err := agent.config.Managers.Select() - if err != nil { - s.errs <- err - return s - } - cc, err := grpc.Dial(peer.Addr, + cc, err := agent.config.ConnBroker.Select( grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTimeout(dispatcherRPCTimeout), ) @@ -74,7 +68,6 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI s.errs <- err return s } - s.addr = peer.Addr s.conn = cc go s.run(ctx, delay, description) @@ -127,7 +120,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e // Need to run Session in a goroutine since there's no way to set a // timeout for an individual Recv call in a stream. go func() { - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) stream, err = client.Session(sessionCtx, &api.SessionRequest{ Description: description, @@ -160,7 +153,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e func (s *session) heartbeat(ctx context.Context) error { log.G(ctx).Debugf("(*session).heartbeat") - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) heartbeat := time.NewTimer(1) // send out a heartbeat right away defer heartbeat.Stop() @@ -224,7 +217,7 @@ func (s *session) logSubscriptions(ctx context.Context) error { log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"}) log.Debugf("") - client := api.NewLogBrokerClient(s.conn) + client := api.NewLogBrokerClient(s.conn.ClientConn) subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{}) if err != nil { return err @@ -269,7 +262,7 @@ func (s *session) watch(ctx context.Context) error { err error ) - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) for { // If this is the first time we're running the loop, or there was a reference mismatch // attempt to get the assignmentWatch @@ -344,7 +337,7 @@ func (s *session) watch(ctx context.Context) error { // sendTaskStatus uses the current session to send the status of a single task. func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error { - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{ SessionID: s.sessionID, Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{ @@ -385,7 +378,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa return updates, ctx.Err() } - client := api.NewDispatcherClient(s.conn) + client := api.NewDispatcherClient(s.conn.ClientConn) n := batchSize if len(updates) < n { @@ -416,8 +409,7 @@ func (s *session) sendError(err error) { func (s *session) close() error { s.closeOnce.Do(func() { if s.conn != nil { - s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -remotes.DefaultObservationWeight) - s.conn.Close() + s.conn.Close(false) } close(s.closed) diff --git a/vendor/github.com/docker/swarmkit/ca/certificates.go b/vendor/github.com/docker/swarmkit/ca/certificates.go index 85b8bb9a6e..52537c89e4 100644 --- a/vendor/github.com/docker/swarmkit/ca/certificates.go +++ b/vendor/github.com/docker/swarmkit/ca/certificates.go @@ -22,8 +22,8 @@ import ( "github.com/cloudflare/cfssl/signer/local" "github.com/docker/go-events" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/ioutils" - "github.com/docker/swarmkit/remotes" "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/net/context" @@ -169,6 +169,15 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit if err == nil { break } + + // If the first attempt fails, we should try a remote + // connection. The local node may be a manager that was + // demoted, so the local connection (which is preferred) may + // not work. If we are successful in renewing the certificate, + // the local connection will not be returned by the connection + // broker anymore. + config.ForceRemote = true + } if err != nil { return nil, err @@ -202,7 +211,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit var kekUpdate *KEKData for i := 0; i < 5; i++ { - kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes) + kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.ConnBroker) if err == nil { break } @@ -218,7 +227,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit return &tlsKeyPair, nil } -func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, r remotes.Remotes) (*KEKData, error) { +func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, connBroker *connectionbroker.Broker) (*KEKData, error) { var managerRole bool for _, ou := range cert.Subject.OrganizationalUnit { if ou == ManagerRole { @@ -229,25 +238,25 @@ func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, key if managerRole { mtlsCreds := credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rca.Pool, Certificates: []tls.Certificate{keypair}}) - conn, peer, err := getGRPCConnection(mtlsCreds, r) + conn, err := getGRPCConnection(mtlsCreds, connBroker, false) if err != nil { return nil, err } - defer conn.Close() - client := api.NewCAClient(conn) + client := api.NewCAClient(conn.ClientConn) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() response, err := client.GetUnlockKey(ctx, &api.GetUnlockKeyRequest{}) if err != nil { if grpc.Code(err) == codes.Unimplemented { // if the server does not support keks, return as if no encryption key was specified + conn.Close(true) return &KEKData{}, nil } - r.Observe(peer, -remotes.DefaultObservationWeight) + conn.Close(false) return nil, err } - r.Observe(peer, remotes.DefaultObservationWeight) + conn.Close(true) return &KEKData{KEK: response.UnlockKey, Version: response.Version.Index}, nil } @@ -440,45 +449,33 @@ func GetLocalRootCA(paths CertPaths) (RootCA, error) { return NewRootCA(cert, key, DefaultNodeCertExpiration) } -func getGRPCConnection(creds credentials.TransportCredentials, r remotes.Remotes) (*grpc.ClientConn, api.Peer, error) { - peer, err := r.Select() - if err != nil { - return nil, api.Peer{}, err - } - - opts := []grpc.DialOption{ +func getGRPCConnection(creds credentials.TransportCredentials, connBroker *connectionbroker.Broker, forceRemote bool) (*connectionbroker.Conn, error) { + dialOpts := []grpc.DialOption{ grpc.WithTransportCredentials(creds), grpc.WithTimeout(5 * time.Second), grpc.WithBackoffMaxDelay(5 * time.Second), } - - conn, err := grpc.Dial(peer.Addr, opts...) - if err != nil { - return nil, api.Peer{}, err + if forceRemote { + return connBroker.SelectRemote(dialOpts...) } - return conn, peer, nil + return connBroker.Select(dialOpts...) } // GetRemoteCA returns the remote endpoint's CA certificate -func GetRemoteCA(ctx context.Context, d digest.Digest, r remotes.Remotes) (RootCA, error) { +func GetRemoteCA(ctx context.Context, d digest.Digest, connBroker *connectionbroker.Broker) (RootCA, error) { // This TLS Config is intentionally using InsecureSkipVerify. We use the // digest instead to check the integrity of the CA certificate. insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) - conn, peer, err := getGRPCConnection(insecureCreds, r) + conn, err := getGRPCConnection(insecureCreds, connBroker, false) if err != nil { return RootCA{}, err } - defer conn.Close() - client := api.NewCAClient(conn) + client := api.NewCAClient(conn.ClientConn) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() defer func() { - if err != nil { - r.Observe(peer, -remotes.DefaultObservationWeight) - return - } - r.Observe(peer, remotes.DefaultObservationWeight) + conn.Close(err == nil) }() response, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{}) if err != nil { @@ -558,20 +555,22 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool}) } - conn, peer, err := getGRPCConnection(creds, config.Remotes) + conn, err := getGRPCConnection(creds, config.ConnBroker, config.ForceRemote) if err != nil { return nil, err } - defer conn.Close() // Create a CAClient to retrieve a new Certificate - caClient := api.NewNodeCAClient(conn) + caClient := api.NewNodeCAClient(conn.ClientConn) + + issueCtx, issueCancel := context.WithTimeout(ctx, 5*time.Second) + defer issueCancel() // Send the Request and retrieve the request token issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token, Availability: config.Availability} - issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest) + issueResponse, err := caClient.IssueNodeCertificate(issueCtx, issueRequest) if err != nil { - config.Remotes.Observe(peer, -remotes.DefaultObservationWeight) + conn.Close(false) return nil, err } @@ -589,13 +588,14 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 defer cancel() statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest) if err != nil { - config.Remotes.Observe(peer, -remotes.DefaultObservationWeight) + conn.Close(false) return nil, err } // If the certificate was issued, return if statusResponse.Status.State == api.IssuanceStateIssued { if statusResponse.Certificate == nil { + conn.Close(false) return nil, errors.New("no certificate in CertificateStatus response") } @@ -605,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 // retry until the certificate gets updated per our // current request. if bytes.Equal(statusResponse.Certificate.CSR, csr) { - config.Remotes.Observe(peer, remotes.DefaultObservationWeight) + conn.Close(true) return statusResponse.Certificate.Certificate, nil } } diff --git a/vendor/github.com/docker/swarmkit/ca/config.go b/vendor/github.com/docker/swarmkit/ca/config.go index 1afa536946..d2664bd635 100644 --- a/vendor/github.com/docker/swarmkit/ca/config.go +++ b/vendor/github.com/docker/swarmkit/ca/config.go @@ -16,9 +16,9 @@ import ( "github.com/Sirupsen/logrus" cfconfig "github.com/cloudflare/cfssl/config" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/log" - "github.com/docker/swarmkit/remotes" "github.com/opencontainers/go-digest" "github.com/pkg/errors" "google.golang.org/grpc/credentials" @@ -200,7 +200,7 @@ func getCAHashFromToken(token string) (digest.Digest, error) { } // DownloadRootCA tries to retrieve a remote root CA and matches the digest against the provided token. -func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remotes.Remotes) (RootCA, error) { +func DownloadRootCA(ctx context.Context, paths CertPaths, token string, connBroker *connectionbroker.Broker) (RootCA, error) { var rootCA RootCA // Get a digest for the optional CA hash string that we've been provided // If we were provided a non-empty string, and it is an invalid hash, return @@ -221,7 +221,7 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote // just been demoted, for example). for i := 0; i != 5; i++ { - rootCA, err = GetRemoteCA(ctx, d, r) + rootCA, err = GetRemoteCA(ctx, d, connBroker) if err == nil { break } @@ -313,11 +313,16 @@ type CertificateRequestConfig struct { Token string // Availability allows a user to control the current scheduling status of a node Availability api.NodeSpec_Availability - // Remotes is the set of remote CAs. - Remotes remotes.Remotes + // ConnBroker provides connections to CAs. + ConnBroker *connectionbroker.Broker // Credentials provides transport credentials for communicating with the // remote server. Credentials credentials.TransportCredentials + // ForceRemote specifies that only a remote (TCP) connection should + // be used to request the certificate. This may be necessary in cases + // where the local node is running a manager, but is in the process of + // being demoted. + ForceRemote bool } // CreateSecurityConfig creates a new key and cert for this node, either locally @@ -380,7 +385,7 @@ func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWrite // RenewTLSConfigNow gets a new TLS cert and key, and updates the security config if provided. This is similar to // RenewTLSConfig, except while that monitors for expiry, and periodically renews, this renews once and is blocking -func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes) error { +func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, connBroker *connectionbroker.Broker) error { s.renewalMu.Lock() defer s.renewalMu.Unlock() @@ -395,7 +400,7 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx, s.KeyWriter(), CertificateRequestConfig{ - Remotes: r, + ConnBroker: connBroker, Credentials: s.ClientTLSCreds, }) if err != nil { @@ -437,7 +442,7 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes // RenewTLSConfig will continuously monitor for the necessity of renewing the local certificates, either by // issuing them locally if key-material is available, or requesting them from a remote CA. -func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remotes, renew <-chan struct{}) <-chan CertificateUpdate { +func RenewTLSConfig(ctx context.Context, s *SecurityConfig, connBroker *connectionbroker.Broker, renew <-chan struct{}) <-chan CertificateUpdate { updates := make(chan CertificateUpdate) go func() { @@ -501,7 +506,7 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo // ignore errors - it will just try again later var certUpdate CertificateUpdate - if err := RenewTLSConfigNow(ctx, s, remotes); err != nil { + if err := RenewTLSConfigNow(ctx, s, connBroker); err != nil { certUpdate.Err = err } else { certUpdate.Role = s.ClientTLSCreds.Role() diff --git a/vendor/github.com/docker/swarmkit/ca/server.go b/vendor/github.com/docker/swarmkit/ca/server.go index d72045db61..a7097f86a3 100644 --- a/vendor/github.com/docker/swarmkit/ca/server.go +++ b/vendor/github.com/docker/swarmkit/ca/server.go @@ -28,6 +28,7 @@ const ( // breaking it apart doesn't seem worth it. type Server struct { mu sync.Mutex + wg sync.WaitGroup ctx context.Context cancel func() store *store.MemoryStore @@ -101,9 +102,10 @@ func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCer return nil, grpc.Errorf(codes.InvalidArgument, codes.InvalidArgument.String()) } - if err := s.isRunningLocked(); err != nil { + if err := s.addTask(); err != nil { return nil, err } + defer s.doneTask() var node *api.Node @@ -187,9 +189,10 @@ func (s *Server) IssueNodeCertificate(ctx context.Context, request *api.IssueNod return nil, grpc.Errorf(codes.InvalidArgument, codes.InvalidArgument.String()) } - if err := s.isRunningLocked(); err != nil { + if err := s.addTask(); err != nil { return nil, err } + defer s.doneTask() var ( blacklistedCerts map[string]*api.BlacklistedCertificate @@ -371,8 +374,10 @@ func (s *Server) Run(ctx context.Context) error { s.mu.Unlock() return errors.New("CA signer is already running") } + s.wg.Add(1) s.mu.Unlock() + defer s.wg.Done() ctx = log.WithModule(ctx, "ca") // Retrieve the channels to keep track of changes in the cluster @@ -402,8 +407,8 @@ func (s *Server) Run(ctx context.Context) error { // returns true without joinTokens being set correctly. s.mu.Lock() s.ctx, s.cancel = context.WithCancel(ctx) - close(s.started) s.mu.Unlock() + close(s.started) if err != nil { log.G(ctx).WithFields(logrus.Fields{ @@ -464,32 +469,38 @@ func (s *Server) Run(ctx context.Context) error { // Stop stops the CA and closes all grpc streams. func (s *Server) Stop() error { s.mu.Lock() - defer s.mu.Unlock() if !s.isRunning() { + s.mu.Unlock() return errors.New("CA signer is already stopped") } s.cancel() + s.mu.Unlock() + // wait for all handlers to finish their CA deals, + s.wg.Wait() s.started = make(chan struct{}) return nil } // Ready waits on the ready channel and returns when the server is ready to serve. func (s *Server) Ready() <-chan struct{} { - s.mu.Lock() - defer s.mu.Unlock() return s.started } -func (s *Server) isRunningLocked() error { +func (s *Server) addTask() error { s.mu.Lock() if !s.isRunning() { s.mu.Unlock() return grpc.Errorf(codes.Aborted, "CA signer is stopped") } + s.wg.Add(1) s.mu.Unlock() return nil } +func (s *Server) doneTask() { + s.wg.Done() +} + func (s *Server) isRunning() bool { if s.ctx == nil { return false diff --git a/vendor/github.com/docker/swarmkit/connectionbroker/broker.go b/vendor/github.com/docker/swarmkit/connectionbroker/broker.go new file mode 100644 index 0000000000..f22726f2b1 --- /dev/null +++ b/vendor/github.com/docker/swarmkit/connectionbroker/broker.go @@ -0,0 +1,105 @@ +// Package connectionbroker is a layer on top of remotes that returns +// a gRPC connection to a manager. The connection may be a local connection +// using a local socket such as a UNIX socket. +package connectionbroker + +import ( + "sync" + + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/remotes" + "google.golang.org/grpc" +) + +// Broker is a simple connection broker. It can either return a fresh +// connection to a remote manager selected with weighted randomization, or a +// local gRPC connection to the local manager. +type Broker struct { + mu sync.Mutex + remotes remotes.Remotes + localConn *grpc.ClientConn +} + +// New creates a new connection broker. +func New(remotes remotes.Remotes) *Broker { + return &Broker{ + remotes: remotes, + } +} + +// SetLocalConn changes the local gRPC connection used by the connection broker. +func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) { + b.mu.Lock() + defer b.mu.Unlock() + + b.localConn = localConn +} + +// Select a manager from the set of available managers, and return a connection. +func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) { + b.mu.Lock() + localConn := b.localConn + b.mu.Unlock() + + if localConn != nil { + return &Conn{ + ClientConn: localConn, + isLocal: true, + }, nil + } + + return b.SelectRemote(dialOpts...) +} + +// SelectRemote chooses a manager from the remotes, and returns a TCP +// connection. +func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { + peer, err := b.remotes.Select() + if err != nil { + return nil, err + } + + cc, err := grpc.Dial(peer.Addr, dialOpts...) + if err != nil { + b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight) + return nil, err + } + + return &Conn{ + ClientConn: cc, + remotes: b.remotes, + peer: peer, + }, nil +} + +// Remotes returns the remotes interface used by the broker, so the caller +// can make observations or see weights directly. +func (b *Broker) Remotes() remotes.Remotes { + return b.remotes +} + +// Conn is a wrapper around a gRPC client connection. +type Conn struct { + *grpc.ClientConn + isLocal bool + remotes remotes.Remotes + peer api.Peer +} + +// Close closes the client connection if it is a remote connection. It also +// records a positive experience with the remote peer if success is true, +// otherwise it records a negative experience. If a local connection is in use, +// Close is a noop. +func (c *Conn) Close(success bool) error { + if c.isLocal { + return nil + } + + if success { + c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight) + } else { + c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight) + } + + return c.ClientConn.Close() +} diff --git a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index 6fe3fdd706..3073a35fae 100644 --- a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -133,6 +133,7 @@ type Dispatcher struct { } // New returns Dispatcher with cluster interface(usually raft.Node). +// NOTE: each handler which does something with raft must add to Dispatcher.wg func New(cluster Cluster, c *Config) *Dispatcher { d := &Dispatcher{ nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod), diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index f852d4f6a6..b1c65aa14b 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -1,6 +1,7 @@ package manager import ( + "crypto/tls" "crypto/x509" "encoding/pem" "fmt" @@ -16,6 +17,7 @@ import ( "github.com/docker/go-events" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/allocator" "github.com/docker/swarmkit/manager/controlapi" @@ -38,6 +40,7 @@ import ( "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) const ( @@ -557,9 +560,6 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { "node.role": ca.ManagerRole, }) - // we are our own peer from which we get certs - try to connect over the local socket - r := remotes.NewRemotes(api.Peer{Addr: m.Addr(), NodeID: nodeID}) - kekData := ca.KEKData{Version: cluster.Meta.Version.Index} for _, encryptionKey := range cluster.UnlockKeys { if encryptionKey.Subsystem == ca.ManagerRole { @@ -579,8 +579,27 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error { // a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews; // don't wait because it might take a bit go func() { - if err := ca.RenewTLSConfigNow(ctx, securityConfig, r); err != nil { - logger.WithError(err).Errorf("failed to download new TLS certificate after locking the cluster") + insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + + conn, err := grpc.Dial( + m.config.ControlAPI, + grpc.WithTransportCredentials(insecureCreds), + grpc.WithDialer( + func(addr string, timeout time.Duration) (net.Conn, error) { + return xnet.DialTimeoutLocal(addr, timeout) + }), + ) + if err != nil { + logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster") + return + } + + defer conn.Close() + + connBroker := connectionbroker.New(remotes.NewRemotes()) + connBroker.SetLocalConn(conn) + if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker); err != nil { + logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster") } }() } diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go index c46727d937..a19f07b6e0 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -1826,10 +1826,10 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf // - ConfChangeAddNode, in which case the contained ID will be added into the set. // - ConfChangeRemoveNode, in which case the contained ID will be removed from the set. func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { - ids := make(map[uint64]bool) + ids := make(map[uint64]struct{}) if snap != nil { for _, id := range snap.Metadata.ConfState.Nodes { - ids[id] = true + ids[id] = struct{}{} } } for _, e := range ents { @@ -1845,7 +1845,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { } switch cc.Type { case raftpb.ConfChangeAddNode: - ids[cc.NodeID] = true + ids[cc.NodeID] = struct{}{} case raftpb.ConfChangeRemoveNode: delete(ids, cc.NodeID) case raftpb.ConfChangeUpdateNode: diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go index b1c74e0357..402b04a33f 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go @@ -52,13 +52,6 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error { return err } - if snapshot != nil { - // Load the snapshot data into the store - if err := n.restoreFromSnapshot(snapshot.Data, forceNewCluster); err != nil { - return err - } - } - // Read logs to fully catch up store var raftNode api.RaftMember if err := raftNode.Unmarshal(waldata.Metadata); err != nil { @@ -66,6 +59,13 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error { } n.Config.ID = raftNode.RaftID + if snapshot != nil { + // Load the snapshot data into the store + if err := n.restoreFromSnapshot(snapshot.Data, forceNewCluster); err != nil { + return err + } + } + ents, st := waldata.Entries, waldata.HardState // All members that are no longer part of the cluster must be added to @@ -88,14 +88,14 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error { // discard the previously uncommitted entries for i, ent := range ents { if ent.Index > st.Commit { - log.G(ctx).Infof("discarding %d uncommitted WAL entries ", len(ents)-i) + log.G(ctx).Infof("discarding %d uncommitted WAL entries", len(ents)-i) ents = ents[:i] break } } // force append the configuration change entries - toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(n.Config.ID), st.Term, st.Commit) + toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), n.Config.ID, st.Term, st.Commit) // All members that are being removed as part of the // force-new-cluster process must be added to the @@ -230,13 +230,15 @@ func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error { oldMembers := n.cluster.Members() - if !forceNewCluster { - for _, member := range snapshot.Membership.Members { + for _, member := range snapshot.Membership.Members { + if forceNewCluster && member.RaftID != n.Config.ID { + n.cluster.RemoveMember(member.RaftID) + } else { if err := n.registerNode(&api.RaftMember{RaftID: member.RaftID, NodeID: member.NodeID, Addr: member.Addr}); err != nil { return err } - delete(oldMembers, member.RaftID) } + delete(oldMembers, member.RaftID) } for _, removedMember := range snapshot.Membership.Removed { diff --git a/vendor/github.com/docker/swarmkit/node/node.go b/vendor/github.com/docker/swarmkit/node/node.go index cf1545c529..be776dc151 100644 --- a/vendor/github.com/docker/swarmkit/node/node.go +++ b/vendor/github.com/docker/swarmkit/node/node.go @@ -18,6 +18,7 @@ import ( "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" + "github.com/docker/swarmkit/connectionbroker" "github.com/docker/swarmkit/ioutils" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager" @@ -105,6 +106,7 @@ type Node struct { sync.RWMutex config *Config remotes *persistentRemotes + connBroker *connectionbroker.Broker role string roleCond *sync.Cond conn *grpc.ClientConn @@ -154,7 +156,6 @@ func New(c *Config) (*Node, error) { return nil, err } } - n := &Node{ remotes: newPersistentRemotes(stateFile, p...), role: ca.WorkerRole, @@ -174,6 +175,8 @@ func New(c *Config) (*Node, error) { } } + n.connBroker = connectionbroker.New(n.remotes) + n.roleCond = sync.NewCond(n.RLocker()) n.connCond = sync.NewCond(n.RLocker()) return n, nil @@ -261,7 +264,7 @@ func (n *Node) run(ctx context.Context) (err error) { } }() - updates := ca.RenewTLSConfig(ctx, securityConfig, n.remotes, forceCertRenewal) + updates := ca.RenewTLSConfig(ctx, securityConfig, n.connBroker, forceCertRenewal) go func() { for { select { @@ -368,17 +371,35 @@ func (n *Node) Err(ctx context.Context) error { } func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportCredentials, ready chan<- struct{}) error { + waitCtx, waitCancel := context.WithCancel(ctx) + remotesCh := n.remotes.WaitSelect(ctx) + controlCh := n.ListenControlSocket(waitCtx) + +waitPeer: + for { + select { + case <-ctx.Done(): + break waitPeer + case <-remotesCh: + break waitPeer + case conn := <-controlCh: + if conn != nil { + break waitPeer + } + } + } + + waitCancel() + select { case <-ctx.Done(): - case <-n.remotes.WaitSelect(ctx): - } - if ctx.Err() != nil { return ctx.Err() + default: } a, err := agent.New(&agent.Config{ Hostname: n.config.Hostname, - Managers: n.remotes, + ConnBroker: n.connBroker, Executor: n.config.Executor, DB: db, NotifyNodeChange: n.notifyNodeChange, @@ -423,6 +444,7 @@ func (n *Node) setControlSocket(conn *grpc.ClientConn) { n.conn.Close() } n.conn = conn + n.connBroker.SetLocalConn(conn) n.connCond.Broadcast() n.Unlock() } @@ -447,15 +469,21 @@ func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn defer close(done) defer n.RUnlock() for { - if ctx.Err() != nil { + select { + case <-ctx.Done(): return + default: } if conn == n.conn { n.connCond.Wait() continue } conn = n.conn - c <- conn + select { + case c <- conn: + case <-ctx.Done(): + return + } } }() return c @@ -532,7 +560,7 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro } log.G(ctx).Debug("generated CA key and certificate") } else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk - rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.remotes) + rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker) if err != nil { return nil, err } @@ -559,7 +587,7 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{ Token: n.config.JoinToken, Availability: n.config.Availability, - Remotes: n.remotes, + ConnBroker: n.connBroker, }) if err != nil { @@ -687,22 +715,6 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig go n.initManagerConnection(connCtx, ready) - // this happens only on initial start - if ready != nil { - go func(ready chan struct{}) { - select { - case <-ready: - addr, err := n.RemoteAPIAddr() - if err != nil { - log.G(ctx).WithError(err).Errorf("get remote api addr") - } else { - n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight) - } - case <-connCtx.Done(): - } - }(ready) - } - // wait for manager stop or for role change select { case <-done: