diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index cb07b5172a..0e04c46c32 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -17,6 +17,7 @@ import ( "github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/client/transport" "github.com/docker/docker/distribution/metadata" "github.com/docker/docker/distribution/xfer" "github.com/docker/docker/image" @@ -115,6 +116,7 @@ type v2LayerDescriptor struct { repo distribution.Repository V2MetadataService *metadata.V2MetadataService tmpFile *os.File + verifier digest.Verifier } func (ld *v2LayerDescriptor) Key() string { @@ -132,15 +134,33 @@ func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) { func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) { logrus.Debugf("pulling blob %q", ld.digest) - var err error + var ( + err error + offset int64 + ) if ld.tmpFile == nil { ld.tmpFile, err = createDownloadFile() + if err != nil { + return nil, 0, xfer.DoNotRetry{Err: err} + } } else { - _, err = ld.tmpFile.Seek(0, os.SEEK_SET) - } - if err != nil { - return nil, 0, xfer.DoNotRetry{Err: err} + offset, err = ld.tmpFile.Seek(0, os.SEEK_END) + if err != nil { + logrus.Debugf("error seeking to end of download file: %v", err) + offset = 0 + + ld.tmpFile.Close() + if err := os.Remove(ld.tmpFile.Name()); err != nil { + logrus.Errorf("Failed to remove temp file: %s", ld.tmpFile.Name()) + } + ld.tmpFile, err = createDownloadFile() + if err != nil { + return nil, 0, xfer.DoNotRetry{Err: err} + } + } else if offset != 0 { + logrus.Debugf("attempting to resume download of %q from %d bytes", ld.digest, offset) + } } tmpFile := ld.tmpFile @@ -148,13 +168,22 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre layerDownload, err := blobs.Open(ctx, ld.digest) if err != nil { - logrus.Debugf("Error statting layer: %v", err) + logrus.Debugf("Error initiating layer download: %v", err) if err == distribution.ErrBlobUnknown { return nil, 0, xfer.DoNotRetry{Err: err} } return nil, 0, retryOnError(err) } + if offset != 0 { + _, err := layerDownload.Seek(offset, os.SEEK_SET) + if err != nil { + if err := ld.truncateDownloadFile(); err != nil { + return nil, 0, xfer.DoNotRetry{Err: err} + } + return nil, 0, err + } + } size, err := layerDownload.Seek(0, os.SEEK_END) if err != nil { // Seek failed, perhaps because there was no Content-Length @@ -162,43 +191,59 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre // still continue without a progress bar. size = 0 } else { - // Restore the seek offset at the beginning of the stream. - _, err = layerDownload.Seek(0, os.SEEK_SET) + if size != 0 && offset > size { + logrus.Debugf("Partial download is larger than full blob. Starting over") + offset = 0 + if err := ld.truncateDownloadFile(); err != nil { + return nil, 0, xfer.DoNotRetry{Err: err} + } + } + + // Restore the seek offset either at the beginning of the + // stream, or just after the last byte we have from previous + // attempts. + _, err = layerDownload.Seek(offset, os.SEEK_SET) if err != nil { return nil, 0, err } } - reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size, ld.ID(), "Downloading") + reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerDownload), progressOutput, size-offset, ld.ID(), "Downloading") defer reader.Close() - verifier, err := digest.NewDigestVerifier(ld.digest) - if err != nil { - return nil, 0, xfer.DoNotRetry{Err: err} + if ld.verifier == nil { + ld.verifier, err = digest.NewDigestVerifier(ld.digest) + if err != nil { + return nil, 0, xfer.DoNotRetry{Err: err} + } } - _, err = io.Copy(tmpFile, io.TeeReader(reader, verifier)) + _, err = io.Copy(tmpFile, io.TeeReader(reader, ld.verifier)) if err != nil { - tmpFile.Close() - if err := os.Remove(tmpFile.Name()); err != nil { - logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name()) + if err == transport.ErrWrongCodeForByteRange { + if err := ld.truncateDownloadFile(); err != nil { + return nil, 0, xfer.DoNotRetry{Err: err} + } + return nil, 0, err } - ld.tmpFile = nil return nil, 0, retryOnError(err) } progress.Update(progressOutput, ld.ID(), "Verifying Checksum") - if !verifier.Verified() { + if !ld.verifier.Verified() { err = fmt.Errorf("filesystem layer verification failed for digest %s", ld.digest) logrus.Error(err) - tmpFile.Close() - if err := os.Remove(tmpFile.Name()); err != nil { - logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name()) - } - ld.tmpFile = nil + // Allow a retry if this digest verification error happened + // after a resumed download. + if offset != 0 { + if err := ld.truncateDownloadFile(); err != nil { + return nil, 0, xfer.DoNotRetry{Err: err} + } + return nil, 0, err + } return nil, 0, xfer.DoNotRetry{Err: err} } @@ -213,6 +258,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre logrus.Errorf("Failed to remove temp file: %s", tmpFile.Name()) } ld.tmpFile = nil + ld.verifier = nil return nil, 0, xfer.DoNotRetry{Err: err} } return tmpFile, size, nil @@ -227,6 +273,23 @@ func (ld *v2LayerDescriptor) Close() { } } +func (ld *v2LayerDescriptor) truncateDownloadFile() error { + // Need a new hash context since we will be redoing the download + ld.verifier = nil + + if _, err := ld.tmpFile.Seek(0, os.SEEK_SET); err != nil { + logrus.Debugf("error seeking to beginning of download file: %v", err) + return err + } + + if err := ld.tmpFile.Truncate(0); err != nil { + logrus.Debugf("error truncating download file: %v", err) + return err + } + + return nil +} + func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) { // Cache mapping from this layer's DiffID to the blobsum ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()})