From aca0bdab13a774356619254e63db3defad029996 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 7 Dec 2016 17:59:31 -0800 Subject: [PATCH] Update vendored swarmkit to 999addf Signed-off-by: Aaron Lehmann --- vendor.conf | 2 +- .../swarmkit/manager/controlapi/service.go | 17 ---- .../swarmkit/manager/logbroker/broker.go | 5 +- .../manager/logbroker/subscription.go | 12 +++ .../docker/swarmkit/manager/manager.go | 22 ++++- .../swarmkit/manager/state/raft/raft.go | 96 ++++++++++++------- .../swarmkit/manager/state/raft/storage.go | 14 +-- .../manager/state/raft/storage/storage.go | 23 +---- .../github.com/docker/swarmkit/node/node.go | 3 +- 9 files changed, 106 insertions(+), 88 deletions(-) diff --git a/vendor.conf b/vendor.conf index 5d272a83e1..e523b0357e 100644 --- a/vendor.conf +++ b/vendor.conf @@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit 522d951f733c821cdc33cccca6127c15a2b6de38 +github.com/docker/swarmkit 999addf86dad33479756c83620ed727ef50bce57 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go index f241836059..87e5d65af2 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go @@ -4,7 +4,6 @@ import ( "errors" "path/filepath" "reflect" - "regexp" "strconv" "strings" @@ -26,9 +25,6 @@ var ( errModeChangeNotAllowed = errors.New("service mode change is not allowed") ) -// Regexp pattern for hostname to conform RFC 1123 -var hostnamePattern = regexp.MustCompile("^(([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])\\.)*([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])$") - func validateResources(r *api.Resources) error { if r == nil { return nil @@ -115,10 +111,6 @@ func validateContainerSpec(container *api.ContainerSpec) error { return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: missing in service spec") } - if err := validateHostname(container.Hostname); err != nil { - return err - } - if container.Image == "" { return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided") } @@ -138,15 +130,6 @@ func validateContainerSpec(container *api.ContainerSpec) error { return nil } -func validateHostname(hostname string) error { - if hostname != "" { - if len(hostname) > 63 || !hostnamePattern.MatchString(hostname) { - return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %s is not valid hostname", hostname) - } - } - return nil -} - func validateTask(taskSpec api.TaskSpec) error { if err := validateResourceRequirements(taskSpec.Resources); err != nil { return err diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index 3492b946fe..bec6fe434e 100644 --- a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -152,7 +152,8 @@ func (lb *LogBroker) unregisterSubscription(subscription *subscription) { defer lb.mu.Unlock() delete(lb.registeredSubscriptions, subscription.message.ID) - subscription.message.Close = true + + subscription.Close() lb.subscriptionQueue.Publish(subscription) } @@ -321,7 +322,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest case v := <-subscriptionCh: subscription := v.(*subscription) - if subscription.message.Close { + if subscription.Closed() { log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed") delete(activeSubscriptions, subscription.message.ID) } else { diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go b/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go index 6fbaf101b8..6b3295ae62 100644 --- a/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go @@ -137,6 +137,18 @@ func (s *subscription) Err() error { return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", ")) } +func (s *subscription) Close() { + s.mu.Lock() + s.message.Close = true + s.mu.Unlock() +} + +func (s *subscription) Closed() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.message.Close +} + func (s *subscription) match() { s.mu.Lock() defer s.mu.Unlock() diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index f5d5e87f47..5f5735d18c 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -385,26 +385,39 @@ func (m *Manager) Run(parent context.Context) error { close(m.started) + errCh := make(chan error, 1) go func() { err := m.raftNode.Run(ctx) if err != nil { + errCh <- err log.G(ctx).WithError(err).Error("raft node stopped") m.Stop(ctx) } }() - if err := raft.WaitForLeader(ctx, m.raftNode); err != nil { + returnErr := func(err error) error { + select { + case runErr := <-errCh: + if runErr == raft.ErrMemberRemoved { + return runErr + } + default: + } return err } + if err := raft.WaitForLeader(ctx, m.raftNode); err != nil { + return returnErr(err) + } + c, err := raft.WaitForCluster(ctx, m.raftNode) if err != nil { - return err + return returnErr(err) } raftConfig := c.Spec.Raft if err := m.watchForKEKChanges(ctx); err != nil { - return err + return returnErr(err) } if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick { @@ -423,7 +436,8 @@ func (m *Manager) Run(parent context.Context) error { } m.mu.Unlock() m.Stop(ctx) - return err + + return returnErr(err) } const stopTimeout = 8 * time.Second diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go index 9ae9293ff8..721437b18e 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -109,10 +109,10 @@ type Node struct { // shutting down the node. waitProp sync.WaitGroup - confState raftpb.ConfState - appliedIndex uint64 - snapshotIndex uint64 - writtenIndex uint64 + confState raftpb.ConfState + appliedIndex uint64 + snapshotMeta raftpb.SnapshotMetadata + writtenWALIndex uint64 ticker clock.Ticker doneCh chan struct{} @@ -127,7 +127,7 @@ type Node struct { // used for membership management checks membershipLock sync.Mutex - snapshotInProgress chan uint64 + snapshotInProgress chan raftpb.SnapshotMetadata asyncTasks sync.WaitGroup // stopped chan is used for notifying grpc handlers that raft node going @@ -270,8 +270,8 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { n.confState = snapshot.Metadata.ConfState n.appliedIndex = snapshot.Metadata.Index - n.snapshotIndex = snapshot.Metadata.Index - n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error + n.snapshotMeta = snapshot.Metadata + n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error if loadAndStartErr == storage.ErrNoWAL { if n.opts.JoinAddr != "" { @@ -428,7 +428,7 @@ func (n *Node) Run(ctx context.Context) error { log.G(ctx).WithError(err).Error("failed to restore from snapshot") } n.appliedIndex = rd.Snapshot.Metadata.Index - n.snapshotIndex = rd.Snapshot.Metadata.Index + n.snapshotMeta = rd.Snapshot.Metadata n.confState = rd.Snapshot.Metadata.ConfState } @@ -479,8 +479,8 @@ func (n *Node) Run(ctx context.Context) error { // Trigger a snapshot every once in awhile if n.snapshotInProgress == nil && - (n.needsSnapshot() || raftConfig.SnapshotInterval > 0 && - n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) { + (n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 && + n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) { n.doSnapshot(ctx, raftConfig) } @@ -511,17 +511,21 @@ func (n *Node) Run(ctx context.Context) error { } } - case snapshotIndex := <-n.snapshotInProgress: - if snapshotIndex > n.snapshotIndex { - n.snapshotIndex = snapshotIndex + case snapshotMeta := <-n.snapshotInProgress: + raftConfig := n.getCurrentRaftConfig() + if snapshotMeta.Index > n.snapshotMeta.Index { + n.snapshotMeta = snapshotMeta + if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil { + log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs") + } } n.snapshotInProgress = nil n.maybeMarkRotationFinished(ctx) - if n.rotationQueued && n.needsSnapshot() { + if n.rotationQueued && n.needsSnapshot(ctx) { // there was a key rotation that took place before while the snapshot // was in progress - we have to take another snapshot and encrypt with the new key n.rotationQueued = false - n.doSnapshot(ctx, n.getCurrentRaftConfig()) + n.doSnapshot(ctx, raftConfig) } case <-n.keyRotator.RotationNotify(): // There are 2 separate checks: rotationQueued, and n.needsSnapshot(). @@ -533,7 +537,7 @@ func (n *Node) Run(ctx context.Context) error { switch { case n.snapshotInProgress != nil: n.rotationQueued = true - case n.needsSnapshot(): + case n.needsSnapshot(ctx): n.doSnapshot(ctx, n.getCurrentRaftConfig()) } case <-n.removeRaftCh: @@ -548,7 +552,7 @@ func (n *Node) Run(ctx context.Context) error { } } -func (n *Node) needsSnapshot() bool { +func (n *Node) needsSnapshot(ctx context.Context) bool { if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() { keys := n.keyRotator.GetKeys() if keys.PendingDEK != nil { @@ -556,23 +560,40 @@ func (n *Node) needsSnapshot() bool { // we want to wait for the last index written with the old DEK to be commited, else a snapshot taken // may have an index less than the index of a WAL written with an old DEK. We want the next snapshot // written with the new key to supercede any WAL written with an old DEK. - n.waitForAppliedIndex = n.writtenIndex - // if there is already a snapshot at this index, bump the index up one, because we want the next snapshot - if n.waitForAppliedIndex == n.snapshotIndex { - n.waitForAppliedIndex++ + n.waitForAppliedIndex = n.writtenWALIndex + // if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current + // snapshot index, because the rotation cannot be completed until the next snapshot + if n.waitForAppliedIndex <= n.snapshotMeta.Index { + n.waitForAppliedIndex = n.snapshotMeta.Index + 1 } + log.G(ctx).Debugf( + "beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex) } } - return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex + + result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex + if result { + log.G(ctx).Debugf( + "a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered", + n.waitForAppliedIndex, n.appliedIndex) + } + return result } func (n *Node) maybeMarkRotationFinished(ctx context.Context) { - if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex { + if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index { // this means we tried to rotate - so finish the rotation if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil { log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation") } else { + log.G(ctx).Debugf( + "a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key", + n.snapshotMeta.Index, n.waitForAppliedIndex) n.waitForAppliedIndex = 0 + + if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil { + log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK") + } } } } @@ -954,13 +975,13 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa defer cancel() if err := member.HealthCheck(healthCtx); err != nil { - n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check") + n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check") return &api.ProcessRaftMessageResponse{}, nil } } if msg.Message.Type == raftpb.MsgProp { - // We don't accepted forwarded proposals. Our + // We don't accept forwarded proposals. Our // current architecture depends on only the leader // making proposals, so in-flight proposals can be // guaranteed not to conflict. @@ -1249,8 +1270,8 @@ func (n *Node) saveToStorage( if len(entries) > 0 { lastIndex := entries[len(entries)-1].Index - if lastIndex > n.writtenIndex { - n.writtenIndex = lastIndex + if lastIndex > n.writtenWALIndex { + n.writtenWALIndex = lastIndex } } @@ -1300,25 +1321,32 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership. defer n.asyncTasks.Done() defer close(thisSend) - ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout) - defer cancel() - if lastSend != nil { + waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout) + defer waitCancel() + select { case <-lastSend: - case <-ctx.Done(): + case <-waitCtx.Done(): return } + + select { + case <-waitCtx.Done(): + return + default: + } } + ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout) + defer cancel() + if n.cluster.IsIDRemoved(m.To) { // Should not send to removed members return } - var ( - conn *membership.Member - ) + var conn *membership.Member if toMember, ok := members[m.To]; ok { conn = toMember } else { diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go index a763c03dce..b1c74e0357 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/storage.go @@ -167,11 +167,11 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { viewStarted := make(chan struct{}) n.asyncTasks.Add(1) - n.snapshotInProgress = make(chan uint64, 1) // buffered in case Shutdown is called during the snapshot - go func(appliedIndex, snapshotIndex uint64) { + n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot + go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) { defer func() { n.asyncTasks.Done() - n.snapshotInProgress <- snapshotIndex + n.snapshotInProgress <- snapshotMeta }() var err error n.memoryStore.View(func(tx store.ReadTx) { @@ -197,7 +197,7 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { log.G(ctx).WithError(err).Error("failed to save snapshot") return } - snapshotIndex = appliedIndex + snapshotMeta = snap.Metadata if appliedIndex > raftConfig.LogEntriesForSlowFollowers { err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers) @@ -205,14 +205,10 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { log.G(ctx).WithError(err).Error("failed to compact snapshot") } } - - if err := n.raftLogger.GC(snap.Metadata.Index, snap.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil { - log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs") - } } else if err != raft.ErrSnapOutOfDate { log.G(ctx).WithError(err).Error("failed to create snapshot") } - }(n.appliedIndex, n.snapshotIndex) + }(n.appliedIndex, n.snapshotMeta) // Wait for the goroutine to establish a read transaction, to make // sure it sees the state as of this moment. diff --git a/vendor/github.com/docker/swarmkit/manager/state/raft/storage/storage.go b/vendor/github.com/docker/swarmkit/manager/state/raft/storage/storage.go index 3bddedd025..a737aabde3 100644 --- a/vendor/github.com/docker/swarmkit/manager/state/raft/storage/storage.go +++ b/vendor/github.com/docker/swarmkit/manager/state/raft/storage/storage.go @@ -2,7 +2,6 @@ package storage import ( "fmt" - "io/ioutil" "os" "path/filepath" "sync" @@ -358,7 +357,7 @@ func (e *EncryptedRaftLogger) Close(ctx context.Context) { e.snapshotter = nil } -// Clear closes the existing WAL and moves away the WAL and snapshot. +// Clear closes the existing WAL and removes the WAL and snapshot. func (e *EncryptedRaftLogger) Clear(ctx context.Context) error { e.encoderMu.Lock() defer e.encoderMu.Unlock() @@ -370,23 +369,7 @@ func (e *EncryptedRaftLogger) Clear(ctx context.Context) error { } e.snapshotter = nil - newWALDir, err := ioutil.TempDir(e.StateDir, "wal.") - if err != nil { - return err - } - os.RemoveAll(newWALDir) - if err = os.Rename(e.walDir(), newWALDir); err != nil { - return err - } - - newSnapDir, err := ioutil.TempDir(e.StateDir, "snap.") - if err != nil { - return err - } - os.RemoveAll(newSnapDir) - if err := os.Rename(e.snapDir(), newSnapDir); err != nil { - return err - } - + os.RemoveAll(e.walDir()) + os.RemoveAll(e.snapDir()) return nil } diff --git a/vendor/github.com/docker/swarmkit/node/node.go b/vendor/github.com/docker/swarmkit/node/node.go index 2cc3b4bb25..e055fc23ff 100644 --- a/vendor/github.com/docker/swarmkit/node/node.go +++ b/vendor/github.com/docker/swarmkit/node/node.go @@ -22,6 +22,7 @@ import ( "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager" "github.com/docker/swarmkit/manager/encryption" + "github.com/docker/swarmkit/manager/state/raft" "github.com/docker/swarmkit/remotes" "github.com/docker/swarmkit/xnet" "github.com/pkg/errors" @@ -718,7 +719,7 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig case <-done: // Fail out if m.Run() returns error, otherwise wait for // role change. - if runErr != nil { + if runErr != nil && runErr != raft.ErrMemberRemoved { err = runErr } else { err = <-roleChanged