package agent import ( "fmt" "math/rand" "reflect" "sync" "time" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "golang.org/x/net/context" ) const ( initialSessionFailureBackoff = 100 * time.Millisecond maxSessionFailureBackoff = 8 * time.Second nodeUpdatePeriod = 20 * time.Second ) // Agent implements the primary node functionality for a member of a swarm // cluster. The primary functionality is to run and report on the status of // tasks assigned to the node. type Agent struct { config *Config // The latest node object state from manager // for this node known to the agent. node *api.Node keys []*api.EncryptionKey sessionq chan sessionOperation worker Worker started chan struct{} startOnce sync.Once // start only once ready chan struct{} leaving chan struct{} leaveOnce sync.Once stopped chan struct{} // requests shutdown stopOnce sync.Once // only allow stop to be called once closed chan struct{} // only closed in run err error // read only after closed is closed } // New returns a new agent, ready for task dispatch. func New(config *Config) (*Agent, error) { if err := config.validate(); err != nil { return nil, err } a := &Agent{ config: config, sessionq: make(chan sessionOperation), started: make(chan struct{}), leaving: make(chan struct{}), stopped: make(chan struct{}), closed: make(chan struct{}), ready: make(chan struct{}), } a.worker = newWorker(config.DB, config.Executor, a) return a, nil } // Start begins execution of the agent in the provided context, if not already // started. // // Start returns an error if the agent has already started. func (a *Agent) Start(ctx context.Context) error { err := errAgentStarted a.startOnce.Do(func() { close(a.started) go a.run(ctx) err = nil // clear error above, only once. }) return err } // Leave instructs the agent to leave the cluster. This method will shutdown // assignment processing and remove all assignments from the node. // Leave blocks until worker has finished closing all task managers or agent // is closed. func (a *Agent) Leave(ctx context.Context) error { select { case <-a.started: default: return errAgentNotStarted } a.leaveOnce.Do(func() { close(a.leaving) }) // agent could be closed while Leave is in progress var err error ch := make(chan struct{}) go func() { err = a.worker.Wait(ctx) close(ch) }() select { case <-ch: return err case <-a.closed: return ErrClosed } } // Stop shuts down the agent, blocking until full shutdown. If the agent is not // started, Stop will block until the agent has fully shutdown. func (a *Agent) Stop(ctx context.Context) error { select { case <-a.started: default: return errAgentNotStarted } a.stop() // wait till closed or context cancelled select { case <-a.closed: return nil case <-ctx.Done(): return ctx.Err() } } // stop signals the agent shutdown process, returning true if this call was the // first to actually shutdown the agent. func (a *Agent) stop() bool { var stopped bool a.stopOnce.Do(func() { close(a.stopped) stopped = true }) return stopped } // Err returns the error that caused the agent to shutdown or nil. Err blocks // until the agent is fully shutdown. func (a *Agent) Err(ctx context.Context) error { select { case <-a.closed: return a.err case <-ctx.Done(): return ctx.Err() } } // Ready returns a channel that will be closed when agent first becomes ready. func (a *Agent) Ready() <-chan struct{} { return a.ready } func (a *Agent) run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() defer close(a.closed) // full shutdown. ctx = log.WithModule(ctx, "agent") log.G(ctx).Debugf("(*Agent).run") defer log.G(ctx).Debugf("(*Agent).run exited") // get the node description nodeDescription, err := a.nodeDescriptionWithHostname(ctx) if err != nil { log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: node description unavailable") } // nodeUpdateTicker is used to periodically check for updates to node description nodeUpdateTicker := time.NewTicker(nodeUpdatePeriod) defer nodeUpdateTicker.Stop() var ( backoff time.Duration session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session registered = session.registered ready = a.ready // first session ready sessionq chan sessionOperation leaving = a.leaving subscriptions = map[string]context.CancelFunc{} ) if err := a.worker.Init(ctx); err != nil { log.G(ctx).WithError(err).Error("worker initialization failed") a.err = err return // fatal? } defer a.worker.Close() // setup a reliable reporter to call back to us. reporter := newStatusReporter(ctx, a) defer reporter.Close() a.worker.Listen(ctx, reporter) for { select { case operation := <-sessionq: operation.response <- operation.fn(session) case <-leaving: leaving = nil // TODO(stevvooe): Signal to the manager that the node is leaving. // when leaving we remove all assignments. if err := a.worker.Assign(ctx, nil); err != nil { log.G(ctx).WithError(err).Error("failed removing all assignments") } case msg := <-session.assignments: // if we have left, accept no more assignments if leaving == nil { continue } switch msg.Type { case api.AssignmentsMessage_COMPLETE: // Need to assign secrets before tasks, because tasks might depend on new secrets if err := a.worker.Assign(ctx, msg.Changes); err != nil { log.G(ctx).WithError(err).Error("failed to synchronize worker assignments") } case api.AssignmentsMessage_INCREMENTAL: if err := a.worker.Update(ctx, msg.Changes); err != nil { log.G(ctx).WithError(err).Error("failed to update worker assignments") } } case msg := <-session.messages: if err := a.handleSessionMessage(ctx, msg); err != nil { log.G(ctx).WithError(err).Error("session message handler failed") } case sub := <-session.subscriptions: if sub.Close { if cancel, ok := subscriptions[sub.ID]; ok { cancel() } delete(subscriptions, sub.ID) continue } if _, ok := subscriptions[sub.ID]; ok { // Duplicate subscription continue } subCtx, subCancel := context.WithCancel(ctx) subscriptions[sub.ID] = subCancel go a.worker.Subscribe(subCtx, sub) case <-registered: log.G(ctx).Debugln("agent: registered") if ready != nil { close(ready) } ready = nil registered = nil // we only care about this once per session backoff = 0 // reset backoff sessionq = a.sessionq case err := <-session.errs: // TODO(stevvooe): This may actually block if a session is closed // but no error was sent. Session.close must only be called here // for this to work. if err != nil { log.G(ctx).WithError(err).Error("agent: session failed") backoff = initialSessionFailureBackoff + 2*backoff if backoff > maxSessionFailureBackoff { backoff = maxSessionFailureBackoff } } if err := session.close(); err != nil { log.G(ctx).WithError(err).Error("agent: closing session failed") } sessionq = nil // if we're here before <-registered, do nothing for that event registered = nil case <-session.closed: log.G(ctx).Debugf("agent: rebuild session") // select a session registration delay from backoff range. delay := time.Duration(0) if backoff > 0 { delay = time.Duration(rand.Int63n(int64(backoff))) } session = newSession(ctx, a, delay, session.sessionID, nodeDescription) registered = session.registered case <-nodeUpdateTicker.C: // skip this case if the registration isn't finished if registered != nil { continue } // get the current node description newNodeDescription, err := a.nodeDescriptionWithHostname(ctx) if err != nil { log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: updated node description unavailable") } // if newNodeDescription is nil, it will cause a panic when // trying to create a session. Typically this can happen // if the engine goes down if newNodeDescription == nil { continue } // if the node description has changed, update it to the new one // and close the session. The old session will be stopped and a // new one will be created with the updated description if !reflect.DeepEqual(nodeDescription, newNodeDescription) { nodeDescription = newNodeDescription // close the session log.G(ctx).Info("agent: found node update") session.sendError(nil) } case <-a.stopped: // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump // this loop a few times. return case <-ctx.Done(): if a.err == nil { a.err = ctx.Err() } session.close() return } } } func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage) error { seen := map[api.Peer]struct{}{} for _, manager := range message.Managers { if manager.Peer.Addr == "" { continue } a.config.ConnBroker.Remotes().Observe(*manager.Peer, int(manager.Weight)) seen[*manager.Peer] = struct{}{} } if message.Node != nil { if a.node == nil || !nodesEqual(a.node, message.Node) { if a.config.NotifyNodeChange != nil { a.config.NotifyNodeChange <- message.Node.Copy() } a.node = message.Node.Copy() if err := a.config.Executor.Configure(ctx, a.node); err != nil { log.G(ctx).WithError(err).Error("node configure failed") } } } // prune managers not in list. for peer := range a.config.ConnBroker.Remotes().Weights() { if _, ok := seen[peer]; !ok { a.config.ConnBroker.Remotes().Remove(peer) } } if message.NetworkBootstrapKeys == nil { return nil } for _, key := range message.NetworkBootstrapKeys { same := false for _, agentKey := range a.keys { if agentKey.LamportTime == key.LamportTime { same = true } } if !same { a.keys = message.NetworkBootstrapKeys if err := a.config.Executor.SetNetworkBootstrapKeys(a.keys); err != nil { panic(fmt.Errorf("configuring network key failed")) } } } return nil } type sessionOperation struct { fn func(session *session) error response chan error } // withSession runs fn with the current session. func (a *Agent) withSession(ctx context.Context, fn func(session *session) error) error { response := make(chan error, 1) select { case a.sessionq <- sessionOperation{ fn: fn, response: response, }: select { case err := <-response: return err case <-a.closed: return ErrClosed case <-ctx.Done(): return ctx.Err() } case <-a.closed: return ErrClosed case <-ctx.Done(): return ctx.Err() } } // UpdateTaskStatus attempts to send a task status update over the current session, // blocking until the operation is completed. // // If an error is returned, the operation should be retried. func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error { log.G(ctx).WithField("task.id", taskID).Debugf("(*Agent).UpdateTaskStatus") ctx, cancel := context.WithCancel(ctx) defer cancel() errs := make(chan error, 1) if err := a.withSession(ctx, func(session *session) error { go func() { err := session.sendTaskStatus(ctx, taskID, status) if err != nil { if err == errTaskUnknown { err = nil // dispatcher no longer cares about this task. } else { log.G(ctx).WithError(err).Error("closing session after fatal error") session.sendError(err) } } else { log.G(ctx).Debug("task status reported") } errs <- err }() return nil }); err != nil { return err } select { case err := <-errs: return err case <-ctx.Done(): return ctx.Err() } } // Publisher returns a LogPublisher for the given subscription // as well as a cancel function that should be called when the log stream // is completed. func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, func(), error) { // TODO(stevvooe): The level of coordination here is WAY too much for logs. // These should only be best effort and really just buffer until a session is // ready. Ideally, they would use a separate connection completely. var ( err error publisher api.LogBroker_PublishLogsClient ) err = a.withSession(ctx, func(session *session) error { publisher, err = api.NewLogBrokerClient(session.conn.ClientConn).PublishLogs(ctx) return err }) if err != nil { return nil, nil, err } return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error { select { case <-ctx.Done(): publisher.CloseSend() return ctx.Err() default: } return publisher.Send(&api.PublishLogsMessage{ SubscriptionID: subscriptionID, Messages: []api.LogMessage{message}, }) }), func() { publisher.CloseSend() }, nil } // nodeDescriptionWithHostname retrieves node description, and overrides hostname if available func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) { desc, err := a.config.Executor.Describe(ctx) // Override hostname if a.config.Hostname != "" && desc != nil { desc.Hostname = a.config.Hostname } return desc, err } // nodesEqual returns true if the node states are functionally equal, ignoring status, // version and other superfluous fields. // // This used to decide whether or not to propagate a node update to executor. func nodesEqual(a, b *api.Node) bool { a, b = a.Copy(), b.Copy() a.Status, b.Status = api.NodeStatus{}, api.NodeStatus{} a.Meta, b.Meta = api.Meta{}, api.Meta{} return reflect.DeepEqual(a, b) }