mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Update vendored swarmkit to 5fe1f720
Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
9d898b872e
commit
4441e390d0
7 changed files with 86 additions and 83 deletions
|
@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
|
||||||
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
|
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
|
||||||
|
|
||||||
# cluster
|
# cluster
|
||||||
github.com/docker/swarmkit deec7ba2c4ef48f20ebe9674afbcced606a5339e
|
github.com/docker/swarmkit 5fe1f720da9c8ee66b49907d6ee415e9bfb1e5ef
|
||||||
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
||||||
github.com/gogo/protobuf v0.3
|
github.com/gogo/protobuf v0.3
|
||||||
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
||||||
|
|
17
vendor/github.com/docker/swarmkit/manager/controlapi/service.go
generated
vendored
17
vendor/github.com/docker/swarmkit/manager/controlapi/service.go
generated
vendored
|
@ -4,7 +4,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
"regexp"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -26,9 +25,6 @@ var (
|
||||||
errModeChangeNotAllowed = errors.New("service mode change is not allowed")
|
errModeChangeNotAllowed = errors.New("service mode change is not allowed")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Regexp pattern for hostname to conform RFC 1123
|
|
||||||
var hostnamePattern = regexp.MustCompile("^(([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])\\.)*([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])$")
|
|
||||||
|
|
||||||
func validateResources(r *api.Resources) error {
|
func validateResources(r *api.Resources) error {
|
||||||
if r == nil {
|
if r == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -115,10 +111,6 @@ func validateContainerSpec(container *api.ContainerSpec) error {
|
||||||
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: missing in service spec")
|
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: missing in service spec")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := validateHostname(container.Hostname); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if container.Image == "" {
|
if container.Image == "" {
|
||||||
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided")
|
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided")
|
||||||
}
|
}
|
||||||
|
@ -138,15 +130,6 @@ func validateContainerSpec(container *api.ContainerSpec) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateHostname(hostname string) error {
|
|
||||||
if hostname != "" {
|
|
||||||
if len(hostname) > 63 || !hostnamePattern.MatchString(hostname) {
|
|
||||||
return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %s is not valid hostname", hostname)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func validateTask(taskSpec api.TaskSpec) error {
|
func validateTask(taskSpec api.TaskSpec) error {
|
||||||
if err := validateResourceRequirements(taskSpec.Resources); err != nil {
|
if err := validateResourceRequirements(taskSpec.Resources); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
5
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
5
vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
|
@ -152,7 +152,8 @@ func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
|
||||||
defer lb.mu.Unlock()
|
defer lb.mu.Unlock()
|
||||||
|
|
||||||
delete(lb.registeredSubscriptions, subscription.message.ID)
|
delete(lb.registeredSubscriptions, subscription.message.ID)
|
||||||
subscription.message.Close = true
|
|
||||||
|
subscription.Close()
|
||||||
lb.subscriptionQueue.Publish(subscription)
|
lb.subscriptionQueue.Publish(subscription)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,7 +322,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
||||||
case v := <-subscriptionCh:
|
case v := <-subscriptionCh:
|
||||||
subscription := v.(*subscription)
|
subscription := v.(*subscription)
|
||||||
|
|
||||||
if subscription.message.Close {
|
if subscription.Closed() {
|
||||||
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
||||||
delete(activeSubscriptions, subscription.message.ID)
|
delete(activeSubscriptions, subscription.message.ID)
|
||||||
} else {
|
} else {
|
||||||
|
|
12
vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go
generated
vendored
12
vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go
generated
vendored
|
@ -137,6 +137,18 @@ func (s *subscription) Err() error {
|
||||||
return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", "))
|
return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *subscription) Close() {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.message.Close = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscription) Closed() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.message.Close
|
||||||
|
}
|
||||||
|
|
||||||
func (s *subscription) match() {
|
func (s *subscription) match() {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
92
vendor/github.com/docker/swarmkit/manager/state/raft/raft.go
generated
vendored
92
vendor/github.com/docker/swarmkit/manager/state/raft/raft.go
generated
vendored
|
@ -107,8 +107,8 @@ type Node struct {
|
||||||
|
|
||||||
confState raftpb.ConfState
|
confState raftpb.ConfState
|
||||||
appliedIndex uint64
|
appliedIndex uint64
|
||||||
snapshotIndex uint64
|
snapshotMeta raftpb.SnapshotMetadata
|
||||||
writtenIndex uint64
|
writtenWALIndex uint64
|
||||||
|
|
||||||
ticker clock.Ticker
|
ticker clock.Ticker
|
||||||
doneCh chan struct{}
|
doneCh chan struct{}
|
||||||
|
@ -123,7 +123,7 @@ type Node struct {
|
||||||
// used for membership management checks
|
// used for membership management checks
|
||||||
membershipLock sync.Mutex
|
membershipLock sync.Mutex
|
||||||
|
|
||||||
snapshotInProgress chan uint64
|
snapshotInProgress chan raftpb.SnapshotMetadata
|
||||||
asyncTasks sync.WaitGroup
|
asyncTasks sync.WaitGroup
|
||||||
|
|
||||||
// stopped chan is used for notifying grpc handlers that raft node going
|
// stopped chan is used for notifying grpc handlers that raft node going
|
||||||
|
@ -266,8 +266,8 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
|
||||||
|
|
||||||
n.confState = snapshot.Metadata.ConfState
|
n.confState = snapshot.Metadata.ConfState
|
||||||
n.appliedIndex = snapshot.Metadata.Index
|
n.appliedIndex = snapshot.Metadata.Index
|
||||||
n.snapshotIndex = snapshot.Metadata.Index
|
n.snapshotMeta = snapshot.Metadata
|
||||||
n.writtenIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
|
n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error
|
||||||
|
|
||||||
if loadAndStartErr == storage.ErrNoWAL {
|
if loadAndStartErr == storage.ErrNoWAL {
|
||||||
if n.opts.JoinAddr != "" {
|
if n.opts.JoinAddr != "" {
|
||||||
|
@ -424,7 +424,7 @@ func (n *Node) Run(ctx context.Context) error {
|
||||||
log.G(ctx).WithError(err).Error("failed to restore from snapshot")
|
log.G(ctx).WithError(err).Error("failed to restore from snapshot")
|
||||||
}
|
}
|
||||||
n.appliedIndex = rd.Snapshot.Metadata.Index
|
n.appliedIndex = rd.Snapshot.Metadata.Index
|
||||||
n.snapshotIndex = rd.Snapshot.Metadata.Index
|
n.snapshotMeta = rd.Snapshot.Metadata
|
||||||
n.confState = rd.Snapshot.Metadata.ConfState
|
n.confState = rd.Snapshot.Metadata.ConfState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,8 +475,8 @@ func (n *Node) Run(ctx context.Context) error {
|
||||||
|
|
||||||
// Trigger a snapshot every once in awhile
|
// Trigger a snapshot every once in awhile
|
||||||
if n.snapshotInProgress == nil &&
|
if n.snapshotInProgress == nil &&
|
||||||
(n.needsSnapshot() || raftConfig.SnapshotInterval > 0 &&
|
(n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
|
||||||
n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval) {
|
n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
|
||||||
n.doSnapshot(ctx, raftConfig)
|
n.doSnapshot(ctx, raftConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -507,17 +507,21 @@ func (n *Node) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case snapshotIndex := <-n.snapshotInProgress:
|
case snapshotMeta := <-n.snapshotInProgress:
|
||||||
if snapshotIndex > n.snapshotIndex {
|
raftConfig := n.getCurrentRaftConfig()
|
||||||
n.snapshotIndex = snapshotIndex
|
if snapshotMeta.Index > n.snapshotMeta.Index {
|
||||||
|
n.snapshotMeta = snapshotMeta
|
||||||
|
if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
|
||||||
|
log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
n.snapshotInProgress = nil
|
n.snapshotInProgress = nil
|
||||||
n.maybeMarkRotationFinished(ctx)
|
n.maybeMarkRotationFinished(ctx)
|
||||||
if n.rotationQueued && n.needsSnapshot() {
|
if n.rotationQueued && n.needsSnapshot(ctx) {
|
||||||
// there was a key rotation that took place before while the snapshot
|
// there was a key rotation that took place before while the snapshot
|
||||||
// was in progress - we have to take another snapshot and encrypt with the new key
|
// was in progress - we have to take another snapshot and encrypt with the new key
|
||||||
n.rotationQueued = false
|
n.rotationQueued = false
|
||||||
n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
n.doSnapshot(ctx, raftConfig)
|
||||||
}
|
}
|
||||||
case <-n.keyRotator.RotationNotify():
|
case <-n.keyRotator.RotationNotify():
|
||||||
// There are 2 separate checks: rotationQueued, and n.needsSnapshot().
|
// There are 2 separate checks: rotationQueued, and n.needsSnapshot().
|
||||||
|
@ -529,7 +533,7 @@ func (n *Node) Run(ctx context.Context) error {
|
||||||
switch {
|
switch {
|
||||||
case n.snapshotInProgress != nil:
|
case n.snapshotInProgress != nil:
|
||||||
n.rotationQueued = true
|
n.rotationQueued = true
|
||||||
case n.needsSnapshot():
|
case n.needsSnapshot(ctx):
|
||||||
n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
n.doSnapshot(ctx, n.getCurrentRaftConfig())
|
||||||
}
|
}
|
||||||
case <-n.removeRaftCh:
|
case <-n.removeRaftCh:
|
||||||
|
@ -544,7 +548,7 @@ func (n *Node) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) needsSnapshot() bool {
|
func (n *Node) needsSnapshot(ctx context.Context) bool {
|
||||||
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
|
if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
|
||||||
keys := n.keyRotator.GetKeys()
|
keys := n.keyRotator.GetKeys()
|
||||||
if keys.PendingDEK != nil {
|
if keys.PendingDEK != nil {
|
||||||
|
@ -552,23 +556,40 @@ func (n *Node) needsSnapshot() bool {
|
||||||
// we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
|
// we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
|
||||||
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
|
// may have an index less than the index of a WAL written with an old DEK. We want the next snapshot
|
||||||
// written with the new key to supercede any WAL written with an old DEK.
|
// written with the new key to supercede any WAL written with an old DEK.
|
||||||
n.waitForAppliedIndex = n.writtenIndex
|
n.waitForAppliedIndex = n.writtenWALIndex
|
||||||
// if there is already a snapshot at this index, bump the index up one, because we want the next snapshot
|
// if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
|
||||||
if n.waitForAppliedIndex == n.snapshotIndex {
|
// snapshot index, because the rotation cannot be completed until the next snapshot
|
||||||
n.waitForAppliedIndex++
|
if n.waitForAppliedIndex <= n.snapshotMeta.Index {
|
||||||
|
n.waitForAppliedIndex = n.snapshotMeta.Index + 1
|
||||||
|
}
|
||||||
|
log.G(ctx).Debugf(
|
||||||
|
"beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
|
||||||
|
if result {
|
||||||
|
log.G(ctx).Debugf(
|
||||||
|
"a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
|
||||||
|
n.waitForAppliedIndex, n.appliedIndex)
|
||||||
}
|
}
|
||||||
return n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
|
func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
|
||||||
if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotIndex {
|
if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
|
||||||
// this means we tried to rotate - so finish the rotation
|
// this means we tried to rotate - so finish the rotation
|
||||||
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
|
if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
|
||||||
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
|
log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
|
||||||
} else {
|
} else {
|
||||||
|
log.G(ctx).Debugf(
|
||||||
|
"a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
|
||||||
|
n.snapshotMeta.Index, n.waitForAppliedIndex)
|
||||||
n.waitForAppliedIndex = 0
|
n.waitForAppliedIndex = 0
|
||||||
|
|
||||||
|
if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
|
||||||
|
log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -950,13 +971,13 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
if err := member.HealthCheck(healthCtx); err != nil {
|
if err := member.HealthCheck(healthCtx); err != nil {
|
||||||
n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check")
|
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
|
||||||
return &api.ProcessRaftMessageResponse{}, nil
|
return &api.ProcessRaftMessageResponse{}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.Message.Type == raftpb.MsgProp {
|
if msg.Message.Type == raftpb.MsgProp {
|
||||||
// We don't accepted forwarded proposals. Our
|
// We don't accept forwarded proposals. Our
|
||||||
// current architecture depends on only the leader
|
// current architecture depends on only the leader
|
||||||
// making proposals, so in-flight proposals can be
|
// making proposals, so in-flight proposals can be
|
||||||
// guaranteed not to conflict.
|
// guaranteed not to conflict.
|
||||||
|
@ -1245,8 +1266,8 @@ func (n *Node) saveToStorage(
|
||||||
|
|
||||||
if len(entries) > 0 {
|
if len(entries) > 0 {
|
||||||
lastIndex := entries[len(entries)-1].Index
|
lastIndex := entries[len(entries)-1].Index
|
||||||
if lastIndex > n.writtenIndex {
|
if lastIndex > n.writtenWALIndex {
|
||||||
n.writtenIndex = lastIndex
|
n.writtenWALIndex = lastIndex
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1296,25 +1317,32 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.
|
||||||
defer n.asyncTasks.Done()
|
defer n.asyncTasks.Done()
|
||||||
defer close(thisSend)
|
defer close(thisSend)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if lastSend != nil {
|
if lastSend != nil {
|
||||||
|
waitCtx, waitCancel := context.WithTimeout(ctx, n.opts.SendTimeout)
|
||||||
|
defer waitCancel()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-lastSend:
|
case <-lastSend:
|
||||||
case <-ctx.Done():
|
case <-waitCtx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-waitCtx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, n.opts.SendTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
if n.cluster.IsIDRemoved(m.To) {
|
if n.cluster.IsIDRemoved(m.To) {
|
||||||
// Should not send to removed members
|
// Should not send to removed members
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var conn *membership.Member
|
||||||
conn *membership.Member
|
|
||||||
)
|
|
||||||
if toMember, ok := members[m.To]; ok {
|
if toMember, ok := members[m.To]; ok {
|
||||||
conn = toMember
|
conn = toMember
|
||||||
} else {
|
} else {
|
||||||
|
|
14
vendor/github.com/docker/swarmkit/manager/state/raft/storage.go
generated
vendored
14
vendor/github.com/docker/swarmkit/manager/state/raft/storage.go
generated
vendored
|
@ -167,11 +167,11 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
|
||||||
|
|
||||||
viewStarted := make(chan struct{})
|
viewStarted := make(chan struct{})
|
||||||
n.asyncTasks.Add(1)
|
n.asyncTasks.Add(1)
|
||||||
n.snapshotInProgress = make(chan uint64, 1) // buffered in case Shutdown is called during the snapshot
|
n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot
|
||||||
go func(appliedIndex, snapshotIndex uint64) {
|
go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) {
|
||||||
defer func() {
|
defer func() {
|
||||||
n.asyncTasks.Done()
|
n.asyncTasks.Done()
|
||||||
n.snapshotInProgress <- snapshotIndex
|
n.snapshotInProgress <- snapshotMeta
|
||||||
}()
|
}()
|
||||||
var err error
|
var err error
|
||||||
n.memoryStore.View(func(tx store.ReadTx) {
|
n.memoryStore.View(func(tx store.ReadTx) {
|
||||||
|
@ -197,7 +197,7 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
|
||||||
log.G(ctx).WithError(err).Error("failed to save snapshot")
|
log.G(ctx).WithError(err).Error("failed to save snapshot")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
snapshotIndex = appliedIndex
|
snapshotMeta = snap.Metadata
|
||||||
|
|
||||||
if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
|
if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
|
||||||
err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
|
err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
|
||||||
|
@ -205,14 +205,10 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
|
||||||
log.G(ctx).WithError(err).Error("failed to compact snapshot")
|
log.G(ctx).WithError(err).Error("failed to compact snapshot")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.raftLogger.GC(snap.Metadata.Index, snap.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil {
|
|
||||||
log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
|
|
||||||
}
|
|
||||||
} else if err != raft.ErrSnapOutOfDate {
|
} else if err != raft.ErrSnapOutOfDate {
|
||||||
log.G(ctx).WithError(err).Error("failed to create snapshot")
|
log.G(ctx).WithError(err).Error("failed to create snapshot")
|
||||||
}
|
}
|
||||||
}(n.appliedIndex, n.snapshotIndex)
|
}(n.appliedIndex, n.snapshotMeta)
|
||||||
|
|
||||||
// Wait for the goroutine to establish a read transaction, to make
|
// Wait for the goroutine to establish a read transaction, to make
|
||||||
// sure it sees the state as of this moment.
|
// sure it sees the state as of this moment.
|
||||||
|
|
23
vendor/github.com/docker/swarmkit/manager/state/raft/storage/storage.go
generated
vendored
23
vendor/github.com/docker/swarmkit/manager/state/raft/storage/storage.go
generated
vendored
|
@ -2,7 +2,6 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -358,7 +357,7 @@ func (e *EncryptedRaftLogger) Close(ctx context.Context) {
|
||||||
e.snapshotter = nil
|
e.snapshotter = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear closes the existing WAL and moves away the WAL and snapshot.
|
// Clear closes the existing WAL and removes the WAL and snapshot.
|
||||||
func (e *EncryptedRaftLogger) Clear(ctx context.Context) error {
|
func (e *EncryptedRaftLogger) Clear(ctx context.Context) error {
|
||||||
e.encoderMu.Lock()
|
e.encoderMu.Lock()
|
||||||
defer e.encoderMu.Unlock()
|
defer e.encoderMu.Unlock()
|
||||||
|
@ -370,23 +369,7 @@ func (e *EncryptedRaftLogger) Clear(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
e.snapshotter = nil
|
e.snapshotter = nil
|
||||||
|
|
||||||
newWALDir, err := ioutil.TempDir(e.StateDir, "wal.")
|
os.RemoveAll(e.walDir())
|
||||||
if err != nil {
|
os.RemoveAll(e.snapDir())
|
||||||
return err
|
|
||||||
}
|
|
||||||
os.RemoveAll(newWALDir)
|
|
||||||
if err = os.Rename(e.walDir(), newWALDir); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
newSnapDir, err := ioutil.TempDir(e.StateDir, "snap.")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
os.RemoveAll(newSnapDir)
|
|
||||||
if err := os.Rename(e.snapDir(), newSnapDir); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue