1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/vendor/src/github.com/docker/swarmkit/agent/agent.go

411 lines
11 KiB
Go
Raw Normal View History

package agent
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
)
const (
initialSessionFailureBackoff = 100 * time.Millisecond
maxSessionFailureBackoff = 8 * time.Second
nodeUpdatePeriod = 20 * time.Second
)
// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
config *Config
// The latest node object state from manager
// for this node known to the agent.
node *api.Node
keys []*api.EncryptionKey
sessionq chan sessionOperation
worker Worker
started chan struct{}
startOnce sync.Once // start only once
ready chan struct{}
stopped chan struct{} // requests shutdown
stopOnce sync.Once // only allow stop to be called once
closed chan struct{} // only closed in run
err error // read only after closed is closed
}
// New returns a new agent, ready for task dispatch.
func New(config *Config) (*Agent, error) {
if err := config.validate(); err != nil {
return nil, err
}
a := &Agent{
config: config,
worker: newWorker(config.DB, config.Executor),
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
}
return a, nil
}
// Start begins execution of the agent in the provided context, if not already
// started.
//
// Start returns an error if the agent has already started.
func (a *Agent) Start(ctx context.Context) error {
err := errAgentStarted
a.startOnce.Do(func() {
close(a.started)
go a.run(ctx)
err = nil // clear error above, only once.
})
return err
}
// Stop shuts down the agent, blocking until full shutdown. If the agent is not
// started, Stop will block until the agent has fully shutdown.
func (a *Agent) Stop(ctx context.Context) error {
select {
case <-a.started:
default:
return errAgentNotStarted
}
a.stop()
// wait till closed or context cancelled
select {
case <-a.closed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// stop signals the agent shutdown process, returning true if this call was the
// first to actually shutdown the agent.
func (a *Agent) stop() bool {
var stopped bool
a.stopOnce.Do(func() {
close(a.stopped)
stopped = true
})
return stopped
}
// Err returns the error that caused the agent to shutdown or nil. Err blocks
// until the agent is fully shutdown.
func (a *Agent) Err(ctx context.Context) error {
select {
case <-a.closed:
return a.err
case <-ctx.Done():
return ctx.Err()
}
}
// Ready returns a channel that will be closed when agent first becomes ready.
func (a *Agent) Ready() <-chan struct{} {
return a.ready
}
func (a *Agent) run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(a.closed) // full shutdown.
ctx = log.WithModule(ctx, "agent")
log.G(ctx).Debugf("(*Agent).run")
defer log.G(ctx).Debugf("(*Agent).run exited")
// get the node description
nodeDescription, err := a.nodeDescriptionWithHostname(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: node description unavailable")
}
// nodeUpdateTicker is used to periodically check for updates to node description
nodeUpdateTicker := time.NewTicker(nodeUpdatePeriod)
defer nodeUpdateTicker.Stop()
var (
backoff time.Duration
session = newSession(ctx, a, backoff, "", nodeDescription) // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
)
if err := a.worker.Init(ctx); err != nil {
log.G(ctx).WithError(err).Error("worker initialization failed")
a.err = err
return // fatal?
}
// setup a reliable reporter to call back to us.
reporter := newStatusReporter(ctx, a)
defer reporter.Close()
a.worker.Listen(ctx, reporter)
for {
select {
case operation := <-sessionq:
operation.response <- operation.fn(session)
case msg := <-session.assignments:
switch msg.Type {
case api.AssignmentsMessage_COMPLETE:
// Need to assign secrets before tasks, because tasks might depend on new secrets
if err := a.worker.Assign(ctx, msg.Changes); err != nil {
log.G(ctx).WithError(err).Error("failed to synchronize worker assignments")
}
case api.AssignmentsMessage_INCREMENTAL:
if err := a.worker.Update(ctx, msg.Changes); err != nil {
log.G(ctx).WithError(err).Error("failed to update worker assignments")
}
}
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
close(ready)
}
ready = nil
registered = nil // we only care about this once per session
backoff = 0 // reset backoff
sessionq = a.sessionq
case err := <-session.errs:
// TODO(stevvooe): This may actually block if a session is closed
// but no error was sent. Session.close must only be called here
// for this to work.
if err != nil {
log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
}
if err := session.close(); err != nil {
log.G(ctx).WithError(err).Error("agent: closing session failed")
}
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")
// select a session registration delay from backoff range.
delay := time.Duration(0)
if backoff > 0 {
delay = time.Duration(rand.Int63n(int64(backoff)))
}
session = newSession(ctx, a, delay, session.sessionID, nodeDescription)
registered = session.registered
case <-nodeUpdateTicker.C:
// skip this case if the registration isn't finished
if registered != nil {
continue
}
// get the current node description
newNodeDescription, err := a.nodeDescriptionWithHostname(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Errorf("agent: updated node description unavailable")
}
// if newNodeDescription is nil, it will cause a panic when
// trying to create a session. Typically this can happen
// if the engine goes down
if newNodeDescription == nil {
continue
}
// if the node description has changed, update it to the new one
// and close the session. The old session will be stopped and a
// new one will be created with the updated description
if !reflect.DeepEqual(nodeDescription, newNodeDescription) {
nodeDescription = newNodeDescription
// close the session
log.G(ctx).Info("agent: found node update")
session.sendError(nil)
}
case <-a.stopped:
// TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
// this loop a few times.
return
case <-ctx.Done():
if a.err == nil {
a.err = ctx.Err()
}
session.close()
return
}
}
}
func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage) error {
seen := map[api.Peer]struct{}{}
for _, manager := range message.Managers {
if manager.Peer.Addr == "" {
log.G(ctx).WithField("manager.addr", manager.Peer.Addr).
Warnf("skipping bad manager address")
continue
}
a.config.Managers.Observe(*manager.Peer, int(manager.Weight))
seen[*manager.Peer] = struct{}{}
}
if message.Node != nil {
if a.node == nil || !nodesEqual(a.node, message.Node) {
if a.config.NotifyRoleChange != nil {
a.config.NotifyRoleChange <- message.Node.Spec.Role
}
a.node = message.Node.Copy()
if err := a.config.Executor.Configure(ctx, a.node); err != nil {
log.G(ctx).WithError(err).Error("node configure failed")
}
}
}
// prune managers not in list.
for peer := range a.config.Managers.Weights() {
if _, ok := seen[peer]; !ok {
a.config.Managers.Remove(peer)
}
}
if message.NetworkBootstrapKeys == nil {
return nil
}
for _, key := range message.NetworkBootstrapKeys {
same := false
for _, agentKey := range a.keys {
if agentKey.LamportTime == key.LamportTime {
same = true
}
}
if !same {
a.keys = message.NetworkBootstrapKeys
if err := a.config.Executor.SetNetworkBootstrapKeys(a.keys); err != nil {
panic(fmt.Errorf("configuring network key failed"))
}
}
}
return nil
}
type sessionOperation struct {
fn func(session *session) error
response chan error
}
// withSession runs fn with the current session.
func (a *Agent) withSession(ctx context.Context, fn func(session *session) error) error {
response := make(chan error, 1)
select {
case a.sessionq <- sessionOperation{
fn: fn,
response: response,
}:
select {
case err := <-response:
return err
case <-a.closed:
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
case <-a.closed:
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
}
// UpdateTaskStatus attempts to send a task status update over the current session,
// blocking until the operation is completed.
//
// If an error is returned, the operation should be retried.
func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
log.G(ctx).WithField("task.id", taskID).Debugf("(*Agent).UpdateTaskStatus")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errs := make(chan error, 1)
if err := a.withSession(ctx, func(session *session) error {
go func() {
err := session.sendTaskStatus(ctx, taskID, status)
if err != nil {
if err == errTaskUnknown {
err = nil // dispatcher no longer cares about this task.
} else {
log.G(ctx).WithError(err).Error("closing session after fatal error")
session.sendError(err)
}
} else {
log.G(ctx).Debug("task status reported")
}
errs <- err
}()
return nil
}); err != nil {
return err
}
select {
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)
// Override hostname
if a.config.Hostname != "" && desc != nil {
desc.Hostname = a.config.Hostname
}
return desc, err
}
// nodesEqual returns true if the node states are functionaly equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a node update to executor.
func nodesEqual(a, b *api.Node) bool {
a, b = a.Copy(), b.Copy()
a.Status, b.Status = api.NodeStatus{}, api.NodeStatus{}
a.Meta, b.Meta = api.Meta{}, api.Meta{}
return reflect.DeepEqual(a, b)
}