Update swarmkit to 4dfc88ccce14ced6f0a6ea82d46dca004c6de0e2

Signed-off-by: Vincent Demeester <vincent@sbr.pm>
This commit is contained in:
Vincent Demeester 2016-11-02 19:43:27 +01:00
parent b06f25496d
commit cff3cdd35a
No known key found for this signature in database
GPG Key ID: 083CC6FD6EB699A3
17 changed files with 1380 additions and 542 deletions

View File

@ -147,7 +147,7 @@ clone git github.com/docker/containerd 52ef1ceb4b660c42cf4ea9013180a5663968d4c7
clone git github.com/tonistiigi/fifo 8c56881ce5e63e19e2dfc495c8af0fb90916467d
# cluster
clone git github.com/docker/swarmkit 72981f443024da2c57d54b915eae0477be6dada5
clone git github.com/docker/swarmkit 4dfc88ccce14ced6f0a6ea82d46dca004c6de0e2
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf v0.3
clone git github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View File

@ -1,3 +1,3 @@
package api
//go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf:../vendor/github.com/gogo/protobuf/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto
//go:generate protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto

File diff suppressed because it is too large Load Diff

View File

@ -117,6 +117,8 @@ message NodeStatus {
State state = 1;
string message = 2;
// Addr is the node's IP address as observed by the manager
string addr = 3;
}
message Image {
@ -200,7 +202,7 @@ message Mount {
int64 size_bytes = 1;
// Mode of the tmpfs upon creation
int32 mode = 2 [(gogoproto.customtype) = "os.FileMode", (gogoproto.nullable) = false];
uint32 mode = 2 [(gogoproto.customtype) = "os.FileMode", (gogoproto.nullable) = false];
// TODO(stevvooe): There are several more tmpfs flags, specified in the
// daemon, that are accepted. Only the most basic are added for now.
@ -781,6 +783,7 @@ message ManagerStatus {
RaftMemberStatus.Reachability reachability = 4;
}
// SecretReference is the linkage between a service and a secret that it uses.
message SecretReference {
// SecretID represents the ID of the specific Secret that we're
@ -788,26 +791,29 @@ message SecretReference {
// any information about the secret contents.
string secret_id = 1 [(gogoproto.customname) = "SecretID"];
// Mode specifies how this secret should be exposed to the task.
enum Mode {
// SYSTEM means that it is not exposed inside to a task at all, but
// only available via direct access, usually at the agent-level
SYSTEM = 0;
// FILE means that it will be exposed to the task as a file
FILE = 1;
// ENV means that it will be exposed to the task as an environment variable
ENV = 2;
}
// Mode is the way the secret should be presented.
Mode mode = 2;
// Target is the name by which the image accesses the secret.
string target = 3;
// SecretName is the name of the secret that this references, but this is just provided for
// lookup/display purposes. The secret in the reference will be identified by its ID.
string secret_name = 4;
string secret_name = 2;
// FileTarget represents a specific target that is backed by a file
message FileTarget {
// Name represents the final filename in the filesystem
string name = 1;
// UID represents the file UID
string uid = 2 [(gogoproto.customname) = "UID"];
// GID represents the file GID
string gid = 3 [(gogoproto.customname) = "GID"];
// Mode represents the FileMode of the file
uint32 mode = 4 [(gogoproto.customtype) = "os.FileMode", (gogoproto.nullable) = false];
}
// Target specifies how this secret should be exposed to the task.
oneof target {
FileTarget file = 3;
}
}
// BlacklistedCertificate is a record for a blacklisted certificate. It does not
@ -840,3 +846,14 @@ message HealthConfig {
// container as unhealthy. Zero means inherit.
int32 retries = 4;
}
message MaybeEncryptedRecord {
enum Algorithm {
NONE = 0 [(gogoproto.enumvalue_customname) = "NotEncrypted"];
SECRETBOX_SALSA20_POLY1305 = 1 [(gogoproto.enumvalue_customname) = "NACLSecretboxSalsa20Poly1305"];
}
Algorithm algorithm = 1;
bytes data = 2;
bytes nonce = 3;
}

View File

@ -629,6 +629,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: token}
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest)
if err != nil {
r.Observe(peer, -remotes.DefaultObservationWeight)
return nil, err
}

View File

@ -8,7 +8,7 @@ import (
"net/http"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/Sirupsen/logrus"
"github.com/cloudflare/cfssl/api"
"github.com/cloudflare/cfssl/signer"
"github.com/pkg/errors"
@ -90,7 +90,7 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
return eca.rootCA.AppendFirstRootPEM(cert)
}
log.Debugf("unable to proxy certificate signing request to %s: %s", url, err)
logrus.Debugf("unable to proxy certificate signing request to %s: %s", url, err)
}
return nil, err
@ -114,7 +114,7 @@ func makeExternalSignRequest(client *http.Client, url string, csrJSON []byte) (c
var apiResponse api.Response
if err := json.Unmarshal(body, &apiResponse); err != nil {
log.Debugf("unable to JSON-parse CFSSL API response body: %s", string(body))
logrus.Debugf("unable to JSON-parse CFSSL API response body: %s", string(body))
return nil, recoverableErr{err: errors.Wrap(err, "unable to parse JSON response")}
}

View File

@ -73,6 +73,17 @@ func newPortSpace(protocol api.PortConfig_Protocol) (*portSpace, error) {
}, nil
}
// getPortConfigkey returns a map key for doing set operations with
// ports. The key consists of name, protocol and target port which
// uniquely identifies a port within a single Endpoint.
func getPortConfigKey(p *api.PortConfig) api.PortConfig {
return api.PortConfig{
Name: p.Name,
Protocol: p.Protocol,
TargetPort: p.TargetPort,
}
}
func reconcilePortConfigs(s *api.Service) []*api.PortConfig {
// If runtime state hasn't been created or if port config has
// changed from port state return the port config from Spec.
@ -80,15 +91,31 @@ func reconcilePortConfigs(s *api.Service) []*api.PortConfig {
return s.Spec.Endpoint.Ports
}
allocatedPorts := make(map[api.PortConfig]*api.PortConfig)
for _, portState := range s.Endpoint.Ports {
if portState.PublishMode != api.PublishModeIngress {
continue
}
allocatedPorts[getPortConfigKey(portState)] = portState
}
var portConfigs []*api.PortConfig
for i, portConfig := range s.Spec.Endpoint.Ports {
portState := s.Endpoint.Ports[i]
for _, portConfig := range s.Spec.Endpoint.Ports {
// If the PublishMode is not Ingress simply pick up
// the port config.
if portConfig.PublishMode != api.PublishModeIngress {
portConfigs = append(portConfigs, portConfig)
continue
}
portState, ok := allocatedPorts[getPortConfigKey(portConfig)]
// If the portConfig is exactly the same as portState
// except if SwarmPort is not user-define then prefer
// portState to ensure sticky allocation of the same
// port that was allocated before.
if portConfig.Name == portState.Name &&
if ok && portConfig.Name == portState.Name &&
portConfig.TargetPort == portState.TargetPort &&
portConfig.Protocol == portState.Protocol &&
portConfig.PublishedPort == 0 {
@ -186,21 +213,26 @@ func (pa *portAllocator) isPortsAllocated(s *api.Service) bool {
return false
}
for i, portConfig := range s.Spec.Endpoint.Ports {
allocatedPorts := make(map[api.PortConfig]*api.PortConfig)
for _, portState := range s.Endpoint.Ports {
if portState.PublishMode != api.PublishModeIngress {
continue
}
allocatedPorts[getPortConfigKey(portState)] = portState
}
for _, portConfig := range s.Spec.Endpoint.Ports {
// Ignore ports which are not PublishModeIngress
if portConfig.PublishMode != api.PublishModeIngress {
continue
}
// The port configuration slice and port state slice
// are expected to be in the same order.
portState := s.Endpoint.Ports[i]
portState, ok := allocatedPorts[getPortConfigKey(portConfig)]
// If name, port, protocol values don't match then we
// are not allocated.
if portConfig.Name != portState.Name ||
portConfig.TargetPort != portState.TargetPort ||
portConfig.Protocol != portState.Protocol {
if !ok {
return false
}

View File

@ -3,9 +3,11 @@ package controlapi
import (
"regexp"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -85,6 +87,12 @@ func (s *Server) UpdateSecret(ctx context.Context, request *api.UpdateSecretRequ
return nil, grpc.Errorf(codes.NotFound, "secret %s not found", request.SecretID)
}
log.G(ctx).WithFields(logrus.Fields{
"secret.ID": request.SecretID,
"secret.Name": request.Spec.Annotations.Name,
"method": "UpdateSecret",
}).Debugf("secret updated")
// WARN: we should never return the actual secret data here. We need to redact the private fields first.
secret.Spec.Data = nil
return &api.UpdateSecretResponse{
@ -169,6 +177,11 @@ func (s *Server) CreateSecret(ctx context.Context, request *api.CreateSecretRequ
return nil, grpc.Errorf(codes.AlreadyExists, "secret %s already exists", request.Spec.Annotations.Name)
case nil:
secret.Spec.Data = nil // clean the actual secret data so it's never returned
log.G(ctx).WithFields(logrus.Fields{
"secret.Name": request.Spec.Annotations.Name,
"method": "CreateSecret",
}).Debugf("secret created")
return &api.CreateSecretResponse{Secret: secret}, nil
default:
return nil, err
@ -191,6 +204,11 @@ func (s *Server) RemoveSecret(ctx context.Context, request *api.RemoveSecretRequ
case store.ErrNotExist:
return nil, grpc.Errorf(codes.NotFound, "secret %s not found", request.SecretID)
case nil:
log.G(ctx).WithFields(logrus.Fields{
"secret.ID": request.SecretID,
"method": "RemoveSecret",
}).Debugf("secret removed")
return &api.RemoveSecretResponse{}, nil
default:
return nil, err

View File

@ -2,6 +2,7 @@ package controlapi
import (
"errors"
"path/filepath"
"reflect"
"regexp"
"strconv"
@ -304,20 +305,42 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
return nil
}
// checkSecretConflicts finds if the passed in spec has secrets with conflicting targets.
func (s *Server) checkSecretConflicts(spec *api.ServiceSpec) error {
// checkSecretValidity finds if the secrets passed in spec have any conflicting targets.
func (s *Server) checkSecretValidity(spec *api.ServiceSpec) error {
container := spec.Task.GetContainer()
if container == nil {
return nil
}
// Keep a map to track all the targets that will be exposed
// The string returned is only used for logging. It could as well be struct{}{}
existingTargets := make(map[string]string)
for _, secretRef := range container.Secrets {
if prevSecretName, ok := existingTargets[secretRef.Target]; ok {
return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, secretRef.Target)
// SecretID and SecretName are mandatory, we have invalid references without them
if secretRef.SecretID == "" || secretRef.SecretName == "" {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference")
}
existingTargets[secretRef.Target] = secretRef.SecretName
// Every secret referece requires a Target
if secretRef.GetTarget() == nil {
return grpc.Errorf(codes.InvalidArgument, "malformed secret reference, no target provided")
}
// If this is a file target, we will ensure filename uniqueness
if secretRef.GetFile() != nil {
fileName := secretRef.GetFile().Name
// Validate the file name
if fileName == "" || fileName != filepath.Base(filepath.Clean(fileName)) {
return grpc.Errorf(codes.InvalidArgument, "malformed file secret reference, invalid target file name provided")
}
// If this target is already in use, we have conflicting targets
if prevSecretName, ok := existingTargets[fileName]; ok {
return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, fileName)
}
existingTargets[fileName] = secretRef.SecretName
}
}
return nil
@ -341,7 +364,7 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
return nil, err
}
if err := s.checkSecretConflicts(request.Spec); err != nil {
if err := s.checkSecretValidity(request.Spec); err != nil {
return nil, err
}
@ -412,7 +435,7 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
}
if err := s.checkSecretConflicts(request.Spec); err != nil {
if err := s.checkSecretValidity(request.Spec); err != nil {
return nil, err
}

View File

@ -2,6 +2,7 @@ package dispatcher
import (
"fmt"
"net"
"strconv"
"sync"
"time"
@ -313,17 +314,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
if node.Status.State == api.NodeStatus_DOWN {
return nil
}
node.Status = api.NodeStatus{
State: api.NodeStatus_UNKNOWN,
Message: `Node moved to "unknown" state due to leadership change in cluster`,
}
node.Status.State = api.NodeStatus_UNKNOWN
node.Status.Message = `Node moved to "unknown" state due to leadership change in cluster`
nodeID := node.ID
expireFunc := func() {
log := log.WithField("node", nodeID)
nodeStatus := api.NodeStatus{State: api.NodeStatus_DOWN, Message: `heartbeat failure for node in "unknown" state`}
log.Debugf("heartbeat expiration for unknown node")
if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
log.WithError(err).Errorf(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
}
}
@ -356,12 +356,18 @@ func (d *Dispatcher) isRunning() bool {
return true
}
// updateNode updates the description of a node and sets status to READY
// markNodeReady updates the description of a node, updates its address, and sets status to READY
// this is used during registration when a new node description is provided
// and during node updates when the node description changes
func (d *Dispatcher) updateNode(nodeID string, description *api.NodeDescription) error {
func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescription, addr string) error {
d.nodeUpdatesLock.Lock()
d.nodeUpdates[nodeID] = nodeUpdate{status: &api.NodeStatus{State: api.NodeStatus_READY}, description: description}
d.nodeUpdates[nodeID] = nodeUpdate{
status: &api.NodeStatus{
State: api.NodeStatus_READY,
Addr: addr,
},
description: description,
}
numUpdates := len(d.nodeUpdates)
d.nodeUpdatesLock.Unlock()
@ -387,6 +393,19 @@ func (d *Dispatcher) updateNode(nodeID string, description *api.NodeDescription)
return nil
}
// gets the node IP from the context of a grpc call
func nodeIPFromContext(ctx context.Context) (string, error) {
nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return "", err
}
addr, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
if err != nil {
return "", errors.Wrap(err, "unable to get ip from addr:port")
}
return addr, nil
}
// register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
// prevent register until we're ready to accept it
@ -407,14 +426,18 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
return "", ErrNodeNotFound
}
if err := d.updateNode(nodeID, description); err != nil {
addr, err := nodeIPFromContext(ctx)
if err != nil {
log.G(ctx).Debugf(err.Error())
}
if err := d.markNodeReady(nodeID, description, addr); err != nil {
return "", err
}
expireFunc := func() {
nodeStatus := api.NodeStatus{State: api.NodeStatus_DOWN, Message: "heartbeat failure"}
log.G(ctx).Debugf("heartbeat expiration")
if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
}
}
@ -575,7 +598,11 @@ func (d *Dispatcher) processUpdates() {
}
if nodeUpdate.status != nil {
node.Status = *nodeUpdate.status
node.Status.State = nodeUpdate.status.State
node.Status.Message = nodeUpdate.status.Message
if nodeUpdate.status.Addr != "" {
node.Status.Addr = nodeUpdate.status.Addr
}
}
if nodeUpdate.description != nil {
node.Description = nodeUpdate.description
@ -782,13 +809,18 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
}
var newSecrets []*api.Secret
for _, secretRef := range container.Secrets {
// Empty ID prefix will return all secrets. Bail if there is no SecretID
if secretRef.SecretID == "" {
log.Debugf("invalid secret reference")
continue
}
secretID := secretRef.SecretID
log := log.WithFields(logrus.Fields{
"secret.id": secretID,
"secret.name": secretRef.SecretName,
})
if tasksUsingSecret[secretID] == nil {
if len(tasksUsingSecret[secretID]) == 0 {
tasksUsingSecret[secretID] = make(map[string]struct{})
secrets, err := store.FindSecrets(readTx, store.ByIDPrefix(secretID))
@ -1046,18 +1078,24 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
}
}
for id, secret := range updateSecrets {
if _, ok := removeSecrets[id]; !ok {
secretChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Secret{
Secret: secret,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
update.Changes = append(update.Changes, secretChange)
// If, due to multiple updates, this secret is no longer in use,
// don't send it down.
if len(tasksUsingSecret[id]) == 0 {
// delete this secret for the secrets to be updated
// so that deleteSecrets knows the current list
delete(updateSecrets, id)
continue
}
secretChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Secret{
Secret: secret,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
update.Changes = append(update.Changes, secretChange)
}
for id := range removeTasks {
taskChange := &api.AssignmentChange{
@ -1072,6 +1110,12 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
update.Changes = append(update.Changes, taskChange)
}
for id := range removeSecrets {
// If this secret is also being sent on the updated set
// don't also add it to the removed set
if _, ok := updateSecrets[id]; ok {
continue
}
secretChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Secret{
@ -1091,13 +1135,22 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
}
}
func (d *Dispatcher) nodeRemove(id string, status api.NodeStatus) error {
// markNodeNotReady sets the node state to some state other than READY
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
if err := d.isRunningLocked(); err != nil {
return err
}
status := &api.NodeStatus{
State: state,
Message: message,
}
d.nodeUpdatesLock.Lock()
d.nodeUpdates[id] = nodeUpdate{status: status.Copy(), description: d.nodeUpdates[id].description}
// pluck the description out of nodeUpdates. this protects against a case
// where a node is marked ready and a description is added, but then the
// node is immediately marked not ready. this preserves that description
d.nodeUpdates[id] = nodeUpdate{status: status, description: d.nodeUpdates[id].description}
numUpdates := len(d.nodeUpdates)
d.nodeUpdatesLock.Unlock()
@ -1159,14 +1212,19 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
var sessionID string
if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
// register the node.
sessionID, err = d.register(stream.Context(), nodeID, r.Description)
sessionID, err = d.register(ctx, nodeID, r.Description)
if err != nil {
return err
}
} else {
sessionID = r.SessionID
// get the node IP addr
addr, err := nodeIPFromContext(stream.Context())
if err != nil {
log.G(ctx).Debugf(err.Error())
}
// update the node description
if err := d.updateNode(nodeID, r.Description); err != nil {
if err := d.markNodeReady(nodeID, r.Description, addr); err != nil {
return err
}
}
@ -1226,8 +1284,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
}
}
nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
log.WithError(err).Error("failed to remove node")
}
// still return an abort if the transport closure was ineffective.

View File

@ -7,6 +7,7 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"sync"
"syscall"
"time"
@ -30,6 +31,7 @@ import (
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/xnet"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -40,6 +42,16 @@ const (
defaultTaskHistoryRetentionLimit = 5
)
// RemoteAddrs provides an listening address and an optional advertise address
// for serving the remote API.
type RemoteAddrs struct {
// Address to bind
ListenAddr string
// Address to advertise to remote nodes (optional).
AdvertiseAddr string
}
// Config is used to tune the Manager.
type Config struct {
SecurityConfig *ca.SecurityConfig
@ -48,13 +60,12 @@ type Config struct {
// will make certificate signing requests for node certificates.
ExternalCAs []*api.ExternalCA
ProtoAddr map[string]string
// ProtoListener will be used for grpc serving if it's not nil,
// ProtoAddr fields will be used to create listeners otherwise.
ProtoListener map[string]net.Listener
// ControlAPI is an address for serving the control API.
ControlAPI string
// AdvertiseAddr is a map of addresses to advertise, by protocol.
AdvertiseAddr string
// RemoteAPI is a listening address for serving the remote API, and
// an optional advertise address.
RemoteAPI RemoteAddrs
// JoinRaft is an optional address of a node in an existing raft
// cluster to join.
@ -81,7 +92,7 @@ type Config struct {
// subsystems.
type Manager struct {
config *Config
listeners map[string]net.Listener
listeners []net.Listener
caserver *ca.Server
dispatcher *dispatcher.Dispatcher
@ -96,10 +107,11 @@ type Manager struct {
localserver *grpc.Server
raftNode *raft.Node
mu sync.Mutex
cancelFunc context.CancelFunc
mu sync.Mutex
started chan struct{}
stopped chan struct{}
stopped bool
}
type closeOnceListener struct {
@ -119,41 +131,28 @@ func (l *closeOnceListener) Close() error {
func New(config *Config) (*Manager, error) {
dispatcherConfig := dispatcher.DefaultConfig()
if config.ProtoAddr == nil {
config.ProtoAddr = make(map[string]string)
}
if config.ProtoListener != nil && config.ProtoListener["tcp"] != nil {
config.ProtoAddr["tcp"] = config.ProtoListener["tcp"].Addr().String()
}
// If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
tcpAddr := config.AdvertiseAddr
advertiseAddr := config.RemoteAPI.AdvertiseAddr
var tcpAddrPort string
if tcpAddr == "" {
var advertiseAddrPort string
if advertiseAddr == "" {
// Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
var err error
_, tcpAddrPort, err = net.SplitHostPort(config.ProtoAddr["tcp"])
_, advertiseAddrPort, err = net.SplitHostPort(config.RemoteAPI.ListenAddr)
if err != nil {
return nil, fmt.Errorf("missing or invalid listen address %s", config.ProtoAddr["tcp"])
return nil, fmt.Errorf("missing or invalid listen address %s", config.RemoteAPI.ListenAddr)
}
// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
tcpAddr = net.JoinHostPort("0.0.0.0", tcpAddrPort)
advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
}
err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to create socket directory")
}
err = os.MkdirAll(config.StateDir, 0700)
err := os.MkdirAll(config.StateDir, 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to create state directory")
}
@ -164,41 +163,49 @@ func New(config *Config) (*Manager, error) {
return nil, errors.Wrap(err, "failed to create raft state directory")
}
var listeners map[string]net.Listener
if len(config.ProtoListener) > 0 {
listeners = config.ProtoListener
} else {
listeners = make(map[string]net.Listener)
var listeners []net.Listener
for proto, addr := range config.ProtoAddr {
l, err := net.Listen(proto, addr)
// A unix socket may fail to bind if the file already
// exists. Try replacing the file.
unwrappedErr := err
if op, ok := unwrappedErr.(*net.OpError); ok {
unwrappedErr = op.Err
}
if sys, ok := unwrappedErr.(*os.SyscallError); ok {
unwrappedErr = sys.Err
}
if proto == "unix" && unwrappedErr == syscall.EADDRINUSE {
os.Remove(addr)
l, err = net.Listen(proto, addr)
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
if proto == "tcp" && tcpAddrPort == "0" {
// in case of 0 port
tcpAddr = l.Addr().String()
}
listeners[proto] = l
// don't create a socket directory if we're on windows. we used named pipe
if runtime.GOOS != "windows" {
err := os.MkdirAll(filepath.Dir(config.ControlAPI), 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to create socket directory")
}
}
l, err := xnet.ListenLocal(config.ControlAPI)
// A unix socket may fail to bind if the file already
// exists. Try replacing the file.
if runtime.GOOS != "windows" {
unwrappedErr := err
if op, ok := unwrappedErr.(*net.OpError); ok {
unwrappedErr = op.Err
}
if sys, ok := unwrappedErr.(*os.SyscallError); ok {
unwrappedErr = sys.Err
}
if unwrappedErr == syscall.EADDRINUSE {
os.Remove(config.ControlAPI)
l, err = xnet.ListenLocal(config.ControlAPI)
}
}
if err != nil {
return nil, errors.Wrap(err, "failed to listen on control API address")
}
listeners = append(listeners, l)
l, err = net.Listen("tcp", config.RemoteAPI.ListenAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to listen on remote API address")
}
if advertiseAddrPort == "0" {
advertiseAddr = l.Addr().String()
config.RemoteAPI.ListenAddr = advertiseAddr
}
listeners = append(listeners, l)
raftCfg := raft.DefaultNodeConfig()
if config.ElectionTick > 0 {
@ -210,7 +217,7 @@ func New(config *Config) (*Manager, error) {
newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
Addr: tcpAddr,
Addr: advertiseAddr,
JoinAddr: config.JoinRaft,
Config: raftCfg,
StateDir: raftStateDir,
@ -231,18 +238,14 @@ func New(config *Config) (*Manager, error) {
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
started: make(chan struct{}),
stopped: make(chan struct{}),
}
return m, nil
}
// Addr returns tcp address on which remote api listens.
func (m *Manager) Addr() net.Addr {
if l, ok := m.listeners["tcp"]; ok {
return l.Addr()
}
return nil
func (m *Manager) Addr() string {
return m.config.RemoteAPI.ListenAddr
}
// Run starts all manager sub-systems and the gRPC server at the configured
@ -252,14 +255,7 @@ func (m *Manager) Run(parent context.Context) error {
ctx, ctxCancel := context.WithCancel(parent)
defer ctxCancel()
// Harakiri.
go func() {
select {
case <-ctx.Done():
case <-m.stopped:
ctxCancel()
}
}()
m.cancelFunc = ctxCancel
leadershipCh, cancel := m.raftNode.SubscribeLeadership()
defer cancel()
@ -336,8 +332,8 @@ func (m *Manager) Run(parent context.Context) error {
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
errServe := make(chan error, len(m.listeners))
for proto, l := range m.listeners {
go m.serveListener(ctx, errServe, proto, l)
for _, lis := range m.listeners {
go m.serveListener(ctx, errServe, lis)
}
defer func() {
@ -383,24 +379,14 @@ func (m *Manager) Run(parent context.Context) error {
// wait for an error in serving.
err = <-errServe
select {
// check to see if stopped was posted to. if so, we're in the process of
// stopping, or done and that's why we got the error. if stopping is
// deliberate, stopped will ALWAYS be closed before the error is trigger,
// so this path will ALWAYS be taken if the stop was deliberate
case <-m.stopped:
// shutdown was requested, do not return an error
// but first, we wait to acquire a mutex to guarantee that stopping is
// finished. as long as we acquire the mutex BEFORE we return, we know
// that stopping is stopped.
m.mu.Lock()
m.mu.Lock()
if m.stopped {
m.mu.Unlock()
return nil
// otherwise, we'll get something from errServe, which indicates that an
// error in serving has actually occurred and this isn't a planned shutdown
default:
return err
}
m.mu.Unlock()
m.Stop(ctx)
return err
}
const stopTimeout = 8 * time.Second
@ -417,13 +403,10 @@ func (m *Manager) Stop(ctx context.Context) {
// from returning before we've finished stopping.
m.mu.Lock()
defer m.mu.Unlock()
select {
// check to see that we've already stopped
case <-m.stopped:
if m.stopped {
return
default:
// do nothing, we're stopping for the first time
}
m.stopped = true
srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
go func() {
@ -460,11 +443,7 @@ func (m *Manager) Stop(ctx context.Context) {
m.keyManager.Stop()
}
// once we start stopping, send a signal that we're doing so. this tells
// Run that we've started stopping, when it gets the error from errServe
// it also prevents the loop from processing any more stuff.
close(m.stopped)
m.cancelFunc()
<-m.raftNode.Done()
timer := time.AfterFunc(stopTimeout, func() {
@ -582,11 +561,9 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan
select {
case leadershipEvent := <-leadershipCh:
m.mu.Lock()
select {
case <-m.stopped:
if m.stopped {
m.mu.Unlock()
return
default:
}
newState := leadershipEvent.(raft.LeadershipState)
@ -596,8 +573,6 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan
m.becomeFollower()
}
m.mu.Unlock()
case <-m.stopped:
return
case <-ctx.Done():
return
}
@ -605,20 +580,21 @@ func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan
}
// serveListener serves a listener for local and non local connections.
func (m *Manager) serveListener(ctx context.Context, errServe chan error, proto string, lis net.Listener) {
func (m *Manager) serveListener(ctx context.Context, errServe chan error, l net.Listener) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"proto": lis.Addr().Network(),
"addr": lis.Addr().String()}))
if proto == "unix" {
"proto": l.Addr().Network(),
"addr": l.Addr().String(),
}))
if _, ok := l.(*net.TCPListener); !ok {
log.G(ctx).Info("Listening for local connections")
// we need to disallow double closes because UnixListener.Close
// can delete unix-socket file of newer listener. grpc calls
// Close twice indeed: in Serve and in Stop.
errServe <- m.localserver.Serve(&closeOnceListener{Listener: lis})
errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
} else {
log.G(ctx).Info("Listening for connections")
errServe <- m.server.Serve(lis)
errServe <- m.server.Serve(l)
}
}

View File

@ -205,7 +205,7 @@ func (g *Orchestrator) removeTasksFromNode(ctx context.Context, node *api.Node)
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed")
log.G(ctx).WithError(err).Errorf("global orchestrator: removeTasksFromNode failed batching tasks")
}
}

View File

@ -1689,7 +1689,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
log.L.Panic("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
log.L.Panic("ConfChange Type should be either ConfChangeAddNode, or ConfChangeRemoveNode, or ConfChangeUpdateNode!")
}
}
var sids []uint64

View File

@ -22,6 +22,7 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/xnet"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -116,10 +117,10 @@ func (n *Node) RemoteAPIAddr() (string, error) {
return "", errors.Errorf("node is not manager")
}
addr := n.manager.Addr()
if addr == nil {
if addr == "" {
return "", errors.Errorf("manager addr is not set")
}
return addr.String(), nil
return addr, nil
}
// New returns new Node instance.
@ -554,12 +555,10 @@ func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{})
opts := []grpc.DialOption{}
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
// Using listen address instead of advertised address because this is a
// local connection.
addr := n.config.ListenControlAPI
opts = append(opts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
return xnet.DialTimeoutLocal(addr, timeout)
}))
conn, err := grpc.Dial(addr, opts...)
if err != nil {
@ -623,11 +622,11 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
remoteAddr, _ := n.remotes.Select(n.NodeID())
m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
ProtoAddr: map[string]string{
"tcp": n.config.ListenRemoteAPI,
"unix": n.config.ListenControlAPI,
RemoteAPI: manager.RemoteAddrs{
ListenAddr: n.config.ListenRemoteAPI,
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
},
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
ControlAPI: n.config.ListenControlAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,

View File

@ -1,3 +1,3 @@
package plugin
//go:generate protoc -I.:../../vendor/github.com/gogo/protobuf/protobuf --gogoswarm_out=import_path=github.com/docker/swarmkit/protobuf/plugin,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor:. plugin.proto
//go:generate protoc -I.:/usr/local --gogoswarm_out=import_path=github.com/docker/swarmkit/protobuf/plugin,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor:. plugin.proto

View File

@ -0,0 +1,20 @@
// +build !windows
package xnet
import (
"net"
"time"
)
// ListenLocal opens a local socket for control communication
func ListenLocal(socket string) (net.Listener, error) {
// on unix it's just a unix socket
return net.Listen("unix", socket)
}
// DialTimeoutLocal is a DialTimeout function for local sockets
func DialTimeoutLocal(socket string, timeout time.Duration) (net.Conn, error) {
// on unix, we dial a unix socket
return net.DialTimeout("unix", socket, timeout)
}

View File

@ -0,0 +1,31 @@
// +build windows
package xnet
import (
"net"
"time"
"github.com/Microsoft/go-winio"
)
// ListenLocal opens a local socket for control communication
func ListenLocal(socket string) (net.Listener, error) {
// set up ACL for the named pipe
// allow Administrators and SYSTEM
sddl := "D:P(A;;GA;;;BA)(A;;GA;;;SY)"
c := winio.PipeConfig{
SecurityDescriptor: sddl,
MessageMode: true, // Use message mode so that CloseWrite() is supported
InputBufferSize: 65536, // Use 64KB buffers to improve performance
OutputBufferSize: 65536,
}
// on windows, our socket is actually a named pipe
return winio.ListenPipe(socket, &c)
}
// DialTimeoutLocal is a DialTimeout function for local sockets
func DialTimeoutLocal(socket string, timeout time.Duration) (net.Conn, error) {
// On windows, we dial a named pipe
return winio.DialPipe(socket, &timeout)
}