1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/graph/pull.go
Jessica Frazelle c49cd3d2a5 Make debugs logs suck less.
Docker-DCO-1.1-Signed-off-by: Jessica Frazelle <jess@docker.com> (github: jfrazelle)
2015-01-23 13:20:29 -08:00

563 lines
17 KiB
Go

package graph
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/url"
"os"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
)
func (s *TagStore) CmdPull(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 && n != 2 {
return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
}
var (
localName = job.Args[0]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
authConfig = &registry.AuthConfig{}
metaHeaders map[string][]string
)
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := registry.ResolveRepositoryInfo(job, localName)
if err != nil {
return job.Error(err)
}
if len(job.Args) > 1 {
tag = job.Args[1]
}
job.GetenvJson("authConfig", authConfig)
job.GetenvJson("metaHeaders", &metaHeaders)
c, err := s.poolAdd("pull", repoInfo.LocalName+":"+tag)
if err != nil {
if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish
job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName))
<-c
return engine.StatusOK
}
return job.Error(err)
}
defer s.poolRemove("pull", repoInfo.LocalName+":"+tag)
log.Debugf("pulling image from host %q with remote name %q", repoInfo.Index.Name, repoInfo.RemoteName)
endpoint, err := repoInfo.GetEndpoint()
if err != nil {
return job.Error(err)
}
r, err := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
if err != nil {
return job.Error(err)
}
logName := repoInfo.LocalName
if tag != "" {
logName += ":" + tag
}
if len(repoInfo.Index.Mirrors) == 0 && (repoInfo.Index.Official || endpoint.Version == registry.APIVersion2) {
j := job.Eng.Job("trust_update_base")
if err = j.Run(); err != nil {
log.Errorf("error updating trust base graph: %s", err)
}
log.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil {
if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
log.Errorf("Error logging event 'pull' for %s: %s", logName, err)
}
return engine.StatusOK
} else if err != registry.ErrDoesNotExist {
log.Errorf("Error from V2 registry: %s", err)
}
log.Debug("image does not exist on v2 registry, falling back to v1")
}
log.Debugf("pulling v1 repository with local name %q", repoInfo.LocalName)
if err = s.pullRepository(r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err != nil {
return job.Error(err)
}
if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
log.Errorf("Error logging event 'pull' for %s: %s", logName, err)
}
return engine.StatusOK
}
func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
out.Write(sf.FormatStatus("", "Pulling repository %s", repoInfo.CanonicalName))
repoData, err := r.GetRepositoryData(repoInfo.RemoteName)
if err != nil {
if strings.Contains(err.Error(), "HTTP code: 404") {
return fmt.Errorf("Error: image %s:%s not found", repoInfo.RemoteName, askedTag)
}
// Unexpected HTTP error
return err
}
log.Debugf("Retrieving the tag list")
tagsList, err := r.GetRemoteTags(repoData.Endpoints, repoInfo.RemoteName, repoData.Tokens)
if err != nil {
log.Errorf("unable to get remote tags: %s", err)
return err
}
for tag, id := range tagsList {
repoData.ImgList[id] = &registry.ImgData{
ID: id,
Tag: tag,
Checksum: "",
}
}
log.Debugf("Registering tags")
// If no tag has been specified, pull them all
var imageId string
if askedTag == "" {
for tag, id := range tagsList {
repoData.ImgList[id].Tag = tag
}
} else {
// Otherwise, check that the tag exists and use only that one
id, exists := tagsList[askedTag]
if !exists {
return fmt.Errorf("Tag %s not found in repository %s", askedTag, repoInfo.CanonicalName)
}
imageId = id
repoData.ImgList[id].Tag = askedTag
}
errors := make(chan error)
layers_downloaded := false
for _, image := range repoData.ImgList {
downloadImage := func(img *registry.ImgData) {
if askedTag != "" && img.Tag != askedTag {
if parallel {
errors <- nil
}
return
}
if img.Tag == "" {
log.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
if parallel {
errors <- nil
}
return
}
// ensure no two downloads of the same image happen at the same time
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
if c != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
<-c
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
} else {
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
}
if parallel {
errors <- nil
}
return
}
defer s.poolRemove("pull", "img:"+img.ID)
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, repoInfo.CanonicalName), nil))
success := false
var lastErr, err error
var is_downloaded bool
for _, ep := range repoInfo.Index.Mirrors {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
// Don't report errors when pulling from mirrors.
log.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err)
continue
}
layers_downloaded = layers_downloaded || is_downloaded
success = true
break
}
if !success {
for _, ep := range repoData.Endpoints {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
// As the error is also given to the output stream the user will see the error.
lastErr = err
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err), nil))
continue
}
layers_downloaded = layers_downloaded || is_downloaded
success = true
break
}
}
if !success {
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, repoInfo.CanonicalName, lastErr)
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
if parallel {
errors <- err
return
}
}
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
if parallel {
errors <- nil
}
}
if parallel {
go downloadImage(image)
} else {
downloadImage(image)
}
}
if parallel {
var lastError error
for i := 0; i < len(repoData.ImgList); i++ {
if err := <-errors; err != nil {
lastError = err
}
}
if lastError != nil {
return lastError
}
}
for tag, id := range tagsList {
if askedTag != "" && id != imageId {
continue
}
if err := s.Set(repoInfo.LocalName, tag, id, true); err != nil {
return err
}
}
requestedTag := repoInfo.CanonicalName
if len(askedTag) > 0 {
requestedTag = repoInfo.CanonicalName + ":" + askedTag
}
WriteStatus(requestedTag, out, sf, layers_downloaded)
return nil
}
func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) (bool, error) {
history, err := r.GetRemoteHistory(imgID, endpoint, token)
if err != nil {
return false, err
}
out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
// FIXME: Try to stream the images?
// FIXME: Launch the getRemoteImage() in goroutines
layers_downloaded := false
for i := len(history) - 1; i >= 0; i-- {
id := history[i]
// ensure no two downloads of the same layer happen at the same time
if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
log.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
<-c
}
defer s.poolRemove("pull", "layer:"+id)
if !s.graph.Exists(id) {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
var (
imgJSON []byte
imgSize int
err error
img *image.Image
)
retries := 5
for j := 1; j <= retries; j++ {
imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
if err != nil && j == retries {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
return layers_downloaded, err
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
}
img, err = image.NewImgJSON(imgJSON)
layers_downloaded = true
if err != nil && j == retries {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
return layers_downloaded, fmt.Errorf("Failed to parse json: %s", err)
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else {
break
}
}
for j := 1; j <= retries; j++ {
// Get the layer
status := "Pulling fs layer"
if j > 1 {
status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
}
out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
if uerr, ok := err.(*url.Error); ok {
err = uerr.Err
}
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else if err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
return layers_downloaded, err
}
layers_downloaded = true
defer layer.Close()
err = s.graph.Register(img,
utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"))
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
continue
} else if err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
return layers_downloaded, err
} else {
break
}
}
}
out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
}
return layers_downloaded, nil
}
func WriteStatus(requestedTag string, out io.Writer, sf *utils.StreamFormatter, layers_downloaded bool) {
if layers_downloaded {
out.Write(sf.FormatStatus("", "Status: Downloaded newer image for %s", requestedTag))
} else {
out.Write(sf.FormatStatus("", "Status: Image is up to date for %s", requestedTag))
}
}
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
imgJSON []byte
img *image.Image
tmpFile *os.File
length int64
downloaded bool
err chan error
}
func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool) error {
endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
if err != nil {
return fmt.Errorf("error getting registry endpoint: %s", err)
}
auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, true)
if err != nil {
return fmt.Errorf("error getting authorization: %s", err)
}
var layersDownloaded bool
if tag == "" {
log.Debugf("Pulling tag list from V2 registry for %s", repoInfo.CanonicalName)
tags, err := r.GetV2RemoteTags(endpoint, repoInfo.RemoteName, auth)
if err != nil {
return err
}
for _, t := range tags {
if downloaded, err := s.pullV2Tag(eng, r, out, endpoint, repoInfo, t, sf, parallel, auth); err != nil {
return err
} else if downloaded {
layersDownloaded = true
}
}
} else {
if downloaded, err := s.pullV2Tag(eng, r, out, endpoint, repoInfo, tag, sf, parallel, auth); err != nil {
return err
} else if downloaded {
layersDownloaded = true
}
}
requestedTag := repoInfo.CanonicalName
if len(tag) > 0 {
requestedTag = repoInfo.CanonicalName + ":" + tag
}
WriteStatus(requestedTag, out, sf, layersDownloaded)
return nil
}
func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Writer, endpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool, auth *registry.RequestAuthorization) (bool, error) {
log.Debugf("Pulling tag from V2 registry: %q", tag)
manifestBytes, err := r.GetV2ImageManifest(endpoint, repoInfo.RemoteName, tag, auth)
if err != nil {
return false, err
}
manifest, verified, err := s.verifyManifest(eng, manifestBytes)
if err != nil {
return false, fmt.Errorf("error verifying manifest: %s", err)
}
if err := checkValidManifest(manifest); err != nil {
return false, err
}
if verified {
log.Printf("Image manifest for %s:%s has been verified", repoInfo.CanonicalName, tag)
} else {
out.Write(sf.FormatStatus(tag, "Pulling from %s", repoInfo.CanonicalName))
}
downloads := make([]downloadInfo, len(manifest.FSLayers))
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
var (
sumStr = manifest.FSLayers[i].BlobSum
imgJSON = []byte(manifest.History[i].V1Compatibility)
)
img, err := image.NewImgJSON(imgJSON)
if err != nil {
return false, fmt.Errorf("failed to parse json: %s", err)
}
downloads[i].img = img
// Check if exists
if s.graph.Exists(img.ID) {
log.Debugf("Image already exists: %s", img.ID)
continue
}
chunks := strings.SplitN(sumStr, ":", 2)
if len(chunks) < 2 {
return false, fmt.Errorf("expected 2 parts in the sumStr, got %#v", chunks)
}
sumType, checksum := chunks[0], chunks[1]
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling fs layer", nil))
downloadFunc := func(di *downloadInfo) error {
log.Debugf("pulling blob %q to V1 img %s", sumStr, img.ID)
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
if c != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
<-c
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
} else {
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
}
} else {
defer s.poolRemove("pull", "img:"+img.ID)
tmpFile, err := ioutil.TempFile("", "GetV2ImageBlob")
if err != nil {
return err
}
r, l, err := r.GetV2ImageBlobReader(endpoint, repoInfo.RemoteName, sumType, checksum, auth)
if err != nil {
return err
}
defer r.Close()
// Wrap the reader with the appropriate TarSum reader.
tarSumReader, err := tarsum.NewTarSumForLabel(r, true, sumType)
if err != nil {
return fmt.Errorf("unable to wrap image blob reader with TarSum: %s", err)
}
io.Copy(tmpFile, utils.ProgressReader(ioutil.NopCloser(tarSumReader), int(l), out, sf, false, utils.TruncateID(img.ID), "Downloading"))
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Verifying Checksum", nil))
if finalChecksum := tarSumReader.Sum(nil); !strings.EqualFold(finalChecksum, sumStr) {
return fmt.Errorf("image verification failed: checksum mismatch - expected %q but got %q", sumStr, finalChecksum)
}
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
log.Debugf("Downloaded %s to tempfile %s", img.ID, tmpFile.Name())
di.tmpFile = tmpFile
di.length = l
di.downloaded = true
}
di.imgJSON = imgJSON
return nil
}
if parallel {
downloads[i].err = make(chan error)
go func(di *downloadInfo) {
di.err <- downloadFunc(di)
}(&downloads[i])
} else {
err := downloadFunc(&downloads[i])
if err != nil {
return false, err
}
}
}
var layersDownloaded bool
for i := len(downloads) - 1; i >= 0; i-- {
d := &downloads[i]
if d.err != nil {
err := <-d.err
if err != nil {
return false, err
}
}
if d.downloaded {
// if tmpFile is empty assume download and extracted elsewhere
defer os.Remove(d.tmpFile.Name())
defer d.tmpFile.Close()
d.tmpFile.Seek(0, 0)
if d.tmpFile != nil {
err = s.graph.Register(d.img,
utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, utils.TruncateID(d.img.ID), "Extracting"))
if err != nil {
return false, err
}
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
}
out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Pull complete", nil))
layersDownloaded = true
} else {
out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Already exists", nil))
}
}
out.Write(sf.FormatStatus(repoInfo.CanonicalName+":"+tag, "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security."))
if err = s.Set(repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil {
return false, err
}
return layersDownloaded, nil
}