1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/distribution/pull_v1.go
Aaron Lehmann 572ce80230 Improved push and pull with upload manager and download manager
This commit adds a transfer manager which deduplicates and schedules
transfers, and also an upload manager and download manager that build on
top of the transfer manager to provide high-level interfaces for uploads
and downloads. The push and pull code is modified to use these building
blocks.

Some benefits of the changes:

- Simplification of push/pull code
- Pushes can upload layers concurrently
- Failed downloads and uploads are retried after backoff delays
- Cancellation is supported, but individual transfers will only be
  cancelled if all pushes or pulls using them are cancelled.
- The distribution code is decoupled from Docker Engine packages and API
  conventions (i.e. streamformatter), which will make it easier to split
  out.

This commit also includes unit tests for the new distribution/xfer
package. The tests cover 87.8% of the statements in the package.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2015-12-09 19:13:35 -08:00

343 lines
11 KiB
Go

package distribution
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/url"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/image/v1"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"golang.org/x/net/context"
)
type v1Puller struct {
v1IDService *metadata.V1IDService
endpoint registry.APIEndpoint
config *ImagePullConfig
repoInfo *registry.RepositoryInfo
session *registry.Session
}
func (p *v1Puller) Pull(ctx context.Context, ref reference.Named) (fallback bool, err error) {
if _, isDigested := ref.(reference.Digested); isDigested {
// Allowing fallback, because HTTPS v1 is before HTTP v2
return true, registry.ErrNoSupport{Err: errors.New("Cannot pull by digest with v1 registry")}
}
tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
if err != nil {
return false, err
}
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := transport.NewTransport(
// TODO(tiborvass): was ReceiveTimeout
registry.NewTransport(tlsConfig),
registry.DockerHeaders(p.config.MetaHeaders)...,
)
client := registry.HTTPClient(tr)
v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders)
if err != nil {
logrus.Debugf("Could not get v1 endpoint: %v", err)
return true, err
}
p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
if err != nil {
// TODO(dmcgowan): Check if should fallback
logrus.Debugf("Fallback from error: %s", err)
return true, err
}
if err := p.pullRepository(ctx, ref); err != nil {
// TODO(dmcgowan): Check if should fallback
return false, err
}
progress.Message(p.config.ProgressOutput, "", p.repoInfo.CanonicalName.Name()+": this image was pulled from a legacy registry. Important: This registry version will not be supported in future versions of docker.")
return false, nil
}
func (p *v1Puller) pullRepository(ctx context.Context, ref reference.Named) error {
progress.Message(p.config.ProgressOutput, "", "Pulling repository "+p.repoInfo.CanonicalName.Name())
repoData, err := p.session.GetRepositoryData(p.repoInfo.RemoteName)
if err != nil {
if strings.Contains(err.Error(), "HTTP code: 404") {
return fmt.Errorf("Error: image %s not found", p.repoInfo.RemoteName.Name())
}
// Unexpected HTTP error
return err
}
logrus.Debugf("Retrieving the tag list")
var tagsList map[string]string
tagged, isTagged := ref.(reference.Tagged)
if !isTagged {
tagsList, err = p.session.GetRemoteTags(repoData.Endpoints, p.repoInfo.RemoteName)
} else {
var tagID string
tagsList = make(map[string]string)
tagID, err = p.session.GetRemoteTag(repoData.Endpoints, p.repoInfo.RemoteName, tagged.Tag())
if err == registry.ErrRepoNotFound {
return fmt.Errorf("Tag %s not found in repository %s", tagged.Tag(), p.repoInfo.CanonicalName.Name())
}
tagsList[tagged.Tag()] = tagID
}
if err != nil {
logrus.Errorf("unable to get remote tags: %s", err)
return err
}
for tag, id := range tagsList {
repoData.ImgList[id] = &registry.ImgData{
ID: id,
Tag: tag,
Checksum: "",
}
}
layersDownloaded := false
for _, imgData := range repoData.ImgList {
if isTagged && imgData.Tag != tagged.Tag() {
continue
}
err := p.downloadImage(ctx, repoData, imgData, &layersDownloaded)
if err != nil {
return err
}
}
localNameRef := p.repoInfo.LocalName
if isTagged {
localNameRef, err = reference.WithTag(localNameRef, tagged.Tag())
if err != nil {
localNameRef = p.repoInfo.LocalName
}
}
writeStatus(localNameRef.String(), p.config.ProgressOutput, layersDownloaded)
return nil
}
func (p *v1Puller) downloadImage(ctx context.Context, repoData *registry.RepositoryData, img *registry.ImgData, layersDownloaded *bool) error {
if img.Tag == "" {
logrus.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
return nil
}
localNameRef, err := reference.WithTag(p.repoInfo.LocalName, img.Tag)
if err != nil {
retErr := fmt.Errorf("Image (id: %s) has invalid tag: %s", img.ID, img.Tag)
logrus.Debug(retErr.Error())
return retErr
}
if err := v1.ValidateID(img.ID); err != nil {
return err
}
progress.Updatef(p.config.ProgressOutput, stringid.TruncateID(img.ID), "Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName.Name())
success := false
var lastErr error
for _, ep := range p.repoInfo.Index.Mirrors {
ep += "v1/"
progress.Updatef(p.config.ProgressOutput, stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep))
if err = p.pullImage(ctx, img.ID, ep, localNameRef, layersDownloaded); err != nil {
// Don't report errors when pulling from mirrors.
logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep, err)
continue
}
success = true
break
}
if !success {
for _, ep := range repoData.Endpoints {
progress.Updatef(p.config.ProgressOutput, stringid.TruncateID(img.ID), "Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep)
if err = p.pullImage(ctx, img.ID, ep, localNameRef, layersDownloaded); err != nil {
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
// As the error is also given to the output stream the user will see the error.
lastErr = err
progress.Updatef(p.config.ProgressOutput, stringid.TruncateID(img.ID), "Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName.Name(), ep, err)
continue
}
success = true
break
}
}
if !success {
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, p.repoInfo.CanonicalName.Name(), lastErr)
progress.Update(p.config.ProgressOutput, stringid.TruncateID(img.ID), err.Error())
return err
}
progress.Update(p.config.ProgressOutput, stringid.TruncateID(img.ID), "Download complete")
return nil
}
func (p *v1Puller) pullImage(ctx context.Context, v1ID, endpoint string, localNameRef reference.Named, layersDownloaded *bool) (err error) {
var history []string
history, err = p.session.GetRemoteHistory(v1ID, endpoint)
if err != nil {
return err
}
if len(history) < 1 {
return fmt.Errorf("empty history for image %s", v1ID)
}
progress.Update(p.config.ProgressOutput, stringid.TruncateID(v1ID), "Pulling dependent layers")
var (
descriptors []xfer.DownloadDescriptor
newHistory []image.History
imgJSON []byte
imgSize int64
)
// Iterate over layers, in order from bottom-most to top-most. Download
// config for all layers and create descriptors.
for i := len(history) - 1; i >= 0; i-- {
v1LayerID := history[i]
imgJSON, imgSize, err = p.downloadLayerConfig(v1LayerID, endpoint)
if err != nil {
return err
}
// Create a new-style config from the legacy configs
h, err := v1.HistoryFromConfig(imgJSON, false)
if err != nil {
return err
}
newHistory = append(newHistory, h)
layerDescriptor := &v1LayerDescriptor{
v1LayerID: v1LayerID,
indexName: p.repoInfo.Index.Name,
endpoint: endpoint,
v1IDService: p.v1IDService,
layersDownloaded: layersDownloaded,
layerSize: imgSize,
session: p.session,
}
descriptors = append(descriptors, layerDescriptor)
}
rootFS := image.NewRootFS()
resultRootFS, release, err := p.config.DownloadManager.Download(ctx, *rootFS, descriptors, p.config.ProgressOutput)
if err != nil {
return err
}
defer release()
config, err := v1.MakeConfigFromV1Config(imgJSON, &resultRootFS, newHistory)
if err != nil {
return err
}
imageID, err := p.config.ImageStore.Create(config)
if err != nil {
return err
}
if err := p.config.TagStore.AddTag(localNameRef, imageID, true); err != nil {
return err
}
return nil
}
func (p *v1Puller) downloadLayerConfig(v1LayerID, endpoint string) (imgJSON []byte, imgSize int64, err error) {
progress.Update(p.config.ProgressOutput, stringid.TruncateID(v1LayerID), "Pulling metadata")
retries := 5
for j := 1; j <= retries; j++ {
imgJSON, imgSize, err := p.session.GetRemoteImageJSON(v1LayerID, endpoint)
if err != nil && j == retries {
progress.Update(p.config.ProgressOutput, stringid.TruncateID(v1LayerID), "Error pulling layer metadata")
return nil, 0, err
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
}
return imgJSON, imgSize, nil
}
// not reached
return nil, 0, nil
}
type v1LayerDescriptor struct {
v1LayerID string
indexName string
endpoint string
v1IDService *metadata.V1IDService
layersDownloaded *bool
layerSize int64
session *registry.Session
}
func (ld *v1LayerDescriptor) Key() string {
return "v1:" + ld.v1LayerID
}
func (ld *v1LayerDescriptor) ID() string {
return stringid.TruncateID(ld.v1LayerID)
}
func (ld *v1LayerDescriptor) DiffID() (layer.DiffID, error) {
return ld.v1IDService.Get(ld.v1LayerID, ld.indexName)
}
func (ld *v1LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
progress.Update(progressOutput, ld.ID(), "Pulling fs layer")
layerReader, err := ld.session.GetRemoteImageLayer(ld.v1LayerID, ld.endpoint, ld.layerSize)
if err != nil {
progress.Update(progressOutput, ld.ID(), "Error pulling dependent layers")
if uerr, ok := err.(*url.Error); ok {
err = uerr.Err
}
if terr, ok := err.(net.Error); ok && terr.Timeout() {
return nil, 0, err
}
return nil, 0, xfer.DoNotRetry{Err: err}
}
*ld.layersDownloaded = true
tmpFile, err := ioutil.TempFile("", "GetImageBlob")
if err != nil {
layerReader.Close()
return nil, 0, err
}
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, layerReader), progressOutput, ld.layerSize, ld.ID(), "Downloading")
defer reader.Close()
_, err = io.Copy(tmpFile, reader)
if err != nil {
return nil, 0, err
}
progress.Update(progressOutput, ld.ID(), "Download complete")
logrus.Debugf("Downloaded %s to tempfile %s", ld.ID(), tmpFile.Name())
tmpFile.Seek(0, 0)
return ioutils.NewReadCloserWrapper(tmpFile, tmpFileCloser(tmpFile)), ld.layerSize, nil
}
func (ld *v1LayerDescriptor) Registered(diffID layer.DiffID) {
// Cache mapping from this layer's DiffID to the blobsum
ld.v1IDService.Set(ld.v1LayerID, ld.indexName, diffID)
}