mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
274baf70bf
Properly verify manifests and layer digests on pull
720 lines
22 KiB
Go
720 lines
22 KiB
Go
package graph
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/distribution/digest"
|
|
"github.com/docker/docker/cliconfig"
|
|
"github.com/docker/docker/image"
|
|
"github.com/docker/docker/pkg/progressreader"
|
|
"github.com/docker/docker/pkg/streamformatter"
|
|
"github.com/docker/docker/pkg/stringid"
|
|
"github.com/docker/docker/pkg/transport"
|
|
"github.com/docker/docker/registry"
|
|
"github.com/docker/docker/utils"
|
|
)
|
|
|
|
type ImagePullConfig struct {
|
|
MetaHeaders map[string][]string
|
|
AuthConfig *cliconfig.AuthConfig
|
|
OutStream io.Writer
|
|
}
|
|
|
|
func (s *TagStore) Pull(image string, tag string, imagePullConfig *ImagePullConfig) error {
|
|
var (
|
|
sf = streamformatter.NewJSONStreamFormatter()
|
|
)
|
|
|
|
// Resolve the Repository name from fqn to RepositoryInfo
|
|
repoInfo, err := s.registryService.ResolveRepository(image)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := validateRepoName(repoInfo.LocalName); err != nil {
|
|
return err
|
|
}
|
|
|
|
c, err := s.poolAdd("pull", utils.ImageReference(repoInfo.LocalName, tag))
|
|
if err != nil {
|
|
if c != nil {
|
|
// Another pull of the same repository is already taking place; just wait for it to finish
|
|
imagePullConfig.OutStream.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", repoInfo.LocalName))
|
|
<-c
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
defer s.poolRemove("pull", utils.ImageReference(repoInfo.LocalName, tag))
|
|
|
|
logName := repoInfo.LocalName
|
|
if tag != "" {
|
|
logName = utils.ImageReference(logName, tag)
|
|
}
|
|
|
|
// Attempt pulling official content from a provided v2 mirror
|
|
if repoInfo.Index.Official {
|
|
v2mirrorEndpoint, v2mirrorRepoInfo, err := configureV2Mirror(repoInfo, s.registryService)
|
|
if err != nil {
|
|
logrus.Errorf("Error configuring mirrors: %s", err)
|
|
return err
|
|
}
|
|
|
|
if v2mirrorEndpoint != nil {
|
|
logrus.Debugf("Attempting to pull from v2 mirror: %s", v2mirrorEndpoint.URL)
|
|
return s.pullFromV2Mirror(v2mirrorEndpoint, v2mirrorRepoInfo, imagePullConfig, tag, sf, logName)
|
|
}
|
|
}
|
|
|
|
logrus.Debugf("pulling image from host %q with remote name %q", repoInfo.Index.Name, repoInfo.RemoteName)
|
|
|
|
endpoint, err := repoInfo.GetEndpoint(imagePullConfig.MetaHeaders)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// TODO(tiborvass): reuse client from endpoint?
|
|
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
|
|
tr := transport.NewTransport(
|
|
registry.NewTransport(registry.ReceiveTimeout, endpoint.IsSecure),
|
|
registry.DockerHeaders(imagePullConfig.MetaHeaders)...,
|
|
)
|
|
client := registry.HTTPClient(tr)
|
|
r, err := registry.NewSession(client, imagePullConfig.AuthConfig, endpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(repoInfo.Index.Mirrors) == 0 && (repoInfo.Index.Official || endpoint.Version == registry.APIVersion2) {
|
|
if repoInfo.Official {
|
|
s.trustService.UpdateBase()
|
|
}
|
|
|
|
logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
|
|
if err := s.pullV2Repository(r, imagePullConfig.OutStream, repoInfo, tag, sf); err == nil {
|
|
s.eventsService.Log("pull", logName, "")
|
|
return nil
|
|
} else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable {
|
|
logrus.Errorf("Error from V2 registry: %s", err)
|
|
}
|
|
|
|
logrus.Debug("image does not exist on v2 registry, falling back to v1")
|
|
}
|
|
|
|
if utils.DigestReference(tag) {
|
|
return fmt.Errorf("pulling with digest reference failed from v2 registry")
|
|
}
|
|
|
|
logrus.Debugf("pulling v1 repository with local name %q", repoInfo.LocalName)
|
|
if err = s.pullRepository(r, imagePullConfig.OutStream, repoInfo, tag, sf); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.eventsService.Log("pull", logName, "")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func makeMirrorRepoInfo(repoInfo *registry.RepositoryInfo, mirror string) *registry.RepositoryInfo {
|
|
mirrorRepo := ®istry.RepositoryInfo{
|
|
RemoteName: repoInfo.RemoteName,
|
|
LocalName: repoInfo.LocalName,
|
|
CanonicalName: repoInfo.CanonicalName,
|
|
Official: false,
|
|
|
|
Index: ®istry.IndexInfo{
|
|
Official: false,
|
|
Secure: repoInfo.Index.Secure,
|
|
Name: mirror,
|
|
Mirrors: []string{},
|
|
},
|
|
}
|
|
return mirrorRepo
|
|
}
|
|
|
|
func configureV2Mirror(repoInfo *registry.RepositoryInfo, s *registry.Service) (*registry.Endpoint, *registry.RepositoryInfo, error) {
|
|
mirrors := repoInfo.Index.Mirrors
|
|
if len(mirrors) == 0 {
|
|
// no mirrors configured
|
|
return nil, nil, nil
|
|
}
|
|
|
|
v1MirrorCount := 0
|
|
var v2MirrorEndpoint *registry.Endpoint
|
|
var v2MirrorRepoInfo *registry.RepositoryInfo
|
|
for _, mirror := range mirrors {
|
|
mirrorRepoInfo := makeMirrorRepoInfo(repoInfo, mirror)
|
|
endpoint, err := registry.NewEndpoint(mirrorRepoInfo.Index, nil)
|
|
if err != nil {
|
|
logrus.Errorf("Unable to create endpoint for %s: %s", mirror, err)
|
|
continue
|
|
}
|
|
if endpoint.Version == 2 {
|
|
if v2MirrorEndpoint == nil {
|
|
v2MirrorEndpoint = endpoint
|
|
v2MirrorRepoInfo = mirrorRepoInfo
|
|
} else {
|
|
// > 1 v2 mirrors given
|
|
return nil, nil, fmt.Errorf("multiple v2 mirrors configured")
|
|
}
|
|
} else {
|
|
v1MirrorCount++
|
|
}
|
|
}
|
|
|
|
if v1MirrorCount == len(mirrors) {
|
|
// OK, but mirrors are v1
|
|
return nil, nil, nil
|
|
}
|
|
if v2MirrorEndpoint != nil && v1MirrorCount == 0 {
|
|
// OK, 1 v2 mirror specified
|
|
return v2MirrorEndpoint, v2MirrorRepoInfo, nil
|
|
}
|
|
if v2MirrorEndpoint != nil && v1MirrorCount > 0 {
|
|
return nil, nil, fmt.Errorf("v1 and v2 mirrors configured")
|
|
}
|
|
// No endpoint could be established with the given mirror configurations
|
|
// Fallback to pulling from the hub as per v1 behavior.
|
|
return nil, nil, nil
|
|
}
|
|
|
|
func (s *TagStore) pullFromV2Mirror(mirrorEndpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo,
|
|
imagePullConfig *ImagePullConfig, tag string, sf *streamformatter.StreamFormatter, logName string) error {
|
|
|
|
tr := transport.NewTransport(
|
|
registry.NewTransport(registry.ReceiveTimeout, mirrorEndpoint.IsSecure),
|
|
registry.DockerHeaders(imagePullConfig.MetaHeaders)...,
|
|
)
|
|
client := registry.HTTPClient(tr)
|
|
mirrorSession, err := registry.NewSession(client, &cliconfig.AuthConfig{}, mirrorEndpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logrus.Debugf("Pulling v2 repository with local name %q from %s", repoInfo.LocalName, mirrorEndpoint.URL)
|
|
if err := s.pullV2Repository(mirrorSession, imagePullConfig.OutStream, repoInfo, tag, sf); err != nil {
|
|
return err
|
|
}
|
|
s.eventsService.Log("pull", logName, "")
|
|
return nil
|
|
}
|
|
|
|
func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, askedTag string, sf *streamformatter.StreamFormatter) error {
|
|
out.Write(sf.FormatStatus("", "Pulling repository %s", repoInfo.CanonicalName))
|
|
|
|
repoData, err := r.GetRepositoryData(repoInfo.RemoteName)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "HTTP code: 404") {
|
|
return fmt.Errorf("Error: image %s not found", utils.ImageReference(repoInfo.RemoteName, askedTag))
|
|
}
|
|
// Unexpected HTTP error
|
|
return err
|
|
}
|
|
|
|
logrus.Debugf("Retrieving the tag list")
|
|
tagsList, err := r.GetRemoteTags(repoData.Endpoints, repoInfo.RemoteName)
|
|
if err != nil {
|
|
logrus.Errorf("unable to get remote tags: %s", err)
|
|
return err
|
|
}
|
|
|
|
for tag, id := range tagsList {
|
|
repoData.ImgList[id] = ®istry.ImgData{
|
|
ID: id,
|
|
Tag: tag,
|
|
Checksum: "",
|
|
}
|
|
}
|
|
|
|
logrus.Debugf("Registering tags")
|
|
// If no tag has been specified, pull them all
|
|
if askedTag == "" {
|
|
for tag, id := range tagsList {
|
|
repoData.ImgList[id].Tag = tag
|
|
}
|
|
} else {
|
|
// Otherwise, check that the tag exists and use only that one
|
|
id, exists := tagsList[askedTag]
|
|
if !exists {
|
|
return fmt.Errorf("Tag %s not found in repository %s", askedTag, repoInfo.CanonicalName)
|
|
}
|
|
repoData.ImgList[id].Tag = askedTag
|
|
}
|
|
|
|
errors := make(chan error)
|
|
|
|
layersDownloaded := false
|
|
for _, image := range repoData.ImgList {
|
|
downloadImage := func(img *registry.ImgData) {
|
|
if askedTag != "" && img.Tag != askedTag {
|
|
errors <- nil
|
|
return
|
|
}
|
|
|
|
if img.Tag == "" {
|
|
logrus.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
|
|
errors <- nil
|
|
return
|
|
}
|
|
|
|
// ensure no two downloads of the same image happen at the same time
|
|
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
|
|
if c != nil {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
<-c
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
|
|
} else {
|
|
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
|
|
}
|
|
errors <- nil
|
|
return
|
|
}
|
|
defer s.poolRemove("pull", "img:"+img.ID)
|
|
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, repoInfo.CanonicalName), nil))
|
|
success := false
|
|
var lastErr, err error
|
|
var isDownloaded bool
|
|
for _, ep := range repoInfo.Index.Mirrors {
|
|
// Ensure endpoint is v1
|
|
ep = ep + "v1/"
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
|
|
if isDownloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
|
|
// Don't report errors when pulling from mirrors.
|
|
logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err)
|
|
continue
|
|
}
|
|
layersDownloaded = layersDownloaded || isDownloaded
|
|
success = true
|
|
break
|
|
}
|
|
if !success {
|
|
for _, ep := range repoData.Endpoints {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, repoInfo.CanonicalName, ep), nil))
|
|
if isDownloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
|
|
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
|
|
// As the error is also given to the output stream the user will see the error.
|
|
lastErr = err
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, repoInfo.CanonicalName, ep, err), nil))
|
|
continue
|
|
}
|
|
layersDownloaded = layersDownloaded || isDownloaded
|
|
success = true
|
|
break
|
|
}
|
|
}
|
|
if !success {
|
|
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, repoInfo.CanonicalName, lastErr)
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil))
|
|
errors <- err
|
|
return
|
|
}
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
|
|
|
|
errors <- nil
|
|
}
|
|
|
|
go downloadImage(image)
|
|
}
|
|
|
|
var lastError error
|
|
for i := 0; i < len(repoData.ImgList); i++ {
|
|
if err := <-errors; err != nil {
|
|
lastError = err
|
|
}
|
|
}
|
|
if lastError != nil {
|
|
return lastError
|
|
}
|
|
|
|
for tag, id := range tagsList {
|
|
if askedTag != "" && tag != askedTag {
|
|
continue
|
|
}
|
|
if err := s.Tag(repoInfo.LocalName, tag, id, true); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
requestedTag := repoInfo.CanonicalName
|
|
if len(askedTag) > 0 {
|
|
requestedTag = utils.ImageReference(repoInfo.CanonicalName, askedTag)
|
|
}
|
|
WriteStatus(requestedTag, out, sf, layersDownloaded)
|
|
return nil
|
|
}
|
|
|
|
func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *streamformatter.StreamFormatter) (bool, error) {
|
|
history, err := r.GetRemoteHistory(imgID, endpoint)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(imgID), "Pulling dependent layers", nil))
|
|
// FIXME: Try to stream the images?
|
|
// FIXME: Launch the getRemoteImage() in goroutines
|
|
|
|
layersDownloaded := false
|
|
for i := len(history) - 1; i >= 0; i-- {
|
|
id := history[i]
|
|
|
|
// ensure no two downloads of the same layer happen at the same time
|
|
if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
|
|
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
|
|
<-c
|
|
}
|
|
defer s.poolRemove("pull", "layer:"+id)
|
|
|
|
if !s.graph.Exists(id) {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil))
|
|
var (
|
|
imgJSON []byte
|
|
imgSize int
|
|
err error
|
|
img *image.Image
|
|
)
|
|
retries := 5
|
|
for j := 1; j <= retries; j++ {
|
|
imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint)
|
|
if err != nil && j == retries {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
|
|
return layersDownloaded, err
|
|
} else if err != nil {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
}
|
|
img, err = image.NewImgJSON(imgJSON)
|
|
layersDownloaded = true
|
|
if err != nil && j == retries {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
|
|
return layersDownloaded, fmt.Errorf("Failed to parse json: %s", err)
|
|
} else if err != nil {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
for j := 1; j <= retries; j++ {
|
|
// Get the layer
|
|
status := "Pulling fs layer"
|
|
if j > 1 {
|
|
status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
|
|
}
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(id), status, nil))
|
|
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, int64(imgSize))
|
|
if uerr, ok := err.(*url.Error); ok {
|
|
err = uerr.Err
|
|
}
|
|
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
} else if err != nil {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil))
|
|
return layersDownloaded, err
|
|
}
|
|
layersDownloaded = true
|
|
defer layer.Close()
|
|
|
|
err = s.graph.Register(img,
|
|
progressreader.New(progressreader.Config{
|
|
In: layer,
|
|
Out: out,
|
|
Formatter: sf,
|
|
Size: imgSize,
|
|
NewLines: false,
|
|
ID: stringid.TruncateID(id),
|
|
Action: "Downloading",
|
|
}))
|
|
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
} else if err != nil {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil))
|
|
return layersDownloaded, err
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil))
|
|
}
|
|
return layersDownloaded, nil
|
|
}
|
|
|
|
func WriteStatus(requestedTag string, out io.Writer, sf *streamformatter.StreamFormatter, layersDownloaded bool) {
|
|
if layersDownloaded {
|
|
out.Write(sf.FormatStatus("", "Status: Downloaded newer image for %s", requestedTag))
|
|
} else {
|
|
out.Write(sf.FormatStatus("", "Status: Image is up to date for %s", requestedTag))
|
|
}
|
|
}
|
|
|
|
func (s *TagStore) pullV2Repository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter) error {
|
|
endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
|
|
if err != nil {
|
|
if repoInfo.Index.Official {
|
|
logrus.Debugf("Unable to pull from V2 registry, falling back to v1: %s", err)
|
|
return ErrV2RegistryUnavailable
|
|
}
|
|
return fmt.Errorf("error getting registry endpoint: %s", err)
|
|
}
|
|
auth, err := r.GetV2Authorization(endpoint, repoInfo.RemoteName, true)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting authorization: %s", err)
|
|
}
|
|
var layersDownloaded bool
|
|
if tag == "" {
|
|
logrus.Debugf("Pulling tag list from V2 registry for %s", repoInfo.CanonicalName)
|
|
tags, err := r.GetV2RemoteTags(endpoint, repoInfo.RemoteName, auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(tags) == 0 {
|
|
return registry.ErrDoesNotExist
|
|
}
|
|
for _, t := range tags {
|
|
if downloaded, err := s.pullV2Tag(r, out, endpoint, repoInfo, t, sf, auth); err != nil {
|
|
return err
|
|
} else if downloaded {
|
|
layersDownloaded = true
|
|
}
|
|
}
|
|
} else {
|
|
if downloaded, err := s.pullV2Tag(r, out, endpoint, repoInfo, tag, sf, auth); err != nil {
|
|
return err
|
|
} else if downloaded {
|
|
layersDownloaded = true
|
|
}
|
|
}
|
|
|
|
requestedTag := repoInfo.CanonicalName
|
|
if len(tag) > 0 {
|
|
requestedTag = utils.ImageReference(repoInfo.CanonicalName, tag)
|
|
}
|
|
WriteStatus(requestedTag, out, sf, layersDownloaded)
|
|
return nil
|
|
}
|
|
|
|
func (s *TagStore) pullV2Tag(r *registry.Session, out io.Writer, endpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter, auth *registry.RequestAuthorization) (bool, error) {
|
|
logrus.Debugf("Pulling tag from V2 registry: %q", tag)
|
|
|
|
remoteDigest, manifestBytes, err := r.GetV2ImageManifest(endpoint, repoInfo.RemoteName, tag, auth)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// loadManifest ensures that the manifest payload has the expected digest
|
|
// if the tag is a digest reference.
|
|
localDigest, manifest, verified, err := s.loadManifest(manifestBytes, tag, remoteDigest)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error verifying manifest: %s", err)
|
|
}
|
|
|
|
if verified {
|
|
logrus.Printf("Image manifest for %s has been verified", utils.ImageReference(repoInfo.CanonicalName, tag))
|
|
}
|
|
out.Write(sf.FormatStatus(tag, "Pulling from %s", repoInfo.CanonicalName))
|
|
|
|
// downloadInfo is used to pass information from download to extractor
|
|
type downloadInfo struct {
|
|
imgJSON []byte
|
|
img *image.Image
|
|
digest digest.Digest
|
|
tmpFile *os.File
|
|
length int64
|
|
downloaded bool
|
|
err chan error
|
|
}
|
|
|
|
downloads := make([]downloadInfo, len(manifest.FSLayers))
|
|
|
|
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
|
|
var (
|
|
sumStr = manifest.FSLayers[i].BlobSum
|
|
imgJSON = []byte(manifest.History[i].V1Compatibility)
|
|
)
|
|
|
|
img, err := image.NewImgJSON(imgJSON)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to parse json: %s", err)
|
|
}
|
|
downloads[i].img = img
|
|
|
|
// Check if exists
|
|
if s.graph.Exists(img.ID) {
|
|
logrus.Debugf("Image already exists: %s", img.ID)
|
|
continue
|
|
}
|
|
|
|
dgst, err := digest.ParseDigest(sumStr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
downloads[i].digest = dgst
|
|
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
|
|
|
|
downloadFunc := func(di *downloadInfo) error {
|
|
logrus.Debugf("pulling blob %q to V1 img %s", sumStr, img.ID)
|
|
|
|
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
|
|
if c != nil {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
<-c
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
|
|
} else {
|
|
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
|
|
}
|
|
} else {
|
|
defer s.poolRemove("pull", "img:"+img.ID)
|
|
tmpFile, err := ioutil.TempFile("", "GetV2ImageBlob")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r, l, err := r.GetV2ImageBlobReader(endpoint, repoInfo.RemoteName, di.digest, auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer r.Close()
|
|
|
|
verifier, err := digest.NewDigestVerifier(di.digest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{
|
|
In: ioutil.NopCloser(io.TeeReader(r, verifier)),
|
|
Out: out,
|
|
Formatter: sf,
|
|
Size: int(l),
|
|
NewLines: false,
|
|
ID: stringid.TruncateID(img.ID),
|
|
Action: "Downloading",
|
|
})); err != nil {
|
|
return fmt.Errorf("unable to copy v2 image blob data: %s", err)
|
|
}
|
|
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Verifying Checksum", nil))
|
|
|
|
if !verifier.Verified() {
|
|
return fmt.Errorf("image layer digest verification failed for %q", di.digest)
|
|
}
|
|
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil))
|
|
|
|
logrus.Debugf("Downloaded %s to tempfile %s", img.ID, tmpFile.Name())
|
|
di.tmpFile = tmpFile
|
|
di.length = l
|
|
di.downloaded = true
|
|
}
|
|
di.imgJSON = imgJSON
|
|
|
|
return nil
|
|
}
|
|
|
|
downloads[i].err = make(chan error)
|
|
go func(di *downloadInfo) {
|
|
di.err <- downloadFunc(di)
|
|
}(&downloads[i])
|
|
}
|
|
|
|
var tagUpdated bool
|
|
for i := len(downloads) - 1; i >= 0; i-- {
|
|
d := &downloads[i]
|
|
if d.err != nil {
|
|
if err := <-d.err; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
if d.downloaded {
|
|
// if tmpFile is empty assume download and extracted elsewhere
|
|
defer os.Remove(d.tmpFile.Name())
|
|
defer d.tmpFile.Close()
|
|
d.tmpFile.Seek(0, 0)
|
|
if d.tmpFile != nil {
|
|
err = s.graph.Register(d.img,
|
|
progressreader.New(progressreader.Config{
|
|
In: d.tmpFile,
|
|
Out: out,
|
|
Formatter: sf,
|
|
Size: int(d.length),
|
|
ID: stringid.TruncateID(d.img.ID),
|
|
Action: "Extracting",
|
|
}))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
|
|
}
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil))
|
|
tagUpdated = true
|
|
} else {
|
|
out.Write(sf.FormatProgress(stringid.TruncateID(d.img.ID), "Already exists", nil))
|
|
}
|
|
|
|
}
|
|
|
|
// Check for new tag if no layers downloaded
|
|
if !tagUpdated {
|
|
repo, err := s.Get(repoInfo.LocalName)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if repo != nil {
|
|
if _, exists := repo[tag]; !exists {
|
|
tagUpdated = true
|
|
}
|
|
} else {
|
|
tagUpdated = true
|
|
}
|
|
}
|
|
|
|
if verified && tagUpdated {
|
|
out.Write(sf.FormatStatus(utils.ImageReference(repoInfo.CanonicalName, tag), "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security."))
|
|
}
|
|
|
|
if localDigest != remoteDigest { // this is not a verification check.
|
|
// NOTE(stevvooe): This is a very defensive branch and should never
|
|
// happen, since all manifest digest implementations use the same
|
|
// algorithm.
|
|
logrus.WithFields(
|
|
logrus.Fields{
|
|
"local": localDigest,
|
|
"remote": remoteDigest,
|
|
}).Debugf("local digest does not match remote")
|
|
|
|
out.Write(sf.FormatStatus("", "Remote Digest: %s", remoteDigest))
|
|
}
|
|
|
|
out.Write(sf.FormatStatus("", "Digest: %s", localDigest))
|
|
|
|
if tag == localDigest.String() {
|
|
// TODO(stevvooe): Ideally, we should always set the digest so we can
|
|
// use the digest whether we pull by it or not. Unfortunately, the tag
|
|
// store treats the digest as a separate tag, meaning there may be an
|
|
// untagged digest image that would seem to be dangling by a user.
|
|
|
|
if err = s.SetDigest(repoInfo.LocalName, localDigest.String(), downloads[0].img.ID); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
if !utils.DigestReference(tag) {
|
|
// only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
|
|
if err = s.Tag(repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
return tagUpdated, nil
|
|
}
|