mirror of
synced 2022-11-09 12:21:53 -05:00

As this feature requires more testing it is much safter to get the underlying changes into the codebase first then enable the feature in another release after proper testing and verification can be done. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
569 lines
16 KiB
569 lines
16 KiB
package graph
import (
func (s *TagStore) verifyManifest(eng *engine.Engine, manifestBytes []byte) (*registry.ManifestData, bool, error) {
sig, err := libtrust.ParsePrettySignature(manifestBytes, "signatures")
if err != nil {
return nil, false, fmt.Errorf("error parsing payload: %s", err)
keys, err := sig.Verify()
if err != nil {
return nil, false, fmt.Errorf("error verifying payload: %s", err)
payload, err := sig.Payload()
if err != nil {
return nil, false, fmt.Errorf("error retrieving payload: %s", err)
var manifest registry.ManifestData
if err := json.Unmarshal(payload, &manifest); err != nil {
return nil, false, fmt.Errorf("error unmarshalling manifest: %s", err)
var verified bool
for _, key := range keys {
job := eng.Job("trust_key_check")
b, err := key.MarshalJSON()
if err != nil {
return nil, false, fmt.Errorf("error marshalling public key: %s", err)
namespace := manifest.Name
if namespace[0] != '/' {
namespace = "/" + namespace
stdoutBuffer := bytes.NewBuffer(nil)
job.Args = append(job.Args, namespace)
job.Setenv("PublicKey", string(b))
// Check key has read/write permission (0x03)
job.SetenvInt("Permission", 0x03)
if err = job.Run(); err != nil {
return nil, false, fmt.Errorf("error running key check: %s", err)
result := engine.Tail(stdoutBuffer, 1)
log.Debugf("Key check result: %q", result)
if result == "verified" {
verified = true
return &manifest, verified, nil
func (s *TagStore) CmdPull(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 && n != 2 {
return job.Errorf("Usage: %s IMAGE [TAG]", job.Name)
var (
localName = job.Args[0]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
authConfig = ®istry.AuthConfig{}
metaHeaders map[string][]string
mirrors []string
if len(job.Args) > 1 {
tag = job.Args[1]
job.GetenvJson("authConfig", authConfig)
job.GetenvJson("metaHeaders", &metaHeaders)
c, err := s.poolAdd("pull", localName+":"+tag)
if err != nil {
if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish
job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName))
return engine.StatusOK
return job.Error(err)
defer s.poolRemove("pull", localName+":"+tag)
// Resolve the Repository name from fqn to endpoint + name
hostname, remoteName, err := registry.ResolveRepositoryName(localName)
if err != nil {
return job.Error(err)
endpoint, err := registry.NewEndpoint(hostname)
if err != nil {
return job.Error(err)
r, err := registry.NewSession(authConfig, registry.HTTPRequestFactory(metaHeaders), endpoint, true)
if err != nil {
return job.Error(err)
var isOfficial bool
if endpoint.VersionString(1) == registry.IndexServerAddress() {
// If pull "index.docker.io/foo/bar", it's stored locally under "foo/bar"
localName = remoteName
isOfficial = isOfficialName(remoteName)
if isOfficial && strings.IndexRune(remoteName, '/') == -1 {
remoteName = "library/" + remoteName
// Use provided mirrors, if any
mirrors = s.mirrors
if err = s.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel"), mirrors); err != nil {
return job.Error(err)
return engine.StatusOK
func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, localName, remoteName, askedTag string, sf *utils.StreamFormatter, parallel bool, mirrors []string) error {
out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
repoData, err := r.GetRepositoryData(remoteName)
if err != nil {
if strings.Contains(err.Error(), "HTTP code: 404") {
return fmt.Errorf("Error: image %s not found", remoteName)
// Unexpected HTTP error
return err
log.Debugf("Retrieving the tag list")
tagsList, err := r.GetRemoteTags(repoData.Endpoints, remoteName, repoData.Tokens)
if err != nil {
log.Errorf("%v", err)
return err
for tag, id := range tagsList {
repoData.ImgList[id] = ®istry.ImgData{
ID: id,
Tag: tag,
Checksum: "",
log.Debugf("Registering tags")
// If no tag has been specified, pull them all
var imageId string
if askedTag == "" {
for tag, id := range tagsList {
repoData.ImgList[id].Tag = tag
} else {
// Otherwise, check that the tag exists and use only that one
id, exists := tagsList[askedTag]
if !exists {
return fmt.Errorf("Tag %s not found in repository %s", askedTag, localName)
imageId = id
repoData.ImgList[id].Tag = askedTag
errors := make(chan error)
layers_downloaded := false
for _, image := range repoData.ImgList {
downloadImage := func(img *registry.ImgData) {
if askedTag != "" && img.Tag != askedTag {
log.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
if parallel {
errors <- nil
if img.Tag == "" {
log.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
if parallel {
errors <- nil
// ensure no two downloads of the same image happen at the same time
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
if c != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
} else {
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
if parallel {
errors <- nil
defer s.poolRemove("pull", "img:"+img.ID)
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil))
success := false
var lastErr, err error
var is_downloaded bool
if mirrors != nil {
for _, ep := range mirrors {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, localName, ep), nil))
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
// Don't report errors when pulling from mirrors.
log.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, localName, ep, err)
layers_downloaded = layers_downloaded || is_downloaded
success = true
if !success {
for _, ep := range repoData.Endpoints {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil))
if is_downloaded, err = s.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
// It's not ideal that only the last error is returned, it would be better to concatenate the errors.
// As the error is also given to the output stream the user will see the error.
lastErr = err
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil))
layers_downloaded = layers_downloaded || is_downloaded
success = true
if !success {
err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, localName, lastErr)
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), err.Error(), nil))
if parallel {
errors <- err
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
if parallel {
errors <- nil
if parallel {
go downloadImage(image)
} else {
if parallel {
var lastError error
for i := 0; i < len(repoData.ImgList); i++ {
if err := <-errors; err != nil {
lastError = err
if lastError != nil {
return lastError
for tag, id := range tagsList {
if askedTag != "" && id != imageId {
if err := s.Set(localName, tag, id, true); err != nil {
return err
requestedTag := localName
if len(askedTag) > 0 {
requestedTag = localName + ":" + askedTag
WriteStatus(requestedTag, out, sf, layers_downloaded)
return nil
func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) (bool, error) {
history, err := r.GetRemoteHistory(imgID, endpoint, token)
if err != nil {
return false, err
out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil))
// FIXME: Try to stream the images?
// FIXME: Launch the getRemoteImage() in goroutines
layers_downloaded := false
for i := len(history) - 1; i >= 0; i-- {
id := history[i]
// ensure no two downloads of the same layer happen at the same time
if c, err := s.poolAdd("pull", "layer:"+id); err != nil {
log.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err)
defer s.poolRemove("pull", "layer:"+id)
if !s.graph.Exists(id) {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil))
var (
imgJSON []byte
imgSize int
err error
img *image.Image
retries := 5
for j := 1; j <= retries; j++ {
imgJSON, imgSize, err = r.GetRemoteImageJSON(id, endpoint, token)
if err != nil && j == retries {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
return layers_downloaded, err
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
img, err = image.NewImgJSON(imgJSON)
layers_downloaded = true
if err != nil && j == retries {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
return layers_downloaded, fmt.Errorf("Failed to parse json: %s", err)
} else if err != nil {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
} else {
for j := 1; j <= retries; j++ {
// Get the layer
status := "Pulling fs layer"
if j > 1 {
status = fmt.Sprintf("Pulling fs layer [retries: %d]", j)
out.Write(sf.FormatProgress(utils.TruncateID(id), status, nil))
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token, int64(imgSize))
if uerr, ok := err.(*url.Error); ok {
err = uerr.Err
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
} else if err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil))
return layers_downloaded, err
layers_downloaded = true
defer layer.Close()
err = s.graph.Register(img, imgJSON,
utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"))
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
} else if err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil))
return layers_downloaded, err
} else {
out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil))
return layers_downloaded, nil
func WriteStatus(requestedTag string, out io.Writer, sf *utils.StreamFormatter, layers_downloaded bool) {
if layers_downloaded {
out.Write(sf.FormatStatus("", "Status: Downloaded newer image for %s", requestedTag))
} else {
out.Write(sf.FormatStatus("", "Status: Image is up to date for %s", requestedTag))
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
imgJSON []byte
img *image.Image
tmpFile *os.File
length int64
downloaded bool
err chan error
func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, localName, remoteName, tag string, sf *utils.StreamFormatter, parallel bool) error {
if tag == "" {
log.Debugf("Pulling tag list from V2 registry for %s", remoteName)
tags, err := r.GetV2RemoteTags(remoteName, nil)
if err != nil {
return err
for _, t := range tags {
if err := s.pullV2Tag(eng, r, out, localName, remoteName, t, sf, parallel); err != nil {
return err
} else {
if err := s.pullV2Tag(eng, r, out, localName, remoteName, tag, sf, parallel); err != nil {
return err
return nil
func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Writer, localName, remoteName, tag string, sf *utils.StreamFormatter, parallel bool) error {
log.Debugf("Pulling tag from V2 registry: %q", tag)
manifestBytes, err := r.GetV2ImageManifest(remoteName, tag, nil)
if err != nil {
return err
manifest, verified, err := s.verifyManifest(eng, manifestBytes)
if err != nil {
return fmt.Errorf("error verifying manifest: %s", err)
if len(manifest.BlobSums) != len(manifest.History) {
return fmt.Errorf("length of history not equal to number of layers")
if verified {
out.Write(sf.FormatStatus("", "The image you are pulling has been digitally signed by Docker, Inc."))
out.Write(sf.FormatStatus(tag, "Pulling from %s", localName))
downloads := make([]downloadInfo, len(manifest.BlobSums))
for i := len(manifest.BlobSums) - 1; i >= 0; i-- {
var (
sumStr = manifest.BlobSums[i]
imgJSON = []byte(manifest.History[i])
img, err := image.NewImgJSON(imgJSON)
if err != nil {
return fmt.Errorf("failed to parse json: %s", err)
downloads[i].img = img
// Check if exists
if s.graph.Exists(img.ID) {
log.Debugf("Image already exists: %s", img.ID)
chunks := strings.SplitN(sumStr, ":", 2)
if len(chunks) < 2 {
return fmt.Errorf("expected 2 parts in the sumStr, got %#v", chunks)
sumType, checksum := chunks[0], chunks[1]
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling fs layer", nil))
downloadFunc := func(di *downloadInfo) error {
log.Infof("pulling blob %q to V1 img %s", sumStr, img.ID)
if c, err := s.poolAdd("pull", "img:"+img.ID); err != nil {
if c != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil))
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
} else {
log.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
} else {
tmpFile, err := ioutil.TempFile("", "GetV2ImageBlob")
if err != nil {
return err
r, l, err := r.GetV2ImageBlobReader(remoteName, sumType, checksum, nil)
if err != nil {
return err
defer r.Close()
io.Copy(tmpFile, utils.ProgressReader(r, int(l), out, sf, false, utils.TruncateID(img.ID), "Downloading"))
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil))
log.Debugf("Downloaded %s to tempfile %s", img.ID, tmpFile.Name())
di.tmpFile = tmpFile
di.length = l
di.downloaded = true
di.imgJSON = imgJSON
defer s.poolRemove("pull", "img:"+img.ID)
return nil
if parallel {
downloads[i].err = make(chan error)
go func(di *downloadInfo) {
di.err <- downloadFunc(di)
} else {
err := downloadFunc(&downloads[i])
if err != nil {
return err
for i := len(downloads) - 1; i >= 0; i-- {
d := &downloads[i]
if d.err != nil {
err := <-d.err
if err != nil {
return err
if d.downloaded {
// if tmpFile is empty assume download and extracted elsewhere
defer os.Remove(d.tmpFile.Name())
defer d.tmpFile.Close()
d.tmpFile.Seek(0, 0)
if d.tmpFile != nil {
err = s.graph.Register(d.img, d.imgJSON,
utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, utils.TruncateID(d.img.ID), "Extracting"))
if err != nil {
return err
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Pull complete", nil))
} else {
out.Write(sf.FormatProgress(utils.TruncateID(d.img.ID), "Already exists", nil))
if err = s.Set(localName, tag, downloads[0].img.ID, true); err != nil {
return err
return nil