mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
1c62e0ae4e
As this feature requires more testing it is much safter to get the underlying changes into the codebase first then enable the feature in another release after proper testing and verification can be done. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
569 lines
16 KiB
Go
569 lines
16 KiB
Go
package graph
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/docker/docker/engine"
|
|
"github.com/docker/docker/image"
|
|
"github.com/docker/docker/pkg/log"
|
|
"github.com/docker/docker/registry"
|
|
"github.com/docker/docker/utils"
|
|
"github.com/docker/libtrust"
|
|
)
|
|
|
|
func (s *TagStore) verifyManifest(eng *engine.Engine, manifestBytes []byte) (*registry.ManifestData, bool, error) {
|
|
sig, err := libtrust.ParsePrettySignature(manifestBytes, "signatures")
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("error parsing payload: %s", err)
|
|
}
|
|
keys, err := sig.Verify()
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("error verifying payload: %s", err)
|
|
}
|
|
|
|
payload, err := sig.Payload()
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("error retrieving payload: %s", err)
|
|
}
|
|
|
|
var manifest registry.ManifestData
|
|
if err := json.Unmarshal(payload, &manifest); err != nil {
|
|
return nil, false, fmt.Errorf("error unmarshalling manifest: %s", err)
|
|
}
|
|
|
|
var verified bool
|
|
for _, key := range keys {
|
|
job := eng.Job("trust_key_check")
|
|
b, err := key.MarshalJSON()
|
|
if err != nil {
|
|
return nil, false, fmt.Errorf("error marshalling public key: %s", err)
|
|
}
|
|
namespace := manifest.Name
|
|
if namespace[0] != '/' {
|
|
namespace = "/" + namespace
|
|
}
|
|
stdoutBuffer := bytes.NewBuffer(nil)
|
|
|
|
job.Args = append(job.Args, namespace)
|
|
job.Setenv("PublicKey", string(b))
|
|
// Check key has read/write permission (0x03)
|
|
job.SetenvInt("Permission", 0x03)
|
|
job.Stdout.Add(stdoutBuffer)
|
|
if err = job.Run(); err != nil {
|
|
return nil, false, fmt.Errorf("error running key check: %s", err)
|
|
}
|
|
result := engine.Tail(stdoutBuffer, 1)
|
|
log.Debugf("Key check result: %q", result)
|
|
if result == "verified" {
|
|
verified = true
|
|
}
|
|
}
|
|
|
|
return &manifest, verified, nil
|
|
}
|
|
|
|
func (s *TagStore) CmdPull(job *engine.Job) engine.Status {
|
|
if n := len(job.Args); n != 1 && n != 2 {
|
|
return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
|
|
}
|
|
|
|
var (
|
|
localName = job.Args[0]
|
|
tag string
|
|
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
|
|
authConfig = ®istry.AuthConfig{}
|
|
metaHeaders map[string][]string
|
|
mirrors []string
|
|
)
|
|
|
|
if len(job.Args) > 1 {
|
|
tag = job.Args[1]
|
|
}
|
|
|
|
job.GetenvJson("authConfig", authConfig)
|
|
job.GetenvJson("metaHeaders", &metaHeaders)
|
|
|
|
c, err := s.poolAdd("pull", localName+":"+tag)
|
|
if err != nil {
|
|
if c != nil {
|
|
// Another pull of the same repository is already taking place; just wait for it to finish
|
|
job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
|
|
<-c
|
|
return engine.StatusOK
|
|
}
|
|
return job.Error(err)
|
|
}
|
|
defer s.poolRemove("pull", localName+":"+tag)
|
|
|
|
// Resolve the Repository name from fqn to endpoint + name
|
|
hostname, remoteName, err := registry.ResolveRepositoryName(localName)
|
|
if err != nil {
|
|
return job.Error(err)
|
|
}
|
|
|
|
endpoint, err := registry.NewEndpoint(hostname)
|
|
if err != nil {
|
|
return job.Error(err)
|
|
}
|
|
|
|
r, err := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
|
|
if err != nil {
|
|
return job.Error(err)
|
|
}
|
|
|
|
var isOfficial bool
|
|
if endpoint.VersionString(1) == registry.IndexServerAddress() {
|
|
// If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
|
|
localName = remoteName
|
|
|
|
isOfficial = isOfficialName(remoteName)
|
|
if isOfficial && strings.IndexRune(remoteName, '/') == -1 {
|
|
remoteName = "library/" + remoteName
|
|
}
|
|
|
|
// Use provided mirrors, if any
|
|
mirrors = s.mirrors
|
|
}
|
|
|
|
if err = s.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel"), mirrors); err != nil {
|
|
return job.Error(err)
|
|
}
|
|
|
|
return engine.StatusOK
|
|
}
|
|
|
|
func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool, mirrors []string) error {
|
|
out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
|
|
|
|
repoData, err := r.GetRepositoryData(remoteName)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "HTTP code: 404") {
|
|
return fmt.Errorf("Error: image %s not found", remoteName)
|
|
}
|
|
// Unexpected HTTP error
|
|
return err
|
|
}
|
|
|
|
log.Debugf("Retrieving the tag list")
|
|
tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
|
|
if err != nil {
|
|
log.Errorf("%v", err)
|
|
return err
|
|
}
|
|
|
|
for tag, id := range tagsList {
|
|
repoData.ImgList[id] = ®istry.ImgData{
|
|
ID: id,
|
|
Tag: tag,
|
|
Checksum: "",
|
|
}
|
|
}
|
|
|
|
log.Debugf("Registering tags")
|
|
// If no tag has been specified, pull them all
|
|
var imageId string
|
|
if askedTag == "" {
|
|
for tag, id := range tagsList {
|
|
repoData.ImgList[id].Tag = tag
|
|
}
|
|
} else {
|
|
// Otherwise, check that the tag exists and use only that one
|
|
id, exists := tagsList[askedTag]
|
|
if !exists {
|
|
return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
|
|
}
|
|
imageId = id
|
|
repoData.ImgList[id].Tag = askedTag
|
|
}
|
|
|
|
errors := make(chan error)
|
|
|
|
layers_downloaded := false
|
|
for _, image := range repoData.ImgList {
|
|
downloadImage := func(img *registry.ImgData) {
|
|
if askedTag != "" && img.Tag != askedTag {
|
|
log.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
|
|
if parallel {
|
|
errors <- nil
|
|
}
|
|
return
|
|
}
|
|
|
|
if img.Tag == "" {
|
|
log.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
|
|
if parallel {
|
|
errors <- nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// ensure no two downloads of the same image happen at the same time
|
|
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
|
|
if c != nil {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
<-c
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
} else {
|
|
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
|
|
}
|
|
if parallel {
|
|
errors <- nil
|
|
}
|
|
return
|
|
}
|
|
defer s.poolRemove("pull", "img:"+img.ID)
|
|
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
|
|
success := false
|
|
var lastErr, err error
|
|
var is_downloaded bool
|
|
if mirrors != nil {
|
|
for _, ep := range mirrors {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, localName, ep), nil))
|
|
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
|
|
// Don't report errors when pulling from mirrors.
|
|
log.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, localName, ep, err)
|
|
continue
|
|
}
|
|
layers_downloaded = layers_downloaded || is_downloaded
|
|
success = true
|
|
break
|
|
}
|
|
}
|
|
if !success {
|
|
for _, ep := range repoData.Endpoints {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
|
|
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
|
|
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
|
|
// As the error is also given to the output stream the user will see the error.
|
|
lastErr = err
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
|
|
continue
|
|
}
|
|
layers_downloaded = layers_downloaded || is_downloaded
|
|
success = true
|
|
break
|
|
}
|
|
}
|
|
if !success {
|
|
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr)
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
|
|
if parallel {
|
|
errors <- err
|
|
return
|
|
}
|
|
}
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
|
|
if parallel {
|
|
errors <- nil
|
|
}
|
|
}
|
|
|
|
if parallel {
|
|
go downloadImage(image)
|
|
} else {
|
|
downloadImage(image)
|
|
}
|
|
}
|
|
if parallel {
|
|
var lastError error
|
|
for i := 0; i < len(repoData.ImgList); i++ {
|
|
if err := <-errors; err != nil {
|
|
lastError = err
|
|
}
|
|
}
|
|
if lastError != nil {
|
|
return lastError
|
|
}
|
|
|
|
}
|
|
for tag, id := range tagsList {
|
|
if askedTag != "" && id != imageId {
|
|
continue
|
|
}
|
|
if err := s.Set(localName, tag, id, true); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
requestedTag := localName
|
|
if len(askedTag) > 0 {
|
|
requestedTag = localName + ":" + askedTag
|
|
}
|
|
WriteStatus(requestedTag, out, sf, layers_downloaded)
|
|
return nil
|
|
}
|
|
|
|
func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) (bool, error) {
|
|
history, err := r.GetRemoteHistory(imgID, endpoint, token)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
|
|
// FIXME: Try to stream the images?
|
|
// FIXME: Launch the getRemoteImage() in goroutines
|
|
|
|
layers_downloaded := false
|
|
for i := len(history) - 1; i >= 0; i-- {
|
|
id := history[i]
|
|
|
|
// ensure no two downloads of the same layer happen at the same time
|
|
if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
|
|
log.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
|
|
<-c
|
|
}
|
|
defer s.poolRemove("pull", "layer:"+id)
|
|
|
|
if !s.graph.Exists(id) {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
|
|
var (
|
|
imgJSON []byte
|
|
imgSize int
|
|
err error
|
|
img *image.Image
|
|
)
|
|
retries := 5
|
|
for j := 1; j <= retries; j++ {
|
|
imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
|
|
if err != nil && j == retries {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
|
return layers_downloaded, err
|
|
} else if err != nil {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
}
|
|
img, err = image.NewImgJSON(imgJSON)
|
|
layers_downloaded = true
|
|
if err != nil && j == retries {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
|
return layers_downloaded, fmt.Errorf("Failed to parse json: %s", err)
|
|
} else if err != nil {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
for j := 1; j <= retries; j++ {
|
|
// Get the layer
|
|
status := "Pulling fs layer"
|
|
if j > 1 {
|
|
status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
|
|
}
|
|
out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
|
|
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
|
|
if uerr, ok := err.(*url.Error); ok {
|
|
err = uerr.Err
|
|
}
|
|
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
} else if err != nil {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
|
|
return layers_downloaded, err
|
|
}
|
|
layers_downloaded = true
|
|
defer layer.Close()
|
|
|
|
err = s.graph.Register(img, imgJSON,
|
|
utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"))
|
|
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
|
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
|
continue
|
|
} else if err != nil {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
|
|
return layers_downloaded, err
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
|
|
}
|
|
return layers_downloaded, nil
|
|
}
|
|
|
|
func WriteStatus(requestedTag string, out io.Writer, sf *utils.StreamFormatter, layers_downloaded bool) {
|
|
if layers_downloaded {
|
|
out.Write(sf.FormatStatus("", "Status: Downloaded newer image for %s", requestedTag))
|
|
} else {
|
|
out.Write(sf.FormatStatus("", "Status: Image is up to date for %s", requestedTag))
|
|
}
|
|
}
|
|
|
|
// downloadInfo is used to pass information from download to extractor
|
|
type downloadInfo struct {
|
|
imgJSON []byte
|
|
img *image.Image
|
|
tmpFile *os.File
|
|
length int64
|
|
downloaded bool
|
|
err chan error
|
|
}
|
|
|
|
func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, localName, remoteName, tag string, sf *utils.StreamFormatter, parallel bool) error {
|
|
if tag == "" {
|
|
log.Debugf("Pulling tag list from V2 registry for %s", remoteName)
|
|
tags, err := r.GetV2RemoteTags(remoteName, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, t := range tags {
|
|
if err := s.pullV2Tag(eng, r, out, localName, remoteName, t, sf, parallel); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
if err := s.pullV2Tag(eng, r, out, localName, remoteName, tag, sf, parallel); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Writer, localName, remoteName, tag string, sf *utils.StreamFormatter, parallel bool) error {
|
|
log.Debugf("Pulling tag from V2 registry: %q", tag)
|
|
manifestBytes, err := r.GetV2ImageManifest(remoteName, tag, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
manifest, verified, err := s.verifyManifest(eng, manifestBytes)
|
|
if err != nil {
|
|
return fmt.Errorf("error verifying manifest: %s", err)
|
|
}
|
|
|
|
if len(manifest.BlobSums) != len(manifest.History) {
|
|
return fmt.Errorf("length of history not equal to number of layers")
|
|
}
|
|
|
|
if verified {
|
|
out.Write(sf.FormatStatus("", "The image you are pulling has been digitally signed by Docker, Inc."))
|
|
}
|
|
out.Write(sf.FormatStatus(tag, "Pulling from %s", localName))
|
|
|
|
downloads := make([]downloadInfo, len(manifest.BlobSums))
|
|
|
|
for i := len(manifest.BlobSums) - 1; i >= 0; i-- {
|
|
var (
|
|
sumStr = manifest.BlobSums[i]
|
|
imgJSON = []byte(manifest.History[i])
|
|
)
|
|
|
|
img, err := image.NewImgJSON(imgJSON)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse json: %s", err)
|
|
}
|
|
downloads[i].img = img
|
|
|
|
// Check if exists
|
|
if s.graph.Exists(img.ID) {
|
|
log.Debugf("Image already exists: %s", img.ID)
|
|
continue
|
|
}
|
|
|
|
chunks := strings.SplitN(sumStr, ":", 2)
|
|
if len(chunks) < 2 {
|
|
return fmt.Errorf("expected 2 parts in the sumStr, got %#v", chunks)
|
|
}
|
|
sumType, checksum := chunks[0], chunks[1]
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling fs layer", nil))
|
|
|
|
downloadFunc := func(di *downloadInfo) error {
|
|
log.Infof("pulling blob %q to V1 img %s", sumStr, img.ID)
|
|
|
|
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
|
|
if c != nil {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
<-c
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
} else {
|
|
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
|
|
}
|
|
} else {
|
|
tmpFile, err := ioutil.TempFile("", "GetV2ImageBlob")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r, l, err := r.GetV2ImageBlobReader(remoteName, sumType, checksum, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer r.Close()
|
|
io.Copy(tmpFile, utils.ProgressReader(r, int(l), out, sf, false, utils.TruncateID(img.ID), "Downloading"))
|
|
|
|
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
|
|
|
|
log.Debugf("Downloaded %s to tempfile %s", img.ID, tmpFile.Name())
|
|
di.tmpFile = tmpFile
|
|
di.length = l
|
|
di.downloaded = true
|
|
}
|
|
di.imgJSON = imgJSON
|
|
defer s.poolRemove("pull", "img:"+img.ID)
|
|
|
|
return nil
|
|
}
|
|
|
|
if parallel {
|
|
downloads[i].err = make(chan error)
|
|
go func(di *downloadInfo) {
|
|
di.err <- downloadFunc(di)
|
|
}(&downloads[i])
|
|
} else {
|
|
err := downloadFunc(&downloads[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
for i := len(downloads) - 1; i >= 0; i-- {
|
|
d := &downloads[i]
|
|
if d.err != nil {
|
|
err := <-d.err
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if d.downloaded {
|
|
// if tmpFile is empty assume download and extracted elsewhere
|
|
defer os.Remove(d.tmpFile.Name())
|
|
defer d.tmpFile.Close()
|
|
d.tmpFile.Seek(0, 0)
|
|
if d.tmpFile != nil {
|
|
err = s.graph.Register(d.img, d.imgJSON,
|
|
utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, utils.TruncateID(d.img.ID), "Extracting"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
|
|
}
|
|
out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Pull complete", nil))
|
|
|
|
} else {
|
|
out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Already exists", nil))
|
|
}
|
|
|
|
}
|
|
|
|
if err = s.Set(localName, tag, downloads[0].img.ID, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|