Update swarmkit to 68a376dc30d8c4001767c39456b990dbd821371b

This fix updates swarmkit to 68a376dc30d8c4001767c39456b990dbd821371b:
```
-github.com/docker/swarmkit 713d79dc8799b33465c58ed120b870c52eb5eb4f
+github.com/docker/swarmkit 68a376dc30d8c4001767c39456b990dbd821371b
```

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
This commit is contained in:
Yong Tang 2018-01-28 07:33:37 +00:00
parent 9368e9dac3
commit b9923d8530
8 changed files with 52 additions and 12 deletions

View File

@ -115,7 +115,7 @@ github.com/dmcgowan/go-tar go1.10
github.com/stevvooe/ttrpc 76e68349ad9ab4d03d764c713826d31216715e4f github.com/stevvooe/ttrpc 76e68349ad9ab4d03d764c713826d31216715e4f
# cluster # cluster
github.com/docker/swarmkit 713d79dc8799b33465c58ed120b870c52eb5eb4f github.com/docker/swarmkit 68a376dc30d8c4001767c39456b990dbd821371b
github.com/gogo/protobuf v0.4 github.com/gogo/protobuf v0.4
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e

View File

@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) {
a.config.SessionTracker.SessionError(err) a.config.SessionTracker.SessionError(err)
} }
log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff { if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff backoff = maxSessionFailureBackoff
} }
log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed")
} }
if err := session.close(); err != nil { if err := session.close(); err != nil {

View File

@ -13,7 +13,7 @@ var (
errAgentStarted = errors.New("agent: already started") errAgentStarted = errors.New("agent: already started")
errAgentNotStarted = errors.New("agent: not started") errAgentNotStarted = errors.New("agent: not started")
errTaskNoContoller = errors.New("agent: no task controller") errTaskNoController = errors.New("agent: no task controller")
errTaskNotAssigned = errors.New("agent: task not assigned") errTaskNotAssigned = errors.New("agent: task not assigned")
errTaskStatusUpdateNoChange = errors.New("agent: no change in task status") errTaskStatusUpdateNoChange = errors.New("agent: no change in task status")
errTaskUnknown = errors.New("agent: task unknown") errTaskUnknown = errors.New("agent: task unknown")

View File

@ -65,10 +65,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout), grpc.WithTimeout(dispatcherRPCTimeout),
) )
if err != nil { if err != nil {
s.errs <- err s.errs <- err
return s return s
} }
log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer())
s.conn = cc s.conn = cc
go s.run(sessionCtx, delay, description) go s.run(sessionCtx, delay, description)
@ -77,6 +81,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) { func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
timer := time.NewTimer(delay) // delay before registering. timer := time.NewTimer(delay) // delay before registering.
log.G(ctx).Infof("waiting %v before registering session", delay)
defer timer.Stop() defer timer.Stop()
select { select {
case <-timer.C: case <-timer.C:
@ -166,15 +171,23 @@ func (s *session) heartbeat(ctx context.Context) error {
heartbeat := time.NewTimer(1) // send out a heartbeat right away heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop() defer heartbeat.Stop()
fields := logrus.Fields{
"sessionID": s.sessionID,
"method": "(*session).heartbeat",
}
for { for {
select { select {
case <-heartbeat.C: case <-heartbeat.C:
heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout) heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
// TODO(anshul) log manager info in all logs in this function.
log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout)
resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{ resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
SessionID: s.sessionID, SessionID: s.sessionID,
}) })
cancel() cancel()
if err != nil { if err != nil {
log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer())
if grpc.Code(err) == codes.NotFound { if grpc.Code(err) == codes.NotFound {
err = errNodeNotRegistered err = errNodeNotRegistered
} }
@ -182,6 +195,8 @@ func (s *session) heartbeat(ctx context.Context) error {
return err return err
} }
log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period)
heartbeat.Reset(resp.Period) heartbeat.Reset(resp.Period)
case <-s.closed: case <-s.closed:
return errSessionClosed return errSessionClosed
@ -408,7 +423,7 @@ func (s *session) sendError(err error) {
} }
} }
// close closing session. It should be called only in <-session.errs branch // close the given session. It should be called only in <-session.errs branch
// of event loop, or when cleaning up the agent. // of event loop, or when cleaning up the agent.
func (s *session) close() error { func (s *session) close() error {
s.closeOnce.Do(func() { s.closeOnce.Do(func() {

View File

@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
// connection. // connection.
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
peer, err := b.remotes.Select() peer, err := b.remotes.Select()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,6 +99,11 @@ type Conn struct {
peer api.Peer peer api.Peer
} }
// Peer returns the peer for this Conn.
func (c *Conn) Peer() api.Peer {
return c.peer
}
// Close closes the client connection if it is a remote connection. It also // Close closes the client connection if it is a remote connection. It also
// records a positive experience with the remote peer if success is true, // records a positive experience with the remote peer if success is true,
// otherwise it records a negative experience. If a local connection is in use, // otherwise it records a negative experience. If a local connection is in use,

View File

@ -195,6 +195,9 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
// Run runs dispatcher tasks which should be run on leader dispatcher. // Run runs dispatcher tasks which should be run on leader dispatcher.
// Dispatcher can be stopped with cancelling ctx or calling Stop(). // Dispatcher can be stopped with cancelling ctx or calling Stop().
func (d *Dispatcher) Run(ctx context.Context) error { func (d *Dispatcher) Run(ctx context.Context) error {
ctx = log.WithModule(ctx, "dispatcher")
log.G(ctx).Info("dispatcher starting")
d.taskUpdatesLock.Lock() d.taskUpdatesLock.Lock()
d.taskUpdates = make(map[string]*api.TaskStatus) d.taskUpdates = make(map[string]*api.TaskStatus)
d.taskUpdatesLock.Unlock() d.taskUpdatesLock.Unlock()
@ -208,7 +211,6 @@ func (d *Dispatcher) Run(ctx context.Context) error {
d.mu.Unlock() d.mu.Unlock()
return errors.New("dispatcher is already running") return errors.New("dispatcher is already running")
} }
ctx = log.WithModule(ctx, "dispatcher")
if err := d.markNodesUnknown(ctx); err != nil { if err := d.markNodesUnknown(ctx); err != nil {
log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err) log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
} }
@ -310,8 +312,12 @@ func (d *Dispatcher) Stop() error {
d.mu.Unlock() d.mu.Unlock()
return errors.New("dispatcher is already stopped") return errors.New("dispatcher is already stopped")
} }
log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop")
log.Info("dispatcher stopping")
d.cancel() d.cancel()
d.mu.Unlock() d.mu.Unlock()
d.nodes.Clean() d.nodes.Clean()
d.processUpdatesLock.Lock() d.processUpdatesLock.Lock()
@ -361,13 +367,15 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
if node.Status.State == api.NodeStatus_DOWN { if node.Status.State == api.NodeStatus_DOWN {
nodeCopy := node nodeCopy := node
expireFunc := func() { expireFunc := func() {
log.Infof("moving tasks to orphaned state for node: %s", nodeCopy.ID)
if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil { if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID)
} }
d.downNodes.Delete(nodeCopy.ID) d.downNodes.Delete(nodeCopy.ID)
} }
log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID)
d.downNodes.Add(nodeCopy, expireFunc) d.downNodes.Add(nodeCopy, expireFunc)
return nil return nil
} }
@ -379,16 +387,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
expireFunc := func() { expireFunc := func() {
log := log.WithField("node", nodeID) log := log.WithField("node", nodeID)
log.Debug("heartbeat expiration for unknown node") log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil { if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`) log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
} }
} }
if err := d.nodes.AddUnknown(node, expireFunc); err != nil { if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
return errors.Wrap(err, `adding node in "unknown" state to node store failed`) return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID)
} }
if err := store.UpdateNode(tx, node); err != nil { if err := store.UpdateNode(tx, node); err != nil {
return errors.Wrap(err, "update failed") return errors.Wrapf(err, "update for node %s failed", nodeID)
} }
return nil return nil
}) })
@ -470,6 +478,7 @@ func nodeIPFromContext(ctx context.Context) (string, error) {
// register is used for registration of node with particular dispatcher. // register is used for registration of node with particular dispatcher.
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
// prevent register until we're ready to accept it // prevent register until we're ready to accept it
dctx, err := d.isRunningLocked() dctx, err := d.isRunningLocked()
if err != nil { if err != nil {
@ -491,7 +500,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
addr, err := nodeIPFromContext(ctx) addr, err := nodeIPFromContext(ctx)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Debug("failed to get remote node IP") logLocal.WithError(err).Debug("failed to get remote node IP")
} }
if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil { if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
@ -499,13 +508,14 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
} }
expireFunc := func() { expireFunc := func() {
log.G(ctx).Debug("heartbeat expiration") log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil { if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration") log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
} }
} }
rn := d.nodes.Add(node, expireFunc) rn := d.nodes.Add(node, expireFunc)
logLocal.Infof("worker %s was successfully registered", nodeID)
// NOTE(stevvooe): We need be a little careful with re-registration. The // NOTE(stevvooe): We need be a little careful with re-registration. The
// current implementation just matches the node id and then gives away the // current implementation just matches the node id and then gives away the
@ -1029,6 +1039,8 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
// markNodeNotReady sets the node state to some state other than READY // markNodeNotReady sets the node state to some state other than READY
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error { func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady")
dctx, err := d.isRunningLocked() dctx, err := d.isRunningLocked()
if err != nil { if err != nil {
return err return err
@ -1048,6 +1060,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
} }
expireFunc := func() { expireFunc := func() {
log.G(dctx).Debugf(`worker timed-out %s in "down" state, moving all tasks to "ORPHANED" state`, id)
if err := d.moveTasksToOrphaned(id); err != nil { if err := d.moveTasksToOrphaned(id); err != nil {
log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
} }
@ -1056,6 +1069,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
} }
d.downNodes.Add(node, expireFunc) d.downNodes.Add(node, expireFunc)
logLocal.Debugf("added node %s to down nodes list", node.ID)
status := &api.NodeStatus{ status := &api.NodeStatus{
State: state, State: state,
@ -1080,6 +1094,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
if rn := d.nodes.Delete(id); rn == nil { if rn := d.nodes.Delete(id); rn == nil {
return errors.Errorf("node %s is not found in local storage", id) return errors.Errorf("node %s is not found in local storage", id)
} }
logLocal.Debugf("deleted node %s from node store", node.ID)
return nil return nil
} }
@ -1094,6 +1109,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a
} }
period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID) period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)
log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Debugf("received heartbeat from worker %v, expect next heartbeat in %v", nodeInfo, period)
return &api.HeartbeatResponse{Period: period}, err return &api.HeartbeatResponse{Period: period}, err
} }
@ -1206,6 +1223,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
} }
} }
log.Infof("dispatcher session dropped, marking node %s down", nodeID)
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil { if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
log.WithError(err).Error("failed to remove node") log.WithError(err).Error("failed to remove node")
} }

View File

@ -33,7 +33,7 @@ type logMessage struct {
// LogBroker coordinates log subscriptions to services and tasks. Clients can // LogBroker coordinates log subscriptions to services and tasks. Clients can
// publish and subscribe to logs channels. // publish and subscribe to logs channels.
// //
// Log subscriptions are pushed to the work nodes by creating log subscsription // Log subscriptions are pushed to the work nodes by creating log subscription
// tasks. As such, the LogBroker also acts as an orchestrator of these tasks. // tasks. As such, the LogBroker also acts as an orchestrator of these tasks.
type LogBroker struct { type LogBroker struct {
mu sync.RWMutex mu sync.RWMutex

View File

@ -239,6 +239,7 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
// Try doing a regular rpc if the receiver doesn't support streaming. // Try doing a regular rpc if the receiver doesn't support streaming.
if grpc.Code(err) == codes.Unimplemented { if grpc.Code(err) == codes.Unimplemented {
log.G(ctx).Info("sending message to raft peer using ProcessRaftMessage()")
_, err = api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m}) _, err = api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
} }