package graph import ( "encoding/json" "errors" "fmt" "io" "io/ioutil" "os" "runtime" "github.com/Sirupsen/logrus" "github.com/docker/distribution" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest/schema1" "github.com/docker/docker/image" "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/registry" "github.com/docker/docker/utils" "golang.org/x/net/context" ) type v2Puller struct { *TagStore endpoint registry.APIEndpoint config *ImagePullConfig sf *streamformatter.StreamFormatter repoInfo *registry.RepositoryInfo repo distribution.Repository sessionID string } func (p *v2Puller) Pull(tag string) (fallback bool, err error) { // TODO(tiborvass): was ReceiveTimeout p.repo, err = newV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull") if err != nil { logrus.Warnf("Error getting v2 registry: %v", err) return true, err } p.sessionID = stringid.GenerateRandomID() if err := p.pullV2Repository(tag); err != nil { if registry.ContinueOnError(err) { logrus.Debugf("Error trying v2 registry: %v", err) return true, err } return false, err } return false, nil } func (p *v2Puller) pullV2Repository(tag string) (err error) { var tags []string taggedName := p.repoInfo.LocalName if len(tag) > 0 { tags = []string{tag} taggedName = utils.ImageReference(p.repoInfo.LocalName, tag) } else { var err error manSvc, err := p.repo.Manifests(context.Background()) if err != nil { return err } tags, err = manSvc.Tags() if err != nil { return err } } poolKey := "v2:" + taggedName broadcaster, found := p.poolAdd("pull", poolKey) broadcaster.Add(p.config.OutStream) if found { // Another pull of the same repository is already taking place; just wait for it to finish return broadcaster.Wait() } // This must use a closure so it captures the value of err when the // function returns, not when the 'defer' is evaluated. defer func() { p.poolRemoveWithError("pull", poolKey, err) }() var layersDownloaded bool for _, tag := range tags { // pulledNew is true if either new layers were downloaded OR if existing images were newly tagged // TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus? pulledNew, err := p.pullV2Tag(broadcaster, tag, taggedName) if err != nil { return err } layersDownloaded = layersDownloaded || pulledNew } writeStatus(taggedName, broadcaster, p.sf, layersDownloaded) return nil } // downloadInfo is used to pass information from download to extractor type downloadInfo struct { img contentAddressableDescriptor imgIndex int tmpFile *os.File digest digest.Digest layer distribution.ReadSeekCloser size int64 err chan error poolKey string broadcaster *broadcaster.Buffered } // contentAddressableDescriptor is used to pass image data from a manifest to the // graph. type contentAddressableDescriptor struct { id string parent string strongID digest.Digest compatibilityID string config []byte v1Compatibility []byte } func newContentAddressableImage(v1Compatibility []byte, blobSum digest.Digest, parent digest.Digest) (contentAddressableDescriptor, error) { img := contentAddressableDescriptor{ v1Compatibility: v1Compatibility, } var err error img.config, err = image.MakeImageConfig(v1Compatibility, blobSum, parent) if err != nil { return img, err } img.strongID, err = image.StrongID(img.config) if err != nil { return img, err } unmarshalledConfig, err := image.NewImgJSON(v1Compatibility) if err != nil { return img, err } img.compatibilityID = unmarshalledConfig.ID img.id = img.strongID.Hex() return img, nil } // ID returns the actual ID to be used for the downloaded image. This may be // a computed ID. func (img contentAddressableDescriptor) ID() string { return img.id } // Parent returns the parent ID to be used for the image. This may be a // computed ID. func (img contentAddressableDescriptor) Parent() string { return img.parent } // MarshalConfig renders the image structure into JSON. func (img contentAddressableDescriptor) MarshalConfig() ([]byte, error) { return img.config, nil } type errVerification struct{} func (errVerification) Error() string { return "verification failed" } func (p *v2Puller) download(di *downloadInfo) { logrus.Debugf("pulling blob %q to %s", di.digest, di.img.id) blobs := p.repo.Blobs(context.Background()) desc, err := blobs.Stat(context.Background(), di.digest) if err != nil { logrus.Debugf("Error statting layer: %v", err) di.err <- err return } di.size = desc.Size layerDownload, err := blobs.Open(context.Background(), di.digest) if err != nil { logrus.Debugf("Error fetching layer: %v", err) di.err <- err return } defer layerDownload.Close() verifier, err := digest.NewDigestVerifier(di.digest) if err != nil { di.err <- err return } reader := progressreader.New(progressreader.Config{ In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)), Out: di.broadcaster, Formatter: p.sf, Size: di.size, NewLines: false, ID: stringid.TruncateID(di.img.id), Action: "Downloading", }) io.Copy(di.tmpFile, reader) di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.id), "Verifying Checksum", nil)) if !verifier.Verified() { err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest) logrus.Error(err) di.err <- err return } di.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.id), "Download complete", nil)) logrus.Debugf("Downloaded %s to tempfile %s", di.img.id, di.tmpFile.Name()) di.layer = layerDownload di.err <- nil } func (p *v2Puller) pullV2Tag(out io.Writer, tag, taggedName string) (tagUpdated bool, err error) { logrus.Debugf("Pulling tag from V2 registry: %q", tag) manSvc, err := p.repo.Manifests(context.Background()) if err != nil { return false, err } unverifiedManifest, err := manSvc.GetByTag(tag) if err != nil { return false, err } if unverifiedManifest == nil { return false, fmt.Errorf("image manifest does not exist for tag %q", tag) } var verifiedManifest *schema1.Manifest verifiedManifest, err = verifyManifest(unverifiedManifest, tag) if err != nil { return false, err } // remove duplicate layers and check parent chain validity err = fixManifestLayers(verifiedManifest) if err != nil { return false, err } imgs, err := p.getImageInfos(verifiedManifest) if err != nil { return false, err } out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name())) var downloads []*downloadInfo var layerIDs []string defer func() { p.graph.Release(p.sessionID, layerIDs...) for _, d := range downloads { p.poolRemoveWithError("pull", d.poolKey, err) if d.tmpFile != nil { d.tmpFile.Close() if err := os.RemoveAll(d.tmpFile.Name()); err != nil { logrus.Errorf("Failed to remove temp file: %s", d.tmpFile.Name()) } } } }() for i := len(verifiedManifest.FSLayers) - 1; i >= 0; i-- { img := imgs[i] p.graph.Retain(p.sessionID, img.id) layerIDs = append(layerIDs, img.id) p.graph.imageMutex.Lock(img.id) // Check if exists if p.graph.Exists(img.id) { if err := p.validateImageInGraph(img.id, imgs, i); err != nil { p.graph.imageMutex.Unlock(img.id) return false, fmt.Errorf("image validation failed: %v", err) } logrus.Debugf("Image already exists: %s", img.id) p.graph.imageMutex.Unlock(img.id) continue } p.graph.imageMutex.Unlock(img.id) out.Write(p.sf.FormatProgress(stringid.TruncateID(img.id), "Pulling fs layer", nil)) d := &downloadInfo{ img: img, imgIndex: i, poolKey: "v2layer:" + img.id, digest: verifiedManifest.FSLayers[i].BlobSum, // TODO: seems like this chan buffer solved hanging problem in go1.5, // this can indicate some deeper problem that somehow we never take // error from channel in loop below err: make(chan error, 1), } tmpFile, err := ioutil.TempFile("", "GetImageBlob") if err != nil { return false, err } d.tmpFile = tmpFile downloads = append(downloads, d) broadcaster, found := p.poolAdd("pull", d.poolKey) broadcaster.Add(out) d.broadcaster = broadcaster if found { d.err <- nil } else { go p.download(d) } } for _, d := range downloads { if err := <-d.err; err != nil { return false, err } if d.layer == nil { // Wait for a different pull to download and extract // this layer. err = d.broadcaster.Wait() if err != nil { return false, err } continue } d.tmpFile.Seek(0, 0) err := func() error { reader := progressreader.New(progressreader.Config{ In: d.tmpFile, Out: d.broadcaster, Formatter: p.sf, Size: d.size, NewLines: false, ID: stringid.TruncateID(d.img.id), Action: "Extracting", }) p.graph.imagesMutex.Lock() defer p.graph.imagesMutex.Unlock() p.graph.imageMutex.Lock(d.img.id) defer p.graph.imageMutex.Unlock(d.img.id) // Must recheck the data on disk if any exists. // This protects against races where something // else is written to the graph under this ID // after attemptIDReuse. if p.graph.Exists(d.img.id) { if err := p.validateImageInGraph(d.img.id, imgs, d.imgIndex); err != nil { return fmt.Errorf("image validation failed: %v", err) } } if err := p.graph.register(d.img, reader); err != nil { return err } if err := p.graph.setLayerDigest(d.img.id, d.digest); err != nil { return err } if err := p.graph.setV1CompatibilityConfig(d.img.id, d.img.v1Compatibility); err != nil { return err } return nil }() if err != nil { return false, err } d.broadcaster.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.id), "Pull complete", nil)) d.broadcaster.Close() tagUpdated = true } manifestDigest, _, err := digestFromManifest(unverifiedManifest, p.repoInfo.LocalName) if err != nil { return false, err } // Check for new tag if no layers downloaded if !tagUpdated { repo, err := p.get(p.repoInfo.LocalName) if err != nil { return false, err } if repo != nil { if _, exists := repo[tag]; !exists { tagUpdated = true } } else { tagUpdated = true } } firstID := layerIDs[len(layerIDs)-1] if utils.DigestReference(tag) { // TODO(stevvooe): Ideally, we should always set the digest so we can // use the digest whether we pull by it or not. Unfortunately, the tag // store treats the digest as a separate tag, meaning there may be an // untagged digest image that would seem to be dangling by a user. if err = p.setDigest(p.repoInfo.LocalName, tag, firstID); err != nil { return false, err } } else { // only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest) if err = p.Tag(p.repoInfo.LocalName, tag, firstID, true); err != nil { return false, err } } if manifestDigest != "" { out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest)) } return tagUpdated, nil } func verifyManifest(signedManifest *schema1.SignedManifest, tag string) (m *schema1.Manifest, err error) { // If pull by digest, then verify the manifest digest. NOTE: It is // important to do this first, before any other content validation. If the // digest cannot be verified, don't even bother with those other things. if manifestDigest, err := digest.ParseDigest(tag); err == nil { verifier, err := digest.NewDigestVerifier(manifestDigest) if err != nil { return nil, err } payload, err := signedManifest.Payload() if err != nil { // If this failed, the signatures section was corrupted // or missing. Treat the entire manifest as the payload. payload = signedManifest.Raw } if _, err := verifier.Write(payload); err != nil { return nil, err } if !verifier.Verified() { err := fmt.Errorf("image verification failed for digest %s", manifestDigest) logrus.Error(err) return nil, err } var verifiedManifest schema1.Manifest if err = json.Unmarshal(payload, &verifiedManifest); err != nil { return nil, err } m = &verifiedManifest } else { m = &signedManifest.Manifest } if m.SchemaVersion != 1 { return nil, fmt.Errorf("unsupported schema version %d for tag %q", m.SchemaVersion, tag) } if len(m.FSLayers) != len(m.History) { return nil, fmt.Errorf("length of history not equal to number of layers for tag %q", tag) } if len(m.FSLayers) == 0 { return nil, fmt.Errorf("no FSLayers in manifest for tag %q", tag) } return m, nil } // fixManifestLayers removes repeated layers from the manifest and checks the // correctness of the parent chain. func fixManifestLayers(m *schema1.Manifest) error { images := make([]*image.Image, len(m.FSLayers)) for i := range m.FSLayers { img, err := image.NewImgJSON([]byte(m.History[i].V1Compatibility)) if err != nil { return err } images[i] = img if err := image.ValidateID(img.ID); err != nil { return err } } if images[len(images)-1].Parent != "" && !allowBaseParentImage { // Windows base layer can point to a base layer parent that is not in manifest. return errors.New("Invalid parent ID in the base layer of the image.") } // check general duplicates to error instead of a deadlock idmap := make(map[string]struct{}) var lastID string for _, img := range images { // skip IDs that appear after each other, we handle those later if _, exists := idmap[img.ID]; img.ID != lastID && exists { return fmt.Errorf("ID %+v appears multiple times in manifest", img.ID) } lastID = img.ID idmap[lastID] = struct{}{} } // backwards loop so that we keep the remaining indexes after removing items for i := len(images) - 2; i >= 0; i-- { if images[i].ID == images[i+1].ID { // repeated ID. remove and continue m.FSLayers = append(m.FSLayers[:i], m.FSLayers[i+1:]...) m.History = append(m.History[:i], m.History[i+1:]...) } else if images[i].Parent != images[i+1].ID { return fmt.Errorf("Invalid parent ID. Expected %v, got %v.", images[i+1].ID, images[i].Parent) } } return nil } // getImageInfos returns an imageinfo struct for every image in the manifest. // These objects contain both calculated strongIDs and compatibilityIDs found // in v1Compatibility object. func (p *v2Puller) getImageInfos(m *schema1.Manifest) ([]contentAddressableDescriptor, error) { imgs := make([]contentAddressableDescriptor, len(m.FSLayers)) var parent digest.Digest for i := len(imgs) - 1; i >= 0; i-- { var err error imgs[i], err = newContentAddressableImage([]byte(m.History[i].V1Compatibility), m.FSLayers[i].BlobSum, parent) if err != nil { return nil, err } parent = imgs[i].strongID } p.attemptIDReuse(imgs) // reset the base layer parent for windows if allowBaseParentImage { var base struct{ Parent string } if err := json.Unmarshal(imgs[len(imgs)-1].v1Compatibility, &base); err != nil { return nil, err } if base.Parent != "" { imgs[len(imgs)-1].parent = base.Parent } } return imgs, nil } // attemptIDReuse does a best attempt to match verified compatibilityIDs // already in the graph with the computed strongIDs so we can keep using them. // This process will never fail but may just return the strongIDs if none of // the compatibilityIDs exists or can be verified. If the strongIDs themselves // fail verification, we deterministically generate alternate IDs to use until // we find one that's available or already exists with the correct data. func (p *v2Puller) attemptIDReuse(imgs []contentAddressableDescriptor) { // This function needs to be protected with a global lock, because it // locks multiple IDs at once, and there's no good way to make sure // the locking happens a deterministic order. p.graph.imagesMutex.Lock() defer p.graph.imagesMutex.Unlock() idMap := make(map[string]struct{}) for _, img := range imgs { idMap[img.id] = struct{}{} idMap[img.compatibilityID] = struct{}{} if p.graph.Exists(img.compatibilityID) { if _, err := p.graph.generateV1CompatibilityChain(img.compatibilityID); err != nil { logrus.Debugf("Migration v1Compatibility generation error: %v", err) return } } } for id := range idMap { p.graph.imageMutex.Lock(id) defer p.graph.imageMutex.Unlock(id) } // continueReuse controls whether the function will try to find // existing layers on disk under the old v1 IDs, to avoid repulling // them. The hashes are checked to ensure these layers are okay to // use. continueReuse starts out as true, but is set to false if // the code encounters something that doesn't match the expected hash. continueReuse := true for i := len(imgs) - 1; i >= 0; i-- { // TODO - (swernli:11-16-2015) Skipping content addressable IDs on // Windows as a hack for TP4 compat. The correct fix is to ensure that // Windows layers do not have anything in them that takes a dependency // on the ID of the layer in the management client. This will be fixed // in Windows post-TP4. if runtime.GOOS == "windows" { imgs[i].id = imgs[i].compatibilityID } if p.graph.Exists(imgs[i].id) { // Found an image in the graph under the strongID. Validate the // image before using it. if err := p.validateImageInGraph(imgs[i].id, imgs, i); err != nil { continueReuse = false logrus.Debugf("not using existing strongID: %v", err) // The strong ID existed in the graph but didn't // validate successfully. We can't use the strong ID // because it didn't validate successfully. Treat the // graph like a hash table with probing... compute // SHA256(id) until we find an ID that either doesn't // already exist in the graph, or has existing content // that validates successfully. for { if err := p.tryNextID(imgs, i, idMap); err != nil { logrus.Debug(err.Error()) } else { break } } } continue } if continueReuse { compatibilityID := imgs[i].compatibilityID if err := p.validateImageInGraph(compatibilityID, imgs, i); err != nil { logrus.Debugf("stopping ID reuse: %v", err) continueReuse = false } else { // The compatibility ID exists in the graph and was // validated. Use it. imgs[i].id = compatibilityID } } } // fix up the parents of the images for i := 0; i < len(imgs); i++ { if i == len(imgs)-1 { // Base layer imgs[i].parent = "" } else { imgs[i].parent = imgs[i+1].id } } } // validateImageInGraph checks that an image in the graph has the expected // strongID. id is the entry in the graph to check, imgs is the slice of // images being processed (for access to the parent), and i is the index // into this slice which the graph entry should be checked against. func (p *v2Puller) validateImageInGraph(id string, imgs []contentAddressableDescriptor, i int) error { img, err := p.graph.Get(id) if err != nil { return fmt.Errorf("missing: %v", err) } if runtime.GOOS == "windows" { // TODO - (swernli:11-16-2015) Skipping content addressable IDs on // Windows as a hack for TP4 compat. The correct fix is to ensure that // Windows layers do not have anything in them that takes a dependency // on the ID of the layer in the management client. This will be fixed // in Windows post-TP4. return nil } layerID, err := p.graph.getLayerDigest(id) if err != nil { return fmt.Errorf("digest: %v", err) } var parentID digest.Digest if i != len(imgs)-1 { if img.Parent != imgs[i+1].id { // comparing that graph points to validated ID return fmt.Errorf("parent: %v %v", img.Parent, imgs[i+1].id) } parentID = imgs[i+1].strongID } else if img.Parent != "" { return fmt.Errorf("unexpected parent: %v", img.Parent) } v1Config, err := p.graph.getV1CompatibilityConfig(img.ID) if err != nil { return fmt.Errorf("v1Compatibility: %v %v", img.ID, err) } json, err := image.MakeImageConfig(v1Config, layerID, parentID) if err != nil { return fmt.Errorf("make config: %v", err) } if dgst, err := image.StrongID(json); err == nil && dgst == imgs[i].strongID { logrus.Debugf("Validated %v as %v", dgst, id) } else { return fmt.Errorf("digest mismatch: %v %v, error: %v", dgst, imgs[i].strongID, err) } // All clear return nil } func (p *v2Puller) tryNextID(imgs []contentAddressableDescriptor, i int, idMap map[string]struct{}) error { nextID, _ := digest.FromBytes([]byte(imgs[i].id)) imgs[i].id = nextID.Hex() if _, exists := idMap[imgs[i].id]; !exists { p.graph.imageMutex.Lock(imgs[i].id) defer p.graph.imageMutex.Unlock(imgs[i].id) } if p.graph.Exists(imgs[i].id) { if err := p.validateImageInGraph(imgs[i].id, imgs, i); err != nil { return fmt.Errorf("not using existing strongID permutation %s: %v", imgs[i].id, err) } } return nil }