1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Attempt to resume downloads after certain errors

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2016-01-26 11:19:18 -08:00
parent f425529e7e
commit 056bf9f25e

View file

@ -17,6 +17,7 @@ import (
"github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/client" "github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/distribution/metadata" "github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer" "github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image" "github.com/docker/docker/image"
@ -115,6 +116,7 @@ type v2LayerDescriptor struct {
repo distribution.Repository repo distribution.Repository
V2MetadataService *metadata.V2MetadataService V2MetadataService *metadata.V2MetadataService
tmpFile *os.File tmpFile *os.File
verifier digest.Verifier
} }
func (ld *v2LayerDescriptor) Key() string { func (ld *v2LayerDescriptor) Key() string {
@ -132,29 +134,56 @@ func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) { func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
logrus.Debugf("pulling blob %q", ld.digest) logrus.Debugf("pulling blob %q", ld.digest)
var err error var (
err error
offset int64
)
if ld.tmpFile == nil { if ld.tmpFile == nil {
ld.tmpFile, err = createDownloadFile() ld.tmpFile, err = createDownloadFile()
} else {
_, err = ld.tmpFile.Seek(0, os.SEEK_SET)
}
if err != nil { if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err} return nil, 0, xfer.DoNotRetry{Err: err}
} }
} else {
offset, err = ld.tmpFile.Seek(0, os.SEEK_END)
if err != nil {
logrus.Debugf("error seeking to end of download file: %v", err)
offset = 0
ld.tmpFile.Close()
if err := os.Remove(ld.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name())
}
ld.tmpFile, err = createDownloadFile()
if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
} else if offset != 0 {
logrus.Debugf("attempting to resume download of %q from %d bytes", ld.digest, offset)
}
}
tmpFile := ld.tmpFile tmpFile := ld.tmpFile
blobs := ld.repo.Blobs(ctx) blobs := ld.repo.Blobs(ctx)
layerDownload, err := blobs.Open(ctx, ld.digest) layerDownload, err := blobs.Open(ctx, ld.digest)
if err != nil { if err != nil {
logrus.Debugf("Error statting layer: %v", err) logrus.Debugf("Error initiating layer download: %v", err)
if err == distribution.ErrBlobUnknown { if err == distribution.ErrBlobUnknown {
return nil, 0, xfer.DoNotRetry{Err: err} return nil, 0, xfer.DoNotRetry{Err: err}
} }
return nil, 0, retryOnError(err) return nil, 0, retryOnError(err)
} }
if offset != 0 {
_, err := layerDownload.Seek(offset, os.SEEK_SET)
if err != nil {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
}
size, err := layerDownload.Seek(0, os.SEEK_END) size, err := layerDownload.Seek(0, os.SEEK_END)
if err != nil { if err != nil {
// Seek failed, perhaps because there was no Content-Length // Seek failed, perhaps because there was no Content-Length
@ -162,43 +191,59 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
// still continue without a progress bar. // still continue without a progress bar.
size = 0 size = 0
} else { } else {
// Restore the seek offset at the beginning of the stream. if size != 0 && offset > size {
_, err = layerDownload.Seek(0, os.SEEK_SET) logrus.Debugf("Partial download is larger than full blob. Starting over")
offset = 0
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
}
// Restore the seek offset either at the beginning of the
// stream, or just after the last byte we have from previous
// attempts.
_, err = layerDownload.Seek(offset, os.SEEK_SET)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
} }
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size, ld.ID(), "Downloading") reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size-offset, ld.ID(), "Downloading")
defer reader.Close() defer reader.Close()
verifier, err := digest.NewDigestVerifier(ld.digest) if ld.verifier == nil {
ld.verifier, err = digest.NewDigestVerifier(ld.digest)
if err != nil { if err != nil {
return nil, 0, xfer.DoNotRetry{Err: err} return nil, 0, xfer.DoNotRetry{Err: err}
} }
_, err = io.Copy(tmpFile, io.TeeReader(reader, verifier))
if err != nil {
tmpFile.Close()
if err := os.Remove(tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
} }
ld.tmpFile = nil
_, err = io.Copy(tmpFile, io.TeeReader(reader, ld.verifier))
if err != nil {
if err == transport.ErrWrongCodeForByteRange {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
}
return nil, 0, err
}
return nil, 0, retryOnError(err) return nil, 0, retryOnError(err)
} }
progress.Update(progressOutput, ld.ID(), "Verifying Checksum") progress.Update(progressOutput, ld.ID(), "Verifying Checksum")
if !verifier.Verified() { if !ld.verifier.Verified() {
err = fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest) err = fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest)
logrus.Error(err) logrus.Error(err)
tmpFile.Close() // Allow a retry if this digest verification error happened
if err := os.Remove(tmpFile.Name()); err != nil { // after a resumed download.
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name()) if offset != 0 {
if err := ld.truncateDownloadFile(); err != nil {
return nil, 0, xfer.DoNotRetry{Err: err}
} }
ld.tmpFile = nil
return nil, 0, err
}
return nil, 0, xfer.DoNotRetry{Err: err} return nil, 0, xfer.DoNotRetry{Err: err}
} }
@ -213,6 +258,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name()) logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name())
} }
ld.tmpFile = nil ld.tmpFile = nil
ld.verifier = nil
return nil, 0, xfer.DoNotRetry{Err: err} return nil, 0, xfer.DoNotRetry{Err: err}
} }
return tmpFile, size, nil return tmpFile, size, nil
@ -227,6 +273,23 @@ func (ld *v2LayerDescriptor) Close() {
} }
} }
func (ld *v2LayerDescriptor) truncateDownloadFile() error {
// Need a new hash context since we will be redoing the download
ld.verifier = nil
if _, err := ld.tmpFile.Seek(0, os.SEEK_SET); err != nil {
logrus.Debugf("error seeking to beginning of download file: %v", err)
return err
}
if err := ld.tmpFile.Truncate(0); err != nil {
logrus.Debugf("error truncating download file: %v", err)
return err
}
return nil
}
func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) { func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
// Cache mapping from this layer's DiffID to the blobsum // Cache mapping from this layer's DiffID to the blobsum
ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()}) ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()})