1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/distribution/push_v2.go
Aaron Lehmann 21278efaee Make TarStream return an io.ReadCloser
Currently, the resources associated with the io.Reader returned by
TarStream are only freed when it is read until EOF. This means that
partial uploads or exports (for example, in the case of a full disk or
severed connection) can leak a goroutine and open file. This commit
changes TarStream to return an io.ReadCloser. Resources are freed when
Close is called.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2015-11-25 16:39:54 -08:00

411 lines
11 KiB
Go

package distribution
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/reference"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/image"
"github.com/docker/docker/image/v1"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/tag"
"golang.org/x/net/context"
)
type v2Pusher struct {
blobSumService *metadata.BlobSumService
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
sf *streamformatter.StreamFormatter
repo distribution.Repository
// layersPushed is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers.
layersPushed map[digest.Digest]bool
}
func (p *v2Pusher) Push() (fallback bool, err error) {
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
localName := p.repoInfo.LocalName.Name()
var associations []tag.Association
if _, isTagged := p.ref.(reference.Tagged); isTagged {
imageID, err := p.config.TagStore.Get(p.ref)
if err != nil {
return false, fmt.Errorf("tag does not exist: %s", p.ref.String())
}
associations = []tag.Association{
{
Ref: p.ref,
ImageID: imageID,
},
}
} else {
// Pull all tags
associations = p.config.TagStore.ReferencesByName(p.ref)
}
if err != nil {
return false, fmt.Errorf("error getting tags for %s: %s", localName, err)
}
if len(associations) == 0 {
return false, fmt.Errorf("no tags to push for %s", localName)
}
for _, association := range associations {
if err := p.pushV2Tag(association); err != nil {
return false, err
}
}
return false, nil
}
func (p *v2Pusher) pushV2Tag(association tag.Association) error {
ref := association.Ref
logrus.Debugf("Pushing repository: %s", ref.String())
img, err := p.config.ImageStore.Get(association.ImageID)
if err != nil {
return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
}
out := p.config.OutStream
var l layer.Layer
topLayerID := img.RootFS.ChainID()
if topLayerID == "" {
l = layer.EmptyLayer
} else {
l, err = p.config.LayerStore.Get(topLayerID)
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err)
}
defer layer.ReleaseAndLog(p.config.LayerStore, l)
}
fsLayers := make(map[layer.DiffID]schema1.FSLayer)
// Push empty layer if necessary
for _, h := range img.History {
if h.EmptyLayer {
dgst, err := p.pushLayerIfNecessary(out, layer.EmptyLayer)
if err != nil {
return err
}
p.layersPushed[dgst] = true
fsLayers[layer.EmptyLayer.DiffID()] = schema1.FSLayer{BlobSum: dgst}
break
}
}
for i := 0; i < len(img.RootFS.DiffIDs); i++ {
dgst, err := p.pushLayerIfNecessary(out, l)
if err != nil {
return err
}
p.layersPushed[dgst] = true
fsLayers[l.DiffID()] = schema1.FSLayer{BlobSum: dgst}
l = l.Parent()
}
var tag string
if tagged, isTagged := ref.(reference.Tagged); isTagged {
tag = tagged.Tag()
}
m, err := CreateV2Manifest(p.repo.Name(), tag, img, fsLayers)
if err != nil {
return err
}
logrus.Infof("Signed manifest for %s using daemon's key: %s", ref.String(), p.config.TrustKey.KeyID())
signed, err := schema1.Sign(m, p.config.TrustKey)
if err != nil {
return err
}
manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
if err != nil {
return err
}
if manifestDigest != "" {
if tagged, isTagged := ref.(reference.Tagged); isTagged {
// NOTE: do not change this format without first changing the trust client
// code. This information is used to determine what was pushed and should be signed.
out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tagged.Tag(), manifestDigest, manifestSize))
}
}
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return err
}
return manSvc.Put(signed)
}
func (p *v2Pusher) pushLayerIfNecessary(out io.Writer, l layer.Layer) (digest.Digest, error) {
logrus.Debugf("Pushing layer: %s", l.DiffID())
// Do we have any blobsums associated with this layer's DiffID?
possibleBlobsums, err := p.blobSumService.GetBlobSums(l.DiffID())
if err == nil {
dgst, exists, err := p.blobSumAlreadyExists(possibleBlobsums)
if err != nil {
out.Write(p.sf.FormatProgress(stringid.TruncateID(string(l.DiffID())), "Image push failed", nil))
return "", err
}
if exists {
out.Write(p.sf.FormatProgress(stringid.TruncateID(string(l.DiffID())), "Layer already exists", nil))
return dgst, nil
}
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then push the blob.
pushDigest, err := p.pushV2Layer(p.repo.Blobs(context.Background()), l)
if err != nil {
return "", err
}
// Cache mapping from this layer's DiffID to the blobsum
if err := p.blobSumService.Add(l.DiffID(), pushDigest); err != nil {
return "", err
}
return pushDigest, nil
}
// blobSumAlreadyExists checks if the registry already know about any of the
// blobsums passed in the "blobsums" slice. If it finds one that the registry
// knows about, it returns the known digest and "true".
func (p *v2Pusher) blobSumAlreadyExists(blobsums []digest.Digest) (digest.Digest, bool, error) {
for _, dgst := range blobsums {
if p.layersPushed[dgst] {
// it is already known that the push is not needed and
// therefore doing a stat is unnecessary
return dgst, true, nil
}
_, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
switch err {
case nil:
return dgst, true, nil
case distribution.ErrBlobUnknown:
// nop
default:
return "", false, err
}
}
return "", false, nil
}
// CreateV2Manifest creates a V2 manifest from an image config and set of
// FSLayer digests.
// FIXME: This should be moved to the distribution repo, since it will also
// be useful for converting new manifests to the old format.
func CreateV2Manifest(name, tag string, img *image.Image, fsLayers map[layer.DiffID]schema1.FSLayer) (*schema1.Manifest, error) {
if len(img.History) == 0 {
return nil, errors.New("empty history when trying to create V2 manifest")
}
// Generate IDs for each layer
// For non-top-level layers, create fake V1Compatibility strings that
// fit the format and don't collide with anything else, but don't
// result in runnable images on their own.
type v1Compatibility struct {
ID string `json:"id"`
Parent string `json:"parent,omitempty"`
Comment string `json:"comment,omitempty"`
Created time.Time `json:"created"`
ContainerConfig struct {
Cmd []string
} `json:"container_config,omitempty"`
ThrowAway bool `json:"throwaway,omitempty"`
}
fsLayerList := make([]schema1.FSLayer, len(img.History))
history := make([]schema1.History, len(img.History))
parent := ""
layerCounter := 0
for i, h := range img.History {
if i == len(img.History)-1 {
break
}
var diffID layer.DiffID
if h.EmptyLayer {
diffID = layer.EmptyLayer.DiffID()
} else {
if len(img.RootFS.DiffIDs) <= layerCounter {
return nil, errors.New("too many non-empty layers in History section")
}
diffID = img.RootFS.DiffIDs[layerCounter]
layerCounter++
}
fsLayer, present := fsLayers[diffID]
if !present {
return nil, fmt.Errorf("missing layer in CreateV2Manifest: %s", diffID.String())
}
dgst, err := digest.FromBytes([]byte(fsLayer.BlobSum.Hex() + " " + parent))
if err != nil {
return nil, err
}
v1ID := dgst.Hex()
v1Compatibility := v1Compatibility{
ID: v1ID,
Parent: parent,
Comment: h.Comment,
Created: h.Created,
}
v1Compatibility.ContainerConfig.Cmd = []string{img.History[i].CreatedBy}
if h.EmptyLayer {
v1Compatibility.ThrowAway = true
}
jsonBytes, err := json.Marshal(&v1Compatibility)
if err != nil {
return nil, err
}
reversedIndex := len(img.History) - i - 1
history[reversedIndex].V1Compatibility = string(jsonBytes)
fsLayerList[reversedIndex] = fsLayer
parent = v1ID
}
latestHistory := img.History[len(img.History)-1]
var diffID layer.DiffID
if latestHistory.EmptyLayer {
diffID = layer.EmptyLayer.DiffID()
} else {
if len(img.RootFS.DiffIDs) <= layerCounter {
return nil, errors.New("too many non-empty layers in History section")
}
diffID = img.RootFS.DiffIDs[layerCounter]
}
fsLayer, present := fsLayers[diffID]
if !present {
return nil, fmt.Errorf("missing layer in CreateV2Manifest: %s", diffID.String())
}
dgst, err := digest.FromBytes([]byte(fsLayer.BlobSum.Hex() + " " + parent + " " + string(img.RawJSON())))
if err != nil {
return nil, err
}
fsLayerList[0] = fsLayer
// Top-level v1compatibility string should be a modified version of the
// image config.
transformedConfig, err := v1.MakeV1ConfigFromConfig(img, dgst.Hex(), parent, latestHistory.EmptyLayer)
if err != nil {
return nil, err
}
history[0].V1Compatibility = string(transformedConfig)
// windows-only baselayer setup
if err := setupBaseLayer(history, *img.RootFS); err != nil {
return nil, err
}
return &schema1.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: name,
Tag: tag,
Architecture: img.Architecture,
FSLayers: fsLayerList,
History: history,
}, nil
}
func rawJSON(value interface{}) *json.RawMessage {
jsonval, err := json.Marshal(value)
if err != nil {
return nil
}
return (*json.RawMessage)(&jsonval)
}
func (p *v2Pusher) pushV2Layer(bs distribution.BlobService, l layer.Layer) (digest.Digest, error) {
out := p.config.OutStream
displayID := stringid.TruncateID(string(l.DiffID()))
out.Write(p.sf.FormatProgress(displayID, "Preparing", nil))
arch, err := l.TarStream()
if err != nil {
return "", err
}
defer arch.Close()
// Send the layer
layerUpload, err := bs.Create(context.Background())
if err != nil {
return "", err
}
defer layerUpload.Close()
// don't care if this fails; best effort
size, _ := l.DiffSize()
reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(arch), // we'll take care of close here.
Out: out,
Formatter: p.sf,
Size: size,
NewLines: false,
ID: displayID,
Action: "Pushing",
})
compressedReader := compress(reader)
digester := digest.Canonical.New()
tee := io.TeeReader(compressedReader, digester.Hash())
out.Write(p.sf.FormatProgress(displayID, "Pushing", nil))
nn, err := layerUpload.ReadFrom(tee)
compressedReader.Close()
if err != nil {
return "", err
}
dgst := digester.Digest()
if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
return "", err
}
logrus.Debugf("uploaded layer %s (%s), %d bytes", l.DiffID(), dgst, nn)
out.Write(p.sf.FormatProgress(displayID, "Pushed", nil))
return dgst, nil
}