From be713ec486090db312f9a83481a2c4663d473f71 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 21 Feb 2018 16:08:32 -0500 Subject: [PATCH] Bump swarmkit Signed-off-by: Brian Goff --- vendor.conf | 2 +- .../swarmkit/manager/dispatcher/dispatcher.go | 111 +++++++----------- 2 files changed, 46 insertions(+), 67 deletions(-) diff --git a/vendor.conf b/vendor.conf index 42fbd52bd7..f00b59b3e1 100644 --- a/vendor.conf +++ b/vendor.conf @@ -120,7 +120,7 @@ github.com/dmcgowan/go-tar go1.10 github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577 # cluster -github.com/docker/swarmkit f74983e7c015a38a81c8642803a78b8322cf7eac +github.com/docker/swarmkit 49a9d7f6ba3c1925262641e694c18eb43575f74b github.com/gogo/protobuf v0.4 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e diff --git a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index bf48818ea5..7b9e175438 100644 --- a/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -125,12 +125,17 @@ type clusterUpdate struct { // Dispatcher is responsible for dispatching tasks and tracking agent health. type Dispatcher struct { - // mu is a lock to provide mutually exclusive access to dispatcher fields - // e.g. lastSeenManagers, networkBootstrapKeys, lastSeenRootCert etc. + // Mutex to synchronize access to dispatcher shared state e.g. nodes, + // lastSeenManagers, networkBootstrapKeys etc. + // TODO(anshul): This can potentially be removed and rpcRW used in its place. mu sync.Mutex - // shutdownWait is used by stop() to wait for existing operations to finish. - shutdownWait sync.WaitGroup - + // WaitGroup to handle the case when Stop() gets called before Run() + // has finished initializing the dispatcher. + wg sync.WaitGroup + // This RWMutex synchronizes RPC handlers and the dispatcher stop(). + // The RPC handlers use the read lock while stop() uses the write lock + // and acts as a barrier to shutdown. + rpcRW sync.RWMutex nodes *nodeStore store *store.MemoryStore lastSeenManagers []*api.WeightedPeer @@ -253,11 +258,8 @@ func (d *Dispatcher) Run(ctx context.Context) error { defer cancel() d.ctx, d.cancel = context.WithCancel(ctx) ctx = d.ctx - - // If Stop() is called, it should wait - // for Run() to complete. - d.shutdownWait.Add(1) - defer d.shutdownWait.Done() + d.wg.Add(1) + defer d.wg.Done() d.mu.Unlock() publishManagers := func(peers []*api.Peer) { @@ -320,15 +322,18 @@ func (d *Dispatcher) Stop() error { return errors.New("dispatcher is already stopped") } - // Cancel dispatcher context. - // This should also close the the streams in Tasks(), Assignments(). + log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop") + log.Info("dispatcher stopping") d.cancel() d.mu.Unlock() - // Wait for the RPCs that are in-progress to finish. - d.shutdownWait.Wait() - + // The active nodes list can be cleaned out only when all + // existing RPCs have finished. + // RPCs that start after rpcRW.Unlock() should find the context + // cancelled and should fail organically. + d.rpcRW.Lock() d.nodes.Clean() + d.rpcRW.Unlock() d.processUpdatesLock.Lock() // In case there are any waiters. There is no chance of any starting @@ -338,6 +343,14 @@ func (d *Dispatcher) Stop() error { d.processUpdatesLock.Unlock() d.clusterUpdateQueue.Close() + + // TODO(anshul): This use of Wait() could be unsafe. + // According to go's documentation on WaitGroup, + // Add() with a positive delta that occur when the counter is zero + // must happen before a Wait(). + // As is, dispatcher Stop() can race with Run(). + d.wg.Wait() + return nil } @@ -485,13 +498,13 @@ func nodeIPFromContext(ctx context.Context) (string, error) { // register is used for registration of node with particular dispatcher. func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { + logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register") + // prevent register until we're ready to accept it dctx, err := d.isRunningLocked() if err != nil { return "", err } - logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register") - if err := d.nodes.CheckRateLimit(nodeID); err != nil { return "", err } @@ -539,15 +552,8 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a // UpdateTaskStatus updates status of task. Node should send such updates // on every status change of its tasks. func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) { - // shutdownWait.Add() followed by isRunning() to ensures that - // if this rpc sees the dispatcher running, - // it will already have called Add() on the shutdownWait wait, - // which ensures that Stop() will wait for this rpc to complete. - // Note that Stop() first does Dispatcher.ctx.cancel() followed by - // shutdownWait.Wait() to make sure new rpc's don't start before waiting - // for existing ones to finish. - d.shutdownWait.Add(1) - defer d.shutdownWait.Done() + d.rpcRW.RLock() + defer d.rpcRW.RUnlock() dctx, err := d.isRunningLocked() if err != nil { @@ -740,15 +746,8 @@ func (d *Dispatcher) processUpdates(ctx context.Context) { // of tasks which should be run on node, if task is not present in that list, // it should be terminated. func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error { - // shutdownWait.Add() followed by isRunning() to ensures that - // if this rpc sees the dispatcher running, - // it will already have called Add() on the shutdownWait wait, - // which ensures that Stop() will wait for this rpc to complete. - // Note that Stop() first does Dispatcher.ctx.cancel() followed by - // shutdownWait.Wait() to make sure new rpc's don't start before waiting - // for existing ones to finish. - d.shutdownWait.Add(1) - defer d.shutdownWait.Done() + d.rpcRW.RLock() + defer d.rpcRW.RUnlock() dctx, err := d.isRunningLocked() if err != nil { @@ -873,15 +872,8 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe // Assignments is a stream of assignments for a node. Each message contains // either full list of tasks and secrets for the node, or an incremental update. func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error { - // shutdownWait.Add() followed by isRunning() to ensures that - // if this rpc sees the dispatcher running, - // it will already have called Add() on the shutdownWait wait, - // which ensures that Stop() will wait for this rpc to complete. - // Note that Stop() first does Dispatcher.ctx.cancel() followed by - // shutdownWait.Wait() to make sure new rpc's don't start before waiting - // for existing ones to finish. - d.shutdownWait.Add(1) - defer d.shutdownWait.Done() + d.rpcRW.RLock() + defer d.rpcRW.RUnlock() dctx, err := d.isRunningLocked() if err != nil { @@ -1140,20 +1132,13 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes // Node should send new heartbeat earlier than now + TTL, otherwise it will // be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error) { - // shutdownWait.Add() followed by isRunning() to ensures that - // if this rpc sees the dispatcher running, - // it will already have called Add() on the shutdownWait wait, - // which ensures that Stop() will wait for this rpc to complete. - // Note that Stop() first does Dispatcher.ctx.cancel() followed by - // shutdownWait.Wait() to make sure new rpc's don't start before waiting - // for existing ones to finish. - d.shutdownWait.Add(1) - defer d.shutdownWait.Done() + d.rpcRW.RLock() + defer d.rpcRW.RUnlock() - // isRunningLocked() is not needed since its OK if - // the dispatcher context is cancelled while this call is in progress - // since Stop() which cancels the dispatcher context will wait for - // Heartbeat() to complete. + // Its OK to call isRunning() here instead of isRunningLocked() + // because of the rpcRW readlock above. + // TODO(anshul) other uses of isRunningLocked() can probably + // also be removed. if !d.isRunning() { return nil, status.Errorf(codes.Aborted, "dispatcher is stopped") } @@ -1192,15 +1177,8 @@ func (d *Dispatcher) getRootCACert() []byte { // a special boolean field Disconnect which if true indicates that node should // reconnect to another Manager immediately. func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error { - // shutdownWait.Add() followed by isRunning() to ensures that - // if this rpc sees the dispatcher running, - // it will already have called Add() on the shutdownWait wait, - // which ensures that Stop() will wait for this rpc to complete. - // Note that Stop() first does Dispatcher.ctx.cancel() followed by - // shutdownWait.Wait() to make sure new rpc's don't start before waiting - // for existing ones to finish. - d.shutdownWait.Add(1) - defer d.shutdownWait.Done() + d.rpcRW.RLock() + defer d.rpcRW.RUnlock() dctx, err := d.isRunningLocked() if err != nil { @@ -1208,6 +1186,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio } ctx := stream.Context() + nodeInfo, err := ca.RemoteNode(ctx) if err != nil { return err