diff --git a/vendor.conf b/vendor.conf index b699dee7ce..e865fcad2c 100644 --- a/vendor.conf +++ b/vendor.conf @@ -102,7 +102,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit 9e4bd71a1690cd27400714fcd98c329b752b5c4c +github.com/docker/swarmkit 2e956c40c02ad527c90ec85bdae25a0acac1bd87 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/agent/agent.go b/vendor/github.com/docker/swarmkit/agent/agent.go index 915a2e5cee..434bdd75f9 100644 --- a/vendor/github.com/docker/swarmkit/agent/agent.go +++ b/vendor/github.com/docker/swarmkit/agent/agent.go @@ -37,6 +37,8 @@ type Agent struct { started chan struct{} startOnce sync.Once // start only once ready chan struct{} + leaving chan struct{} + leaveOnce sync.Once stopped chan struct{} // requests shutdown stopOnce sync.Once // only allow stop to be called once closed chan struct{} // only closed in run @@ -53,6 +55,7 @@ func New(config *Config) (*Agent, error) { config: config, sessionq: make(chan sessionOperation), started: make(chan struct{}), + leaving: make(chan struct{}), stopped: make(chan struct{}), closed: make(chan struct{}), ready: make(chan struct{}), @@ -78,6 +81,37 @@ func (a *Agent) Start(ctx context.Context) error { return err } +// Leave instructs the agent to leave the cluster. This method will shutdown +// assignment processing and remove all assignments from the node. +// Leave blocks until worker has finished closing all task managers or agent +// is closed. +func (a *Agent) Leave(ctx context.Context) error { + select { + case <-a.started: + default: + return errAgentNotStarted + } + + a.leaveOnce.Do(func() { + close(a.leaving) + }) + + // agent could be closed while Leave is in progress + var err error + ch := make(chan struct{}) + go func() { + err = a.worker.Wait(ctx) + close(ch) + }() + + select { + case <-ch: + return err + case <-a.closed: + return ErrClosed + } +} + // Stop shuts down the agent, blocking until full shutdown. If the agent is not // started, Stop will block until the agent has fully shutdown. func (a *Agent) Stop(ctx context.Context) error { @@ -151,6 +185,7 @@ func (a *Agent) run(ctx context.Context) { registered = session.registered ready = a.ready // first session ready sessionq chan sessionOperation + leaving = a.leaving subscriptions = map[string]context.CancelFunc{} ) @@ -171,7 +206,21 @@ func (a *Agent) run(ctx context.Context) { select { case operation := <-sessionq: operation.response <- operation.fn(session) + case <-leaving: + leaving = nil + + // TODO(stevvooe): Signal to the manager that the node is leaving. + + // when leaving we remove all assignments. + if err := a.worker.Assign(ctx, nil); err != nil { + log.G(ctx).WithError(err).Error("failed removing all assignments") + } case msg := <-session.assignments: + // if we have left, accept no more assignments + if leaving == nil { + continue + } + switch msg.Type { case api.AssignmentsMessage_COMPLETE: // Need to assign secrets before tasks, because tasks might depend on new secrets diff --git a/vendor/github.com/docker/swarmkit/agent/reporter.go b/vendor/github.com/docker/swarmkit/agent/reporter.go index eac4b3267a..73e6ab3fd9 100644 --- a/vendor/github.com/docker/swarmkit/agent/reporter.go +++ b/vendor/github.com/docker/swarmkit/agent/reporter.go @@ -115,7 +115,7 @@ func (sr *statusReporter) run(ctx context.Context) { } if err != nil { - log.G(ctx).WithError(err).Error("failed reporting status to agent") + log.G(ctx).WithError(err).Error("status reporter failed to report status to agent") // place it back in the map, if not there, allowing us to pick // the value if a new one came in when we were sending the last diff --git a/vendor/github.com/docker/swarmkit/agent/task.go b/vendor/github.com/docker/swarmkit/agent/task.go index 91f282b654..95fe93179b 100644 --- a/vendor/github.com/docker/swarmkit/agent/task.go +++ b/vendor/github.com/docker/swarmkit/agent/task.go @@ -1,6 +1,7 @@ package agent import ( + "sync" "time" "github.com/docker/swarmkit/agent/exec" @@ -19,8 +20,10 @@ type taskManager struct { updateq chan *api.Task - shutdown chan struct{} - closed chan struct{} + shutdown chan struct{} + shutdownOnce sync.Once + closed chan struct{} + closeOnce sync.Once } func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager { @@ -48,20 +51,15 @@ func (tm *taskManager) Update(ctx context.Context, task *api.Task) error { } } -// Close shuts down the task manager, blocking until it is stopped. +// Close shuts down the task manager, blocking until it is closed. func (tm *taskManager) Close() error { - select { - case <-tm.closed: - return nil - case <-tm.shutdown: - default: + tm.shutdownOnce.Do(func() { close(tm.shutdown) - } + }) - select { - case <-tm.closed: - return nil - } + <-tm.closed + + return nil } func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) { @@ -106,7 +104,8 @@ func (tm *taskManager) run(ctx context.Context) { // always check for shutdown before running. select { case <-tm.shutdown: - continue // ignore run request and handle shutdown + shutdown = tm.shutdown // a little questionable + continue // ignore run request and handle shutdown case <-tm.closed: continue default: @@ -143,7 +142,7 @@ func (tm *taskManager) run(ctx context.Context) { } if err := tm.reporter.UpdateTaskStatus(ctx, running.ID, status); err != nil { - log.G(ctx).WithError(err).Error("failed reporting status to agent") + log.G(ctx).WithError(err).Error("task manager failed to report status to agent") } } @@ -230,25 +229,19 @@ func (tm *taskManager) run(ctx context.Context) { continue // wait until operation actually exits. } - // TODO(stevvooe): This should be left for the repear. - - // make an attempt at removing. this is best effort. any errors will be - // retried by the reaper later. - if err := tm.ctlr.Remove(ctx); err != nil { - log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed") - } - - if err := tm.ctlr.Close(); err != nil { - log.G(ctx).WithError(err).Error("error closing controller") - } // disable everything, and prepare for closing. statusq = nil errs = nil shutdown = nil - close(tm.closed) + tm.closeOnce.Do(func() { + close(tm.closed) + }) case <-tm.closed: return case <-ctx.Done(): + tm.closeOnce.Do(func() { + close(tm.closed) + }) return } } diff --git a/vendor/github.com/docker/swarmkit/agent/worker.go b/vendor/github.com/docker/swarmkit/agent/worker.go index c3684f4961..43630fc96e 100644 --- a/vendor/github.com/docker/swarmkit/agent/worker.go +++ b/vendor/github.com/docker/swarmkit/agent/worker.go @@ -40,6 +40,9 @@ type Worker interface { // Subscribe to log messages matching the subscription. Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error + + // Wait blocks until all task managers have closed + Wait(ctx context.Context) error } // statusReporterKey protects removal map from panic. @@ -57,6 +60,9 @@ type worker struct { taskManagers map[string]*taskManager mu sync.RWMutex + + closed bool + closers sync.WaitGroup // keeps track of active closers } func newWorker(db *bolt.DB, executor exec.Executor, publisherProvider exec.LogPublisherProvider) *worker { @@ -106,6 +112,10 @@ func (w *worker) Init(ctx context.Context) error { // Close performs worker cleanup when no longer needed. func (w *worker) Close() { + w.mu.Lock() + w.closed = true + w.mu.Unlock() + w.taskevents.Close() } @@ -118,6 +128,10 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange w.mu.Lock() defer w.mu.Unlock() + if w.closed { + return ErrClosed + } + log.G(ctx).WithFields(logrus.Fields{ "len(assignments)": len(assignments), }).Debug("(*worker).Assign") @@ -140,6 +154,10 @@ func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange w.mu.Lock() defer w.mu.Unlock() + if w.closed { + return ErrClosed + } + log.G(ctx).WithFields(logrus.Fields{ "len(assignments)": len(assignments), }).Debug("(*worker).Update") @@ -222,10 +240,22 @@ func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.Assig } closeManager := func(tm *taskManager) { - // when a task is no longer assigned, we shutdown the task manager for - // it and leave cleanup to the sweeper. - if err := tm.Close(); err != nil { - log.G(ctx).WithError(err).Error("error closing task manager") + go func(tm *taskManager) { + defer w.closers.Done() + // when a task is no longer assigned, we shutdown the task manager + if err := tm.Close(); err != nil { + log.G(ctx).WithError(err).Error("error closing task manager") + } + }(tm) + + // make an attempt at removing. this is best effort. any errors will be + // retried by the reaper later. + if err := tm.ctlr.Remove(ctx); err != nil { + log.G(ctx).WithError(err).WithField("task.id", tm.task.ID).Error("remove task failed") + } + + if err := tm.ctlr.Close(); err != nil { + log.G(ctx).WithError(err).Error("error closing controller") } } @@ -359,6 +389,8 @@ func (w *worker) taskManager(ctx context.Context, tx *bolt.Tx, task *api.Task) ( return nil, err } w.taskManagers[task.ID] = tm + // keep track of active tasks + w.closers.Add(1) return tm, nil } @@ -484,3 +516,18 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe } } } + +func (w *worker) Wait(ctx context.Context) error { + ch := make(chan struct{}) + go func() { + w.closers.Wait() + close(ch) + }() + + select { + case <-ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/vendor/github.com/docker/swarmkit/ca/certificates.go b/vendor/github.com/docker/swarmkit/ca/certificates.go index 6eb6e4dd74..b95f6fb748 100644 --- a/vendor/github.com/docker/swarmkit/ca/certificates.go +++ b/vendor/github.com/docker/swarmkit/ca/certificates.go @@ -69,7 +69,7 @@ const ( MinNodeCertExpiration = 1 * time.Hour ) -// A recoverableErr is an non-fatal error encountered signing a certificate, +// A recoverableErr is a non-fatal error encountered signing a certificate, // which means that the certificate issuance may be retried at a later time. type recoverableErr struct { err error diff --git a/vendor/github.com/docker/swarmkit/ca/config.go b/vendor/github.com/docker/swarmkit/ca/config.go index e15454b5b4..82280a00b6 100644 --- a/vendor/github.com/docker/swarmkit/ca/config.go +++ b/vendor/github.com/docker/swarmkit/ca/config.go @@ -459,13 +459,26 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo if err != nil { // We failed to read the expiration, let's stick with the starting default log.Errorf("failed to read the expiration of the TLS certificate in: %s", s.KeyReader().Target()) - updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")} + + select { + case updates <- CertificateUpdate{Err: errors.New("failed to read certificate expiration")}: + case <-ctx.Done(): + log.Info("shutting down certificate renewal routine") + return + } } else { // If we have an expired certificate, we let's stick with the starting default in // the hope that this is a temporary clock skew. if validUntil.Before(time.Now()) { log.WithError(err).Errorf("failed to create a new client TLS config") - updates <- CertificateUpdate{Err: errors.New("TLS certificate is expired")} + + select { + case updates <- CertificateUpdate{Err: errors.New("TLS certificate is expired")}: + case <-ctx.Done(): + log.Info("shutting down certificate renewal routine") + return + } + } else { // Random retry time between 50% and 80% of the total time to expiration retry = calculateRandomExpiry(validFrom, validUntil) @@ -478,19 +491,27 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, remotes remotes.Remo select { case <-time.After(retry): - log.Infof("renewing certificate") + log.Info("renewing certificate") case <-renew: - log.Infof("forced certificate renewal") + log.Info("forced certificate renewal") case <-ctx.Done(): - log.Infof("shuting down certificate renewal routine") + log.Info("shutting down certificate renewal routine") return } - // ignore errors - it will just try again laster + // ignore errors - it will just try again later + var certUpdate CertificateUpdate if err := RenewTLSConfigNow(ctx, s, remotes); err != nil { - updates <- CertificateUpdate{Err: err} + certUpdate.Err = err } else { - updates <- CertificateUpdate{Role: s.ClientTLSCreds.Role()} + certUpdate.Role = s.ClientTLSCreds.Role() + } + + select { + case updates <- certUpdate: + case <-ctx.Done(): + log.Info("shutting down certificate renewal routine") + return } } }() diff --git a/vendor/github.com/docker/swarmkit/log/context.go b/vendor/github.com/docker/swarmkit/log/context.go index 4539e47eb9..3da380f112 100644 --- a/vendor/github.com/docker/swarmkit/log/context.go +++ b/vendor/github.com/docker/swarmkit/log/context.go @@ -42,7 +42,7 @@ func GetLogger(ctx context.Context) *logrus.Entry { } // WithModule adds the module to the context, appending it with a slash if a -// module already exists. A module is just an roughly correlated defined by the +// module already exists. A module is just a roughly correlated defined by the // call tree for a given context. // // As an example, we might have a "node" module already part of a context. If diff --git a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go index a7391eba6c..1275b08b07 100644 --- a/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go +++ b/vendor/github.com/docker/swarmkit/manager/allocator/networkallocator/portallocator.go @@ -73,7 +73,7 @@ func newPortSpace(protocol api.PortConfig_Protocol) (*portSpace, error) { }, nil } -// getPortConfigkey returns a map key for doing set operations with +// getPortConfigKey returns a map key for doing set operations with // ports. The key consists of name, protocol and target port which // uniquely identifies a port within a single Endpoint. func getPortConfigKey(p *api.PortConfig) api.PortConfig { diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index bec6fe434e..fa07a04455 100644 --- a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -173,7 +173,7 @@ func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan ev })) // Grab current subscriptions. - subscriptions := make([]*subscription, 0, len(lb.registeredSubscriptions)) + var subscriptions []*subscription for _, s := range lb.registeredSubscriptions { if s.Contains(nodeID) { subscriptions = append(subscriptions, s) diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index 649f229532..c81bfbafea 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -45,7 +45,7 @@ const ( defaultTaskHistoryRetentionLimit = 5 ) -// RemoteAddrs provides an listening address and an optional advertise address +// RemoteAddrs provides a listening address and an optional advertise address // for serving the remote API. type RemoteAddrs struct { // Address to bind diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/replicated.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/replicated.go index b7ecd3c371..c3b90676e2 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/replicated.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/replicated.go @@ -102,7 +102,7 @@ func (r *Orchestrator) Stop() { func (r *Orchestrator) tick(ctx context.Context) { // tickTasks must be called first, so we respond to task-level changes - // before performing service reconcillation. + // before performing service reconciliation. r.tickTasks(ctx) r.tickServices(ctx) } diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go index 2084ba160d..eee840c814 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/services.go @@ -181,7 +181,7 @@ func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) { func (r *Orchestrator) addTasks(ctx context.Context, batch *store.Batch, service *api.Service, runningSlots map[uint64]orchestrator.Slot, deadSlots map[uint64]orchestrator.Slot, count int) { slot := uint64(0) for i := 0; i < count; i++ { - // Find an slot number that is missing a running task + // Find a slot number that is missing a running task for { slot++ if _, ok := runningSlots[slot]; !ok { diff --git a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go index 7b8b442615..892f4e494a 100644 --- a/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go +++ b/vendor/github.com/docker/swarmkit/manager/orchestrator/replicated/tasks.go @@ -15,7 +15,7 @@ import ( // This file provides task-level orchestration. It observes changes to task // and node state and kills/recreates tasks if necessary. This is distinct from -// service-level reconcillation, which observes changes to services and creates +// service-level reconciliation, which observes changes to services and creates // and/or kills tasks to match the service definition. func invalidNode(n *api.Node) bool { diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go index 98b3f451cd..4228d7eced 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -380,8 +380,8 @@ func (n *Node) Run(ctx context.Context) error { cancel() n.stop(ctx) if nodeRemoved { - // Move WAL and snapshot out of the way, since - // they are no longer usable. + // Delete WAL and snapshots, since they are no longer + // usable. if err := n.raftLogger.Clear(ctx); err != nil { log.G(ctx).WithError(err).Error("failed to move wal after node removal") } @@ -405,7 +405,7 @@ func (n *Node) Run(ctx context.Context) error { // Save entries to storage if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil { - log.G(ctx).WithError(err).Error("failed to save entries to storage") + return errors.Wrap(err, "failed to save entries to storage") } if len(rd.Messages) != 0 { @@ -710,11 +710,20 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons defer n.membershipLock.Unlock() if !n.IsMember() { - return nil, ErrNoRaftMember + return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error()) } if !n.isLeader() { - return nil, ErrLostLeadership + return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error()) + } + + // A single manager must not be able to join the raft cluster twice. If + // it did, that would cause the quorum to be computed incorrectly. This + // could happen if the WAL was deleted from an active manager. + for _, m := range n.cluster.Members() { + if m.NodeID == nodeInfo.NodeID { + return nil, grpc.Errorf(codes.AlreadyExists, "%s", "a raft member with this node ID already exists") + } } // Find a unique ID for the joining member. @@ -734,7 +743,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons requestHost, requestPort, err := net.SplitHostPort(remoteAddr) if err != nil { - return nil, fmt.Errorf("invalid address %s in raft join request", remoteAddr) + return nil, grpc.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr) } requestIP := net.ParseIP(requestHost) @@ -990,6 +999,11 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa defer n.stopMu.RUnlock() if n.IsMember() { + if msg.Message.To != n.Config.ID { + n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To) + return &api.ProcessRaftMessageResponse{}, nil + } + if err := n.raftNode.Step(ctx, *msg.Message); err != nil { n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed") } diff --git a/vendor/github.com/docker/swarmkit/node/node.go b/vendor/github.com/docker/swarmkit/node/node.go index cd92acd9d3..57b88a9f93 100644 --- a/vendor/github.com/docker/swarmkit/node/node.go +++ b/vendor/github.com/docker/swarmkit/node/node.go @@ -295,7 +295,7 @@ func (n *Node) run(ctx context.Context) (err error) { var wg sync.WaitGroup wg.Add(2) go func() { - managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop + managerErr = n.superviseManager(ctx, securityConfig, managerReady) // store err and loop wg.Done() cancel() }() @@ -330,6 +330,14 @@ func (n *Node) Stop(ctx context.Context) error { default: return errNodeNotStarted } + // ask agent to clean up assignments + n.Lock() + if n.agent != nil { + if err := n.agent.Leave(ctx); err != nil { + log.G(ctx).WithError(err).Error("agent failed to clean up assignments") + } + } + n.Unlock() n.stopOnce.Do(func() { close(n.stopped) @@ -616,9 +624,7 @@ func (n *Node) waitRole(ctx context.Context, role string) error { n.roleCond.Wait() select { case <-ctx.Done(): - if ctx.Err() != nil { - return ctx.Err() - } + return ctx.Err() default: } } @@ -627,100 +633,117 @@ func (n *Node) waitRole(ctx context.Context, role string) error { } func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error { + remoteAddr, _ := n.remotes.Select(n.NodeID()) + m, err := manager.New(&manager.Config{ + ForceNewCluster: n.config.ForceNewCluster, + RemoteAPI: manager.RemoteAddrs{ + ListenAddr: n.config.ListenRemoteAPI, + AdvertiseAddr: n.config.AdvertiseRemoteAPI, + }, + ControlAPI: n.config.ListenControlAPI, + SecurityConfig: securityConfig, + ExternalCAs: n.config.ExternalCAs, + JoinRaft: remoteAddr.Addr, + StateDir: n.config.StateDir, + HeartbeatTick: n.config.HeartbeatTick, + ElectionTick: n.config.ElectionTick, + AutoLockManagers: n.config.AutoLockManagers, + UnlockKey: n.unlockKey, + Availability: n.config.Availability, + }) + if err != nil { + return err + } + done := make(chan struct{}) + var runErr error + go func() { + if err := m.Run(context.Background()); err != nil && err != raft.ErrMemberRemoved { + runErr = err + } + close(done) + }() + + workerRole := make(chan struct{}) + waitRoleCtx, waitRoleCancel := context.WithCancel(ctx) + defer waitRoleCancel() + go func() { + n.waitRole(waitRoleCtx, ca.WorkerRole) + close(workerRole) + }() + + defer func() { + n.Lock() + n.manager = nil + n.Unlock() + m.Stop(ctx) + <-done + n.setControlSocket(nil) + }() + + n.Lock() + n.manager = m + n.Unlock() + + connCtx, connCancel := context.WithCancel(ctx) + defer connCancel() + + go n.initManagerConnection(connCtx, ready) + + // this happens only on initial start + if ready != nil { + go func(ready chan struct{}) { + select { + case <-ready: + addr, err := n.RemoteAPIAddr() + if err != nil { + log.G(ctx).WithError(err).Errorf("get remote api addr") + } else { + n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight) + } + case <-connCtx.Done(): + } + }(ready) + } + + // wait for manager stop or for role change + // if manager stopped before role change, wait for new role for 16 seconds, + // then just restart manager, we might just miss that event. + // we need to wait for role to prevent manager to start again with wrong + // certificate + select { + case <-done: + timer := time.NewTimer(16 * time.Second) + defer timer.Stop() + select { + case <-timer.C: + log.G(ctx).Warn("failed to get worker role after manager stop, restart manager") + case <-workerRole: + case <-ctx.Done(): + return ctx.Err() + } + return runErr + case <-workerRole: + log.G(ctx).Info("role changed to worker, wait for manager to stop") + select { + case <-done: + return runErr + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } +} + +func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error { for { if err := n.waitRole(ctx, ca.ManagerRole); err != nil { return err } - - remoteAddr, _ := n.remotes.Select(n.NodeID()) - m, err := manager.New(&manager.Config{ - ForceNewCluster: n.config.ForceNewCluster, - RemoteAPI: manager.RemoteAddrs{ - ListenAddr: n.config.ListenRemoteAPI, - AdvertiseAddr: n.config.AdvertiseRemoteAPI, - }, - ControlAPI: n.config.ListenControlAPI, - SecurityConfig: securityConfig, - ExternalCAs: n.config.ExternalCAs, - JoinRaft: remoteAddr.Addr, - StateDir: n.config.StateDir, - HeartbeatTick: n.config.HeartbeatTick, - ElectionTick: n.config.ElectionTick, - AutoLockManagers: n.config.AutoLockManagers, - UnlockKey: n.unlockKey, - Availability: n.config.Availability, - }) - if err != nil { - return err - } - done := make(chan struct{}) - var runErr error - go func() { - runErr = m.Run(context.Background()) - close(done) - }() - - n.Lock() - n.manager = m - n.Unlock() - - connCtx, connCancel := context.WithCancel(ctx) - go n.initManagerConnection(connCtx, ready) - - // this happens only on initial start - if ready != nil { - go func(ready chan struct{}) { - select { - case <-ready: - addr, err := n.RemoteAPIAddr() - if err != nil { - log.G(ctx).WithError(err).Errorf("get remote api addr") - } else { - n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight) - } - case <-connCtx.Done(): - } - }(ready) - ready = nil - } - - roleChanged := make(chan error) - waitCtx, waitCancel := context.WithCancel(ctx) - go func() { - err := n.waitRole(waitCtx, ca.WorkerRole) - roleChanged <- err - }() - - select { - case <-done: - // Fail out if m.Run() returns error, otherwise wait for - // role change. - if runErr != nil && runErr != raft.ErrMemberRemoved { - err = runErr - } else { - err = <-roleChanged - } - case err = <-roleChanged: - } - - n.Lock() - n.manager = nil - n.Unlock() - - select { - case <-done: - case <-ctx.Done(): - err = ctx.Err() - m.Stop(context.Background()) - <-done - } - connCancel() - n.setControlSocket(nil) - waitCancel() - - if err != nil { - return err + if err := n.runManager(ctx, securityConfig, ready); err != nil { + return errors.Wrap(err, "manager stopped") } + ready = nil } }