mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Revendor swarmkit to 2e956c4
Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
1b2786c2c2
commit
b1f5320dd6
16 changed files with 295 additions and 148 deletions
|
@ -102,7 +102,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
|
|||
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
|
||||
|
||||
# cluster
|
||||
github.com/docker/swarmkit 9e4bd71a1690cd27400714fcd98c329b752b5c4c
|
||||
github.com/docker/swarmkit 2e956c40c02ad527c90ec85bdae25a0acac1bd87
|
||||
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
||||
github.com/gogo/protobuf v0.3
|
||||
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
||||
|
|
49
vendor/github.com/docker/swarmkit/agent/agent.go
generated
vendored
49
vendor/github.com/docker/swarmkit/agent/agent.go
generated
vendored
|
@ -37,6 +37,8 @@ type Agent struct {
|
|||
started chan struct{}
|
||||
startOnce sync.Once // start only once
|
||||
ready chan struct{}
|
||||
leaving chan struct{}
|
||||
leaveOnce sync.Once
|
||||
stopped chan struct{} // requests shutdown
|
||||
stopOnce sync.Once // only allow stop to be called once
|
||||
closed chan struct{} // only closed in run
|
||||
|
@ -53,6 +55,7 @@ func New(config *Config) (*Agent, error) {
|
|||
config: config,
|
||||
sessionq: make(chan sessionOperation),
|
||||
started: make(chan struct{}),
|
||||
leaving: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
ready: make(chan struct{}),
|
||||
|
@ -78,6 +81,37 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Leave instructs the agent to leave the cluster. This method will shutdown
|
||||
// assignment processing and remove all assignments from the node.
|
||||
// Leave blocks until worker has finished closing all task managers or agent
|
||||
// is closed.
|
||||
func (a *Agent) Leave(ctx context.Context) error {
|
||||
select {
|
||||
case <-a.started:
|
||||
default:
|
||||
return errAgentNotStarted
|
||||
}
|
||||
|
||||
a.leaveOnce.Do(func() {
|
||||
close(a.leaving)
|
||||
})
|
||||
|
||||
// agent could be closed while Leave is in progress
|
||||
var err error
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
err = a.worker.Wait(ctx)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
return err
|
||||
case <-a.closed:
|
||||
return ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
// Stop shuts down the agent, blocking until full shutdown. If the agent is not
|
||||
// started, Stop will block until the agent has fully shutdown.
|
||||
func (a *Agent) Stop(ctx context.Context) error {
|
||||
|
@ -151,6 +185,7 @@ func (a *Agent) run(ctx context.Context) {
|
|||
registered = session.registered
|
||||
ready = a.ready // first session ready
|
||||
sessionq chan sessionOperation
|
||||
leaving = a.leaving
|
||||
subscriptions = map[string]context.CancelFunc{}
|
||||
)
|
||||
|
||||
|
@ -171,7 +206,21 @@ func (a *Agent) run(ctx context.Context) {
|
|||
select {
|
||||
case operation := <-sessionq:
|
||||
operation.response <- operation.fn(session)
|
||||
case <-leaving:
|
||||
leaving = nil
|
||||
|
||||
// TODO(stevvooe): Signal to the manager that the node is leaving.
|
||||
|
||||
// when leaving we remove all assignments.
|
||||
if err := a.worker.Assign(ctx, nil); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed removing all assignments")
|
||||
}
|
||||
case msg := <-session.assignments:
|
||||
// if we have left, accept no more assignments
|
||||
if leaving == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
switch msg.Type {
|
||||
case api.AssignmentsMessage_COMPLETE:
|
||||
// Need to assign secrets before tasks, because tasks might depend on new secrets
|
||||
|
|
2
vendor/github.com/docker/swarmkit/agent/reporter.go
generated
vendored
2
vendor/github.com/docker/swarmkit/agent/reporter.go
generated
vendored
|
@ -115,7 +115,7 @@ func (sr *statusReporter) run(ctx context.Context) {
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed reporting status to agent")
|
||||
log.G(ctx).WithError(err).Error("status reporter failed to report status to agent")
|
||||
|
||||
// place it back in the map, if not there, allowing us to pick
|
||||
// the value if a new one came in when we were sending the last
|
||||
|
|
47
vendor/github.com/docker/swarmkit/agent/task.go
generated
vendored
47
vendor/github.com/docker/swarmkit/agent/task.go
generated
vendored
|
@ -1,6 +1,7 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/swarmkit/agent/exec"
|
||||
|
@ -19,8 +20,10 @@ type taskManager struct {
|
|||
|
||||
updateq chan *api.Task
|
||||
|
||||
shutdown chan struct{}
|
||||
closed chan struct{}
|
||||
shutdown chan struct{}
|
||||
shutdownOnce sync.Once
|
||||
closed chan struct{}
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
|
||||
|
@ -48,20 +51,15 @@ func (tm *taskManager) Update(ctx context.Context, task *api.Task) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Close shuts down the task manager, blocking until it is stopped.
|
||||
// Close shuts down the task manager, blocking until it is closed.
|
||||
func (tm *taskManager) Close() error {
|
||||
select {
|
||||
case <-tm.closed:
|
||||
return nil
|
||||
case <-tm.shutdown:
|
||||
default:
|
||||
tm.shutdownOnce.Do(func() {
|
||||
close(tm.shutdown)
|
||||
}
|
||||
})
|
||||
|
||||
select {
|
||||
case <-tm.closed:
|
||||
return nil
|
||||
}
|
||||
<-tm.closed
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
|
||||
|
@ -106,7 +104,8 @@ func (tm *taskManager) run(ctx context.Context) {
|
|||
// always check for shutdown before running.
|
||||
select {
|
||||
case <-tm.shutdown:
|
||||
continue // ignore run request and handle shutdown
|
||||
shutdown = tm.shutdown // a little questionable
|
||||
continue // ignore run request and handle shutdown
|
||||
case <-tm.closed:
|
||||
continue
|
||||
default:
|
||||
|
@ -143,7 +142,7 @@ func (tm *taskManager) run(ctx context.Context) {
|
|||
}
|
||||
|
||||
if err := tm.reporter.UpdateTaskStatus(ctx, running.ID, status); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed reporting status to agent")
|
||||
log.G(ctx).WithError(err).Error("task manager failed to report status to agent")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -230,25 +229,19 @@ func (tm *taskManager) run(ctx context.Context) {
|
|||
continue // wait until operation actually exits.
|
||||
}
|
||||
|
||||
// TODO(stevvooe): This should be left for the repear.
|
||||
|
||||
// make an attempt at removing. this is best effort. any errors will be
|
||||
// retried by the reaper later.
|
||||
if err := tm.ctlr.Remove(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
|
||||
}
|
||||
|
||||
if err := tm.ctlr.Close(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("error closing controller")
|
||||
}
|
||||
// disable everything, and prepare for closing.
|
||||
statusq = nil
|
||||
errs = nil
|
||||
shutdown = nil
|
||||
close(tm.closed)
|
||||
tm.closeOnce.Do(func() {
|
||||
close(tm.closed)
|
||||
})
|
||||
case <-tm.closed:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
tm.closeOnce.Do(func() {
|
||||
close(tm.closed)
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
55
vendor/github.com/docker/swarmkit/agent/worker.go
generated
vendored
55
vendor/github.com/docker/swarmkit/agent/worker.go
generated
vendored
|
@ -40,6 +40,9 @@ type Worker interface {
|
|||
|
||||
// Subscribe to log messages matching the subscription.
|
||||
Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error
|
||||
|
||||
// Wait blocks until all task managers have closed
|
||||
Wait(ctx context.Context) error
|
||||
}
|
||||
|
||||
// statusReporterKey protects removal map from panic.
|
||||
|
@ -57,6 +60,9 @@ type worker struct {
|
|||
|
||||
taskManagers map[string]*taskManager
|
||||
mu sync.RWMutex
|
||||
|
||||
closed bool
|
||||
closers sync.WaitGroup // keeps track of active closers
|
||||
}
|
||||
|
||||
func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker {
|
||||
|
@ -106,6 +112,10 @@ func (w *worker) Init(ctx context.Context) error {
|
|||
|
||||
// Close performs worker cleanup when no longer needed.
|
||||
func (w *worker) Close() {
|
||||
w.mu.Lock()
|
||||
w.closed = true
|
||||
w.mu.Unlock()
|
||||
|
||||
w.taskevents.Close()
|
||||
}
|
||||
|
||||
|
@ -118,6 +128,10 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange
|
|||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.closed {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"len(assignments)": len(assignments),
|
||||
}).Debug("(*worker).Assign")
|
||||
|
@ -140,6 +154,10 @@ func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange
|
|||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.closed {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"len(assignments)": len(assignments),
|
||||
}).Debug("(*worker).Update")
|
||||
|
@ -222,10 +240,22 @@ func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.Assig
|
|||
}
|
||||
|
||||
closeManager := func(tm *taskManager) {
|
||||
// when a task is no longer assigned, we shutdown the task manager for
|
||||
// it and leave cleanup to the sweeper.
|
||||
if err := tm.Close(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("error closing task manager")
|
||||
go func(tm *taskManager) {
|
||||
defer w.closers.Done()
|
||||
// when a task is no longer assigned, we shutdown the task manager
|
||||
if err := tm.Close(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("error closing task manager")
|
||||
}
|
||||
}(tm)
|
||||
|
||||
// make an attempt at removing. this is best effort. any errors will be
|
||||
// retried by the reaper later.
|
||||
if err := tm.ctlr.Remove(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed")
|
||||
}
|
||||
|
||||
if err := tm.ctlr.Close(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("error closing controller")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -359,6 +389,8 @@ func (w *worker) taskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) (
|
|||
return nil, err
|
||||
}
|
||||
w.taskManagers[task.ID] = tm
|
||||
// keep track of active tasks
|
||||
w.closers.Add(1)
|
||||
return tm, nil
|
||||
}
|
||||
|
||||
|
@ -484,3 +516,18 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *worker) Wait(ctx context.Context) error {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
w.closers.Wait()
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
|
2
vendor/github.com/docker/swarmkit/ca/certificates.go
generated
vendored
2
vendor/github.com/docker/swarmkit/ca/certificates.go
generated
vendored
|
@ -69,7 +69,7 @@ const (
|
|||
MinNodeCertExpiration = 1 * time.Hour
|
||||
)
|
||||
|
||||
// A recoverableErr is an non-fatal error encountered signing a certificate,
|
||||
// A recoverableErr is a non-fatal error encountered signing a certificate,
|
||||
// which means that the certificate issuance may be retried at a later time.
|
||||
type recoverableErr struct {
|
||||
err error
|
||||
|
|
37
vendor/github.com/docker/swarmkit/ca/config.go
generated
vendored
37
vendor/github.com/docker/swarmkit/ca/config.go
generated
vendored
|
@ -459,13 +459,26 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo
|
|||
if err != nil {
|
||||
// We failed to read the expiration, let's stick with the starting default
|
||||
log.Errorf("failed to read the expiration of the TLS certificate in: %s", s.KeyReader().Target())
|
||||
updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")}
|
||||
|
||||
select {
|
||||
case updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")}:
|
||||
case <-ctx.Done():
|
||||
log.Info("shutting down certificate renewal routine")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// If we have an expired certificate, we let's stick with the starting default in
|
||||
// the hope that this is a temporary clock skew.
|
||||
if validUntil.Before(time.Now()) {
|
||||
log.WithError(err).Errorf("failed to create a new client TLS config")
|
||||
updates <- CertificateUpdate{Err: errors.New("TLS certificate is expired")}
|
||||
|
||||
select {
|
||||
case updates <- CertificateUpdate{Err: errors.New("TLS certificate is expired")}:
|
||||
case <-ctx.Done():
|
||||
log.Info("shutting down certificate renewal routine")
|
||||
return
|
||||
}
|
||||
|
||||
} else {
|
||||
// Random retry time between 50% and 80% of the total time to expiration
|
||||
retry = calculateRandomExpiry(validFrom, validUntil)
|
||||
|
@ -478,19 +491,27 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo
|
|||
|
||||
select {
|
||||
case <-time.After(retry):
|
||||
log.Infof("renewing certificate")
|
||||
log.Info("renewing certificate")
|
||||
case <-renew:
|
||||
log.Infof("forced certificate renewal")
|
||||
log.Info("forced certificate renewal")
|
||||
case <-ctx.Done():
|
||||
log.Infof("shuting down certificate renewal routine")
|
||||
log.Info("shutting down certificate renewal routine")
|
||||
return
|
||||
}
|
||||
|
||||
// ignore errors - it will just try again laster
|
||||
// ignore errors - it will just try again later
|
||||
var certUpdate CertificateUpdate
|
||||
if err := RenewTLSConfigNow(ctx, s, remotes); err != nil {
|
||||
updates <- CertificateUpdate{Err: err}
|
||||
certUpdate.Err = err
|
||||
} else {
|
||||
updates <- CertificateUpdate{Role: s.ClientTLSCreds.Role()}
|
||||
certUpdate.Role = s.ClientTLSCreds.Role()
|
||||
}
|
||||
|
||||
select {
|
||||
case updates <- certUpdate:
|
||||
case <-ctx.Done():
|
||||
log.Info("shutting down certificate renewal routine")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
2
vendor/github.com/docker/swarmkit/log/context.go
generated
vendored
2
vendor/github.com/docker/swarmkit/log/context.go
generated
vendored
|
@ -42,7 +42,7 @@ func GetLogger(ctx context.Context) *logrus.Entry {
|
|||
}
|
||||
|
||||
// WithModule adds the module to the context, appending it with a slash if a
|
||||
// module already exists. A module is just an roughly correlated defined by the
|
||||
// module already exists. A module is just a roughly correlated defined by the
|
||||
// call tree for a given context.
|
||||
//
|
||||
// As an example, we might have a "node" module already part of a context. If
|
||||
|
|
|
@ -73,7 +73,7 @@ func newPortSpace(protocol api.PortConfig_Protocol) (*portSpace, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// getPortConfigkey returns a map key for doing set operations with
|
||||
// getPortConfigKey returns a map key for doing set operations with
|
||||
// ports. The key consists of name, protocol and target port which
|
||||
// uniquely identifies a port within a single Endpoint.
|
||||
func getPortConfigKey(p *api.PortConfig) api.PortConfig {
|
||||
|
|
2
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
2
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
|
@ -173,7 +173,7 @@ func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan ev
|
|||
}))
|
||||
|
||||
// Grab current subscriptions.
|
||||
subscriptions := make([]*subscription, 0, len(lb.registeredSubscriptions))
|
||||
var subscriptions []*subscription
|
||||
for _, s := range lb.registeredSubscriptions {
|
||||
if s.Contains(nodeID) {
|
||||
subscriptions = append(subscriptions, s)
|
||||
|
|
2
vendor/github.com/docker/swarmkit/manager/manager.go
generated
vendored
2
vendor/github.com/docker/swarmkit/manager/manager.go
generated
vendored
|
@ -45,7 +45,7 @@ const (
|
|||
defaultTaskHistoryRetentionLimit = 5
|
||||
)
|
||||
|
||||
// RemoteAddrs provides an listening address and an optional advertise address
|
||||
// RemoteAddrs provides a listening address and an optional advertise address
|
||||
// for serving the remote API.
|
||||
type RemoteAddrs struct {
|
||||
// Address to bind
|
||||
|
|
2
vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/replicated.go
generated
vendored
2
vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/replicated.go
generated
vendored
|
@ -102,7 +102,7 @@ func (r *Orchestrator) Stop() {
|
|||
|
||||
func (r *Orchestrator) tick(ctx context.Context) {
|
||||
// tickTasks must be called first, so we respond to task-level changes
|
||||
// before performing service reconcillation.
|
||||
// before performing service reconciliation.
|
||||
r.tickTasks(ctx)
|
||||
r.tickServices(ctx)
|
||||
}
|
||||
|
|
2
vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go
generated
vendored
2
vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go
generated
vendored
|
@ -181,7 +181,7 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
|
|||
func (r *Orchestrator) addTasks(ctx context.Context, batch *store.Batch, service *api.Service, runningSlots map[uint64]orchestrator.Slot, deadSlots map[uint64]orchestrator.Slot, count int) {
|
||||
slot := uint64(0)
|
||||
for i := 0; i < count; i++ {
|
||||
// Find an slot number that is missing a running task
|
||||
// Find a slot number that is missing a running task
|
||||
for {
|
||||
slot++
|
||||
if _, ok := runningSlots[slot]; !ok {
|
||||
|
|
2
vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go
generated
vendored
2
vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go
generated
vendored
|
@ -15,7 +15,7 @@ import (
|
|||
|
||||
// This file provides task-level orchestration. It observes changes to task
|
||||
// and node state and kills/recreates tasks if necessary. This is distinct from
|
||||
// service-level reconcillation, which observes changes to services and creates
|
||||
// service-level reconciliation, which observes changes to services and creates
|
||||
// and/or kills tasks to match the service definition.
|
||||
|
||||
func invalidNode(n *api.Node) bool {
|
||||
|
|
26
vendor/github.com/docker/swarmkit/manager/state/raft/raft.go
generated
vendored
26
vendor/github.com/docker/swarmkit/manager/state/raft/raft.go
generated
vendored
|
@ -380,8 +380,8 @@ func (n *Node) Run(ctx context.Context) error {
|
|||
cancel()
|
||||
n.stop(ctx)
|
||||
if nodeRemoved {
|
||||
// Move WAL and snapshot out of the way, since
|
||||
// they are no longer usable.
|
||||
// Delete WAL and snapshots, since they are no longer
|
||||
// usable.
|
||||
if err := n.raftLogger.Clear(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to move wal after node removal")
|
||||
}
|
||||
|
@ -405,7 +405,7 @@ func (n *Node) Run(ctx context.Context) error {
|
|||
|
||||
// Save entries to storage
|
||||
if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to save entries to storage")
|
||||
return errors.Wrap(err, "failed to save entries to storage")
|
||||
}
|
||||
|
||||
if len(rd.Messages) != 0 {
|
||||
|
@ -710,11 +710,20 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
|
|||
defer n.membershipLock.Unlock()
|
||||
|
||||
if !n.IsMember() {
|
||||
return nil, ErrNoRaftMember
|
||||
return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error())
|
||||
}
|
||||
|
||||
if !n.isLeader() {
|
||||
return nil, ErrLostLeadership
|
||||
return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
|
||||
}
|
||||
|
||||
// A single manager must not be able to join the raft cluster twice. If
|
||||
// it did, that would cause the quorum to be computed incorrectly. This
|
||||
// could happen if the WAL was deleted from an active manager.
|
||||
for _, m := range n.cluster.Members() {
|
||||
if m.NodeID == nodeInfo.NodeID {
|
||||
return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists")
|
||||
}
|
||||
}
|
||||
|
||||
// Find a unique ID for the joining member.
|
||||
|
@ -734,7 +743,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons
|
|||
|
||||
requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid address %s in raft join request", remoteAddr)
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr)
|
||||
}
|
||||
|
||||
requestIP := net.ParseIP(requestHost)
|
||||
|
@ -990,6 +999,11 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
|
|||
defer n.stopMu.RUnlock()
|
||||
|
||||
if n.IsMember() {
|
||||
if msg.Message.To != n.Config.ID {
|
||||
n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To)
|
||||
return &api.ProcessRaftMessageResponse{}, nil
|
||||
}
|
||||
|
||||
if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
|
||||
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
|
||||
}
|
||||
|
|
209
vendor/github.com/docker/swarmkit/node/node.go
generated
vendored
209
vendor/github.com/docker/swarmkit/node/node.go
generated
vendored
|
@ -295,7 +295,7 @@ func (n *Node) run(ctx context.Context) (err error) {
|
|||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
|
||||
managerErr = n.superviseManager(ctx, securityConfig, managerReady) // store err and loop
|
||||
wg.Done()
|
||||
cancel()
|
||||
}()
|
||||
|
@ -330,6 +330,14 @@ func (n *Node) Stop(ctx context.Context) error {
|
|||
default:
|
||||
return errNodeNotStarted
|
||||
}
|
||||
// ask agent to clean up assignments
|
||||
n.Lock()
|
||||
if n.agent != nil {
|
||||
if err := n.agent.Leave(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
|
||||
}
|
||||
}
|
||||
n.Unlock()
|
||||
|
||||
n.stopOnce.Do(func() {
|
||||
close(n.stopped)
|
||||
|
@ -616,9 +624,7 @@ func (n *Node) waitRole(ctx context.Context, role string) error {
|
|||
n.roleCond.Wait()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -627,100 +633,117 @@ func (n *Node) waitRole(ctx context.Context, role string) error {
|
|||
}
|
||||
|
||||
func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
|
||||
remoteAddr, _ := n.remotes.Select(n.NodeID())
|
||||
m, err := manager.New(&manager.Config{
|
||||
ForceNewCluster: n.config.ForceNewCluster,
|
||||
RemoteAPI: manager.RemoteAddrs{
|
||||
ListenAddr: n.config.ListenRemoteAPI,
|
||||
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
|
||||
},
|
||||
ControlAPI: n.config.ListenControlAPI,
|
||||
SecurityConfig: securityConfig,
|
||||
ExternalCAs: n.config.ExternalCAs,
|
||||
JoinRaft: remoteAddr.Addr,
|
||||
StateDir: n.config.StateDir,
|
||||
HeartbeatTick: n.config.HeartbeatTick,
|
||||
ElectionTick: n.config.ElectionTick,
|
||||
AutoLockManagers: n.config.AutoLockManagers,
|
||||
UnlockKey: n.unlockKey,
|
||||
Availability: n.config.Availability,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done := make(chan struct{})
|
||||
var runErr error
|
||||
go func() {
|
||||
if err := m.Run(context.Background()); err != nil && err != raft.ErrMemberRemoved {
|
||||
runErr = err
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
workerRole := make(chan struct{})
|
||||
waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
|
||||
defer waitRoleCancel()
|
||||
go func() {
|
||||
n.waitRole(waitRoleCtx, ca.WorkerRole)
|
||||
close(workerRole)
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
n.Lock()
|
||||
n.manager = nil
|
||||
n.Unlock()
|
||||
m.Stop(ctx)
|
||||
<-done
|
||||
n.setControlSocket(nil)
|
||||
}()
|
||||
|
||||
n.Lock()
|
||||
n.manager = m
|
||||
n.Unlock()
|
||||
|
||||
connCtx, connCancel := context.WithCancel(ctx)
|
||||
defer connCancel()
|
||||
|
||||
go n.initManagerConnection(connCtx, ready)
|
||||
|
||||
// this happens only on initial start
|
||||
if ready != nil {
|
||||
go func(ready chan struct{}) {
|
||||
select {
|
||||
case <-ready:
|
||||
addr, err := n.RemoteAPIAddr()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("get remote api addr")
|
||||
} else {
|
||||
n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
|
||||
}
|
||||
case <-connCtx.Done():
|
||||
}
|
||||
}(ready)
|
||||
}
|
||||
|
||||
// wait for manager stop or for role change
|
||||
// if manager stopped before role change, wait for new role for 16 seconds,
|
||||
// then just restart manager, we might just miss that event.
|
||||
// we need to wait for role to prevent manager to start again with wrong
|
||||
// certificate
|
||||
select {
|
||||
case <-done:
|
||||
timer := time.NewTimer(16 * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
log.G(ctx).Warn("failed to get worker role after manager stop, restart manager")
|
||||
case <-workerRole:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return runErr
|
||||
case <-workerRole:
|
||||
log.G(ctx).Info("role changed to worker, wait for manager to stop")
|
||||
select {
|
||||
case <-done:
|
||||
return runErr
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
|
||||
for {
|
||||
if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteAddr, _ := n.remotes.Select(n.NodeID())
|
||||
m, err := manager.New(&manager.Config{
|
||||
ForceNewCluster: n.config.ForceNewCluster,
|
||||
RemoteAPI: manager.RemoteAddrs{
|
||||
ListenAddr: n.config.ListenRemoteAPI,
|
||||
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
|
||||
},
|
||||
ControlAPI: n.config.ListenControlAPI,
|
||||
SecurityConfig: securityConfig,
|
||||
ExternalCAs: n.config.ExternalCAs,
|
||||
JoinRaft: remoteAddr.Addr,
|
||||
StateDir: n.config.StateDir,
|
||||
HeartbeatTick: n.config.HeartbeatTick,
|
||||
ElectionTick: n.config.ElectionTick,
|
||||
AutoLockManagers: n.config.AutoLockManagers,
|
||||
UnlockKey: n.unlockKey,
|
||||
Availability: n.config.Availability,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done := make(chan struct{})
|
||||
var runErr error
|
||||
go func() {
|
||||
runErr = m.Run(context.Background())
|
||||
close(done)
|
||||
}()
|
||||
|
||||
n.Lock()
|
||||
n.manager = m
|
||||
n.Unlock()
|
||||
|
||||
connCtx, connCancel := context.WithCancel(ctx)
|
||||
go n.initManagerConnection(connCtx, ready)
|
||||
|
||||
// this happens only on initial start
|
||||
if ready != nil {
|
||||
go func(ready chan struct{}) {
|
||||
select {
|
||||
case <-ready:
|
||||
addr, err := n.RemoteAPIAddr()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("get remote api addr")
|
||||
} else {
|
||||
n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
|
||||
}
|
||||
case <-connCtx.Done():
|
||||
}
|
||||
}(ready)
|
||||
ready = nil
|
||||
}
|
||||
|
||||
roleChanged := make(chan error)
|
||||
waitCtx, waitCancel := context.WithCancel(ctx)
|
||||
go func() {
|
||||
err := n.waitRole(waitCtx, ca.WorkerRole)
|
||||
roleChanged <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Fail out if m.Run() returns error, otherwise wait for
|
||||
// role change.
|
||||
if runErr != nil && runErr != raft.ErrMemberRemoved {
|
||||
err = runErr
|
||||
} else {
|
||||
err = <-roleChanged
|
||||
}
|
||||
case err = <-roleChanged:
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
n.manager = nil
|
||||
n.Unlock()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
m.Stop(context.Background())
|
||||
<-done
|
||||
}
|
||||
connCancel()
|
||||
n.setControlSocket(nil)
|
||||
waitCancel()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
if err := n.runManager(ctx, securityConfig, ready); err != nil {
|
||||
return errors.Wrap(err, "manager stopped")
|
||||
}
|
||||
ready = nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue