package manager import ( "context" "time" "code.cloudfoundry.org/clock" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/state/raft" "github.com/docker/swarmkit/manager/state/raft/membership" "github.com/docker/swarmkit/manager/state/store" ) const ( // roleReconcileInterval is how often to retry removing a node, if a reconciliation or // removal failed roleReconcileInterval = 5 * time.Second // removalTimeout is how long to wait before a raft member removal fails to be applied // to the store removalTimeout = 5 * time.Second ) // roleManager reconciles the raft member list with desired role changes. type roleManager struct { ctx context.Context cancel func() store *store.MemoryStore raft *raft.Node doneChan chan struct{} // pendingReconciliation contains changed nodes that have not yet been reconciled in // the raft member list. pendingReconciliation map[string]*api.Node // pendingRemoval contains the IDs of nodes that have been deleted - if these correspond // to members in the raft cluster, those members need to be removed from raft pendingRemoval map[string]struct{} // leave this nil except for tests which need to inject a fake time source clocksource clock.Clock } // newRoleManager creates a new roleManager. func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager { ctx, cancel := context.WithCancel(context.Background()) return &roleManager{ ctx: ctx, cancel: cancel, store: store, raft: raftNode, doneChan: make(chan struct{}), pendingReconciliation: make(map[string]*api.Node), pendingRemoval: make(map[string]struct{}), } } // getTicker returns a ticker based on the configured clock source func (rm *roleManager) getTicker(interval time.Duration) clock.Ticker { if rm.clocksource == nil { return clock.NewClock().NewTicker(interval) } return rm.clocksource.NewTicker(interval) } // Run is roleManager's main loop. On startup, it looks at every node object in the cluster and // attempts to reconcile the raft member list with all the nodes' desired roles. If any nodes // need to be demoted or promoted, it will add them to a reconciliation queue, and if any raft // members' node have been deleted, it will add them to a removal queue. // These queues are processed immediately, and any nodes that failed to be processed are // processed again in the next reconciliation interval, so that nodes will hopefully eventually // be reconciled. As node updates come in, any promotions or demotions are also added to the // reconciliation queue and reconciled. As node removals come in, they are added to the removal // queue to be removed from the raft cluster. // Removal from a raft cluster is idempotent (and it's the only raft cluster change that will occur // during reconciliation or removal), so it's fine if a node is in both the removal and reconciliation // queues. // The ctx param is only used for logging. func (rm *roleManager) Run(ctx context.Context) { defer close(rm.doneChan) var ( nodes []*api.Node // ticker and tickerCh are used to time the reconciliation interval, which will // periodically attempt to re-reconcile nodes that failed to reconcile the first // time through ticker clock.Ticker tickerCh <-chan time.Time ) watcher, cancelWatch, err := store.ViewAndWatch(rm.store, func(readTx store.ReadTx) error { var err error nodes, err = store.FindNodes(readTx, store.All) return err }, api.EventUpdateNode{}, api.EventDeleteNode{}) defer cancelWatch() if err != nil { log.G(ctx).WithError(err).Error("failed to check nodes for role changes") } else { // Assume all raft members have been deleted from the cluster, until the node list // tells us otherwise. We can make this assumption because the node object must // exist first before the raft member object. // Background life-cycle for a manager: it joins the cluster, getting a new TLS // certificate. To get a TLS certificate, it makes an RPC call to the CA server, // which on successful join adds its information to the cluster node list and // eventually generates a TLS certificate for it. Once it has a TLS certificate, // it can contact the other nodes, and makes an RPC call to request to join the // raft cluster. The node it contacts will add the node to the raft membership. for _, member := range rm.raft.GetMemberlist() { rm.pendingRemoval[member.NodeID] = struct{}{} } for _, node := range nodes { // if the node exists, we don't want it removed from the raft membership cluster // necessarily delete(rm.pendingRemoval, node.ID) // reconcile each existing node rm.pendingReconciliation[node.ID] = node rm.reconcileRole(ctx, node) } for nodeID := range rm.pendingRemoval { rm.evictRemovedNode(ctx, nodeID) } // If any reconciliations or member removals failed, we want to try again, so // make sure that we start the ticker so we can try again and again every // roleReconciliationInterval seconds until the queues are both empty. if len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0 { ticker = rm.getTicker(roleReconcileInterval) tickerCh = ticker.C() } } for { select { case event := <-watcher: switch ev := event.(type) { case api.EventUpdateNode: rm.pendingReconciliation[ev.Node.ID] = ev.Node rm.reconcileRole(ctx, ev.Node) case api.EventDeleteNode: rm.pendingRemoval[ev.Node.ID] = struct{}{} rm.evictRemovedNode(ctx, ev.Node.ID) } // If any reconciliations or member removals failed, we want to try again, so // make sure that we start the ticker so we can try again and again every // roleReconciliationInterval seconds until the queues are both empty. if (len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0) && ticker == nil { ticker = rm.getTicker(roleReconcileInterval) tickerCh = ticker.C() } case <-tickerCh: for _, node := range rm.pendingReconciliation { rm.reconcileRole(ctx, node) } for nodeID := range rm.pendingRemoval { rm.evictRemovedNode(ctx, nodeID) } if len(rm.pendingReconciliation) == 0 && len(rm.pendingRemoval) == 0 { ticker.Stop() ticker = nil tickerCh = nil } case <-rm.ctx.Done(): if ticker != nil { ticker.Stop() } return } } } // evictRemovedNode evicts a removed node from the raft cluster membership. This is to cover an edge case in which // a node might have been removed, but somehow the role was not reconciled (possibly a demotion and a removal happened // in rapid succession before the raft membership configuration went through). func (rm *roleManager) evictRemovedNode(ctx context.Context, nodeID string) { // Check if the member still exists in the membership member := rm.raft.GetMemberByNodeID(nodeID) if member != nil { // We first try to remove the raft node from the raft cluster. On the next tick, if the node // has been removed from the cluster membership, we then delete it from the removed list rm.removeMember(ctx, member) return } delete(rm.pendingRemoval, nodeID) } // removeMember removes a member from the raft cluster membership func (rm *roleManager) removeMember(ctx context.Context, member *membership.Member) { // Quorum safeguard - quorum should have been checked before a node was allowed to be demoted, but if in the // intervening time some other node disconnected, removing this node would result in a loss of cluster quorum. // We leave it if !rm.raft.CanRemoveMember(member.RaftID) { // TODO(aaronl): Retry later log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", member.NodeID) return } rmCtx, rmCancel := context.WithTimeout(rm.ctx, removalTimeout) defer rmCancel() if member.RaftID == rm.raft.Config.ID { // Don't use rmCtx, because we expect to lose // leadership, which will cancel this context. log.G(ctx).Info("demoted; transferring leadership") err := rm.raft.TransferLeadership(context.Background()) if err == nil { return } log.G(ctx).WithError(err).Info("failed to transfer leadership") } if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { // TODO(aaronl): Retry later log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", member.NodeID) } } // reconcileRole looks at the desired role for a node, and if it is being demoted or promoted, updates the // node role accordingly. If the node is being demoted, it also removes the node from the raft cluster membership. func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) { if node.Role == node.Spec.DesiredRole { // Nothing to do. delete(rm.pendingReconciliation, node.ID) return } // Promotion can proceed right away. if node.Spec.DesiredRole == api.NodeRoleManager && node.Role == api.NodeRoleWorker { err := rm.store.Update(func(tx store.Tx) error { updatedNode := store.GetNode(tx, node.ID) if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role { return nil } updatedNode.Role = api.NodeRoleManager return store.UpdateNode(tx, updatedNode) }) if err != nil { log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID) } else { delete(rm.pendingReconciliation, node.ID) } } else if node.Spec.DesiredRole == api.NodeRoleWorker && node.Role == api.NodeRoleManager { // Check for node in memberlist member := rm.raft.GetMemberByNodeID(node.ID) if member != nil { // We first try to remove the raft node from the raft cluster. On the next tick, if the node // has been removed from the cluster membership, we then update the store to reflect the fact // that it has been successfully demoted, and if that works, remove it from the pending list. rm.removeMember(ctx, member) return } err := rm.store.Update(func(tx store.Tx) error { updatedNode := store.GetNode(tx, node.ID) if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role { return nil } updatedNode.Role = api.NodeRoleWorker return store.UpdateNode(tx, updatedNode) }) if err != nil { log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID) } else { delete(rm.pendingReconciliation, node.ID) } } } // Stop stops the roleManager and waits for the main loop to exit. func (rm *roleManager) Stop() { rm.cancel() <-rm.doneChan }