1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #30035 from aaronlehmann/vendor-swarmkit-62d835f

Update vendored swarmkit to 62d835f
This commit is contained in:
Brian Goff 2017-01-10 20:13:12 -05:00 committed by GitHub
commit a02b1215c2
14 changed files with 274 additions and 129 deletions

View file

@ -103,7 +103,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit c97146840a26c9ce8023284d0e9c989586cc1857
github.com/docker/swarmkit 62d835f478b2e4fd2768deb88fb3b32e334faaee
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -336,12 +336,10 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
seen := map[api.Peer]struct{}{}
for _, manager := range message.Managers {
if manager.Peer.Addr == "" {
log.G(ctx).WithField("manager.addr", manager.Peer.Addr).
Warnf("skipping bad manager address")
continue
}
a.config.Managers.Observe(*manager.Peer, int(manager.Weight))
a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight))
seen[*manager.Peer] = struct{}{}
}
@ -358,9 +356,9 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe
}
// prune managers not in list.
for peer := range a.config.Managers.Weights() {
for peer := range a.config.ConnBroker.Remotes().Weights() {
if _, ok := seen[peer]; !ok {
a.config.Managers.Remove(peer)
a.config.ConnBroker.Remotes().Remove(peer)
}
}
@ -468,7 +466,7 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
)
err = a.withSession(ctx, func(session *session) error {
publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx)
return err
})
if err != nil {

View file

@ -4,7 +4,7 @@ import (
"github.com/boltdb/bolt"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/connectionbroker"
"github.com/pkg/errors"
"google.golang.org/grpc/credentials"
)
@ -14,9 +14,9 @@ type Config struct {
// Hostname the name of host for agent instance.
Hostname string
// Managers provides the manager backend used by the agent. It will be
// updated with managers weights as observed by the agent.
Managers remotes.Remotes
// ConnBroker provides a connection broker for retrieving gRPC
// connections to managers.
ConnBroker *connectionbroker.Broker
// Executor specifies the executor to use for the agent.
Executor exec.Executor

View file

@ -30,7 +30,7 @@ type ResourceAllocator interface {
func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string, addresses []string) (string, error) {
var taskID string
if err := r.agent.withSession(ctx, func(session *session) error {
client := api.NewResourceAllocatorClient(session.conn)
client := api.NewResourceAllocatorClient(session.conn.ClientConn)
r, err := client.AttachNetwork(ctx, &api.AttachNetworkRequest{
Config: &api.NetworkAttachmentConfig{
Target: target,
@ -53,7 +53,7 @@ func (r *resourceAllocator) AttachNetwork(ctx context.Context, id, target string
// DetachNetwork deletes a network attachment.
func (r *resourceAllocator) DetachNetwork(ctx context.Context, aID string) error {
return r.agent.withSession(ctx, func(session *session) error {
client := api.NewResourceAllocatorClient(session.conn)
client := api.NewResourceAllocatorClient(session.conn.ClientConn)
_, err := client.DetachNetwork(ctx, &api.DetachNetworkRequest{
AttachmentID: aID,
})

View file

@ -7,9 +7,9 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/remotes"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -30,8 +30,7 @@ var (
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
conn *grpc.ClientConn
addr string
conn *connectionbroker.Conn
agent *Agent
sessionID string
@ -61,12 +60,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
// TODO(stevvooe): Need to move connection management up a level or create
// independent connection for log broker client.
peer, err := agent.config.Managers.Select()
if err != nil {
s.errs <- err
return s
}
cc, err := grpc.Dial(peer.Addr,
cc, err := agent.config.ConnBroker.Select(
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
@ -74,7 +68,6 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
s.errs <- err
return s
}
s.addr = peer.Addr
s.conn = cc
go s.run(ctx, delay, description)
@ -127,7 +120,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
@ -160,7 +153,7 @@ func (s *session) start(ctx context.Context, description *api.NodeDescription) e
func (s *session) heartbeat(ctx context.Context) error {
log.G(ctx).Debugf("(*session).heartbeat")
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()
@ -224,7 +217,7 @@ func (s *session) logSubscriptions(ctx context.Context) error {
log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
log.Debugf("")
client := api.NewLogBrokerClient(s.conn)
client := api.NewLogBrokerClient(s.conn.ClientConn)
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
if err != nil {
return err
@ -269,7 +262,7 @@ func (s *session) watch(ctx context.Context) error {
err error
)
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
for {
// If this is the first time we're running the loop, or there was a reference mismatch
// attempt to get the assignmentWatch
@ -344,7 +337,7 @@ func (s *session) watch(ctx context.Context) error {
// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
@ -385,7 +378,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa
return updates, ctx.Err()
}
client := api.NewDispatcherClient(s.conn)
client := api.NewDispatcherClient(s.conn.ClientConn)
n := batchSize
if len(updates) < n {
@ -416,8 +409,7 @@ func (s *session) sendError(err error) {
func (s *session) close() error {
s.closeOnce.Do(func() {
if s.conn != nil {
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -remotes.DefaultObservationWeight)
s.conn.Close()
s.conn.Close(false)
}
close(s.closed)

View file

@ -22,8 +22,8 @@ import (
"github.com/cloudflare/cfssl/signer/local"
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/ioutils"
"github.com/docker/swarmkit/remotes"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
@ -169,6 +169,15 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
if err == nil {
break
}
// If the first attempt fails, we should try a remote
// connection. The local node may be a manager that was
// demoted, so the local connection (which is preferred) may
// not work. If we are successful in renewing the certificate,
// the local connection will not be returned by the connection
// broker anymore.
config.ForceRemote = true
}
if err != nil {
return nil, err
@ -202,7 +211,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
var kekUpdate *KEKData
for i := 0; i < 5; i++ {
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes)
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.ConnBroker)
if err == nil {
break
}
@ -218,7 +227,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
return &tlsKeyPair, nil
}
func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, r remotes.Remotes) (*KEKData, error) {
func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, keypair tls.Certificate, connBroker *connectionbroker.Broker) (*KEKData, error) {
var managerRole bool
for _, ou := range cert.Subject.OrganizationalUnit {
if ou == ManagerRole {
@ -229,25 +238,25 @@ func (rca *RootCA) getKEKUpdate(ctx context.Context, cert *x509.Certificate, key
if managerRole {
mtlsCreds := credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rca.Pool, Certificates: []tls.Certificate{keypair}})
conn, peer, err := getGRPCConnection(mtlsCreds, r)
conn, err := getGRPCConnection(mtlsCreds, connBroker, false)
if err != nil {
return nil, err
}
defer conn.Close()
client := api.NewCAClient(conn)
client := api.NewCAClient(conn.ClientConn)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
response, err := client.GetUnlockKey(ctx, &api.GetUnlockKeyRequest{})
if err != nil {
if grpc.Code(err) == codes.Unimplemented { // if the server does not support keks, return as if no encryption key was specified
conn.Close(true)
return &KEKData{}, nil
}
r.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}
r.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(true)
return &KEKData{KEK: response.UnlockKey, Version: response.Version.Index}, nil
}
@ -440,45 +449,33 @@ func GetLocalRootCA(paths CertPaths) (RootCA, error) {
return NewRootCA(cert, key, DefaultNodeCertExpiration)
}
func getGRPCConnection(creds credentials.TransportCredentials, r remotes.Remotes) (*grpc.ClientConn, api.Peer, error) {
peer, err := r.Select()
if err != nil {
return nil, api.Peer{}, err
}
opts := []grpc.DialOption{
func getGRPCConnection(creds credentials.TransportCredentials, connBroker *connectionbroker.Broker, forceRemote bool) (*connectionbroker.Conn, error) {
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithTimeout(5 * time.Second),
grpc.WithBackoffMaxDelay(5 * time.Second),
}
conn, err := grpc.Dial(peer.Addr, opts...)
if err != nil {
return nil, api.Peer{}, err
if forceRemote {
return connBroker.SelectRemote(dialOpts...)
}
return conn, peer, nil
return connBroker.Select(dialOpts...)
}
// GetRemoteCA returns the remote endpoint's CA certificate
func GetRemoteCA(ctx context.Context, d digest.Digest, r remotes.Remotes) (RootCA, error) {
func GetRemoteCA(ctx context.Context, d digest.Digest, connBroker *connectionbroker.Broker) (RootCA, error) {
// This TLS Config is intentionally using InsecureSkipVerify. We use the
// digest instead to check the integrity of the CA certificate.
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, peer, err := getGRPCConnection(insecureCreds, r)
conn, err := getGRPCConnection(insecureCreds, connBroker, false)
if err != nil {
return RootCA{}, err
}
defer conn.Close()
client := api.NewCAClient(conn)
client := api.NewCAClient(conn.ClientConn)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
defer func() {
if err != nil {
r.Observe(peer, -remotes.DefaultObservationWeight)
return
}
r.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(err == nil)
}()
response, err := client.GetRootCACertificate(ctx, &api.GetRootCACertificateRequest{})
if err != nil {
@ -558,20 +555,22 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool})
}
conn, peer, err := getGRPCConnection(creds, config.Remotes)
conn, err := getGRPCConnection(creds, config.ConnBroker, config.ForceRemote)
if err != nil {
return nil, err
}
defer conn.Close()
// Create a CAClient to retrieve a new Certificate
caClient := api.NewNodeCAClient(conn)
caClient := api.NewNodeCAClient(conn.ClientConn)
issueCtx, issueCancel := context.WithTimeout(ctx, 5*time.Second)
defer issueCancel()
// Send the Request and retrieve the request token
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token, Availability: config.Availability}
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest)
issueResponse, err := caClient.IssueNodeCertificate(issueCtx, issueRequest)
if err != nil {
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}
@ -589,13 +588,14 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
defer cancel()
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
if err != nil {
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
conn.Close(false)
return nil, err
}
// If the certificate was issued, return
if statusResponse.Status.State == api.IssuanceStateIssued {
if statusResponse.Certificate == nil {
conn.Close(false)
return nil, errors.New("no certificate in CertificateStatus response")
}
@ -605,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
// retry until the certificate gets updated per our
// current request.
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
config.Remotes.Observe(peer, remotes.DefaultObservationWeight)
conn.Close(true)
return statusResponse.Certificate.Certificate, nil
}
}

View file

@ -16,9 +16,9 @@ import (
"github.com/Sirupsen/logrus"
cfconfig "github.com/cloudflare/cfssl/config"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/remotes"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"google.golang.org/grpc/credentials"
@ -200,7 +200,7 @@ func getCAHashFromToken(token string) (digest.Digest, error) {
}
// DownloadRootCA tries to retrieve a remote root CA and matches the digest against the provided token.
func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remotes.Remotes) (RootCA, error) {
func DownloadRootCA(ctx context.Context, paths CertPaths, token string, connBroker *connectionbroker.Broker) (RootCA, error) {
var rootCA RootCA
// Get a digest for the optional CA hash string that we've been provided
// If we were provided a non-empty string, and it is an invalid hash, return
@ -221,7 +221,7 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote
// just been demoted, for example).
for i := 0; i != 5; i++ {
rootCA, err = GetRemoteCA(ctx, d, r)
rootCA, err = GetRemoteCA(ctx, d, connBroker)
if err == nil {
break
}
@ -313,11 +313,16 @@ type CertificateRequestConfig struct {
Token string
// Availability allows a user to control the current scheduling status of a node
Availability api.NodeSpec_Availability
// Remotes is the set of remote CAs.
Remotes remotes.Remotes
// ConnBroker provides connections to CAs.
ConnBroker *connectionbroker.Broker
// Credentials provides transport credentials for communicating with the
// remote server.
Credentials credentials.TransportCredentials
// ForceRemote specifies that only a remote (TCP) connection should
// be used to request the certificate. This may be necessary in cases
// where the local node is running a manager, but is in the process of
// being demoted.
ForceRemote bool
}
// CreateSecurityConfig creates a new key and cert for this node, either locally
@ -380,7 +385,7 @@ func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWrite
// RenewTLSConfigNow gets a new TLS cert and key, and updates the security config if provided. This is similar to
// RenewTLSConfig, except while that monitors for expiry, and periodically renews, this renews once and is blocking
func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes) error {
func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, connBroker *connectionbroker.Broker) error {
s.renewalMu.Lock()
defer s.renewalMu.Unlock()
@ -395,7 +400,7 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes
tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx,
s.KeyWriter(),
CertificateRequestConfig{
Remotes: r,
ConnBroker: connBroker,
Credentials: s.ClientTLSCreds,
})
if err != nil {
@ -437,7 +442,7 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes
// RenewTLSConfig will continuously monitor for the necessity of renewing the local certificates, either by
// issuing them locally if key-material is available, or requesting them from a remote CA.
func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remotes, renew <-chan struct{}) <-chan CertificateUpdate {
func RenewTLSConfig(ctx context.Context, s *SecurityConfig, connBroker *connectionbroker.Broker, renew <-chan struct{}) <-chan CertificateUpdate {
updates := make(chan CertificateUpdate)
go func() {
@ -501,7 +506,7 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo
// ignore errors - it will just try again later
var certUpdate CertificateUpdate
if err := RenewTLSConfigNow(ctx, s, remotes); err != nil {
if err := RenewTLSConfigNow(ctx, s, connBroker); err != nil {
certUpdate.Err = err
} else {
certUpdate.Role = s.ClientTLSCreds.Role()

View file

@ -28,6 +28,7 @@ const (
// breaking it apart doesn't seem worth it.
type Server struct {
mu sync.Mutex
wg sync.WaitGroup
ctx context.Context
cancel func()
store *store.MemoryStore
@ -101,9 +102,10 @@ func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCer
return nil, grpc.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
}
if err := s.isRunningLocked(); err != nil {
if err := s.addTask(); err != nil {
return nil, err
}
defer s.doneTask()
var node *api.Node
@ -187,9 +189,10 @@ func (s *Server) IssueNodeCertificate(ctx context.Context, request *api.IssueNod
return nil, grpc.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
}
if err := s.isRunningLocked(); err != nil {
if err := s.addTask(); err != nil {
return nil, err
}
defer s.doneTask()
var (
blacklistedCerts map[string]*api.BlacklistedCertificate
@ -371,8 +374,10 @@ func (s *Server) Run(ctx context.Context) error {
s.mu.Unlock()
return errors.New("CA signer is already running")
}
s.wg.Add(1)
s.mu.Unlock()
defer s.wg.Done()
ctx = log.WithModule(ctx, "ca")
// Retrieve the channels to keep track of changes in the cluster
@ -402,8 +407,8 @@ func (s *Server) Run(ctx context.Context) error {
// returns true without joinTokens being set correctly.
s.mu.Lock()
s.ctx, s.cancel = context.WithCancel(ctx)
close(s.started)
s.mu.Unlock()
close(s.started)
if err != nil {
log.G(ctx).WithFields(logrus.Fields{
@ -464,32 +469,38 @@ func (s *Server) Run(ctx context.Context) error {
// Stop stops the CA and closes all grpc streams.
func (s *Server) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.isRunning() {
s.mu.Unlock()
return errors.New("CA signer is already stopped")
}
s.cancel()
s.mu.Unlock()
// wait for all handlers to finish their CA deals,
s.wg.Wait()
s.started = make(chan struct{})
return nil
}
// Ready waits on the ready channel and returns when the server is ready to serve.
func (s *Server) Ready() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
return s.started
}
func (s *Server) isRunningLocked() error {
func (s *Server) addTask() error {
s.mu.Lock()
if !s.isRunning() {
s.mu.Unlock()
return grpc.Errorf(codes.Aborted, "CA signer is stopped")
}
s.wg.Add(1)
s.mu.Unlock()
return nil
}
func (s *Server) doneTask() {
s.wg.Done()
}
func (s *Server) isRunning() bool {
if s.ctx == nil {
return false

View file

@ -0,0 +1,105 @@
// Package connectionbroker is a layer on top of remotes that returns
// a gRPC connection to a manager. The connection may be a local connection
// using a local socket such as a UNIX socket.
package connectionbroker
import (
"sync"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/remotes"
"google.golang.org/grpc"
)
// Broker is a simple connection broker. It can either return a fresh
// connection to a remote manager selected with weighted randomization, or a
// local gRPC connection to the local manager.
type Broker struct {
mu sync.Mutex
remotes remotes.Remotes
localConn *grpc.ClientConn
}
// New creates a new connection broker.
func New(remotes remotes.Remotes) *Broker {
return &Broker{
remotes: remotes,
}
}
// SetLocalConn changes the local gRPC connection used by the connection broker.
func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) {
b.mu.Lock()
defer b.mu.Unlock()
b.localConn = localConn
}
// Select a manager from the set of available managers, and return a connection.
func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
b.mu.Lock()
localConn := b.localConn
b.mu.Unlock()
if localConn != nil {
return &Conn{
ClientConn: localConn,
isLocal: true,
}, nil
}
return b.SelectRemote(dialOpts...)
}
// SelectRemote chooses a manager from the remotes, and returns a TCP
// connection.
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
peer, err := b.remotes.Select()
if err != nil {
return nil, err
}
cc, err := grpc.Dial(peer.Addr, dialOpts...)
if err != nil {
b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight)
return nil, err
}
return &Conn{
ClientConn: cc,
remotes: b.remotes,
peer: peer,
}, nil
}
// Remotes returns the remotes interface used by the broker, so the caller
// can make observations or see weights directly.
func (b *Broker) Remotes() remotes.Remotes {
return b.remotes
}
// Conn is a wrapper around a gRPC client connection.
type Conn struct {
*grpc.ClientConn
isLocal bool
remotes remotes.Remotes
peer api.Peer
}
// Close closes the client connection if it is a remote connection. It also
// records a positive experience with the remote peer if success is true,
// otherwise it records a negative experience. If a local connection is in use,
// Close is a noop.
func (c *Conn) Close(success bool) error {
if c.isLocal {
return nil
}
if success {
c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
} else {
c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
}
return c.ClientConn.Close()
}

View file

@ -133,6 +133,7 @@ type Dispatcher struct {
}
// New returns Dispatcher with cluster interface(usually raft.Node).
// NOTE: each handler which does something with raft must add to Dispatcher.wg
func New(cluster Cluster, c *Config) *Dispatcher {
d := &Dispatcher{
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),

View file

@ -1,6 +1,7 @@
package manager
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
@ -16,6 +17,7 @@ import (
"github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator"
"github.com/docker/swarmkit/manager/controlapi"
@ -38,6 +40,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
@ -557,9 +560,6 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
"node.role": ca.ManagerRole,
})
// we are our own peer from which we get certs - try to connect over the local socket
r := remotes.NewRemotes(api.Peer{Addr: m.Addr(), NodeID: nodeID})
kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
for _, encryptionKey := range cluster.UnlockKeys {
if encryptionKey.Subsystem == ca.ManagerRole {
@ -579,8 +579,27 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
// a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
// don't wait because it might take a bit
go func() {
if err := ca.RenewTLSConfigNow(ctx, securityConfig, r); err != nil {
logger.WithError(err).Errorf("failed to download new TLS certificate after locking the cluster")
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err := grpc.Dial(
m.config.ControlAPI,
grpc.WithTransportCredentials(insecureCreds),
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return xnet.DialTimeoutLocal(addr, timeout)
}),
)
if err != nil {
logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
return
}
defer conn.Close()
connBroker := connectionbroker.New(remotes.NewRemotes())
connBroker.SetLocalConn(conn)
if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker); err != nil {
logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
}
}()
}

View file

@ -1826,10 +1826,10 @@ func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raf
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
ids := make(map[uint64]struct{})
if snap != nil {
for _, id := range snap.Metadata.ConfState.Nodes {
ids[id] = true
ids[id] = struct{}{}
}
}
for _, e := range ents {
@ -1845,7 +1845,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = true
ids[cc.NodeID] = struct{}{}
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
case raftpb.ConfChangeUpdateNode:

View file

@ -52,13 +52,6 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
return err
}
if snapshot != nil {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(snapshot.Data, forceNewCluster); err != nil {
return err
}
}
// Read logs to fully catch up store
var raftNode api.RaftMember
if err := raftNode.Unmarshal(waldata.Metadata); err != nil {
@ -66,6 +59,13 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
}
n.Config.ID = raftNode.RaftID
if snapshot != nil {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(snapshot.Data, forceNewCluster); err != nil {
return err
}
}
ents, st := waldata.Entries, waldata.HardState
// All members that are no longer part of the cluster must be added to
@ -88,14 +88,14 @@ func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
// discard the previously uncommitted entries
for i, ent := range ents {
if ent.Index > st.Commit {
log.G(ctx).Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
log.G(ctx).Infof("discarding %d uncommitted WAL entries", len(ents)-i)
ents = ents[:i]
break
}
}
// force append the configuration change entries
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(n.Config.ID), st.Term, st.Commit)
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), n.Config.ID, st.Term, st.Commit)
// All members that are being removed as part of the
// force-new-cluster process must be added to the
@ -230,13 +230,15 @@ func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error {
oldMembers := n.cluster.Members()
if !forceNewCluster {
for _, member := range snapshot.Membership.Members {
for _, member := range snapshot.Membership.Members {
if forceNewCluster && member.RaftID != n.Config.ID {
n.cluster.RemoveMember(member.RaftID)
} else {
if err := n.registerNode(&api.RaftMember{RaftID: member.RaftID, NodeID: member.NodeID, Addr: member.Addr}); err != nil {
return err
}
delete(oldMembers, member.RaftID)
}
delete(oldMembers, member.RaftID)
}
for _, removedMember := range snapshot.Membership.Removed {

View file

@ -18,6 +18,7 @@ import (
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/ioutils"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager"
@ -105,6 +106,7 @@ type Node struct {
sync.RWMutex
config *Config
remotes *persistentRemotes
connBroker *connectionbroker.Broker
role string
roleCond *sync.Cond
conn *grpc.ClientConn
@ -154,7 +156,6 @@ func New(c *Config) (*Node, error) {
return nil, err
}
}
n := &Node{
remotes: newPersistentRemotes(stateFile, p...),
role: ca.WorkerRole,
@ -174,6 +175,8 @@ func New(c *Config) (*Node, error) {
}
}
n.connBroker = connectionbroker.New(n.remotes)
n.roleCond = sync.NewCond(n.RLocker())
n.connCond = sync.NewCond(n.RLocker())
return n, nil
@ -261,7 +264,7 @@ func (n *Node) run(ctx context.Context) (err error) {
}
}()
updates := ca.RenewTLSConfig(ctx, securityConfig, n.remotes, forceCertRenewal)
updates := ca.RenewTLSConfig(ctx, securityConfig, n.connBroker, forceCertRenewal)
go func() {
for {
select {
@ -368,17 +371,35 @@ func (n *Node) Err(ctx context.Context) error {
}
func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportCredentials, ready chan<- struct{}) error {
waitCtx, waitCancel := context.WithCancel(ctx)
remotesCh := n.remotes.WaitSelect(ctx)
controlCh := n.ListenControlSocket(waitCtx)
waitPeer:
for {
select {
case <-ctx.Done():
break waitPeer
case <-remotesCh:
break waitPeer
case conn := <-controlCh:
if conn != nil {
break waitPeer
}
}
}
waitCancel()
select {
case <-ctx.Done():
case <-n.remotes.WaitSelect(ctx):
}
if ctx.Err() != nil {
return ctx.Err()
default:
}
a, err := agent.New(&agent.Config{
Hostname: n.config.Hostname,
Managers: n.remotes,
ConnBroker: n.connBroker,
Executor: n.config.Executor,
DB: db,
NotifyNodeChange: n.notifyNodeChange,
@ -423,6 +444,7 @@ func (n *Node) setControlSocket(conn *grpc.ClientConn) {
n.conn.Close()
}
n.conn = conn
n.connBroker.SetLocalConn(conn)
n.connCond.Broadcast()
n.Unlock()
}
@ -447,15 +469,21 @@ func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn
defer close(done)
defer n.RUnlock()
for {
if ctx.Err() != nil {
select {
case <-ctx.Done():
return
default:
}
if conn == n.conn {
n.connCond.Wait()
continue
}
conn = n.conn
c <- conn
select {
case c <- conn:
case <-ctx.Done():
return
}
}
}()
return c
@ -532,7 +560,7 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro
}
log.G(ctx).Debug("generated CA key and certificate")
} else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.remotes)
rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker)
if err != nil {
return nil, err
}
@ -559,7 +587,7 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro
securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{
Token: n.config.JoinToken,
Availability: n.config.Availability,
Remotes: n.remotes,
ConnBroker: n.connBroker,
})
if err != nil {
@ -687,22 +715,6 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
go n.initManagerConnection(connCtx, ready)
// this happens only on initial start
if ready != nil {
go func(ready chan struct{}) {
select {
case <-ready:
addr, err := n.RemoteAPIAddr()
if err != nil {
log.G(ctx).WithError(err).Errorf("get remote api addr")
} else {
n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
}
case <-connCtx.Done():
}
}(ready)
}
// wait for manager stop or for role change
select {
case <-done: