1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/graph/push_v2.go
Tonis Tiigi 504e67b867 Calculate hash based image IDs on pull
Generate a hash chain involving the image configuration, layer digests,
and parent image hashes. Use the digests to compute IDs for each image
in a manifest, instead of using the remotely specified IDs.

To avoid breaking users' caches, check for images already in the graph
under old IDs, and avoid repulling an image if the version on disk under
the legacy ID ends up with the same digest that was computed from the
manifest for that image.

When a calculated ID already exists in the graph but can't be verified,
continue trying SHA256(digest) until a suitable ID is found.

"save" and "load" are not changed to use a similar scheme. "load" will
preserve the IDs present in the tar file.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2015-10-12 10:51:28 -07:00

272 lines
7.2 KiB
Go

package graph
import (
"fmt"
"io"
"io/ioutil"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
"golang.org/x/net/context"
)
type v2Pusher struct {
*TagStore
endpoint registry.APIEndpoint
localRepo Repository
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
sf *streamformatter.StreamFormatter
repo distribution.Repository
// layersPushed is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers.
layersPushed map[digest.Digest]bool
}
func (p *v2Pusher) Push() (fallback bool, err error) {
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
return false, p.pushV2Repository(p.config.Tag)
}
func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) {
logrus.Debugf("Checking %q against %#v", askedTag, p.localRepo)
if len(askedTag) > 0 {
if _, ok := p.localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
return nil, fmt.Errorf("Tag does not exist for %s", askedTag)
}
return []string{askedTag}, nil
}
var tags []string
for tag := range p.localRepo {
if !utils.DigestReference(tag) {
tags = append(tags, tag)
}
}
return tags, nil
}
func (p *v2Pusher) pushV2Repository(tag string) error {
localName := p.repoInfo.LocalName
if _, found := p.poolAdd("push", localName); found {
return fmt.Errorf("push or pull %s is already in progress", localName)
}
defer p.poolRemove("push", localName)
tags, err := p.getImageTags(tag)
if err != nil {
return fmt.Errorf("error getting tags for %s: %s", localName, err)
}
if len(tags) == 0 {
return fmt.Errorf("no tags to push for %s", localName)
}
for _, tag := range tags {
if err := p.pushV2Tag(tag); err != nil {
return err
}
}
return nil
}
func (p *v2Pusher) pushV2Tag(tag string) error {
logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)
layerID, exists := p.localRepo[tag]
if !exists {
return fmt.Errorf("tag does not exist: %s", tag)
}
layersSeen := make(map[string]bool)
layer, err := p.graph.Get(layerID)
if err != nil {
return err
}
m := &manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: p.repo.Name(),
Tag: tag,
Architecture: layer.Architecture,
FSLayers: []manifest.FSLayer{},
History: []manifest.History{},
}
var metadata runconfig.Config
if layer != nil && layer.Config != nil {
metadata = *layer.Config
}
out := p.config.OutStream
for ; layer != nil; layer, err = p.graph.GetParent(layer) {
if err != nil {
return err
}
// break early if layer has already been seen in this image,
// this prevents infinite loops on layers which loopback, this
// cannot be prevented since layer IDs are not merkle hashes
// TODO(dmcgowan): throw error if no valid use case is found
if layersSeen[layer.ID] {
break
}
logrus.Debugf("Pushing layer: %s", layer.ID)
if layer.Config != nil && metadata.Image != layer.ID {
if err := runconfig.Merge(&metadata, layer.Config); err != nil {
return err
}
}
var exists bool
dgst, err := p.graph.GetLayerDigest(layer.ID)
switch err {
case nil:
if p.layersPushed[dgst] {
exists = true
// break out of switch, it is already known that
// the push is not needed and therefore doing a
// stat is unnecessary
break
}
_, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
switch err {
case nil:
exists = true
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
case distribution.ErrBlobUnknown:
// nop
default:
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
return err
}
case ErrDigestNotSet:
// nop
case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:
return fmt.Errorf("error getting image checksum: %v", err)
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then fetch it.
if !exists {
if pushDigest, err := p.pushV2Image(p.repo.Blobs(context.Background()), layer); err != nil {
return err
} else if pushDigest != dgst {
// Cache new checksum
if err := p.graph.SetLayerDigest(layer.ID, pushDigest); err != nil {
return err
}
dgst = pushDigest
}
}
// read v1Compatibility config, generate new if needed
jsonData, err := p.graph.GenerateV1CompatibilityChain(layer.ID)
if err != nil {
return err
}
m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})
m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})
layersSeen[layer.ID] = true
p.layersPushed[dgst] = true
}
logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())
signed, err := manifest.Sign(m, p.trustKey)
if err != nil {
return err
}
manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
if err != nil {
return err
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tag, manifestDigest, manifestSize))
}
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return err
}
return manSvc.Put(signed)
}
func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (digest.Digest, error) {
out := p.config.OutStream
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Preparing", nil))
image, err := p.graph.Get(img.ID)
if err != nil {
return "", err
}
arch, err := p.graph.TarLayer(image)
if err != nil {
return "", err
}
defer arch.Close()
// Send the layer
layerUpload, err := bs.Create(context.Background())
if err != nil {
return "", err
}
defer layerUpload.Close()
digester := digest.Canonical.New()
tee := io.TeeReader(arch, digester.Hash())
reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(tee), // we'll take care of close here.
Out: out,
Formatter: p.sf,
// TODO(stevvooe): This may cause a size reporting error. Try to get
// this from tar-split or elsewhere. The main issue here is that we
// don't want to buffer to disk *just* to calculate the size.
Size: img.Size,
NewLines: false,
ID: stringid.TruncateID(img.ID),
Action: "Pushing",
})
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushing", nil))
nn, err := io.Copy(layerUpload, reader)
if err != nil {
return "", err
}
dgst := digester.Digest()
if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
return "", err
}
logrus.Debugf("uploaded layer %s (%s), %d bytes", img.ID, dgst, nn)
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushed", nil))
return dgst, nil
}