mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
ce61a1ed98
Moby works perfectly when you are in a situation when one has a good and stable internet connection. Operating in area's where internet connectivity is likely to be lost in undetermined intervals, like a satellite connection or 4G/LTE in rural area's, can become a problem when pulling a new image. When connection is lost while image layers are being pulled, Moby will try to reconnect up to 5 times. If this fails, the incompletely downloaded layers are lost will need to be completely downloaded again during the next pull request. This means that we are using more data than we might have to. Pulling a layer multiple times from the start can become costly over a satellite or 4G/LTE connection. As these techniques (especially 4G) quite common in IoT and Moby is used to run Azure IoT Edge devices, I would like to add a settable maximum download attempts. The maximum download attempts is currently set at 5 (distribution/xfer/download.go). I would like to change this constant to a variable that the user can set. The default will still be 5, so nothing will change from the current version unless specified when starting the daemon with the added flag or in the config file. I added a default value of 5 for DefaultMaxDownloadAttempts and a settable max-download-attempts in the daemon config file. It is also added to the config of dockerd so it can be set with a flag when starting the daemon. This value gets stored in the imageService of the daemon when it is initiated and can be passed to the NewLayerDownloadManager as a parameter. It will be stored in the LayerDownloadManager when initiated. This enables us to set the max amount of retries in makeDownoadFunc equal to the max download attempts. I also added some tests that are based on maxConcurrentDownloads/maxConcurrentUploads. You can pull this version and test in a development container. Either create a config `file /etc/docker/daemon.json` with `{"max-download-attempts"=3}``, or use `dockerd --max-download-attempts=3 -D &` to start up the dockerd. Start downloading a container and disconnect from the internet whilst downloading. The result would be that it stops pulling after three attempts. Signed-off-by: Lukas Heeren <lukas-heeren@hotmail.com> Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
430 lines
12 KiB
Go
430 lines
12 KiB
Go
package xfer // import "github.com/docker/docker/distribution/xfer"
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/docker/image"
|
|
"github.com/docker/docker/layer"
|
|
"github.com/docker/docker/pkg/progress"
|
|
digest "github.com/opencontainers/go-digest"
|
|
"gotest.tools/assert"
|
|
)
|
|
|
|
const maxDownloadConcurrency = 3
|
|
|
|
type mockLayer struct {
|
|
layerData bytes.Buffer
|
|
diffID layer.DiffID
|
|
chainID layer.ChainID
|
|
parent layer.Layer
|
|
}
|
|
|
|
func (ml *mockLayer) TarStream() (io.ReadCloser, error) {
|
|
return ioutil.NopCloser(bytes.NewBuffer(ml.layerData.Bytes())), nil
|
|
}
|
|
|
|
func (ml *mockLayer) TarStreamFrom(layer.ChainID) (io.ReadCloser, error) {
|
|
return nil, fmt.Errorf("not implemented")
|
|
}
|
|
|
|
func (ml *mockLayer) ChainID() layer.ChainID {
|
|
return ml.chainID
|
|
}
|
|
|
|
func (ml *mockLayer) DiffID() layer.DiffID {
|
|
return ml.diffID
|
|
}
|
|
|
|
func (ml *mockLayer) Parent() layer.Layer {
|
|
return ml.parent
|
|
}
|
|
|
|
func (ml *mockLayer) Size() (size int64, err error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func (ml *mockLayer) DiffSize() (size int64, err error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func (ml *mockLayer) Metadata() (map[string]string, error) {
|
|
return make(map[string]string), nil
|
|
}
|
|
|
|
type mockLayerStore struct {
|
|
layers map[layer.ChainID]*mockLayer
|
|
}
|
|
|
|
func createChainIDFromParent(parent layer.ChainID, dgsts ...layer.DiffID) layer.ChainID {
|
|
if len(dgsts) == 0 {
|
|
return parent
|
|
}
|
|
if parent == "" {
|
|
return createChainIDFromParent(layer.ChainID(dgsts[0]), dgsts[1:]...)
|
|
}
|
|
// H = "H(n-1) SHA256(n)"
|
|
dgst := digest.FromBytes([]byte(string(parent) + " " + string(dgsts[0])))
|
|
return createChainIDFromParent(layer.ChainID(dgst), dgsts[1:]...)
|
|
}
|
|
|
|
func (ls *mockLayerStore) Map() map[layer.ChainID]layer.Layer {
|
|
layers := map[layer.ChainID]layer.Layer{}
|
|
|
|
for k, v := range ls.layers {
|
|
layers[k] = v
|
|
}
|
|
|
|
return layers
|
|
}
|
|
|
|
func (ls *mockLayerStore) Register(reader io.Reader, parentID layer.ChainID) (layer.Layer, error) {
|
|
return ls.RegisterWithDescriptor(reader, parentID, distribution.Descriptor{})
|
|
}
|
|
|
|
func (ls *mockLayerStore) RegisterWithDescriptor(reader io.Reader, parentID layer.ChainID, _ distribution.Descriptor) (layer.Layer, error) {
|
|
var (
|
|
parent layer.Layer
|
|
err error
|
|
)
|
|
|
|
if parentID != "" {
|
|
parent, err = ls.Get(parentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
l := &mockLayer{parent: parent}
|
|
_, err = l.layerData.ReadFrom(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
l.diffID = layer.DiffID(digest.FromBytes(l.layerData.Bytes()))
|
|
l.chainID = createChainIDFromParent(parentID, l.diffID)
|
|
|
|
ls.layers[l.chainID] = l
|
|
return l, nil
|
|
}
|
|
|
|
func (ls *mockLayerStore) Get(chainID layer.ChainID) (layer.Layer, error) {
|
|
l, ok := ls.layers[chainID]
|
|
if !ok {
|
|
return nil, layer.ErrLayerDoesNotExist
|
|
}
|
|
return l, nil
|
|
}
|
|
|
|
func (ls *mockLayerStore) Release(l layer.Layer) ([]layer.Metadata, error) {
|
|
return []layer.Metadata{}, nil
|
|
}
|
|
func (ls *mockLayerStore) CreateRWLayer(string, layer.ChainID, *layer.CreateRWLayerOpts) (layer.RWLayer, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (ls *mockLayerStore) GetRWLayer(string) (layer.RWLayer, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (ls *mockLayerStore) ReleaseRWLayer(layer.RWLayer) ([]layer.Metadata, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
func (ls *mockLayerStore) GetMountID(string) (string, error) {
|
|
return "", errors.New("not implemented")
|
|
}
|
|
|
|
func (ls *mockLayerStore) Cleanup() error {
|
|
return nil
|
|
}
|
|
|
|
func (ls *mockLayerStore) DriverStatus() [][2]string {
|
|
return [][2]string{}
|
|
}
|
|
|
|
func (ls *mockLayerStore) DriverName() string {
|
|
return "mock"
|
|
}
|
|
|
|
type mockDownloadDescriptor struct {
|
|
currentDownloads *int32
|
|
id string
|
|
diffID layer.DiffID
|
|
registeredDiffID layer.DiffID
|
|
expectedDiffID layer.DiffID
|
|
simulateRetries int
|
|
retries int
|
|
}
|
|
|
|
// Key returns the key used to deduplicate downloads.
|
|
func (d *mockDownloadDescriptor) Key() string {
|
|
return d.id
|
|
}
|
|
|
|
// ID returns the ID for display purposes.
|
|
func (d *mockDownloadDescriptor) ID() string {
|
|
return d.id
|
|
}
|
|
|
|
// DiffID should return the DiffID for this layer, or an error
|
|
// if it is unknown (for example, if it has not been downloaded
|
|
// before).
|
|
func (d *mockDownloadDescriptor) DiffID() (layer.DiffID, error) {
|
|
if d.diffID != "" {
|
|
return d.diffID, nil
|
|
}
|
|
return "", errors.New("no diffID available")
|
|
}
|
|
|
|
func (d *mockDownloadDescriptor) Registered(diffID layer.DiffID) {
|
|
d.registeredDiffID = diffID
|
|
}
|
|
|
|
func (d *mockDownloadDescriptor) mockTarStream() io.ReadCloser {
|
|
// The mock implementation returns the ID repeated 5 times as a tar
|
|
// stream instead of actual tar data. The data is ignored except for
|
|
// computing IDs.
|
|
return ioutil.NopCloser(bytes.NewBuffer([]byte(d.id + d.id + d.id + d.id + d.id)))
|
|
}
|
|
|
|
// Download is called to perform the download.
|
|
func (d *mockDownloadDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
|
|
if d.currentDownloads != nil {
|
|
defer atomic.AddInt32(d.currentDownloads, -1)
|
|
|
|
if atomic.AddInt32(d.currentDownloads, 1) > maxDownloadConcurrency {
|
|
return nil, 0, errors.New("concurrency limit exceeded")
|
|
}
|
|
}
|
|
|
|
// Sleep a bit to simulate a time-consuming download.
|
|
for i := int64(0); i <= 10; i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, 0, ctx.Err()
|
|
case <-time.After(10 * time.Millisecond):
|
|
progressOutput.WriteProgress(progress.Progress{ID: d.ID(), Action: "Downloading", Current: i, Total: 10})
|
|
}
|
|
}
|
|
|
|
if d.retries < d.simulateRetries {
|
|
d.retries++
|
|
return nil, 0, fmt.Errorf("simulating download attempt %d/%d", d.retries, d.simulateRetries)
|
|
}
|
|
|
|
return d.mockTarStream(), 0, nil
|
|
}
|
|
|
|
func (d *mockDownloadDescriptor) Close() {
|
|
}
|
|
|
|
func downloadDescriptors(currentDownloads *int32) []DownloadDescriptor {
|
|
return []DownloadDescriptor{
|
|
&mockDownloadDescriptor{
|
|
currentDownloads: currentDownloads,
|
|
id: "id1",
|
|
expectedDiffID: layer.DiffID("sha256:68e2c75dc5c78ea9240689c60d7599766c213ae210434c53af18470ae8c53ec1"),
|
|
},
|
|
&mockDownloadDescriptor{
|
|
currentDownloads: currentDownloads,
|
|
id: "id2",
|
|
expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
|
|
},
|
|
&mockDownloadDescriptor{
|
|
currentDownloads: currentDownloads,
|
|
id: "id3",
|
|
expectedDiffID: layer.DiffID("sha256:58745a8bbd669c25213e9de578c4da5c8ee1c836b3581432c2b50e38a6753300"),
|
|
},
|
|
&mockDownloadDescriptor{
|
|
currentDownloads: currentDownloads,
|
|
id: "id2",
|
|
expectedDiffID: layer.DiffID("sha256:64a636223116aa837973a5d9c2bdd17d9b204e4f95ac423e20e65dfbb3655473"),
|
|
},
|
|
&mockDownloadDescriptor{
|
|
currentDownloads: currentDownloads,
|
|
id: "id4",
|
|
expectedDiffID: layer.DiffID("sha256:0dfb5b9577716cc173e95af7c10289322c29a6453a1718addc00c0c5b1330936"),
|
|
simulateRetries: 1,
|
|
},
|
|
&mockDownloadDescriptor{
|
|
currentDownloads: currentDownloads,
|
|
id: "id5",
|
|
expectedDiffID: layer.DiffID("sha256:0a5f25fa1acbc647f6112a6276735d0fa01e4ee2aa7ec33015e337350e1ea23d"),
|
|
},
|
|
}
|
|
}
|
|
|
|
func TestSuccessfulDownload(t *testing.T) {
|
|
// TODO Windows: Fix this unit text
|
|
if runtime.GOOS == "windows" {
|
|
t.Skip("Needs fixing on Windows")
|
|
}
|
|
|
|
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
|
|
lsMap := make(map[string]layer.Store)
|
|
lsMap[runtime.GOOS] = layerStore
|
|
ldm := NewLayerDownloadManager(lsMap, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
|
|
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
receivedProgress := make(map[string]progress.Progress)
|
|
|
|
go func() {
|
|
for p := range progressChan {
|
|
receivedProgress[p.ID] = p
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
var currentDownloads int32
|
|
descriptors := downloadDescriptors(¤tDownloads)
|
|
|
|
firstDescriptor := descriptors[0].(*mockDownloadDescriptor)
|
|
|
|
// Pre-register the first layer to simulate an already-existing layer
|
|
l, err := layerStore.Register(firstDescriptor.mockTarStream(), "")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
firstDescriptor.diffID = l.DiffID()
|
|
|
|
rootFS, releaseFunc, err := ldm.Download(context.Background(), *image.NewRootFS(), runtime.GOOS, descriptors, progress.ChanOutput(progressChan))
|
|
if err != nil {
|
|
t.Fatalf("download error: %v", err)
|
|
}
|
|
|
|
releaseFunc()
|
|
|
|
close(progressChan)
|
|
<-progressDone
|
|
|
|
if len(rootFS.DiffIDs) != len(descriptors) {
|
|
t.Fatal("got wrong number of diffIDs in rootfs")
|
|
}
|
|
|
|
for i, d := range descriptors {
|
|
descriptor := d.(*mockDownloadDescriptor)
|
|
|
|
if descriptor.diffID != "" {
|
|
if receivedProgress[d.ID()].Action != "Already exists" {
|
|
t.Fatalf("did not get 'Already exists' message for %v", d.ID())
|
|
}
|
|
} else if receivedProgress[d.ID()].Action != "Pull complete" {
|
|
t.Fatalf("did not get 'Pull complete' message for %v", d.ID())
|
|
}
|
|
|
|
if rootFS.DiffIDs[i] != descriptor.expectedDiffID {
|
|
t.Fatalf("rootFS item %d has the wrong diffID (expected: %v got: %v)", i, descriptor.expectedDiffID, rootFS.DiffIDs[i])
|
|
}
|
|
|
|
if descriptor.diffID == "" && descriptor.registeredDiffID != rootFS.DiffIDs[i] {
|
|
t.Fatal("diffID mismatch between rootFS and Registered callback")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestCancelledDownload(t *testing.T) {
|
|
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
|
|
lsMap := make(map[string]layer.Store)
|
|
lsMap[runtime.GOOS] = layerStore
|
|
ldm := NewLayerDownloadManager(lsMap, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
|
|
go func() {
|
|
for range progressChan {
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
go func() {
|
|
<-time.After(time.Millisecond)
|
|
cancel()
|
|
}()
|
|
|
|
descriptors := downloadDescriptors(nil)
|
|
_, _, err := ldm.Download(ctx, *image.NewRootFS(), runtime.GOOS, descriptors, progress.ChanOutput(progressChan))
|
|
if err != context.Canceled {
|
|
t.Fatal("expected download to be cancelled")
|
|
}
|
|
|
|
close(progressChan)
|
|
<-progressDone
|
|
}
|
|
|
|
func TestMaxDownloadAttempts(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
simulateRetries int
|
|
maxDownloadAttempts int
|
|
expectedErr string
|
|
}{
|
|
{
|
|
name: "max-attempts=5, succeed at 2nd attempt",
|
|
simulateRetries: 2,
|
|
maxDownloadAttempts: 5,
|
|
},
|
|
{
|
|
name: "max-attempts=5, succeed at 5th attempt",
|
|
simulateRetries: 5,
|
|
maxDownloadAttempts: 5,
|
|
},
|
|
{
|
|
name: "max-attempts=5, fail at 6th attempt",
|
|
simulateRetries: 6,
|
|
maxDownloadAttempts: 5,
|
|
expectedErr: "simulating download attempt 5/6",
|
|
},
|
|
{
|
|
name: "max-attempts=0, fail after 1 attempt",
|
|
simulateRetries: 1,
|
|
maxDownloadAttempts: 0,
|
|
expectedErr: "simulating download attempt 1/1",
|
|
},
|
|
}
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
t.Parallel()
|
|
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
|
|
lsMap := make(map[string]layer.Store)
|
|
lsMap[runtime.GOOS] = layerStore
|
|
ldm := NewLayerDownloadManager(
|
|
lsMap,
|
|
maxDownloadConcurrency,
|
|
func(m *LayerDownloadManager) {
|
|
m.waitDuration = time.Millisecond
|
|
m.maxDownloadAttempts = tc.maxDownloadAttempts
|
|
})
|
|
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
|
|
go func() {
|
|
for range progressChan {
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
var currentDownloads int32
|
|
descriptors := downloadDescriptors(¤tDownloads)
|
|
descriptors[4].(*mockDownloadDescriptor).simulateRetries = tc.simulateRetries
|
|
|
|
_, _, err := ldm.Download(context.Background(), *image.NewRootFS(), runtime.GOOS, descriptors, progress.ChanOutput(progressChan))
|
|
if tc.expectedErr == "" {
|
|
assert.NilError(t, err)
|
|
} else {
|
|
assert.Error(t, err, tc.expectedErr)
|
|
}
|
|
})
|
|
}
|
|
}
|