1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/vendor/src/github.com/docker/swarmkit/agent/agent.go

352 lines
8.7 KiB
Go
Raw Normal View History

package agent
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
)
const (
initialSessionFailureBackoff = 100 * time.Millisecond
maxSessionFailureBackoff = 8 * time.Second
)
// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
config *Config
// The latest node object state from manager
// for this node known to the agent.
node *api.Node
keys []*api.EncryptionKey
sessionq chan sessionOperation
worker Worker
started chan struct{}
startOnce sync.Once // start only once
ready chan struct{}
stopped chan struct{} // requests shutdown
stopOnce sync.Once // only allow stop to be called once
closed chan struct{} // only closed in run
err error // read only after closed is closed
}
// New returns a new agent, ready for task dispatch.
func New(config *Config) (*Agent, error) {
if err := config.validate(); err != nil {
return nil, err
}
a := &Agent{
config: config,
worker: newWorker(config.DB, config.Executor),
sessionq: make(chan sessionOperation),
started: make(chan struct{}),
stopped: make(chan struct{}),
closed: make(chan struct{}),
ready: make(chan struct{}),
}
return a, nil
}
// Start begins execution of the agent in the provided context, if not already
// started.
//
// Start returns an error if the agent has already started.
func (a *Agent) Start(ctx context.Context) error {
err := errAgentStarted
a.startOnce.Do(func() {
close(a.started)
go a.run(ctx)
err = nil // clear error above, only once.
})
return err
}
// Stop shuts down the agent, blocking until full shutdown. If the agent is not
// started, Stop will block until the agent has fully shutdown.
func (a *Agent) Stop(ctx context.Context) error {
select {
case <-a.started:
default:
return errAgentNotStarted
}
a.stop()
// wait till closed or context cancelled
select {
case <-a.closed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// stop signals the agent shutdown process, returning true if this call was the
// first to actually shutdown the agent.
func (a *Agent) stop() bool {
var stopped bool
a.stopOnce.Do(func() {
close(a.stopped)
stopped = true
})
return stopped
}
// Err returns the error that caused the agent to shutdown or nil. Err blocks
// until the agent is fully shutdown.
func (a *Agent) Err(ctx context.Context) error {
select {
case <-a.closed:
return a.err
case <-ctx.Done():
return ctx.Err()
}
}
// Ready returns a channel that will be closed when agent first becomes ready.
func (a *Agent) Ready() <-chan struct{} {
return a.ready
}
func (a *Agent) run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(a.closed) // full shutdown.
ctx = log.WithModule(ctx, "agent")
log.G(ctx).Debugf("(*Agent).run")
defer log.G(ctx).Debugf("(*Agent).run exited")
var (
backoff time.Duration
session = newSession(ctx, a, backoff, "") // start the initial session
registered = session.registered
ready = a.ready // first session ready
sessionq chan sessionOperation
)
if err := a.worker.Init(ctx); err != nil {
log.G(ctx).WithError(err).Error("worker initialization failed")
a.err = err
return // fatal?
}
// setup a reliable reporter to call back to us.
reporter := newStatusReporter(ctx, a)
defer reporter.Close()
a.worker.Listen(ctx, reporter)
for {
select {
case operation := <-sessionq:
operation.response <- operation.fn(session)
case msg := <-session.tasks:
if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
log.G(ctx).WithError(err).Error("task assignment failed")
}
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
close(ready)
}
ready = nil
registered = nil // we only care about this once per session
backoff = 0 // reset backoff
sessionq = a.sessionq
case err := <-session.errs:
// TODO(stevvooe): This may actually block if a session is closed
// but no error was sent. Session.close must only be called here
// for this to work.
if err != nil {
log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
}
if err := session.close(); err != nil {
log.G(ctx).WithError(err).Error("agent: closing session failed")
}
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")
// select a session registration delay from backoff range.
delay := time.Duration(rand.Int63n(int64(backoff)))
session = newSession(ctx, a, delay, session.sessionID)
registered = session.registered
sessionq = a.sessionq
case <-a.stopped:
// TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
// this loop a few times.
return
case <-ctx.Done():
if a.err == nil {
a.err = ctx.Err()
}
session.close()
return
}
}
}
func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage) error {
seen := map[api.Peer]struct{}{}
for _, manager := range message.Managers {
if manager.Peer.Addr == "" {
log.G(ctx).WithField("manager.addr", manager.Peer.Addr).
Warnf("skipping bad manager address")
continue
}
a.config.Managers.Observe(*manager.Peer, int(manager.Weight))
seen[*manager.Peer] = struct{}{}
}
if message.Node != nil {
if a.node == nil || !nodesEqual(a.node, message.Node) {
if a.config.NotifyRoleChange != nil {
a.config.NotifyRoleChange <- message.Node.Spec.Role
}
a.node = message.Node.Copy()
if err := a.config.Executor.Configure(ctx, a.node); err != nil {
log.G(ctx).WithError(err).Error("node configure failed")
}
}
}
// prune managers not in list.
for peer := range a.config.Managers.Weights() {
if _, ok := seen[peer]; !ok {
a.config.Managers.Remove(peer)
}
}
if message.NetworkBootstrapKeys == nil {
return nil
}
for _, key := range message.NetworkBootstrapKeys {
same := false
for _, agentKey := range a.keys {
if agentKey.LamportTime == key.LamportTime {
same = true
}
}
if !same {
a.keys = message.NetworkBootstrapKeys
if err := a.config.Executor.SetNetworkBootstrapKeys(a.keys); err != nil {
panic(fmt.Errorf("configuring network key failed"))
}
}
}
return nil
}
type sessionOperation struct {
fn func(session *session) error
response chan error
}
// withSession runs fn with the current session.
func (a *Agent) withSession(ctx context.Context, fn func(session *session) error) error {
response := make(chan error, 1)
select {
case a.sessionq <- sessionOperation{
fn: fn,
response: response,
}:
select {
case err := <-response:
return err
case <-a.closed:
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
case <-a.closed:
return ErrClosed
case <-ctx.Done():
return ctx.Err()
}
}
// UpdateTaskStatus attempts to send a task status update over the current session,
// blocking until the operation is completed.
//
// If an error is returned, the operation should be retried.
func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
log.G(ctx).WithField("task.id", taskID).Debugf("(*Agent).UpdateTaskStatus")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errs := make(chan error, 1)
if err := a.withSession(ctx, func(session *session) error {
go func() {
err := session.sendTaskStatus(ctx, taskID, status)
if err != nil {
if err == errTaskUnknown {
err = nil // dispatcher no longer cares about this task.
} else {
log.G(ctx).WithError(err).Error("sending task status update failed")
}
} else {
log.G(ctx).Debug("task status reported")
}
errs <- err
}()
return nil
}); err != nil {
return err
}
select {
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// nodesEqual returns true if the node states are functionaly equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a node update to executor.
func nodesEqual(a, b *api.Node) bool {
a, b = a.Copy(), b.Copy()
a.Status, b.Status = api.NodeStatus{}, api.NodeStatus{}
a.Meta, b.Meta = api.Meta{}, api.Meta{}
return reflect.DeepEqual(a, b)
}