1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/graph/pull_v2.go
Aaron Lehmann 257c59251b Vendor updated version of docker/distribution
This updates the vendored docker/distribution to the current master
branch.

Note the following changes:

- The manifest package was split into manifest/schema1. Most references
  to the manifest package in the engine needed to be updated to use
  schema1 instead.

- Validation functions in api/v2 were replaced by the
  distribution/reference package. The engine code has been updated to
  use the reference package for validation where necessary. A future PR
  will change the engine to use the types defined in
  distribution/reference more comprehensively.

- The reference package explicitly allows double _ characters in
  repository names. registry_test.go was updated for this.

- TestPullFailsWithAlteredManifest was corrupting the manifest JSON, now
  that the schema1 package unmarshals the correct payload. The test is
  being changed to modify the JSON without affecting its length, which
  allows the pull to succeed to the point where digest validation
  happens.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2015-11-02 12:40:18 -08:00

698 lines
20 KiB
Go

package graph
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/broadcaster"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
"golang.org/x/net/context"
)
type v2Puller struct {
*TagStore
endpoint registry.APIEndpoint
config *ImagePullConfig
sf *streamformatter.StreamFormatter
repoInfo *registry.RepositoryInfo
repo distribution.Repository
sessionID string
}
func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
// TODO(tiborvass): was ReceiveTimeout
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
if err != nil {
logrus.Warnf("Error getting v2 registry: %v", err)
return true, err
}
p.sessionID = stringid.GenerateRandomID()
if err := p.pullV2Repository(tag); err != nil {
if registry.ContinueOnError(err) {
logrus.Debugf("Error trying v2 registry: %v", err)
return true, err
}
return false, err
}
return false, nil
}
func (p *v2Puller) pullV2Repository(tag string) (err error) {
var tags []string
taggedName := p.repoInfo.LocalName
if len(tag) > 0 {
tags = []string{tag}
taggedName = utils.ImageReference(p.repoInfo.LocalName, tag)
} else {
var err error
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return err
}
tags, err = manSvc.Tags()
if err != nil {
return err
}
}
poolKey := "v2:" + taggedName
broadcaster, found := p.poolAdd("pull", poolKey)
broadcaster.Add(p.config.OutStream)
if found {
// Another pull of the same repository is already taking place; just wait for it to finish
return broadcaster.Wait()
}
// This must use a closure so it captures the value of err when the
// function returns, not when the 'defer' is evaluated.
defer func() {
p.poolRemoveWithError("pull", poolKey, err)
}()
var layersDownloaded bool
for _, tag := range tags {
// pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
// TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
pulledNew, err := p.pullV2Tag(broadcaster, tag, taggedName)
if err != nil {
return err
}
layersDownloaded = layersDownloaded || pulledNew
}
writeStatus(taggedName, broadcaster, p.sf, layersDownloaded)
return nil
}
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
img contentAddressableDescriptor
imgIndex int
tmpFile *os.File
digest digest.Digest
layer distribution.ReadSeekCloser
size int64
err chan error
poolKey string
broadcaster *broadcaster.Buffered
}
// contentAddressableDescriptor is used to pass image data from a manifest to the
// graph.
type contentAddressableDescriptor struct {
id string
parent string
strongID digest.Digest
compatibilityID string
config []byte
v1Compatibility []byte
}
func newContentAddressableImage(v1Compatibility []byte, blobSum digest.Digest, parent digest.Digest) (contentAddressableDescriptor, error) {
img := contentAddressableDescriptor{
v1Compatibility: v1Compatibility,
}
var err error
img.config, err = image.MakeImageConfig(v1Compatibility, blobSum, parent)
if err != nil {
return img, err
}
img.strongID, err = image.StrongID(img.config)
if err != nil {
return img, err
}
unmarshalledConfig, err := image.NewImgJSON(v1Compatibility)
if err != nil {
return img, err
}
img.compatibilityID = unmarshalledConfig.ID
img.id = img.strongID.Hex()
return img, nil
}
// ID returns the actual ID to be used for the downloaded image. This may be
// a computed ID.
func (img contentAddressableDescriptor) ID() string {
return img.id
}
// Parent returns the parent ID to be used for the image. This may be a
// computed ID.
func (img contentAddressableDescriptor) Parent() string {
return img.parent
}
// MarshalConfig renders the image structure into JSON.
func (img contentAddressableDescriptor) MarshalConfig() ([]byte, error) {
return img.config, nil
}
type errVerification struct{}
func (errVerification) Error() string { return "verification failed" }
func (p *v2Puller) download(di *downloadInfo) {
logrus.Debugf("pulling blob %q to %s", di.digest, di.img.id)
blobs := p.repo.Blobs(context.Background())
desc, err := blobs.Stat(context.Background(), di.digest)
if err != nil {
logrus.Debugf("Error statting layer: %v", err)
di.err <- err
return
}
di.size = desc.Size
layerDownload, err := blobs.Open(context.Background(), di.digest)
if err != nil {
logrus.Debugf("Error fetching layer: %v", err)
di.err <- err
return
}
defer layerDownload.Close()
verifier, err := digest.NewDigestVerifier(di.digest)
if err != nil {
di.err <- err
return
}
reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)),
Out: di.broadcaster,
Formatter: p.sf,
Size: di.size,
NewLines: false,
ID: stringid.TruncateID(di.img.id),
Action: "Downloading",
})
io.Copy(di.tmpFile, reader)
di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.id), "Verifying Checksum", nil))
if !verifier.Verified() {
err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest)
logrus.Error(err)
di.err <- err
return
}
di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.id), "Download complete", nil))
logrus.Debugf("Downloaded %s to tempfile %s", di.img.id, di.tmpFile.Name())
di.layer = layerDownload
di.err <- nil
}
func (p *v2Puller) pullV2Tag(out io.Writer, tag, taggedName string) (tagUpdated bool, err error) {
logrus.Debugf("Pulling tag from V2 registry: %q", tag)
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return false, err
}
unverifiedManifest, err := manSvc.GetByTag(tag)
if err != nil {
return false, err
}
if unverifiedManifest == nil {
return false, fmt.Errorf("image manifest does not exist for tag %q", tag)
}
var verifiedManifest *schema1.Manifest
verifiedManifest, err = verifyManifest(unverifiedManifest, tag)
if err != nil {
return false, err
}
// remove duplicate layers and check parent chain validity
err = fixManifestLayers(verifiedManifest)
if err != nil {
return false, err
}
imgs, err := p.getImageInfos(verifiedManifest)
if err != nil {
return false, err
}
out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
var downloads []*downloadInfo
var layerIDs []string
defer func() {
p.graph.Release(p.sessionID, layerIDs...)
for _, d := range downloads {
p.poolRemoveWithError("pull", d.poolKey, err)
if d.tmpFile != nil {
d.tmpFile.Close()
if err := os.RemoveAll(d.tmpFile.Name()); err != nil {
logrus.Errorf("Failed to remove temp file: %s", d.tmpFile.Name())
}
}
}
}()
for i := len(verifiedManifest.FSLayers) - 1; i >= 0; i-- {
img := imgs[i]
p.graph.Retain(p.sessionID, img.id)
layerIDs = append(layerIDs, img.id)
p.graph.imageMutex.Lock(img.id)
// Check if exists
if p.graph.Exists(img.id) {
if err := p.validateImageInGraph(img.id, imgs, i); err != nil {
p.graph.imageMutex.Unlock(img.id)
return false, fmt.Errorf("image validation failed: %v", err)
}
logrus.Debugf("Image already exists: %s", img.id)
p.graph.imageMutex.Unlock(img.id)
continue
}
p.graph.imageMutex.Unlock(img.id)
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.id), "Pulling fs layer", nil))
d := &downloadInfo{
img: img,
imgIndex: i,
poolKey: "v2layer:" + img.id,
digest: verifiedManifest.FSLayers[i].BlobSum,
// TODO: seems like this chan buffer solved hanging problem in go1.5,
// this can indicate some deeper problem that somehow we never take
// error from channel in loop below
err: make(chan error, 1),
}
tmpFile, err := ioutil.TempFile("", "GetImageBlob")
if err != nil {
return false, err
}
d.tmpFile = tmpFile
downloads = append(downloads, d)
broadcaster, found := p.poolAdd("pull", d.poolKey)
broadcaster.Add(out)
d.broadcaster = broadcaster
if found {
d.err <- nil
} else {
go p.download(d)
}
}
for _, d := range downloads {
if err := <-d.err; err != nil {
return false, err
}
if d.layer == nil {
// Wait for a different pull to download and extract
// this layer.
err = d.broadcaster.Wait()
if err != nil {
return false, err
}
continue
}
d.tmpFile.Seek(0, 0)
err := func() error {
reader := progressreader.New(progressreader.Config{
In: d.tmpFile,
Out: d.broadcaster,
Formatter: p.sf,
Size: d.size,
NewLines: false,
ID: stringid.TruncateID(d.img.id),
Action: "Extracting",
})
p.graph.imagesMutex.Lock()
defer p.graph.imagesMutex.Unlock()
p.graph.imageMutex.Lock(d.img.id)
defer p.graph.imageMutex.Unlock(d.img.id)
// Must recheck the data on disk if any exists.
// This protects against races where something
// else is written to the graph under this ID
// after attemptIDReuse.
if p.graph.Exists(d.img.id) {
if err := p.validateImageInGraph(d.img.id, imgs, d.imgIndex); err != nil {
return fmt.Errorf("image validation failed: %v", err)
}
}
if err := p.graph.register(d.img, reader); err != nil {
return err
}
if err := p.graph.setLayerDigest(d.img.id, d.digest); err != nil {
return err
}
if err := p.graph.setV1CompatibilityConfig(d.img.id, d.img.v1Compatibility); err != nil {
return err
}
return nil
}()
if err != nil {
return false, err
}
d.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.id), "Pull complete", nil))
d.broadcaster.Close()
tagUpdated = true
}
manifestDigest, _, err := digestFromManifest(unverifiedManifest, p.repoInfo.LocalName)
if err != nil {
return false, err
}
// Check for new tag if no layers downloaded
if !tagUpdated {
repo, err := p.Get(p.repoInfo.LocalName)
if err != nil {
return false, err
}
if repo != nil {
if _, exists := repo[tag]; !exists {
tagUpdated = true
}
} else {
tagUpdated = true
}
}
firstID := layerIDs[len(layerIDs)-1]
if utils.DigestReference(tag) {
// TODO(stevvooe): Ideally, we should always set the digest so we can
// use the digest whether we pull by it or not. Unfortunately, the tag
// store treats the digest as a separate tag, meaning there may be an
// untagged digest image that would seem to be dangling by a user.
if err = p.SetDigest(p.repoInfo.LocalName, tag, firstID); err != nil {
return false, err
}
} else {
// only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
if err = p.Tag(p.repoInfo.LocalName, tag, firstID, true); err != nil {
return false, err
}
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
}
return tagUpdated, nil
}
func verifyManifest(signedManifest *schema1.SignedManifest, tag string) (m *schema1.Manifest, err error) {
// If pull by digest, then verify the manifest digest. NOTE: It is
// important to do this first, before any other content validation. If the
// digest cannot be verified, don't even bother with those other things.
if manifestDigest, err := digest.ParseDigest(tag); err == nil {
verifier, err := digest.NewDigestVerifier(manifestDigest)
if err != nil {
return nil, err
}
payload, err := signedManifest.Payload()
if err != nil {
// If this failed, the signatures section was corrupted
// or missing. Treat the entire manifest as the payload.
payload = signedManifest.Raw
}
if _, err := verifier.Write(payload); err != nil {
return nil, err
}
if !verifier.Verified() {
err := fmt.Errorf("image verification failed for digest %s", manifestDigest)
logrus.Error(err)
return nil, err
}
var verifiedManifest schema1.Manifest
if err = json.Unmarshal(payload, &verifiedManifest); err != nil {
return nil, err
}
m = &verifiedManifest
} else {
m = &signedManifest.Manifest
}
if m.SchemaVersion != 1 {
return nil, fmt.Errorf("unsupported schema version %d for tag %q", m.SchemaVersion, tag)
}
if len(m.FSLayers) != len(m.History) {
return nil, fmt.Errorf("length of history not equal to number of layers for tag %q", tag)
}
if len(m.FSLayers) == 0 {
return nil, fmt.Errorf("no FSLayers in manifest for tag %q", tag)
}
return m, nil
}
// fixManifestLayers removes repeated layers from the manifest and checks the
// correctness of the parent chain.
func fixManifestLayers(m *schema1.Manifest) error {
images := make([]*image.Image, len(m.FSLayers))
for i := range m.FSLayers {
img, err := image.NewImgJSON([]byte(m.History[i].V1Compatibility))
if err != nil {
return err
}
images[i] = img
if err := image.ValidateID(img.ID); err != nil {
return err
}
}
if images[len(images)-1].Parent != "" {
return errors.New("Invalid parent ID in the base layer of the image.")
}
// check general duplicates to error instead of a deadlock
idmap := make(map[string]struct{})
var lastID string
for _, img := range images {
// skip IDs that appear after each other, we handle those later
if _, exists := idmap[img.ID]; img.ID != lastID && exists {
return fmt.Errorf("ID %+v appears multiple times in manifest", img.ID)
}
lastID = img.ID
idmap[lastID] = struct{}{}
}
// backwards loop so that we keep the remaining indexes after removing items
for i := len(images) - 2; i >= 0; i-- {
if images[i].ID == images[i+1].ID { // repeated ID. remove and continue
m.FSLayers = append(m.FSLayers[:i], m.FSLayers[i+1:]...)
m.History = append(m.History[:i], m.History[i+1:]...)
} else if images[i].Parent != images[i+1].ID {
return fmt.Errorf("Invalid parent ID. Expected %v, got %v.", images[i+1].ID, images[i].Parent)
}
}
return nil
}
// getImageInfos returns an imageinfo struct for every image in the manifest.
// These objects contain both calculated strongIDs and compatibilityIDs found
// in v1Compatibility object.
func (p *v2Puller) getImageInfos(m *schema1.Manifest) ([]contentAddressableDescriptor, error) {
imgs := make([]contentAddressableDescriptor, len(m.FSLayers))
var parent digest.Digest
for i := len(imgs) - 1; i >= 0; i-- {
var err error
imgs[i], err = newContentAddressableImage([]byte(m.History[i].V1Compatibility), m.FSLayers[i].BlobSum, parent)
if err != nil {
return nil, err
}
parent = imgs[i].strongID
}
p.attemptIDReuse(imgs)
return imgs, nil
}
// attemptIDReuse does a best attempt to match verified compatibilityIDs
// already in the graph with the computed strongIDs so we can keep using them.
// This process will never fail but may just return the strongIDs if none of
// the compatibilityIDs exists or can be verified. If the strongIDs themselves
// fail verification, we deterministically generate alternate IDs to use until
// we find one that's available or already exists with the correct data.
func (p *v2Puller) attemptIDReuse(imgs []contentAddressableDescriptor) {
// This function needs to be protected with a global lock, because it
// locks multiple IDs at once, and there's no good way to make sure
// the locking happens a deterministic order.
p.graph.imagesMutex.Lock()
defer p.graph.imagesMutex.Unlock()
idMap := make(map[string]struct{})
for _, img := range imgs {
idMap[img.id] = struct{}{}
idMap[img.compatibilityID] = struct{}{}
if p.graph.Exists(img.compatibilityID) {
if _, err := p.graph.GenerateV1CompatibilityChain(img.compatibilityID); err != nil {
logrus.Debugf("Migration v1Compatibility generation error: %v", err)
return
}
}
}
for id := range idMap {
p.graph.imageMutex.Lock(id)
defer p.graph.imageMutex.Unlock(id)
}
// continueReuse controls whether the function will try to find
// existing layers on disk under the old v1 IDs, to avoid repulling
// them. The hashes are checked to ensure these layers are okay to
// use. continueReuse starts out as true, but is set to false if
// the code encounters something that doesn't match the expected hash.
continueReuse := true
for i := len(imgs) - 1; i >= 0; i-- {
if p.graph.Exists(imgs[i].id) {
// Found an image in the graph under the strongID. Validate the
// image before using it.
if err := p.validateImageInGraph(imgs[i].id, imgs, i); err != nil {
continueReuse = false
logrus.Debugf("not using existing strongID: %v", err)
// The strong ID existed in the graph but didn't
// validate successfully. We can't use the strong ID
// because it didn't validate successfully. Treat the
// graph like a hash table with probing... compute
// SHA256(id) until we find an ID that either doesn't
// already exist in the graph, or has existing content
// that validates successfully.
for {
if err := p.tryNextID(imgs, i, idMap); err != nil {
logrus.Debug(err.Error())
} else {
break
}
}
}
continue
}
if continueReuse {
compatibilityID := imgs[i].compatibilityID
if err := p.validateImageInGraph(compatibilityID, imgs, i); err != nil {
logrus.Debugf("stopping ID reuse: %v", err)
continueReuse = false
} else {
// The compatibility ID exists in the graph and was
// validated. Use it.
imgs[i].id = compatibilityID
}
}
}
// fix up the parents of the images
for i := 0; i < len(imgs); i++ {
if i == len(imgs)-1 { // Base layer
imgs[i].parent = ""
} else {
imgs[i].parent = imgs[i+1].id
}
}
}
// validateImageInGraph checks that an image in the graph has the expected
// strongID. id is the entry in the graph to check, imgs is the slice of
// images being processed (for access to the parent), and i is the index
// into this slice which the graph entry should be checked against.
func (p *v2Puller) validateImageInGraph(id string, imgs []contentAddressableDescriptor, i int) error {
img, err := p.graph.Get(id)
if err != nil {
return fmt.Errorf("missing: %v", err)
}
layerID, err := p.graph.getLayerDigest(id)
if err != nil {
return fmt.Errorf("digest: %v", err)
}
var parentID digest.Digest
if i != len(imgs)-1 {
if img.Parent != imgs[i+1].id { // comparing that graph points to validated ID
return fmt.Errorf("parent: %v %v", img.Parent, imgs[i+1].id)
}
parentID = imgs[i+1].strongID
} else if img.Parent != "" {
return fmt.Errorf("unexpected parent: %v", img.Parent)
}
v1Config, err := p.graph.getV1CompatibilityConfig(img.ID)
if err != nil {
return fmt.Errorf("v1Compatibility: %v %v", img.ID, err)
}
json, err := image.MakeImageConfig(v1Config, layerID, parentID)
if err != nil {
return fmt.Errorf("make config: %v", err)
}
if dgst, err := image.StrongID(json); err == nil && dgst == imgs[i].strongID {
logrus.Debugf("Validated %v as %v", dgst, id)
} else {
return fmt.Errorf("digest mismatch: %v %v, error: %v", dgst, imgs[i].strongID, err)
}
// All clear
return nil
}
func (p *v2Puller) tryNextID(imgs []contentAddressableDescriptor, i int, idMap map[string]struct{}) error {
nextID, _ := digest.FromBytes([]byte(imgs[i].id))
imgs[i].id = nextID.Hex()
if _, exists := idMap[imgs[i].id]; !exists {
p.graph.imageMutex.Lock(imgs[i].id)
defer p.graph.imageMutex.Unlock(imgs[i].id)
}
if p.graph.Exists(imgs[i].id) {
if err := p.validateImageInGraph(imgs[i].id, imgs, i); err != nil {
return fmt.Errorf("not using existing strongID permutation %s: %v", imgs[i].id, err)
}
}
return nil
}