mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
ce61a1ed98
Moby works perfectly when you are in a situation when one has a good and stable internet connection. Operating in area's where internet connectivity is likely to be lost in undetermined intervals, like a satellite connection or 4G/LTE in rural area's, can become a problem when pulling a new image. When connection is lost while image layers are being pulled, Moby will try to reconnect up to 5 times. If this fails, the incompletely downloaded layers are lost will need to be completely downloaded again during the next pull request. This means that we are using more data than we might have to. Pulling a layer multiple times from the start can become costly over a satellite or 4G/LTE connection. As these techniques (especially 4G) quite common in IoT and Moby is used to run Azure IoT Edge devices, I would like to add a settable maximum download attempts. The maximum download attempts is currently set at 5 (distribution/xfer/download.go). I would like to change this constant to a variable that the user can set. The default will still be 5, so nothing will change from the current version unless specified when starting the daemon with the added flag or in the config file. I added a default value of 5 for DefaultMaxDownloadAttempts and a settable max-download-attempts in the daemon config file. It is also added to the config of dockerd so it can be set with a flag when starting the daemon. This value gets stored in the imageService of the daemon when it is initiated and can be passed to the NewLayerDownloadManager as a parameter. It will be stored in the LayerDownloadManager when initiated. This enables us to set the max amount of retries in makeDownoadFunc equal to the max download attempts. I also added some tests that are based on maxConcurrentDownloads/maxConcurrentUploads. You can pull this version and test in a development container. Either create a config `file /etc/docker/daemon.json` with `{"max-download-attempts"=3}``, or use `dockerd --max-download-attempts=3 -D &` to start up the dockerd. Start downloading a container and disconnect from the internet whilst downloading. The result would be that it stops pulling after three attempts. Signed-off-by: Lukas Heeren <lukas-heeren@hotmail.com> Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
484 lines
14 KiB
Go
484 lines
14 KiB
Go
package xfer // import "github.com/docker/docker/distribution/xfer"
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/docker/image"
|
|
"github.com/docker/docker/layer"
|
|
"github.com/docker/docker/pkg/archive"
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/docker/docker/pkg/progress"
|
|
"github.com/docker/docker/pkg/system"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const maxDownloadAttempts = 5
|
|
|
|
// LayerDownloadManager figures out which layers need to be downloaded, then
|
|
// registers and downloads those, taking into account dependencies between
|
|
// layers.
|
|
type LayerDownloadManager struct {
|
|
layerStores map[string]layer.Store
|
|
tm TransferManager
|
|
waitDuration time.Duration
|
|
maxDownloadAttempts int
|
|
}
|
|
|
|
// SetConcurrency sets the max concurrent downloads for each pull
|
|
func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
|
|
ldm.tm.SetConcurrency(concurrency)
|
|
}
|
|
|
|
// NewLayerDownloadManager returns a new LayerDownloadManager.
|
|
func NewLayerDownloadManager(layerStores map[string]layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
|
|
manager := LayerDownloadManager{
|
|
layerStores: layerStores,
|
|
tm: NewTransferManager(concurrencyLimit),
|
|
waitDuration: time.Second,
|
|
maxDownloadAttempts: maxDownloadAttempts,
|
|
}
|
|
for _, option := range options {
|
|
option(&manager)
|
|
}
|
|
return &manager
|
|
}
|
|
|
|
// WithMaxDownloadAttempts configures the maximum number of download
|
|
// attempts for a download manager.
|
|
func WithMaxDownloadAttempts(max int) func(*LayerDownloadManager) {
|
|
return func(dlm *LayerDownloadManager) {
|
|
dlm.maxDownloadAttempts = max
|
|
}
|
|
}
|
|
|
|
type downloadTransfer struct {
|
|
Transfer
|
|
|
|
layerStore layer.Store
|
|
layer layer.Layer
|
|
err error
|
|
}
|
|
|
|
// result returns the layer resulting from the download, if the download
|
|
// and registration were successful.
|
|
func (d *downloadTransfer) result() (layer.Layer, error) {
|
|
return d.layer, d.err
|
|
}
|
|
|
|
// A DownloadDescriptor references a layer that may need to be downloaded.
|
|
type DownloadDescriptor interface {
|
|
// Key returns the key used to deduplicate downloads.
|
|
Key() string
|
|
// ID returns the ID for display purposes.
|
|
ID() string
|
|
// DiffID should return the DiffID for this layer, or an error
|
|
// if it is unknown (for example, if it has not been downloaded
|
|
// before).
|
|
DiffID() (layer.DiffID, error)
|
|
// Download is called to perform the download.
|
|
Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
|
|
// Close is called when the download manager is finished with this
|
|
// descriptor and will not call Download again or read from the reader
|
|
// that Download returned.
|
|
Close()
|
|
}
|
|
|
|
// DownloadDescriptorWithRegistered is a DownloadDescriptor that has an
|
|
// additional Registered method which gets called after a downloaded layer is
|
|
// registered. This allows the user of the download manager to know the DiffID
|
|
// of each registered layer. This method is called if a cast to
|
|
// DownloadDescriptorWithRegistered is successful.
|
|
type DownloadDescriptorWithRegistered interface {
|
|
DownloadDescriptor
|
|
Registered(diffID layer.DiffID)
|
|
}
|
|
|
|
// Download is a blocking function which ensures the requested layers are
|
|
// present in the layer store. It uses the string returned by the Key method to
|
|
// deduplicate downloads. If a given layer is not already known to present in
|
|
// the layer store, and the key is not used by an in-progress download, the
|
|
// Download method is called to get the layer tar data. Layers are then
|
|
// registered in the appropriate order. The caller must call the returned
|
|
// release function once it is done with the returned RootFS object.
|
|
func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS image.RootFS, os string, layers []DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
|
|
var (
|
|
topLayer layer.Layer
|
|
topDownload *downloadTransfer
|
|
watcher *Watcher
|
|
missingLayer bool
|
|
transferKey = ""
|
|
downloadsByKey = make(map[string]*downloadTransfer)
|
|
)
|
|
|
|
// Assume that the operating system is the host OS if blank, and validate it
|
|
// to ensure we don't cause a panic by an invalid index into the layerstores.
|
|
if os == "" {
|
|
os = runtime.GOOS
|
|
}
|
|
if !system.IsOSSupported(os) {
|
|
return image.RootFS{}, nil, system.ErrNotSupportedOperatingSystem
|
|
}
|
|
|
|
rootFS := initialRootFS
|
|
for _, descriptor := range layers {
|
|
key := descriptor.Key()
|
|
transferKey += key
|
|
|
|
if !missingLayer {
|
|
missingLayer = true
|
|
diffID, err := descriptor.DiffID()
|
|
if err == nil {
|
|
getRootFS := rootFS
|
|
getRootFS.Append(diffID)
|
|
l, err := ldm.layerStores[os].Get(getRootFS.ChainID())
|
|
if err == nil {
|
|
// Layer already exists.
|
|
logrus.Debugf("Layer already exists: %s", descriptor.ID())
|
|
progress.Update(progressOutput, descriptor.ID(), "Already exists")
|
|
if topLayer != nil {
|
|
layer.ReleaseAndLog(ldm.layerStores[os], topLayer)
|
|
}
|
|
topLayer = l
|
|
missingLayer = false
|
|
rootFS.Append(diffID)
|
|
// Register this repository as a source of this layer.
|
|
withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
|
|
if hasRegistered { // As layerstore may set the driver
|
|
withRegistered.Registered(diffID)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// Does this layer have the same data as a previous layer in
|
|
// the stack? If so, avoid downloading it more than once.
|
|
var topDownloadUncasted Transfer
|
|
if existingDownload, ok := downloadsByKey[key]; ok {
|
|
xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload, os)
|
|
defer topDownload.Transfer.Release(watcher)
|
|
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
|
|
topDownload = topDownloadUncasted.(*downloadTransfer)
|
|
continue
|
|
}
|
|
|
|
// Layer is not known to exist - download and register it.
|
|
progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer")
|
|
|
|
var xferFunc DoFunc
|
|
if topDownload != nil {
|
|
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload, os)
|
|
defer topDownload.Transfer.Release(watcher)
|
|
} else {
|
|
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil, os)
|
|
}
|
|
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
|
|
topDownload = topDownloadUncasted.(*downloadTransfer)
|
|
downloadsByKey[key] = topDownload
|
|
}
|
|
|
|
if topDownload == nil {
|
|
return rootFS, func() {
|
|
if topLayer != nil {
|
|
layer.ReleaseAndLog(ldm.layerStores[os], topLayer)
|
|
}
|
|
}, nil
|
|
}
|
|
|
|
// Won't be using the list built up so far - will generate it
|
|
// from downloaded layers instead.
|
|
rootFS.DiffIDs = []layer.DiffID{}
|
|
|
|
defer func() {
|
|
if topLayer != nil {
|
|
layer.ReleaseAndLog(ldm.layerStores[os], topLayer)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
topDownload.Transfer.Release(watcher)
|
|
return rootFS, func() {}, ctx.Err()
|
|
case <-topDownload.Done():
|
|
break
|
|
}
|
|
|
|
l, err := topDownload.result()
|
|
if err != nil {
|
|
topDownload.Transfer.Release(watcher)
|
|
return rootFS, func() {}, err
|
|
}
|
|
|
|
// Must do this exactly len(layers) times, so we don't include the
|
|
// base layer on Windows.
|
|
for range layers {
|
|
if l == nil {
|
|
topDownload.Transfer.Release(watcher)
|
|
return rootFS, func() {}, errors.New("internal error: too few parent layers")
|
|
}
|
|
rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
|
|
l = l.Parent()
|
|
}
|
|
return rootFS, func() { topDownload.Transfer.Release(watcher) }, err
|
|
}
|
|
|
|
// makeDownloadFunc returns a function that performs the layer download and
|
|
// registration. If parentDownload is non-nil, it waits for that download to
|
|
// complete before the registration step, and registers the downloaded data
|
|
// on top of parentDownload's resulting layer. Otherwise, it registers the
|
|
// layer on top of the ChainID given by parentLayer.
|
|
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer, os string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
d := &downloadTransfer{
|
|
Transfer: NewTransfer(),
|
|
layerStore: ldm.layerStores[os],
|
|
}
|
|
|
|
go func() {
|
|
defer func() {
|
|
close(progressChan)
|
|
}()
|
|
|
|
progressOutput := progress.ChanOutput(progressChan)
|
|
|
|
select {
|
|
case <-start:
|
|
default:
|
|
progress.Update(progressOutput, descriptor.ID(), "Waiting")
|
|
<-start
|
|
}
|
|
|
|
if parentDownload != nil {
|
|
// Did the parent download already fail or get
|
|
// cancelled?
|
|
select {
|
|
case <-parentDownload.Done():
|
|
_, err := parentDownload.result()
|
|
if err != nil {
|
|
d.err = err
|
|
return
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
|
|
var (
|
|
downloadReader io.ReadCloser
|
|
size int64
|
|
err error
|
|
retries int
|
|
)
|
|
|
|
defer descriptor.Close()
|
|
|
|
for {
|
|
downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput)
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
// If an error was returned because the context
|
|
// was cancelled, we shouldn't retry.
|
|
select {
|
|
case <-d.Transfer.Context().Done():
|
|
d.err = err
|
|
return
|
|
default:
|
|
}
|
|
|
|
retries++
|
|
if _, isDNR := err.(DoNotRetry); isDNR || retries > ldm.maxDownloadAttempts {
|
|
logrus.Errorf("Download failed after %d attempts: %v", retries, err)
|
|
d.err = err
|
|
return
|
|
}
|
|
|
|
logrus.Infof("Download failed, retrying (%d/%d): %v", retries, ldm.maxDownloadAttempts, err)
|
|
delay := retries * 5
|
|
ticker := time.NewTicker(ldm.waitDuration)
|
|
|
|
selectLoop:
|
|
for {
|
|
progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
|
|
select {
|
|
case <-ticker.C:
|
|
delay--
|
|
if delay == 0 {
|
|
ticker.Stop()
|
|
break selectLoop
|
|
}
|
|
case <-d.Transfer.Context().Done():
|
|
ticker.Stop()
|
|
d.err = errors.New("download cancelled during retry delay")
|
|
return
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
close(inactive)
|
|
|
|
if parentDownload != nil {
|
|
select {
|
|
case <-d.Transfer.Context().Done():
|
|
d.err = errors.New("layer registration cancelled")
|
|
downloadReader.Close()
|
|
return
|
|
case <-parentDownload.Done():
|
|
}
|
|
|
|
l, err := parentDownload.result()
|
|
if err != nil {
|
|
d.err = err
|
|
downloadReader.Close()
|
|
return
|
|
}
|
|
parentLayer = l.ChainID()
|
|
}
|
|
|
|
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
|
|
defer reader.Close()
|
|
|
|
inflatedLayerData, err := archive.DecompressStream(reader)
|
|
if err != nil {
|
|
d.err = fmt.Errorf("could not get decompression stream: %v", err)
|
|
return
|
|
}
|
|
|
|
var src distribution.Descriptor
|
|
if fs, ok := descriptor.(distribution.Describable); ok {
|
|
src = fs.Descriptor()
|
|
}
|
|
if ds, ok := d.layerStore.(layer.DescribableStore); ok {
|
|
d.layer, err = ds.RegisterWithDescriptor(inflatedLayerData, parentLayer, src)
|
|
} else {
|
|
d.layer, err = d.layerStore.Register(inflatedLayerData, parentLayer)
|
|
}
|
|
if err != nil {
|
|
select {
|
|
case <-d.Transfer.Context().Done():
|
|
d.err = errors.New("layer registration cancelled")
|
|
default:
|
|
d.err = fmt.Errorf("failed to register layer: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
progress.Update(progressOutput, descriptor.ID(), "Pull complete")
|
|
withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
|
|
if hasRegistered {
|
|
withRegistered.Registered(d.layer.DiffID())
|
|
}
|
|
|
|
// Doesn't actually need to be its own goroutine, but
|
|
// done like this so we can defer close(c).
|
|
go func() {
|
|
<-d.Transfer.Released()
|
|
if d.layer != nil {
|
|
layer.ReleaseAndLog(d.layerStore, d.layer)
|
|
}
|
|
}()
|
|
}()
|
|
|
|
return d
|
|
}
|
|
}
|
|
|
|
// makeDownloadFuncFromDownload returns a function that performs the layer
|
|
// registration when the layer data is coming from an existing download. It
|
|
// waits for sourceDownload and parentDownload to complete, and then
|
|
// reregisters the data from sourceDownload's top layer on top of
|
|
// parentDownload. This function does not log progress output because it would
|
|
// interfere with the progress reporting for sourceDownload, which has the same
|
|
// Key.
|
|
func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer, os string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
d := &downloadTransfer{
|
|
Transfer: NewTransfer(),
|
|
layerStore: ldm.layerStores[os],
|
|
}
|
|
|
|
go func() {
|
|
defer func() {
|
|
close(progressChan)
|
|
}()
|
|
|
|
<-start
|
|
|
|
close(inactive)
|
|
|
|
select {
|
|
case <-d.Transfer.Context().Done():
|
|
d.err = errors.New("layer registration cancelled")
|
|
return
|
|
case <-parentDownload.Done():
|
|
}
|
|
|
|
l, err := parentDownload.result()
|
|
if err != nil {
|
|
d.err = err
|
|
return
|
|
}
|
|
parentLayer := l.ChainID()
|
|
|
|
// sourceDownload should have already finished if
|
|
// parentDownload finished, but wait for it explicitly
|
|
// to be sure.
|
|
select {
|
|
case <-d.Transfer.Context().Done():
|
|
d.err = errors.New("layer registration cancelled")
|
|
return
|
|
case <-sourceDownload.Done():
|
|
}
|
|
|
|
l, err = sourceDownload.result()
|
|
if err != nil {
|
|
d.err = err
|
|
return
|
|
}
|
|
|
|
layerReader, err := l.TarStream()
|
|
if err != nil {
|
|
d.err = err
|
|
return
|
|
}
|
|
defer layerReader.Close()
|
|
|
|
var src distribution.Descriptor
|
|
if fs, ok := l.(distribution.Describable); ok {
|
|
src = fs.Descriptor()
|
|
}
|
|
if ds, ok := d.layerStore.(layer.DescribableStore); ok {
|
|
d.layer, err = ds.RegisterWithDescriptor(layerReader, parentLayer, src)
|
|
} else {
|
|
d.layer, err = d.layerStore.Register(layerReader, parentLayer)
|
|
}
|
|
if err != nil {
|
|
d.err = fmt.Errorf("failed to register layer: %v", err)
|
|
return
|
|
}
|
|
|
|
withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered)
|
|
if hasRegistered {
|
|
withRegistered.Registered(d.layer.DiffID())
|
|
}
|
|
|
|
// Doesn't actually need to be its own goroutine, but
|
|
// done like this so we can defer close(c).
|
|
go func() {
|
|
<-d.Transfer.Released()
|
|
if d.layer != nil {
|
|
layer.ReleaseAndLog(d.layerStore, d.layer)
|
|
}
|
|
}()
|
|
}()
|
|
|
|
return d
|
|
}
|
|
}
|