mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
d80c4244d3
The process of pulling an image spawns a new goroutine for each layer in the image manifest. If any of these downloads fail we would stop everything and return the error, even though other goroutines would still be running and writing output through a progress reader which is attached to an http response writer. Since the request handler had already returned from the first error, the http server panics when one of these download goroutines makes a write to the response writer buffer. This patch prevents this crash in the daemon http server by waiting for all of the download goroutines to complete, even if one of them fails. Only then does it return, terminating the request handler. Docker-DCO-1.1-Signed-off-by: Josh Hawn <josh.hawn@docker.com> (github: jlhawn)
439 lines
12 KiB
Go
439 lines
12 KiB
Go
package graph
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/distribution/digest"
|
|
"github.com/docker/distribution/manifest"
|
|
"github.com/docker/docker/image"
|
|
"github.com/docker/docker/pkg/progressreader"
|
|
"github.com/docker/docker/pkg/streamformatter"
|
|
"github.com/docker/docker/pkg/stringid"
|
|
"github.com/docker/docker/registry"
|
|
"github.com/docker/docker/trust"
|
|
"github.com/docker/docker/utils"
|
|
"github.com/docker/libtrust"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
type v2Puller struct {
|
|
*TagStore
|
|
endpoint registry.APIEndpoint
|
|
config *ImagePullConfig
|
|
sf *streamformatter.StreamFormatter
|
|
repoInfo *registry.RepositoryInfo
|
|
repo distribution.Repository
|
|
sessionID string
|
|
}
|
|
|
|
func (p *v2Puller) Pull(tag string) (fallback bool, err error) {
|
|
// TODO(tiborvass): was ReceiveTimeout
|
|
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)
|
|
if err != nil {
|
|
logrus.Debugf("Error getting v2 registry: %v", err)
|
|
return true, err
|
|
}
|
|
|
|
p.sessionID = stringid.GenerateRandomID()
|
|
|
|
if err := p.pullV2Repository(tag); err != nil {
|
|
if registry.ContinueOnError(err) {
|
|
logrus.Debugf("Error trying v2 registry: %v", err)
|
|
return true, err
|
|
}
|
|
return false, err
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (p *v2Puller) pullV2Repository(tag string) (err error) {
|
|
var tags []string
|
|
taggedName := p.repoInfo.LocalName
|
|
if len(tag) > 0 {
|
|
tags = []string{tag}
|
|
taggedName = utils.ImageReference(p.repoInfo.LocalName, tag)
|
|
} else {
|
|
var err error
|
|
|
|
manSvc, err := p.repo.Manifests(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tags, err = manSvc.Tags()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
|
|
c, err := p.poolAdd("pull", taggedName)
|
|
if err != nil {
|
|
if c != nil {
|
|
// Another pull of the same repository is already taking place; just wait for it to finish
|
|
p.sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", p.repoInfo.CanonicalName)
|
|
<-c
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
defer p.poolRemove("pull", taggedName)
|
|
|
|
var layersDownloaded bool
|
|
for _, tag := range tags {
|
|
// pulledNew is true if either new layers were downloaded OR if existing images were newly tagged
|
|
// TODO(tiborvass): should we change the name of `layersDownload`? What about message in WriteStatus?
|
|
pulledNew, err := p.pullV2Tag(tag, taggedName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
layersDownloaded = layersDownloaded || pulledNew
|
|
}
|
|
|
|
writeStatus(taggedName, p.config.OutStream, p.sf, layersDownloaded)
|
|
|
|
return nil
|
|
}
|
|
|
|
// downloadInfo is used to pass information from download to extractor
|
|
type downloadInfo struct {
|
|
img *image.Image
|
|
tmpFile *os.File
|
|
digest digest.Digest
|
|
layer distribution.ReadSeekCloser
|
|
size int64
|
|
err chan error
|
|
out io.Writer // Download progress is written here.
|
|
}
|
|
|
|
type errVerification struct{}
|
|
|
|
func (errVerification) Error() string { return "verification failed" }
|
|
|
|
func (p *v2Puller) download(di *downloadInfo) {
|
|
logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID)
|
|
|
|
out := di.out
|
|
|
|
if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil {
|
|
if c != nil {
|
|
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Layer already being pulled by another client. Waiting.", nil))
|
|
<-c
|
|
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
|
|
} else {
|
|
logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", di.img.ID, err)
|
|
}
|
|
di.err <- nil
|
|
return
|
|
}
|
|
|
|
defer p.poolRemove("pull", "img:"+di.img.ID)
|
|
tmpFile, err := ioutil.TempFile("", "GetImageBlob")
|
|
if err != nil {
|
|
di.err <- err
|
|
return
|
|
}
|
|
|
|
blobs := p.repo.Blobs(nil)
|
|
|
|
desc, err := blobs.Stat(nil, di.digest)
|
|
if err != nil {
|
|
logrus.Debugf("Error statting layer: %v", err)
|
|
di.err <- err
|
|
return
|
|
}
|
|
di.size = desc.Size
|
|
|
|
layerDownload, err := blobs.Open(nil, di.digest)
|
|
if err != nil {
|
|
logrus.Debugf("Error fetching layer: %v", err)
|
|
di.err <- err
|
|
return
|
|
}
|
|
defer layerDownload.Close()
|
|
|
|
verifier, err := digest.NewDigestVerifier(di.digest)
|
|
if err != nil {
|
|
di.err <- err
|
|
return
|
|
}
|
|
|
|
reader := progressreader.New(progressreader.Config{
|
|
In: ioutil.NopCloser(io.TeeReader(layerDownload, verifier)),
|
|
Out: out,
|
|
Formatter: p.sf,
|
|
Size: di.size,
|
|
NewLines: false,
|
|
ID: stringid.TruncateID(di.img.ID),
|
|
Action: "Downloading",
|
|
})
|
|
io.Copy(tmpFile, reader)
|
|
|
|
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Verifying Checksum", nil))
|
|
|
|
if !verifier.Verified() {
|
|
err = fmt.Errorf("filesystem layer verification failed for digest %s", di.digest)
|
|
logrus.Error(err)
|
|
di.err <- err
|
|
return
|
|
}
|
|
|
|
out.Write(p.sf.FormatProgress(stringid.TruncateID(di.img.ID), "Download complete", nil))
|
|
|
|
logrus.Debugf("Downloaded %s to tempfile %s", di.img.ID, tmpFile.Name())
|
|
di.tmpFile = tmpFile
|
|
di.layer = layerDownload
|
|
|
|
di.err <- nil
|
|
}
|
|
|
|
func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) {
|
|
logrus.Debugf("Pulling tag from V2 registry: %q", tag)
|
|
out := p.config.OutStream
|
|
|
|
manSvc, err := p.repo.Manifests(context.Background())
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
manifest, err := manSvc.GetByTag(tag)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
verified, err = p.validateManifest(manifest, tag)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if verified {
|
|
logrus.Printf("Image manifest for %s has been verified", taggedName)
|
|
}
|
|
|
|
// By using a pipeWriter for each of the downloads to write their progress
|
|
// to, we can avoid an issue where this function returns an error but
|
|
// leaves behind running download goroutines. By splitting the writer
|
|
// with a pipe, we can close the pipe if there is any error, consequently
|
|
// causing each download to cancel due to an error writing to this pipe.
|
|
pipeReader, pipeWriter := io.Pipe()
|
|
go func() {
|
|
if _, err := io.Copy(out, pipeReader); err != nil {
|
|
logrus.Errorf("error copying from layer download progress reader: %s", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if err != nil {
|
|
// All operations on the pipe are synchronous. This call will wait
|
|
// until all current readers/writers are done using the pipe then
|
|
// set the error. All successive reads/writes will return with this
|
|
// error.
|
|
pipeWriter.CloseWithError(errors.New("download canceled"))
|
|
}
|
|
}()
|
|
|
|
out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
|
|
|
|
downloads := make([]downloadInfo, len(manifest.FSLayers))
|
|
|
|
layerIDs := []string{}
|
|
defer func() {
|
|
p.graph.Release(p.sessionID, layerIDs...)
|
|
}()
|
|
|
|
for i := len(manifest.FSLayers) - 1; i >= 0; i-- {
|
|
img, err := image.NewImgJSON([]byte(manifest.History[i].V1Compatibility))
|
|
if err != nil {
|
|
logrus.Debugf("error getting image v1 json: %v", err)
|
|
return false, err
|
|
}
|
|
downloads[i].img = img
|
|
downloads[i].digest = manifest.FSLayers[i].BlobSum
|
|
|
|
p.graph.Retain(p.sessionID, img.ID)
|
|
layerIDs = append(layerIDs, img.ID)
|
|
|
|
// Check if exists
|
|
if p.graph.Exists(img.ID) {
|
|
logrus.Debugf("Image already exists: %s", img.ID)
|
|
continue
|
|
}
|
|
|
|
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
|
|
|
|
downloads[i].err = make(chan error)
|
|
downloads[i].out = pipeWriter
|
|
go p.download(&downloads[i])
|
|
}
|
|
|
|
var tagUpdated bool
|
|
for i := len(downloads) - 1; i >= 0; i-- {
|
|
d := &downloads[i]
|
|
if d.err != nil {
|
|
if err := <-d.err; err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
if d.layer != nil {
|
|
// if tmpFile is empty assume download and extracted elsewhere
|
|
defer os.Remove(d.tmpFile.Name())
|
|
defer d.tmpFile.Close()
|
|
d.tmpFile.Seek(0, 0)
|
|
if d.tmpFile != nil {
|
|
|
|
reader := progressreader.New(progressreader.Config{
|
|
In: d.tmpFile,
|
|
Out: out,
|
|
Formatter: p.sf,
|
|
Size: d.size,
|
|
NewLines: false,
|
|
ID: stringid.TruncateID(d.img.ID),
|
|
Action: "Extracting",
|
|
})
|
|
|
|
err = p.graph.Register(d.img, reader)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if err := p.graph.SetDigest(d.img.ID, d.digest); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// FIXME: Pool release here for parallel tag pull (ensures any downloads block until fully extracted)
|
|
}
|
|
out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Pull complete", nil))
|
|
tagUpdated = true
|
|
} else {
|
|
out.Write(p.sf.FormatProgress(stringid.TruncateID(d.img.ID), "Already exists", nil))
|
|
}
|
|
}
|
|
|
|
manifestDigest, _, err := digestFromManifest(manifest, p.repoInfo.LocalName)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Check for new tag if no layers downloaded
|
|
if !tagUpdated {
|
|
repo, err := p.Get(p.repoInfo.LocalName)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if repo != nil {
|
|
if _, exists := repo[tag]; !exists {
|
|
tagUpdated = true
|
|
}
|
|
} else {
|
|
tagUpdated = true
|
|
}
|
|
}
|
|
|
|
if verified && tagUpdated {
|
|
out.Write(p.sf.FormatStatus(p.repo.Name()+":"+tag, "The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security."))
|
|
}
|
|
|
|
if utils.DigestReference(tag) {
|
|
// TODO(stevvooe): Ideally, we should always set the digest so we can
|
|
// use the digest whether we pull by it or not. Unfortunately, the tag
|
|
// store treats the digest as a separate tag, meaning there may be an
|
|
// untagged digest image that would seem to be dangling by a user.
|
|
if err = p.SetDigest(p.repoInfo.LocalName, tag, downloads[0].img.ID); err != nil {
|
|
return false, err
|
|
}
|
|
} else {
|
|
// only set the repository/tag -> image ID mapping when pulling by tag (i.e. not by digest)
|
|
if err = p.Tag(p.repoInfo.LocalName, tag, downloads[0].img.ID, true); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
if manifestDigest != "" {
|
|
out.Write(p.sf.FormatStatus("", "Digest: %s", manifestDigest))
|
|
}
|
|
|
|
return tagUpdated, nil
|
|
}
|
|
|
|
// verifyTrustedKeys checks the keys provided against the trust store,
|
|
// ensuring that the provided keys are trusted for the namespace. The keys
|
|
// provided from this method must come from the signatures provided as part of
|
|
// the manifest JWS package, obtained from unpackSignedManifest or libtrust.
|
|
func (p *v2Puller) verifyTrustedKeys(namespace string, keys []libtrust.PublicKey) (verified bool, err error) {
|
|
if namespace[0] != '/' {
|
|
namespace = "/" + namespace
|
|
}
|
|
|
|
for _, key := range keys {
|
|
b, err := key.MarshalJSON()
|
|
if err != nil {
|
|
return false, fmt.Errorf("error marshalling public key: %s", err)
|
|
}
|
|
// Check key has read/write permission (0x03)
|
|
v, err := p.trustService.CheckKey(namespace, b, 0x03)
|
|
if err != nil {
|
|
vErr, ok := err.(trust.NotVerifiedError)
|
|
if !ok {
|
|
return false, fmt.Errorf("error running key check: %s", err)
|
|
}
|
|
logrus.Debugf("Key check result: %v", vErr)
|
|
}
|
|
verified = v
|
|
}
|
|
|
|
if verified {
|
|
logrus.Debug("Key check result: verified")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (p *v2Puller) validateManifest(m *manifest.SignedManifest, tag string) (verified bool, err error) {
|
|
// If pull by digest, then verify the manifest digest. NOTE: It is
|
|
// important to do this first, before any other content validation. If the
|
|
// digest cannot be verified, don't even bother with those other things.
|
|
if manifestDigest, err := digest.ParseDigest(tag); err == nil {
|
|
verifier, err := digest.NewDigestVerifier(manifestDigest)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
payload, err := m.Payload()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if _, err := verifier.Write(payload); err != nil {
|
|
return false, err
|
|
}
|
|
if !verifier.Verified() {
|
|
err := fmt.Errorf("image verification failed for digest %s", manifestDigest)
|
|
logrus.Error(err)
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
// TODO(tiborvass): what's the usecase for having manifest == nil and err == nil ? Shouldn't be the error be "DoesNotExist" ?
|
|
if m == nil {
|
|
return false, fmt.Errorf("image manifest does not exist for tag %q", tag)
|
|
}
|
|
if m.SchemaVersion != 1 {
|
|
return false, fmt.Errorf("unsupported schema version %d for tag %q", m.SchemaVersion, tag)
|
|
}
|
|
if len(m.FSLayers) != len(m.History) {
|
|
return false, fmt.Errorf("length of history not equal to number of layers for tag %q", tag)
|
|
}
|
|
if len(m.FSLayers) == 0 {
|
|
return false, fmt.Errorf("no FSLayers in manifest for tag %q", tag)
|
|
}
|
|
keys, err := manifest.Verify(m)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error verifying manifest for tag %q: %v", tag, err)
|
|
}
|
|
verified, err = p.verifyTrustedKeys(m.Name, keys)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error verifying manifest keys: %v", err)
|
|
}
|
|
return verified, nil
|
|
}
|