2016-06-07 14:28:28 -07:00
|
|
|
package dispatcher
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2016-11-02 19:43:27 +01:00
|
|
|
"net"
|
2016-09-13 09:28:01 -07:00
|
|
|
"strconv"
|
2016-06-07 14:28:28 -07:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
|
|
"google.golang.org/grpc/transport"
|
|
|
|
|
|
|
|
"github.com/Sirupsen/logrus"
|
2016-08-17 22:43:33 -07:00
|
|
|
"github.com/docker/go-events"
|
2016-06-07 14:28:28 -07:00
|
|
|
"github.com/docker/swarmkit/api"
|
2016-07-26 00:15:08 -07:00
|
|
|
"github.com/docker/swarmkit/api/equality"
|
2016-06-07 14:28:28 -07:00
|
|
|
"github.com/docker/swarmkit/ca"
|
|
|
|
"github.com/docker/swarmkit/log"
|
|
|
|
"github.com/docker/swarmkit/manager/state"
|
|
|
|
"github.com/docker/swarmkit/manager/state/store"
|
|
|
|
"github.com/docker/swarmkit/protobuf/ptypes"
|
2016-08-22 22:30:01 -07:00
|
|
|
"github.com/docker/swarmkit/remotes"
|
2016-11-04 12:11:41 -07:00
|
|
|
"github.com/docker/swarmkit/watch"
|
2016-09-26 23:48:16 -07:00
|
|
|
"github.com/pkg/errors"
|
2016-06-07 14:28:28 -07:00
|
|
|
"golang.org/x/net/context"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
// DefaultHeartBeatPeriod is used for setting default value in cluster config
|
|
|
|
// and in case if cluster config is missing.
|
|
|
|
DefaultHeartBeatPeriod = 5 * time.Second
|
|
|
|
defaultHeartBeatEpsilon = 500 * time.Millisecond
|
|
|
|
defaultGracePeriodMultiplier = 3
|
2016-07-26 00:15:08 -07:00
|
|
|
defaultRateLimitPeriod = 8 * time.Second
|
2016-06-07 14:28:28 -07:00
|
|
|
|
|
|
|
// maxBatchItems is the threshold of queued writes that should
|
|
|
|
// trigger an actual transaction to commit them to the shared store.
|
|
|
|
maxBatchItems = 10000
|
|
|
|
|
|
|
|
// maxBatchInterval needs to strike a balance between keeping
|
|
|
|
// latency low, and realizing opportunities to combine many writes
|
|
|
|
// into a single transaction. A fraction of a second feels about
|
|
|
|
// right.
|
|
|
|
maxBatchInterval = 100 * time.Millisecond
|
2016-09-13 09:28:01 -07:00
|
|
|
|
|
|
|
modificationBatchLimit = 100
|
|
|
|
batchingWaitTime = 100 * time.Millisecond
|
2016-11-09 22:37:38 +01:00
|
|
|
|
|
|
|
// defaultNodeDownPeriod specifies the default time period we
|
|
|
|
// wait before moving tasks assigned to down nodes to ORPHANED
|
|
|
|
// state.
|
|
|
|
defaultNodeDownPeriod = 24 * time.Hour
|
2016-06-07 14:28:28 -07:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
// ErrNodeAlreadyRegistered returned if node with same ID was already
|
|
|
|
// registered with this dispatcher.
|
|
|
|
ErrNodeAlreadyRegistered = errors.New("node already registered")
|
|
|
|
// ErrNodeNotRegistered returned if node with such ID wasn't registered
|
|
|
|
// with this dispatcher.
|
|
|
|
ErrNodeNotRegistered = errors.New("node not registered")
|
|
|
|
// ErrSessionInvalid returned when the session in use is no longer valid.
|
|
|
|
// The node should re-register and start a new session.
|
|
|
|
ErrSessionInvalid = errors.New("session invalid")
|
2016-07-22 10:26:45 -07:00
|
|
|
// ErrNodeNotFound returned when the Node doesn't exist in raft.
|
2016-06-07 14:28:28 -07:00
|
|
|
ErrNodeNotFound = errors.New("node not found")
|
|
|
|
)
|
|
|
|
|
|
|
|
// Config is configuration for Dispatcher. For default you should use
|
2016-08-09 11:49:39 -07:00
|
|
|
// DefaultConfig.
|
2016-06-07 14:28:28 -07:00
|
|
|
type Config struct {
|
2016-06-30 13:34:48 -07:00
|
|
|
HeartbeatPeriod time.Duration
|
|
|
|
HeartbeatEpsilon time.Duration
|
|
|
|
// RateLimitPeriod specifies how often node with same ID can try to register
|
|
|
|
// new session.
|
|
|
|
RateLimitPeriod time.Duration
|
2016-06-07 14:28:28 -07:00
|
|
|
GracePeriodMultiplier int
|
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultConfig returns default config for Dispatcher.
|
|
|
|
func DefaultConfig() *Config {
|
|
|
|
return &Config{
|
|
|
|
HeartbeatPeriod: DefaultHeartBeatPeriod,
|
|
|
|
HeartbeatEpsilon: defaultHeartBeatEpsilon,
|
2016-06-30 13:34:48 -07:00
|
|
|
RateLimitPeriod: defaultRateLimitPeriod,
|
2016-06-07 14:28:28 -07:00
|
|
|
GracePeriodMultiplier: defaultGracePeriodMultiplier,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
// Cluster is interface which represent raft cluster. manager/state/raft.Node
|
|
|
|
// is implements it. This interface needed only for easier unit-testing.
|
2016-06-07 14:28:28 -07:00
|
|
|
type Cluster interface {
|
|
|
|
GetMemberlist() map[uint64]*api.RaftMember
|
2016-08-17 22:43:33 -07:00
|
|
|
SubscribePeers() (chan events.Event, func())
|
2016-06-07 14:28:28 -07:00
|
|
|
MemoryStore() *store.MemoryStore
|
|
|
|
}
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
// nodeUpdate provides a new status and/or description to apply to a node
|
|
|
|
// object.
|
|
|
|
type nodeUpdate struct {
|
|
|
|
status *api.NodeStatus
|
|
|
|
description *api.NodeDescription
|
|
|
|
}
|
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
// Dispatcher is responsible for dispatching tasks and tracking agent health.
|
|
|
|
type Dispatcher struct {
|
|
|
|
mu sync.Mutex
|
|
|
|
nodes *nodeStore
|
|
|
|
store *store.MemoryStore
|
|
|
|
mgrQueue *watch.Queue
|
|
|
|
lastSeenManagers []*api.WeightedPeer
|
|
|
|
networkBootstrapKeys []*api.EncryptionKey
|
|
|
|
keyMgrQueue *watch.Queue
|
|
|
|
config *Config
|
|
|
|
cluster Cluster
|
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
|
|
|
taskUpdates map[string]*api.TaskStatus // indexed by task ID
|
|
|
|
taskUpdatesLock sync.Mutex
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
nodeUpdates map[string]nodeUpdate // indexed by node ID
|
|
|
|
nodeUpdatesLock sync.Mutex
|
|
|
|
|
2016-11-09 22:37:38 +01:00
|
|
|
downNodes *nodeStore
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
processUpdatesTrigger chan struct{}
|
|
|
|
|
|
|
|
// for waiting for the next task/node batch update
|
|
|
|
processUpdatesLock sync.Mutex
|
|
|
|
processUpdatesCond *sync.Cond
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// New returns Dispatcher with cluster interface(usually raft.Node).
|
|
|
|
// NOTE: each handler which does something with raft must add to Dispatcher.wg
|
|
|
|
func New(cluster Cluster, c *Config) *Dispatcher {
|
2016-08-09 11:49:39 -07:00
|
|
|
d := &Dispatcher{
|
|
|
|
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
|
2016-11-09 22:37:38 +01:00
|
|
|
downNodes: newNodeStore(defaultNodeDownPeriod, 0, 1, 0),
|
2016-08-09 11:49:39 -07:00
|
|
|
store: cluster.MemoryStore(),
|
|
|
|
cluster: cluster,
|
|
|
|
taskUpdates: make(map[string]*api.TaskStatus),
|
|
|
|
nodeUpdates: make(map[string]nodeUpdate),
|
|
|
|
processUpdatesTrigger: make(chan struct{}, 1),
|
|
|
|
config: c,
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
2016-08-09 11:49:39 -07:00
|
|
|
|
|
|
|
d.processUpdatesCond = sync.NewCond(&d.processUpdatesLock)
|
|
|
|
|
|
|
|
return d
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
|
|
|
|
members := cluster.GetMemberlist()
|
|
|
|
var mgrs []*api.WeightedPeer
|
|
|
|
for _, m := range members {
|
|
|
|
mgrs = append(mgrs, &api.WeightedPeer{
|
|
|
|
Peer: &api.Peer{
|
2016-06-14 17:07:14 -07:00
|
|
|
NodeID: m.NodeID,
|
2016-06-07 14:28:28 -07:00
|
|
|
Addr: m.Addr,
|
|
|
|
},
|
2016-07-27 21:17:00 -07:00
|
|
|
|
|
|
|
// TODO(stevvooe): Calculate weight of manager selection based on
|
|
|
|
// cluster-level observations, such as number of connections and
|
|
|
|
// load.
|
2016-08-22 22:30:01 -07:00
|
|
|
Weight: remotes.DefaultObservationWeight,
|
2016-06-07 14:28:28 -07:00
|
|
|
})
|
|
|
|
}
|
|
|
|
return mgrs
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run runs dispatcher tasks which should be run on leader dispatcher.
|
|
|
|
// Dispatcher can be stopped with cancelling ctx or calling Stop().
|
|
|
|
func (d *Dispatcher) Run(ctx context.Context) error {
|
|
|
|
d.mu.Lock()
|
|
|
|
if d.isRunning() {
|
|
|
|
d.mu.Unlock()
|
2016-09-26 23:48:16 -07:00
|
|
|
return errors.New("dispatcher is already running")
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
2016-08-09 11:49:39 -07:00
|
|
|
ctx = log.WithModule(ctx, "dispatcher")
|
2016-06-07 14:28:28 -07:00
|
|
|
if err := d.markNodesUnknown(ctx); err != nil {
|
2016-08-09 11:49:39 -07:00
|
|
|
log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
configWatcher, cancel, err := store.ViewAndWatch(
|
|
|
|
d.store,
|
|
|
|
func(readTx store.ReadTx) error {
|
|
|
|
clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err == nil && len(clusters) == 1 {
|
2016-06-17 19:01:18 -07:00
|
|
|
heartbeatPeriod, err := ptypes.Duration(clusters[0].Spec.Dispatcher.HeartbeatPeriod)
|
|
|
|
if err == nil && heartbeatPeriod > 0 {
|
|
|
|
d.config.HeartbeatPeriod = heartbeatPeriod
|
|
|
|
}
|
2016-06-07 14:28:28 -07:00
|
|
|
if clusters[0].NetworkBootstrapKeys != nil {
|
|
|
|
d.networkBootstrapKeys = clusters[0].NetworkBootstrapKeys
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
state.EventUpdateCluster{},
|
|
|
|
)
|
|
|
|
if err != nil {
|
2016-06-30 13:34:48 -07:00
|
|
|
d.mu.Unlock()
|
2016-06-07 14:28:28 -07:00
|
|
|
return err
|
|
|
|
}
|
2016-09-13 09:28:01 -07:00
|
|
|
// set queues here to guarantee that Close will close them
|
|
|
|
d.mgrQueue = watch.NewQueue()
|
|
|
|
d.keyMgrQueue = watch.NewQueue()
|
2016-08-17 22:43:33 -07:00
|
|
|
|
|
|
|
peerWatcher, peerCancel := d.cluster.SubscribePeers()
|
|
|
|
defer peerCancel()
|
|
|
|
d.lastSeenManagers = getWeightedPeers(d.cluster)
|
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
defer cancel()
|
|
|
|
d.ctx, d.cancel = context.WithCancel(ctx)
|
|
|
|
d.mu.Unlock()
|
|
|
|
|
2016-08-17 22:43:33 -07:00
|
|
|
publishManagers := func(peers []*api.Peer) {
|
|
|
|
var mgrs []*api.WeightedPeer
|
|
|
|
for _, p := range peers {
|
|
|
|
mgrs = append(mgrs, &api.WeightedPeer{
|
|
|
|
Peer: p,
|
2016-08-22 22:30:01 -07:00
|
|
|
Weight: remotes.DefaultObservationWeight,
|
2016-08-17 22:43:33 -07:00
|
|
|
})
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
2016-08-17 22:43:33 -07:00
|
|
|
d.mu.Lock()
|
2016-06-07 14:28:28 -07:00
|
|
|
d.lastSeenManagers = mgrs
|
|
|
|
d.mu.Unlock()
|
|
|
|
d.mgrQueue.Publish(mgrs)
|
|
|
|
}
|
|
|
|
|
|
|
|
batchTimer := time.NewTimer(maxBatchInterval)
|
|
|
|
defer batchTimer.Stop()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
2016-08-17 22:43:33 -07:00
|
|
|
case ev := <-peerWatcher:
|
|
|
|
publishManagers(ev.([]*api.Peer))
|
2016-08-09 11:49:39 -07:00
|
|
|
case <-d.processUpdatesTrigger:
|
|
|
|
d.processUpdates()
|
2016-06-07 14:28:28 -07:00
|
|
|
batchTimer.Reset(maxBatchInterval)
|
|
|
|
case <-batchTimer.C:
|
2016-08-09 11:49:39 -07:00
|
|
|
d.processUpdates()
|
2016-06-07 14:28:28 -07:00
|
|
|
batchTimer.Reset(maxBatchInterval)
|
|
|
|
case v := <-configWatcher:
|
|
|
|
cluster := v.(state.EventUpdateCluster)
|
|
|
|
d.mu.Lock()
|
2016-06-17 19:01:18 -07:00
|
|
|
if cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod != nil {
|
|
|
|
// ignore error, since Spec has passed validation before
|
|
|
|
heartbeatPeriod, _ := ptypes.Duration(cluster.Cluster.Spec.Dispatcher.HeartbeatPeriod)
|
|
|
|
if heartbeatPeriod != d.config.HeartbeatPeriod {
|
|
|
|
// only call d.nodes.updatePeriod when heartbeatPeriod changes
|
|
|
|
d.config.HeartbeatPeriod = heartbeatPeriod
|
|
|
|
d.nodes.updatePeriod(d.config.HeartbeatPeriod, d.config.HeartbeatEpsilon, d.config.GracePeriodMultiplier)
|
|
|
|
}
|
|
|
|
}
|
2016-06-07 14:28:28 -07:00
|
|
|
d.networkBootstrapKeys = cluster.Cluster.NetworkBootstrapKeys
|
|
|
|
d.mu.Unlock()
|
2016-08-17 22:43:33 -07:00
|
|
|
d.keyMgrQueue.Publish(cluster.Cluster.NetworkBootstrapKeys)
|
2016-06-07 14:28:28 -07:00
|
|
|
case <-d.ctx.Done():
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stop stops dispatcher and closes all grpc streams.
|
|
|
|
func (d *Dispatcher) Stop() error {
|
|
|
|
d.mu.Lock()
|
|
|
|
if !d.isRunning() {
|
2016-06-30 13:34:48 -07:00
|
|
|
d.mu.Unlock()
|
2016-09-26 23:48:16 -07:00
|
|
|
return errors.New("dispatcher is already stopped")
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
d.cancel()
|
|
|
|
d.mu.Unlock()
|
|
|
|
d.nodes.Clean()
|
2016-08-09 11:49:39 -07:00
|
|
|
|
|
|
|
d.processUpdatesLock.Lock()
|
|
|
|
// In case there are any waiters. There is no chance of any starting
|
|
|
|
// after this point, because they check if the context is canceled
|
|
|
|
// before waiting.
|
|
|
|
d.processUpdatesCond.Broadcast()
|
|
|
|
d.processUpdatesLock.Unlock()
|
|
|
|
|
2016-08-17 22:43:33 -07:00
|
|
|
d.mgrQueue.Close()
|
|
|
|
d.keyMgrQueue.Close()
|
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-06-15 22:41:30 -07:00
|
|
|
func (d *Dispatcher) isRunningLocked() error {
|
2016-06-07 14:28:28 -07:00
|
|
|
d.mu.Lock()
|
|
|
|
if !d.isRunning() {
|
|
|
|
d.mu.Unlock()
|
|
|
|
return grpc.Errorf(codes.Aborted, "dispatcher is stopped")
|
|
|
|
}
|
|
|
|
d.mu.Unlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
|
|
|
|
log := log.G(ctx).WithField("method", "(*Dispatcher).markNodesUnknown")
|
|
|
|
var nodes []*api.Node
|
|
|
|
var err error
|
|
|
|
d.store.View(func(tx store.ReadTx) {
|
|
|
|
nodes, err = store.FindNodes(tx, store.All)
|
|
|
|
})
|
|
|
|
if err != nil {
|
2016-09-26 23:48:16 -07:00
|
|
|
return errors.Wrap(err, "failed to get list of nodes")
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
_, err = d.store.Batch(func(batch *store.Batch) error {
|
|
|
|
for _, n := range nodes {
|
|
|
|
err := batch.Update(func(tx store.Tx) error {
|
|
|
|
// check if node is still here
|
|
|
|
node := store.GetNode(tx, n.ID)
|
|
|
|
if node == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// do not try to resurrect down nodes
|
|
|
|
if node.Status.State == api.NodeStatus_DOWN {
|
2016-11-09 22:37:38 +01:00
|
|
|
nodeCopy := node
|
|
|
|
expireFunc := func() {
|
|
|
|
if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
|
|
|
|
log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
|
|
|
|
}
|
|
|
|
|
|
|
|
d.downNodes.Delete(nodeCopy.ID)
|
|
|
|
}
|
|
|
|
|
|
|
|
d.downNodes.Add(nodeCopy, expireFunc)
|
2016-06-07 14:28:28 -07:00
|
|
|
return nil
|
|
|
|
}
|
2016-11-02 19:43:27 +01:00
|
|
|
|
|
|
|
node.Status.State = api.NodeStatus_UNKNOWN
|
|
|
|
node.Status.Message = `Node moved to "unknown" state due to leadership change in cluster`
|
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
nodeID := node.ID
|
|
|
|
|
|
|
|
expireFunc := func() {
|
|
|
|
log := log.WithField("node", nodeID)
|
|
|
|
log.Debugf("heartbeat expiration for unknown node")
|
2016-11-02 19:43:27 +01:00
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
|
2016-06-30 13:34:48 -07:00
|
|
|
log.WithError(err).Errorf(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
|
2016-09-26 23:48:16 -07:00
|
|
|
return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
if err := store.UpdateNode(tx, node); err != nil {
|
2016-09-26 23:48:16 -07:00
|
|
|
return errors.Wrap(err, "update failed")
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
2016-06-30 13:34:48 -07:00
|
|
|
log.WithField("node", n.ID).WithError(err).Errorf(`failed to move node to "unknown" state`)
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Dispatcher) isRunning() bool {
|
|
|
|
if d.ctx == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-d.ctx.Done():
|
|
|
|
return false
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2016-11-02 19:43:27 +01:00
|
|
|
// markNodeReady updates the description of a node, updates its address, and sets status to READY
|
2016-09-13 09:28:01 -07:00
|
|
|
// this is used during registration when a new node description is provided
|
|
|
|
// and during node updates when the node description changes
|
2016-11-02 19:43:27 +01:00
|
|
|
func (d *Dispatcher) markNodeReady(nodeID string, description *api.NodeDescription, addr string) error {
|
2016-08-09 11:49:39 -07:00
|
|
|
d.nodeUpdatesLock.Lock()
|
2016-11-02 19:43:27 +01:00
|
|
|
d.nodeUpdates[nodeID] = nodeUpdate{
|
|
|
|
status: &api.NodeStatus{
|
|
|
|
State: api.NodeStatus_READY,
|
|
|
|
Addr: addr,
|
|
|
|
},
|
|
|
|
description: description,
|
|
|
|
}
|
2016-08-09 11:49:39 -07:00
|
|
|
numUpdates := len(d.nodeUpdates)
|
|
|
|
d.nodeUpdatesLock.Unlock()
|
|
|
|
|
2016-11-09 22:37:38 +01:00
|
|
|
// Node is marked ready. Remove the node from down nodes if it
|
|
|
|
// is there.
|
|
|
|
d.downNodes.Delete(nodeID)
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
if numUpdates >= maxBatchItems {
|
|
|
|
select {
|
|
|
|
case d.processUpdatesTrigger <- struct{}{}:
|
|
|
|
case <-d.ctx.Done():
|
2016-09-13 09:28:01 -07:00
|
|
|
return d.ctx.Err()
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
// Wait until the node update batch happens before unblocking register.
|
|
|
|
d.processUpdatesLock.Lock()
|
|
|
|
select {
|
|
|
|
case <-d.ctx.Done():
|
2016-09-13 09:28:01 -07:00
|
|
|
return d.ctx.Err()
|
2016-08-09 11:49:39 -07:00
|
|
|
default:
|
|
|
|
}
|
|
|
|
d.processUpdatesCond.Wait()
|
|
|
|
d.processUpdatesLock.Unlock()
|
|
|
|
|
2016-09-13 09:28:01 -07:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-11-02 19:43:27 +01:00
|
|
|
// gets the node IP from the context of a grpc call
|
|
|
|
func nodeIPFromContext(ctx context.Context) (string, error) {
|
|
|
|
nodeInfo, err := ca.RemoteNode(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
addr, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
|
|
|
|
if err != nil {
|
|
|
|
return "", errors.Wrap(err, "unable to get ip from addr:port")
|
|
|
|
}
|
|
|
|
return addr, nil
|
|
|
|
}
|
|
|
|
|
2016-09-13 09:28:01 -07:00
|
|
|
// register is used for registration of node with particular dispatcher.
|
|
|
|
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
|
|
|
|
// prevent register until we're ready to accept it
|
|
|
|
if err := d.isRunningLocked(); err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := d.nodes.CheckRateLimit(nodeID); err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(stevvooe): Validate node specification.
|
|
|
|
var node *api.Node
|
|
|
|
d.store.View(func(tx store.ReadTx) {
|
|
|
|
node = store.GetNode(tx, nodeID)
|
|
|
|
})
|
|
|
|
if node == nil {
|
|
|
|
return "", ErrNodeNotFound
|
|
|
|
}
|
|
|
|
|
2016-11-02 19:43:27 +01:00
|
|
|
addr, err := nodeIPFromContext(ctx)
|
|
|
|
if err != nil {
|
|
|
|
log.G(ctx).Debugf(err.Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := d.markNodeReady(nodeID, description, addr); err != nil {
|
2016-09-13 09:28:01 -07:00
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
expireFunc := func() {
|
|
|
|
log.G(ctx).Debugf("heartbeat expiration")
|
2016-11-02 19:43:27 +01:00
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
|
2016-06-07 14:28:28 -07:00
|
|
|
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
rn := d.nodes.Add(node, expireFunc)
|
|
|
|
|
|
|
|
// NOTE(stevvooe): We need be a little careful with re-registration. The
|
|
|
|
// current implementation just matches the node id and then gives away the
|
|
|
|
// sessionID. If we ever want to use sessionID as a secret, which we may
|
|
|
|
// want to, this is giving away the keys to the kitchen.
|
|
|
|
//
|
|
|
|
// The right behavior is going to be informed by identity. Basically, each
|
|
|
|
// time a node registers, we invalidate the session and issue a new
|
|
|
|
// session, once identity is proven. This will cause misbehaved agents to
|
|
|
|
// be kicked when multiple connections are made.
|
2016-07-07 12:58:43 +01:00
|
|
|
return rn.SessionID, nil
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateTaskStatus updates status of task. Node should send such updates
|
|
|
|
// on every status change of its tasks.
|
|
|
|
func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) {
|
|
|
|
nodeInfo, err := ca.RemoteNode(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
nodeID := nodeInfo.NodeID
|
|
|
|
fields := logrus.Fields{
|
|
|
|
"node.id": nodeID,
|
|
|
|
"node.session": r.SessionID,
|
|
|
|
"method": "(*Dispatcher).UpdateTaskStatus",
|
|
|
|
}
|
|
|
|
if nodeInfo.ForwardedBy != nil {
|
|
|
|
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
|
|
|
|
}
|
|
|
|
log := log.G(ctx).WithFields(fields)
|
|
|
|
|
2016-06-15 22:41:30 -07:00
|
|
|
if err := d.isRunningLocked(); err != nil {
|
2016-06-07 14:28:28 -07:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Validate task updates
|
|
|
|
for _, u := range r.Updates {
|
|
|
|
if u.Status == nil {
|
|
|
|
log.WithField("task.id", u.TaskID).Warn("task report has nil status")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
var t *api.Task
|
|
|
|
d.store.View(func(tx store.ReadTx) {
|
|
|
|
t = store.GetTask(tx, u.TaskID)
|
|
|
|
})
|
|
|
|
if t == nil {
|
|
|
|
log.WithField("task.id", u.TaskID).Warn("cannot find target task in store")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if t.NodeID != nodeID {
|
|
|
|
err := grpc.Errorf(codes.PermissionDenied, "cannot update a task not assigned this node")
|
|
|
|
log.WithField("task.id", u.TaskID).Error(err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
d.taskUpdatesLock.Lock()
|
|
|
|
// Enqueue task updates
|
|
|
|
for _, u := range r.Updates {
|
|
|
|
if u.Status == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
d.taskUpdates[u.TaskID] = u.Status
|
|
|
|
}
|
|
|
|
|
|
|
|
numUpdates := len(d.taskUpdates)
|
|
|
|
d.taskUpdatesLock.Unlock()
|
|
|
|
|
|
|
|
if numUpdates >= maxBatchItems {
|
2016-08-09 11:49:39 -07:00
|
|
|
select {
|
|
|
|
case d.processUpdatesTrigger <- struct{}{}:
|
|
|
|
case <-d.ctx.Done():
|
|
|
|
}
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
func (d *Dispatcher) processUpdates() {
|
|
|
|
var (
|
|
|
|
taskUpdates map[string]*api.TaskStatus
|
|
|
|
nodeUpdates map[string]nodeUpdate
|
|
|
|
)
|
2016-06-07 14:28:28 -07:00
|
|
|
d.taskUpdatesLock.Lock()
|
2016-08-09 11:49:39 -07:00
|
|
|
if len(d.taskUpdates) != 0 {
|
|
|
|
taskUpdates = d.taskUpdates
|
|
|
|
d.taskUpdates = make(map[string]*api.TaskStatus)
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
d.taskUpdatesLock.Unlock()
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
d.nodeUpdatesLock.Lock()
|
|
|
|
if len(d.nodeUpdates) != 0 {
|
|
|
|
nodeUpdates = d.nodeUpdates
|
|
|
|
d.nodeUpdates = make(map[string]nodeUpdate)
|
|
|
|
}
|
|
|
|
d.nodeUpdatesLock.Unlock()
|
|
|
|
|
|
|
|
if len(taskUpdates) == 0 && len(nodeUpdates) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
log := log.G(d.ctx).WithFields(logrus.Fields{
|
2016-08-09 11:49:39 -07:00
|
|
|
"method": "(*Dispatcher).processUpdates",
|
2016-06-07 14:28:28 -07:00
|
|
|
})
|
|
|
|
|
|
|
|
_, err := d.store.Batch(func(batch *store.Batch) error {
|
|
|
|
for taskID, status := range taskUpdates {
|
|
|
|
err := batch.Update(func(tx store.Tx) error {
|
|
|
|
logger := log.WithField("task.id", taskID)
|
|
|
|
task := store.GetTask(tx, taskID)
|
|
|
|
if task == nil {
|
|
|
|
logger.Errorf("task unavailable")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
logger = logger.WithField("state.transition", fmt.Sprintf("%v->%v", task.Status.State, status.State))
|
|
|
|
|
|
|
|
if task.Status == *status {
|
|
|
|
logger.Debug("task status identical, ignoring")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if task.Status.State > status.State {
|
|
|
|
logger.Debug("task status invalid transition")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
task.Status = *status
|
|
|
|
if err := store.UpdateTask(tx, task); err != nil {
|
|
|
|
logger.WithError(err).Error("failed to update task status")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
logger.Debug("task status updated")
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
2016-08-09 11:49:39 -07:00
|
|
|
log.WithError(err).Error("dispatcher task update transaction failed")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for nodeID, nodeUpdate := range nodeUpdates {
|
|
|
|
err := batch.Update(func(tx store.Tx) error {
|
|
|
|
logger := log.WithField("node.id", nodeID)
|
|
|
|
node := store.GetNode(tx, nodeID)
|
|
|
|
if node == nil {
|
|
|
|
logger.Errorf("node unavailable")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if nodeUpdate.status != nil {
|
2016-11-02 19:43:27 +01:00
|
|
|
node.Status.State = nodeUpdate.status.State
|
|
|
|
node.Status.Message = nodeUpdate.status.Message
|
|
|
|
if nodeUpdate.status.Addr != "" {
|
|
|
|
node.Status.Addr = nodeUpdate.status.Addr
|
|
|
|
}
|
2016-08-09 11:49:39 -07:00
|
|
|
}
|
|
|
|
if nodeUpdate.description != nil {
|
|
|
|
node.Description = nodeUpdate.description
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := store.UpdateNode(tx, node); err != nil {
|
|
|
|
logger.WithError(err).Error("failed to update node status")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
logger.Debug("node status updated")
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Error("dispatcher node update transaction failed")
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
}
|
2016-08-09 11:49:39 -07:00
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Error("dispatcher batch failed")
|
|
|
|
}
|
2016-08-09 11:49:39 -07:00
|
|
|
|
|
|
|
d.processUpdatesCond.Broadcast()
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Tasks is a stream of tasks state for node. Each message contains full list
|
|
|
|
// of tasks which should be run on node, if task is not present in that list,
|
|
|
|
// it should be terminated.
|
|
|
|
func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error {
|
|
|
|
nodeInfo, err := ca.RemoteNode(stream.Context())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
nodeID := nodeInfo.NodeID
|
|
|
|
|
2016-06-15 22:41:30 -07:00
|
|
|
if err := d.isRunningLocked(); err != nil {
|
2016-06-07 14:28:28 -07:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
fields := logrus.Fields{
|
|
|
|
"node.id": nodeID,
|
|
|
|
"node.session": r.SessionID,
|
|
|
|
"method": "(*Dispatcher).Tasks",
|
|
|
|
}
|
|
|
|
if nodeInfo.ForwardedBy != nil {
|
|
|
|
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
|
|
|
|
}
|
|
|
|
log.G(stream.Context()).WithFields(fields).Debugf("")
|
|
|
|
|
|
|
|
if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
tasksMap := make(map[string]*api.Task)
|
|
|
|
nodeTasks, cancel, err := store.ViewAndWatch(
|
|
|
|
d.store,
|
|
|
|
func(readTx store.ReadTx) error {
|
|
|
|
tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, t := range tasks {
|
|
|
|
tasksMap[t.ID] = t
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
state.EventCreateTask{Task: &api.Task{NodeID: nodeID},
|
|
|
|
Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
|
|
|
|
state.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
|
|
|
|
Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
|
|
|
|
state.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
|
|
|
|
Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
for {
|
|
|
|
if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
var tasks []*api.Task
|
|
|
|
for _, t := range tasksMap {
|
|
|
|
// dispatcher only sends tasks that have been assigned to a node
|
|
|
|
if t != nil && t.Status.State >= api.TaskStateAssigned {
|
|
|
|
tasks = append(tasks, t)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := stream.Send(&api.TasksMessage{Tasks: tasks}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-07-26 00:15:08 -07:00
|
|
|
// bursty events should be processed in batches and sent out snapshot
|
2016-07-27 21:17:00 -07:00
|
|
|
var (
|
2016-09-13 09:28:01 -07:00
|
|
|
modificationCnt int
|
|
|
|
batchingTimer *time.Timer
|
|
|
|
batchingTimeout <-chan time.Time
|
2016-07-27 21:17:00 -07:00
|
|
|
)
|
|
|
|
|
|
|
|
batchingLoop:
|
|
|
|
for modificationCnt < modificationBatchLimit {
|
2016-07-26 00:15:08 -07:00
|
|
|
select {
|
|
|
|
case event := <-nodeTasks:
|
|
|
|
switch v := event.(type) {
|
|
|
|
case state.EventCreateTask:
|
|
|
|
tasksMap[v.Task.ID] = v.Task
|
|
|
|
modificationCnt++
|
|
|
|
case state.EventUpdateTask:
|
|
|
|
if oldTask, exists := tasksMap[v.Task.ID]; exists {
|
2016-08-09 11:49:39 -07:00
|
|
|
// States ASSIGNED and below are set by the orchestrator/scheduler,
|
|
|
|
// not the agent, so tasks in these states need to be sent to the
|
|
|
|
// agent even if nothing else has changed.
|
|
|
|
if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
|
2016-07-26 00:15:08 -07:00
|
|
|
// this update should not trigger action at agent
|
|
|
|
tasksMap[v.Task.ID] = v.Task
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
tasksMap[v.Task.ID] = v.Task
|
|
|
|
modificationCnt++
|
|
|
|
case state.EventDeleteTask:
|
|
|
|
delete(tasksMap, v.Task.ID)
|
|
|
|
modificationCnt++
|
|
|
|
}
|
2016-09-13 09:28:01 -07:00
|
|
|
if batchingTimer != nil {
|
|
|
|
batchingTimer.Reset(batchingWaitTime)
|
2016-07-27 21:17:00 -07:00
|
|
|
} else {
|
2016-09-13 09:28:01 -07:00
|
|
|
batchingTimer = time.NewTimer(batchingWaitTime)
|
|
|
|
batchingTimeout = batchingTimer.C
|
|
|
|
}
|
|
|
|
case <-batchingTimeout:
|
|
|
|
break batchingLoop
|
|
|
|
case <-stream.Context().Done():
|
|
|
|
return stream.Context().Err()
|
|
|
|
case <-d.ctx.Done():
|
|
|
|
return d.ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if batchingTimer != nil {
|
|
|
|
batchingTimer.Stop()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Assignments is a stream of assignments for a node. Each message contains
|
|
|
|
// either full list of tasks and secrets for the node, or an incremental update.
|
|
|
|
func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error {
|
|
|
|
nodeInfo, err := ca.RemoteNode(stream.Context())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
nodeID := nodeInfo.NodeID
|
|
|
|
|
|
|
|
if err := d.isRunningLocked(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
fields := logrus.Fields{
|
|
|
|
"node.id": nodeID,
|
|
|
|
"node.session": r.SessionID,
|
|
|
|
"method": "(*Dispatcher).Assignments",
|
|
|
|
}
|
|
|
|
if nodeInfo.ForwardedBy != nil {
|
|
|
|
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
|
|
|
|
}
|
|
|
|
log := log.G(stream.Context()).WithFields(fields)
|
|
|
|
log.Debugf("")
|
|
|
|
|
|
|
|
if _, err = d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
sequence int64
|
|
|
|
appliesTo string
|
|
|
|
initial api.AssignmentsMessage
|
|
|
|
)
|
|
|
|
tasksMap := make(map[string]*api.Task)
|
2016-10-15 08:49:04 -07:00
|
|
|
tasksUsingSecret := make(map[string]map[string]struct{})
|
2016-09-13 09:28:01 -07:00
|
|
|
|
|
|
|
sendMessage := func(msg api.AssignmentsMessage, assignmentType api.AssignmentsMessage_Type) error {
|
|
|
|
sequence++
|
|
|
|
msg.AppliesTo = appliesTo
|
|
|
|
msg.ResultsIn = strconv.FormatInt(sequence, 10)
|
|
|
|
appliesTo = msg.ResultsIn
|
|
|
|
msg.Type = assignmentType
|
|
|
|
|
|
|
|
if err := stream.Send(&msg); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-10-15 08:49:04 -07:00
|
|
|
// returns a slice of new secrets to send down
|
|
|
|
addSecretsForTask := func(readTx store.ReadTx, t *api.Task) []*api.Secret {
|
|
|
|
container := t.Spec.GetContainer()
|
|
|
|
if container == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
var newSecrets []*api.Secret
|
|
|
|
for _, secretRef := range container.Secrets {
|
2016-11-02 19:43:27 +01:00
|
|
|
// Empty ID prefix will return all secrets. Bail if there is no SecretID
|
|
|
|
if secretRef.SecretID == "" {
|
|
|
|
log.Debugf("invalid secret reference")
|
|
|
|
continue
|
|
|
|
}
|
2016-10-15 08:49:04 -07:00
|
|
|
secretID := secretRef.SecretID
|
|
|
|
log := log.WithFields(logrus.Fields{
|
|
|
|
"secret.id": secretID,
|
|
|
|
"secret.name": secretRef.SecretName,
|
|
|
|
})
|
|
|
|
|
2016-11-02 19:43:27 +01:00
|
|
|
if len(tasksUsingSecret[secretID]) == 0 {
|
2016-10-15 08:49:04 -07:00
|
|
|
tasksUsingSecret[secretID] = make(map[string]struct{})
|
|
|
|
|
|
|
|
secrets, err := store.FindSecrets(readTx, store.ByIDPrefix(secretID))
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Errorf("error retrieving secret")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if len(secrets) != 1 {
|
|
|
|
log.Debugf("secret not found")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the secret was found and there was one result
|
|
|
|
// (there should never be more than one because of the
|
|
|
|
// uniqueness constraint), add this secret to our
|
|
|
|
// initial set that we send down.
|
|
|
|
newSecrets = append(newSecrets, secrets[0])
|
|
|
|
}
|
|
|
|
tasksUsingSecret[secretID][t.ID] = struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
return newSecrets
|
|
|
|
}
|
|
|
|
|
2016-09-13 09:28:01 -07:00
|
|
|
// TODO(aaronl): Also send node secrets that should be exposed to
|
|
|
|
// this node.
|
|
|
|
nodeTasks, cancel, err := store.ViewAndWatch(
|
|
|
|
d.store,
|
|
|
|
func(readTx store.ReadTx) error {
|
|
|
|
tasks, err := store.FindTasks(readTx, store.ByNodeID(nodeID))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, t := range tasks {
|
|
|
|
// We only care about tasks that are ASSIGNED or
|
|
|
|
// higher. If the state is below ASSIGNED, the
|
|
|
|
// task may not meet the constraints for this
|
|
|
|
// node, so we have to be careful about sending
|
|
|
|
// secrets associated with it.
|
|
|
|
if t.Status.State < api.TaskStateAssigned {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
tasksMap[t.ID] = t
|
2016-10-15 08:49:04 -07:00
|
|
|
taskChange := &api.AssignmentChange{
|
|
|
|
Assignment: &api.Assignment{
|
|
|
|
Item: &api.Assignment_Task{
|
|
|
|
Task: t,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Action: api.AssignmentChange_AssignmentActionUpdate,
|
|
|
|
}
|
|
|
|
initial.Changes = append(initial.Changes, taskChange)
|
|
|
|
// Only send secrets down if these tasks are in < RUNNING
|
|
|
|
if t.Status.State <= api.TaskStateRunning {
|
|
|
|
newSecrets := addSecretsForTask(readTx, t)
|
|
|
|
for _, secret := range newSecrets {
|
|
|
|
secretChange := &api.AssignmentChange{
|
|
|
|
Assignment: &api.Assignment{
|
|
|
|
Item: &api.Assignment_Secret{
|
|
|
|
Secret: secret,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Action: api.AssignmentChange_AssignmentActionUpdate,
|
|
|
|
}
|
|
|
|
|
|
|
|
initial.Changes = append(initial.Changes, secretChange)
|
|
|
|
}
|
|
|
|
}
|
2016-09-13 09:28:01 -07:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
state.EventUpdateTask{Task: &api.Task{NodeID: nodeID},
|
|
|
|
Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
|
|
|
|
state.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
|
|
|
|
Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
|
2016-10-15 08:49:04 -07:00
|
|
|
state.EventUpdateSecret{},
|
|
|
|
state.EventDeleteSecret{},
|
2016-09-13 09:28:01 -07:00
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
if err := sendMessage(initial, api.AssignmentsMessage_COMPLETE); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
// Check for session expiration
|
|
|
|
if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// bursty events should be processed in batches and sent out together
|
|
|
|
var (
|
|
|
|
update api.AssignmentsMessage
|
|
|
|
modificationCnt int
|
|
|
|
batchingTimer *time.Timer
|
|
|
|
batchingTimeout <-chan time.Time
|
|
|
|
updateTasks = make(map[string]*api.Task)
|
2016-10-15 08:49:04 -07:00
|
|
|
updateSecrets = make(map[string]*api.Secret)
|
2016-09-13 09:28:01 -07:00
|
|
|
removeTasks = make(map[string]struct{})
|
2016-10-15 08:49:04 -07:00
|
|
|
removeSecrets = make(map[string]struct{})
|
2016-09-13 09:28:01 -07:00
|
|
|
)
|
|
|
|
|
|
|
|
oneModification := func() {
|
|
|
|
modificationCnt++
|
|
|
|
|
|
|
|
if batchingTimer != nil {
|
|
|
|
batchingTimer.Reset(batchingWaitTime)
|
|
|
|
} else {
|
|
|
|
batchingTimer = time.NewTimer(batchingWaitTime)
|
|
|
|
batchingTimeout = batchingTimer.C
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-15 08:49:04 -07:00
|
|
|
// Release the secrets references from this task
|
|
|
|
releaseSecretsForTask := func(t *api.Task) bool {
|
|
|
|
var modified bool
|
|
|
|
container := t.Spec.GetContainer()
|
|
|
|
if container == nil {
|
|
|
|
return modified
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, secretRef := range container.Secrets {
|
|
|
|
secretID := secretRef.SecretID
|
|
|
|
delete(tasksUsingSecret[secretID], t.ID)
|
|
|
|
if len(tasksUsingSecret[secretID]) == 0 {
|
|
|
|
// No tasks are using the secret anymore
|
|
|
|
delete(tasksUsingSecret, secretID)
|
|
|
|
removeSecrets[secretID] = struct{}{}
|
|
|
|
modified = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return modified
|
|
|
|
}
|
|
|
|
|
2016-09-13 09:28:01 -07:00
|
|
|
// The batching loop waits for 50 ms after the most recent
|
|
|
|
// change, or until modificationBatchLimit is reached. The
|
|
|
|
// worst case latency is modificationBatchLimit * batchingWaitTime,
|
|
|
|
// which is 10 seconds.
|
|
|
|
batchingLoop:
|
|
|
|
for modificationCnt < modificationBatchLimit {
|
|
|
|
select {
|
|
|
|
case event := <-nodeTasks:
|
|
|
|
switch v := event.(type) {
|
|
|
|
// We don't monitor EventCreateTask because tasks are
|
|
|
|
// never created in the ASSIGNED state. First tasks are
|
|
|
|
// created by the orchestrator, then the scheduler moves
|
|
|
|
// them to ASSIGNED. If this ever changes, we will need
|
|
|
|
// to monitor task creations as well.
|
|
|
|
case state.EventUpdateTask:
|
|
|
|
// We only care about tasks that are ASSIGNED or
|
|
|
|
// higher.
|
|
|
|
if v.Task.Status.State < api.TaskStateAssigned {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if oldTask, exists := tasksMap[v.Task.ID]; exists {
|
|
|
|
// States ASSIGNED and below are set by the orchestrator/scheduler,
|
|
|
|
// not the agent, so tasks in these states need to be sent to the
|
|
|
|
// agent even if nothing else has changed.
|
|
|
|
if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
|
|
|
|
// this update should not trigger a task change for the agent
|
|
|
|
tasksMap[v.Task.ID] = v.Task
|
2016-10-15 08:49:04 -07:00
|
|
|
// If this task got updated to a final state, let's release
|
|
|
|
// the secrets that are being used by the task
|
|
|
|
if v.Task.Status.State > api.TaskStateRunning {
|
|
|
|
// If releasing the secrets caused a secret to be
|
|
|
|
// removed from an agent, mark one modification
|
|
|
|
if releaseSecretsForTask(v.Task) {
|
|
|
|
oneModification()
|
|
|
|
}
|
|
|
|
}
|
2016-09-13 09:28:01 -07:00
|
|
|
continue
|
|
|
|
}
|
2016-10-15 08:49:04 -07:00
|
|
|
} else if v.Task.Status.State <= api.TaskStateRunning {
|
|
|
|
// If this task wasn't part of the assignment set before, and it's <= RUNNING
|
|
|
|
// add the secrets it references to the secrets assignment.
|
|
|
|
// Task states > RUNNING are worker reported only, are never created in
|
|
|
|
// a > RUNNING state.
|
|
|
|
var newSecrets []*api.Secret
|
|
|
|
d.store.View(func(readTx store.ReadTx) {
|
|
|
|
newSecrets = addSecretsForTask(readTx, v.Task)
|
|
|
|
})
|
|
|
|
for _, secret := range newSecrets {
|
|
|
|
updateSecrets[secret.ID] = secret
|
|
|
|
}
|
2016-09-13 09:28:01 -07:00
|
|
|
}
|
|
|
|
tasksMap[v.Task.ID] = v.Task
|
|
|
|
updateTasks[v.Task.ID] = v.Task
|
|
|
|
|
|
|
|
oneModification()
|
|
|
|
case state.EventDeleteTask:
|
|
|
|
if _, exists := tasksMap[v.Task.ID]; !exists {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
removeTasks[v.Task.ID] = struct{}{}
|
|
|
|
|
|
|
|
delete(tasksMap, v.Task.ID)
|
|
|
|
|
2016-10-15 08:49:04 -07:00
|
|
|
// Release the secrets being used by this task
|
|
|
|
// Ignoring the return here. We will always mark
|
|
|
|
// this as a modification, since a task is being
|
|
|
|
// removed.
|
|
|
|
releaseSecretsForTask(v.Task)
|
|
|
|
|
2016-09-13 09:28:01 -07:00
|
|
|
oneModification()
|
2016-10-15 08:49:04 -07:00
|
|
|
// TODO(aaronl): For node secrets, we'll need to handle
|
|
|
|
// EventCreateSecret.
|
|
|
|
case state.EventUpdateSecret:
|
|
|
|
if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
log.Debugf("Secret %s (ID: %d) was updated though it was still referenced by one or more tasks",
|
|
|
|
v.Secret.Spec.Annotations.Name, v.Secret.ID)
|
|
|
|
|
|
|
|
case state.EventDeleteSecret:
|
|
|
|
if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
log.Debugf("Secret %s (ID: %d) was deleted though it was still referenced by one or more tasks",
|
|
|
|
v.Secret.Spec.Annotations.Name, v.Secret.ID)
|
2016-07-26 00:15:08 -07:00
|
|
|
}
|
2016-09-13 09:28:01 -07:00
|
|
|
case <-batchingTimeout:
|
2016-07-27 21:17:00 -07:00
|
|
|
break batchingLoop
|
2016-07-26 00:15:08 -07:00
|
|
|
case <-stream.Context().Done():
|
|
|
|
return stream.Context().Err()
|
|
|
|
case <-d.ctx.Done():
|
|
|
|
return d.ctx.Err()
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
}
|
2016-07-27 21:17:00 -07:00
|
|
|
|
2016-09-13 09:28:01 -07:00
|
|
|
if batchingTimer != nil {
|
|
|
|
batchingTimer.Stop()
|
|
|
|
}
|
|
|
|
|
|
|
|
if modificationCnt > 0 {
|
|
|
|
for id, task := range updateTasks {
|
|
|
|
if _, ok := removeTasks[id]; !ok {
|
2016-10-15 08:49:04 -07:00
|
|
|
taskChange := &api.AssignmentChange{
|
|
|
|
Assignment: &api.Assignment{
|
|
|
|
Item: &api.Assignment_Task{
|
|
|
|
Task: task,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Action: api.AssignmentChange_AssignmentActionUpdate,
|
|
|
|
}
|
|
|
|
|
|
|
|
update.Changes = append(update.Changes, taskChange)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for id, secret := range updateSecrets {
|
2016-11-02 19:43:27 +01:00
|
|
|
// If, due to multiple updates, this secret is no longer in use,
|
|
|
|
// don't send it down.
|
|
|
|
if len(tasksUsingSecret[id]) == 0 {
|
|
|
|
// delete this secret for the secrets to be updated
|
|
|
|
// so that deleteSecrets knows the current list
|
|
|
|
delete(updateSecrets, id)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
secretChange := &api.AssignmentChange{
|
|
|
|
Assignment: &api.Assignment{
|
|
|
|
Item: &api.Assignment_Secret{
|
|
|
|
Secret: secret,
|
2016-10-15 08:49:04 -07:00
|
|
|
},
|
2016-11-02 19:43:27 +01:00
|
|
|
},
|
|
|
|
Action: api.AssignmentChange_AssignmentActionUpdate,
|
2016-09-13 09:28:01 -07:00
|
|
|
}
|
2016-11-02 19:43:27 +01:00
|
|
|
|
|
|
|
update.Changes = append(update.Changes, secretChange)
|
2016-09-13 09:28:01 -07:00
|
|
|
}
|
|
|
|
for id := range removeTasks {
|
2016-10-15 08:49:04 -07:00
|
|
|
taskChange := &api.AssignmentChange{
|
|
|
|
Assignment: &api.Assignment{
|
|
|
|
Item: &api.Assignment_Task{
|
|
|
|
Task: &api.Task{ID: id},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Action: api.AssignmentChange_AssignmentActionRemove,
|
|
|
|
}
|
|
|
|
|
|
|
|
update.Changes = append(update.Changes, taskChange)
|
2016-09-13 09:28:01 -07:00
|
|
|
}
|
2016-10-15 08:49:04 -07:00
|
|
|
for id := range removeSecrets {
|
2016-11-02 19:43:27 +01:00
|
|
|
// If this secret is also being sent on the updated set
|
|
|
|
// don't also add it to the removed set
|
|
|
|
if _, ok := updateSecrets[id]; ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2016-10-15 08:49:04 -07:00
|
|
|
secretChange := &api.AssignmentChange{
|
|
|
|
Assignment: &api.Assignment{
|
|
|
|
Item: &api.Assignment_Secret{
|
|
|
|
Secret: &api.Secret{ID: id},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Action: api.AssignmentChange_AssignmentActionRemove,
|
|
|
|
}
|
|
|
|
|
|
|
|
update.Changes = append(update.Changes, secretChange)
|
|
|
|
}
|
|
|
|
|
2016-09-13 09:28:01 -07:00
|
|
|
if err := sendMessage(update, api.AssignmentsMessage_INCREMENTAL); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-07-27 21:17:00 -07:00
|
|
|
}
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-09 22:37:38 +01:00
|
|
|
func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
|
|
|
|
_, err := d.store.Batch(func(batch *store.Batch) error {
|
|
|
|
var (
|
|
|
|
tasks []*api.Task
|
|
|
|
err error
|
|
|
|
)
|
|
|
|
|
|
|
|
d.store.View(func(tx store.ReadTx) {
|
|
|
|
tasks, err = store.FindTasks(tx, store.ByNodeID(nodeID))
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, task := range tasks {
|
|
|
|
if task.Status.State < api.TaskStateOrphaned {
|
|
|
|
task.Status.State = api.TaskStateOrphaned
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := batch.Update(func(tx store.Tx) error {
|
|
|
|
err := store.UpdateTask(tx, task)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-11-02 19:43:27 +01:00
|
|
|
// markNodeNotReady sets the node state to some state other than READY
|
|
|
|
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
|
2016-06-15 22:41:30 -07:00
|
|
|
if err := d.isRunningLocked(); err != nil {
|
2016-06-07 14:28:28 -07:00
|
|
|
return err
|
|
|
|
}
|
2016-08-09 11:49:39 -07:00
|
|
|
|
2016-11-09 22:37:38 +01:00
|
|
|
// Node is down. Add it to down nodes so that we can keep
|
|
|
|
// track of tasks assigned to the node.
|
|
|
|
var (
|
|
|
|
node *api.Node
|
|
|
|
err error
|
|
|
|
)
|
|
|
|
d.store.View(func(readTx store.ReadTx) {
|
|
|
|
node = store.GetNode(readTx, id)
|
|
|
|
if node == nil {
|
|
|
|
err = fmt.Errorf("could not find node %s while trying to add to down nodes store", id)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
expireFunc := func() {
|
|
|
|
if err := d.moveTasksToOrphaned(id); err != nil {
|
|
|
|
log.G(context.TODO()).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
|
|
|
|
}
|
|
|
|
|
|
|
|
d.downNodes.Delete(id)
|
|
|
|
}
|
|
|
|
|
|
|
|
d.downNodes.Add(node, expireFunc)
|
|
|
|
|
2016-11-02 19:43:27 +01:00
|
|
|
status := &api.NodeStatus{
|
|
|
|
State: state,
|
|
|
|
Message: message,
|
|
|
|
}
|
|
|
|
|
2016-08-09 11:49:39 -07:00
|
|
|
d.nodeUpdatesLock.Lock()
|
2016-11-02 19:43:27 +01:00
|
|
|
// pluck the description out of nodeUpdates. this protects against a case
|
|
|
|
// where a node is marked ready and a description is added, but then the
|
|
|
|
// node is immediately marked not ready. this preserves that description
|
|
|
|
d.nodeUpdates[id] = nodeUpdate{status: status, description: d.nodeUpdates[id].description}
|
2016-08-09 11:49:39 -07:00
|
|
|
numUpdates := len(d.nodeUpdates)
|
|
|
|
d.nodeUpdatesLock.Unlock()
|
|
|
|
|
|
|
|
if numUpdates >= maxBatchItems {
|
|
|
|
select {
|
|
|
|
case d.processUpdatesTrigger <- struct{}{}:
|
|
|
|
case <-d.ctx.Done():
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if rn := d.nodes.Delete(id); rn == nil {
|
2016-09-26 23:48:16 -07:00
|
|
|
return errors.Errorf("node %s is not found in local storage", id)
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Heartbeat is heartbeat method for nodes. It returns new TTL in response.
|
|
|
|
// Node should send new heartbeat earlier than now + TTL, otherwise it will
|
|
|
|
// be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN
|
|
|
|
func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) {
|
|
|
|
nodeInfo, err := ca.RemoteNode(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)
|
|
|
|
return &api.HeartbeatResponse{Period: *ptypes.DurationProto(period)}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Dispatcher) getManagers() []*api.WeightedPeer {
|
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
|
|
|
return d.lastSeenManagers
|
|
|
|
}
|
|
|
|
|
2016-08-17 22:43:33 -07:00
|
|
|
func (d *Dispatcher) getNetworkBootstrapKeys() []*api.EncryptionKey {
|
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
|
|
|
return d.networkBootstrapKeys
|
|
|
|
}
|
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
// Session is a stream which controls agent connection.
|
|
|
|
// Each message contains list of backup Managers with weights. Also there is
|
|
|
|
// a special boolean field Disconnect which if true indicates that node should
|
|
|
|
// reconnect to another Manager immediately.
|
|
|
|
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
|
|
|
|
ctx := stream.Context()
|
|
|
|
nodeInfo, err := ca.RemoteNode(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
nodeID := nodeInfo.NodeID
|
|
|
|
|
2016-06-15 22:41:30 -07:00
|
|
|
if err := d.isRunningLocked(); err != nil {
|
2016-06-07 14:28:28 -07:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-08-17 22:43:33 -07:00
|
|
|
var sessionID string
|
|
|
|
if _, err := d.nodes.GetWithSession(nodeID, r.SessionID); err != nil {
|
|
|
|
// register the node.
|
2016-11-02 19:43:27 +01:00
|
|
|
sessionID, err = d.register(ctx, nodeID, r.Description)
|
2016-08-17 22:43:33 -07:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
sessionID = r.SessionID
|
2016-11-02 19:43:27 +01:00
|
|
|
// get the node IP addr
|
|
|
|
addr, err := nodeIPFromContext(stream.Context())
|
|
|
|
if err != nil {
|
|
|
|
log.G(ctx).Debugf(err.Error())
|
|
|
|
}
|
2016-09-13 09:28:01 -07:00
|
|
|
// update the node description
|
2016-11-02 19:43:27 +01:00
|
|
|
if err := d.markNodeReady(nodeID, r.Description, addr); err != nil {
|
2016-09-13 09:28:01 -07:00
|
|
|
return err
|
|
|
|
}
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
fields := logrus.Fields{
|
|
|
|
"node.id": nodeID,
|
|
|
|
"node.session": sessionID,
|
|
|
|
"method": "(*Dispatcher).Session",
|
|
|
|
}
|
|
|
|
if nodeInfo.ForwardedBy != nil {
|
|
|
|
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
|
|
|
|
}
|
|
|
|
log := log.G(ctx).WithFields(fields)
|
|
|
|
|
|
|
|
var nodeObj *api.Node
|
|
|
|
nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
|
|
|
|
nodeObj = store.GetNode(readTx, nodeID)
|
|
|
|
return nil
|
|
|
|
}, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
|
|
|
|
Checks: []state.NodeCheckFunc{state.NodeCheckID}},
|
|
|
|
)
|
|
|
|
if cancel != nil {
|
|
|
|
defer cancel()
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Error("ViewAndWatch Node failed")
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := stream.Send(&api.SessionMessage{
|
|
|
|
SessionID: sessionID,
|
|
|
|
Node: nodeObj,
|
|
|
|
Managers: d.getManagers(),
|
2016-08-17 22:43:33 -07:00
|
|
|
NetworkBootstrapKeys: d.getNetworkBootstrapKeys(),
|
2016-06-07 14:28:28 -07:00
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
managerUpdates, mgrCancel := d.mgrQueue.Watch()
|
|
|
|
defer mgrCancel()
|
|
|
|
keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
|
|
|
|
defer keyMgrCancel()
|
|
|
|
|
2016-06-17 19:01:18 -07:00
|
|
|
// disconnectNode is a helper forcibly shutdown connection
|
|
|
|
disconnectNode := func() error {
|
2016-06-07 14:28:28 -07:00
|
|
|
// force disconnect by shutting down the stream.
|
|
|
|
transportStream, ok := transport.StreamFromContext(stream.Context())
|
|
|
|
if ok {
|
|
|
|
// if we have the transport stream, we can signal a disconnect
|
|
|
|
// in the client.
|
|
|
|
if err := transportStream.ServerTransport().Close(); err != nil {
|
|
|
|
log.WithError(err).Error("session end")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-02 19:43:27 +01:00
|
|
|
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
|
2016-06-07 14:28:28 -07:00
|
|
|
log.WithError(err).Error("failed to remove node")
|
|
|
|
}
|
|
|
|
// still return an abort if the transport closure was ineffective.
|
|
|
|
return grpc.Errorf(codes.Aborted, "node must disconnect")
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
// After each message send, we need to check the nodes sessionID hasn't
|
2016-08-22 22:30:01 -07:00
|
|
|
// changed. If it has, we will shut down the stream and make the node
|
2016-06-07 14:28:28 -07:00
|
|
|
// re-register.
|
|
|
|
node, err := d.nodes.GetWithSession(nodeID, sessionID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-08-17 22:43:33 -07:00
|
|
|
var (
|
|
|
|
disconnect bool
|
|
|
|
mgrs []*api.WeightedPeer
|
|
|
|
netKeys []*api.EncryptionKey
|
|
|
|
)
|
2016-06-17 19:01:18 -07:00
|
|
|
|
2016-06-07 14:28:28 -07:00
|
|
|
select {
|
|
|
|
case ev := <-managerUpdates:
|
|
|
|
mgrs = ev.([]*api.WeightedPeer)
|
|
|
|
case ev := <-nodeUpdates:
|
|
|
|
nodeObj = ev.(state.EventUpdateNode).Node
|
|
|
|
case <-stream.Context().Done():
|
|
|
|
return stream.Context().Err()
|
|
|
|
case <-node.Disconnect:
|
2016-06-17 19:01:18 -07:00
|
|
|
disconnect = true
|
2016-06-07 14:28:28 -07:00
|
|
|
case <-d.ctx.Done():
|
2016-06-17 19:01:18 -07:00
|
|
|
disconnect = true
|
2016-08-17 22:43:33 -07:00
|
|
|
case ev := <-keyMgrUpdates:
|
|
|
|
netKeys = ev.([]*api.EncryptionKey)
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
if mgrs == nil {
|
|
|
|
mgrs = d.getManagers()
|
|
|
|
}
|
2016-08-17 22:43:33 -07:00
|
|
|
if netKeys == nil {
|
|
|
|
netKeys = d.getNetworkBootstrapKeys()
|
|
|
|
}
|
2016-06-07 14:28:28 -07:00
|
|
|
|
|
|
|
if err := stream.Send(&api.SessionMessage{
|
|
|
|
SessionID: sessionID,
|
|
|
|
Node: nodeObj,
|
|
|
|
Managers: mgrs,
|
2016-08-17 22:43:33 -07:00
|
|
|
NetworkBootstrapKeys: netKeys,
|
2016-06-07 14:28:28 -07:00
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-06-17 19:01:18 -07:00
|
|
|
if disconnect {
|
|
|
|
return disconnectNode()
|
|
|
|
}
|
2016-06-07 14:28:28 -07:00
|
|
|
}
|
|
|
|
}
|