1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/graph/pull_v2.go
Josh Hawn d80c4244d3 [graph] Use a pipe for downloads to write progress
The process of pulling an image spawns a new goroutine for each layer in the
image manifest. If any of these downloads fail we would stop everything and
return the error, even though other goroutines would still be running and
writing output through a progress reader which is attached to an http response
writer. Since the request handler had already returned from the first error,
the http server panics when one of these download goroutines makes a write to
the response writer buffer.

This patch prevents this crash in the daemon http server by waiting for all of
the download goroutines to complete, even if one of them fails. Only then does
it return, terminating the request handler.

Docker-DCO-1.1-Signed-off-by: Josh Hawn <josh.hawn@docker.com> (github: jlhawn)
2015-08-05 18:13:39 -07:00

439 lines
12 KiB
Go

package graph
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/trust"
"github.com/docker/docker/utils"
"github.com/docker/libtrust"
"golang.org/x/net/context"
)
type v2Puller struct {
*TagStore
endpoint registry.APIEndpoint
config *ImagePullConfig
sf *streamformatter.StreamFormatter
repoInfo *registry.RepositoryInfo
repo distribution.Repository
sessionID string
}
func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
// TODO(tiborvass): was ReceiveTimeout
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
p.sessionID = stringid.GenerateRandomID()
if err := p.pullV2Repository(tag); err != nil {
if registry.ContinueOnError(err) {
logrus.Debugf("Error trying v2 registry: %v", err)
return true, err
}
return false, err
}
return false, nil
}
func (p *v2Puller) pullV2Repository(tag string) (err error) {
var tags []string
taggedName := p.repoInfo.LocalName
if len(tag) > 0 {
tags = []string{tag}
taggedName = utils.ImageReference(p.repoInfo.LocalName, tag)
} else {
var err error
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return err
}
tags, err = manSvc.Tags()
if err != nil {
return err
}
}
c, err := p.poolAdd("pull", taggedName)
if err != nil {
if c != nil {
// Another pull of the same repository is already taking place; just wait for it to finish
p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)
<-c
return nil
}
return err
}
defer p.poolRemove("pull", taggedName)
var layersDownloaded bool
for _, tag := range tags {
// pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
// TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
pulledNew, err := p.pullV2Tag(tag, taggedName)
if err != nil {
return err
}
layersDownloaded = layersDownloaded || pulledNew
}
writeStatus(taggedName, p.config.OutStream, p.sf, layersDownloaded)
return nil
}
// downloadInfo is used to pass information from download to extractor
type downloadInfo struct {
img *image.Image
tmpFile *os.File
digest digest.Digest
layer distribution.ReadSeekCloser
size int64
err chan error
out io.Writer // Download progress is written here.
}
type errVerification struct{}
func (errVerification) Error() string { return "verification failed" }
func (p *v2Puller) download(di *downloadInfo) {
logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID)
out := di.out
if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil {
if c != nil {
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil))
<-c
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
} else {
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", di.img.ID, err)
}
di.err <- nil
return
}
defer p.poolRemove("pull", "img:"+di.img.ID)
tmpFile, err := ioutil.TempFile("", "GetImageBlob")
if err != nil {
di.err <- err
return
}
blobs := p.repo.Blobs(nil)
desc, err := blobs.Stat(nil, di.digest)
if err != nil {
logrus.Debugf("Error statting layer: %v", err)
di.err <- err
return
}
di.size = desc.Size
layerDownload, err := blobs.Open(nil, di.digest)
if err != nil {
logrus.Debugf("Error fetching layer: %v", err)
di.err <- err
return
}
defer layerDownload.Close()
verifier, err := digest.NewDigestVerifier(di.digest)
if err != nil {
di.err <- err
return
}
reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)),
Out: out,
Formatter: p.sf,
Size: di.size,
NewLines: false,
ID: stringid.TruncateID(di.img.ID),
Action: "Downloading",
})
io.Copy(tmpFile, reader)
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil))
if !verifier.Verified() {
err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest)
logrus.Error(err)
di.err <- err
return
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name())
di.tmpFile = tmpFile
di.layer = layerDownload
di.err <- nil
}
func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) {
logrus.Debugf("Pulling tag from V2 registry: %q", tag)
out := p.config.OutStream
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return false, err
}
manifest, err := manSvc.GetByTag(tag)
if err != nil {
return false, err
}
verified, err = p.validateManifest(manifest, tag)
if err != nil {
return false, err
}
if verified {
logrus.Printf("Image manifest for %s has been verified", taggedName)
}
// By using a pipeWriter for each of the downloads to write their progress
// to, we can avoid an issue where this function returns an error but
// leaves behind running download goroutines. By splitting the writer
// with a pipe, we can close the pipe if there is any error, consequently
// causing each download to cancel due to an error writing to this pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
if _, err := io.Copy(out, pipeReader); err != nil {
logrus.Errorf("error copying from layer download progress reader: %s", err)
}
}()
defer func() {
if err != nil {
// All operations on the pipe are synchronous. This call will wait
// until all current readers/writers are done using the pipe then
// set the error. All successive reads/writes will return with this
// error.
pipeWriter.CloseWithError(errors.New("download canceled"))
}
}()
out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
downloads := make([]downloadInfo, len(manifest.FSLayers))
layerIDs := []string{}
defer func() {
p.graph.Release(p.sessionID, layerIDs...)
}()
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
img, err := image.NewImgJSON([]byte(manifest.History[i].V1Compatibility))
if err != nil {
logrus.Debugf("error getting image v1 json: %v", err)
return false, err
}
downloads[i].img = img
downloads[i].digest = manifest.FSLayers[i].BlobSum
p.graph.Retain(p.sessionID, img.ID)
layerIDs = append(layerIDs, img.ID)
// Check if exists
if p.graph.Exists(img.ID) {
logrus.Debugf("Image already exists: %s", img.ID)
continue
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
downloads[i].err = make(chan error)
downloads[i].out = pipeWriter
go p.download(&downloads[i])
}
var tagUpdated bool
for i := len(downloads) - 1; i >= 0; i-- {
d := &downloads[i]
if d.err != nil {
if err := <-d.err; err != nil {
return false, err
}
}
if d.layer != nil {
// if tmpFile is empty assume download and extracted elsewhere
defer os.Remove(d.tmpFile.Name())
defer d.tmpFile.Close()
d.tmpFile.Seek(0, 0)
if d.tmpFile != nil {
reader := progressreader.New(progressreader.Config{
In: d.tmpFile,
Out: out,
Formatter: p.sf,
Size: d.size,
NewLines: false,
ID: stringid.TruncateID(d.img.ID),
Action: "Extracting",
})
err = p.graph.Register(d.img, reader)
if err != nil {
return false, err
}
if err := p.graph.SetDigest(d.img.ID, d.digest); err != nil {
return false, err
}
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
}
out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil))
tagUpdated = true
} else {
out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Already exists", nil))
}
}
manifestDigest, _, err := digestFromManifest(manifest, p.repoInfo.LocalName)
if err != nil {
return false, err
}
// Check for new tag if no layers downloaded
if !tagUpdated {
repo, err := p.Get(p.repoInfo.LocalName)
if err != nil {
return false, err
}
if repo != nil {
if _, exists := repo[tag]; !exists {
tagUpdated = true
}
} else {
tagUpdated = true
}
}
if verified && tagUpdated {
out.Write(p.sf.FormatStatus(p.repo.Name()+":"+tag, "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security."))
}
if utils.DigestReference(tag) {
// TODO(stevvooe): Ideally, we should always set the digest so we can
// use the digest whether we pull by it or not. Unfortunately, the tag
// store treats the digest as a separate tag, meaning there may be an
// untagged digest image that would seem to be dangling by a user.
if err = p.SetDigest(p.repoInfo.LocalName, tag, downloads[0].img.ID); err != nil {
return false, err
}
} else {
// only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
if err = p.Tag(p.repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil {
return false, err
}
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
}
return tagUpdated, nil
}
// verifyTrustedKeys checks the keys provided against the trust store,
// ensuring that the provided keys are trusted for the namespace. The keys
// provided from this method must come from the signatures provided as part of
// the manifest JWS package, obtained from unpackSignedManifest or libtrust.
func (p *v2Puller) verifyTrustedKeys(namespace string, keys []libtrust.PublicKey) (verified bool, err error) {
if namespace[0] != '/' {
namespace = "/" + namespace
}
for _, key := range keys {
b, err := key.MarshalJSON()
if err != nil {
return false, fmt.Errorf("error marshalling public key: %s", err)
}
// Check key has read/write permission (0x03)
v, err := p.trustService.CheckKey(namespace, b, 0x03)
if err != nil {
vErr, ok := err.(trust.NotVerifiedError)
if !ok {
return false, fmt.Errorf("error running key check: %s", err)
}
logrus.Debugf("Key check result: %v", vErr)
}
verified = v
}
if verified {
logrus.Debug("Key check result: verified")
}
return
}
func (p *v2Puller) validateManifest(m *manifest.SignedManifest, tag string) (verified bool, err error) {
// If pull by digest, then verify the manifest digest. NOTE: It is
// important to do this first, before any other content validation. If the
// digest cannot be verified, don't even bother with those other things.
if manifestDigest, err := digest.ParseDigest(tag); err == nil {
verifier, err := digest.NewDigestVerifier(manifestDigest)
if err != nil {
return false, err
}
payload, err := m.Payload()
if err != nil {
return false, err
}
if _, err := verifier.Write(payload); err != nil {
return false, err
}
if !verifier.Verified() {
err := fmt.Errorf("image verification failed for digest %s", manifestDigest)
logrus.Error(err)
return false, err
}
}
// TODO(tiborvass): what's the usecase for having manifest == nil and err == nil ? Shouldn't be the error be "DoesNotExist" ?
if m == nil {
return false, fmt.Errorf("image manifest does not exist for tag %q", tag)
}
if m.SchemaVersion != 1 {
return false, fmt.Errorf("unsupported schema version %d for tag %q", m.SchemaVersion, tag)
}
if len(m.FSLayers) != len(m.History) {
return false, fmt.Errorf("length of history not equal to number of layers for tag %q", tag)
}
if len(m.FSLayers) == 0 {
return false, fmt.Errorf("no FSLayers in manifest for tag %q", tag)
}
keys, err := manifest.Verify(m)
if err != nil {
return false, fmt.Errorf("error verifying manifest for tag %q: %v", tag, err)
}
verified, err = p.verifyTrustedKeys(m.Name, keys)
if err != nil {
return false, fmt.Errorf("error verifying manifest keys: %v", err)
}
return verified, nil
}