Merge pull request #15353 from jlhawn/fixing-concurrency-trust

[graph] Use a pipe for downloads to write progress
This commit is contained in:
Alexander Morozov 2015-08-05 18:52:27 -07:00
commit af9dc3050b
2 changed files with 28 additions and 4 deletions

View File

@ -1,6 +1,7 @@
package graph
import (
"errors"
"fmt"
"io"
"io/ioutil"
@ -108,6 +109,7 @@ type downloadInfo struct {
layer distribution.ReadSeekCloser
size int64
err chan error
out io.Writer // Download progress is written here.
}
type errVerification struct{}
@ -117,7 +119,7 @@ func (errVerification) Error() string { return "verification failed" }
func (p *v2Puller) download(di *downloadInfo) {
logrus.Debugf("pulling blob %q to %s", di.digest, di.img.ID)
out := p.config.OutStream
out := di.out
if c, err := p.poolAdd("pull", "img:"+di.img.ID); err != nil {
if c != nil {
@ -191,7 +193,7 @@ func (p *v2Puller) download(di *downloadInfo) {
di.err <- nil
}
func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
func (p *v2Puller) pullV2Tag(tag, taggedName string) (verified bool, err error) {
logrus.Debugf("Pulling tag from V2 registry: %q", tag)
out := p.config.OutStream
@ -204,7 +206,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
if err != nil {
return false, err
}
verified, err := p.validateManifest(manifest, tag)
verified, err = p.validateManifest(manifest, tag)
if err != nil {
return false, err
}
@ -212,6 +214,27 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
logrus.Printf("Image manifest for %s has been verified", taggedName)
}
// By using a pipeWriter for each of the downloads to write their progress
// to, we can avoid an issue where this function returns an error but
// leaves behind running download goroutines. By splitting the writer
// with a pipe, we can close the pipe if there is any error, consequently
// causing each download to cancel due to an error writing to this pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
if _, err := io.Copy(out, pipeReader); err != nil {
logrus.Errorf("error copying from layer download progress reader: %s", err)
}
}()
defer func() {
if err != nil {
// All operations on the pipe are synchronous. This call will wait
// until all current readers/writers are done using the pipe then
// set the error. All successive reads/writes will return with this
// error.
pipeWriter.CloseWithError(errors.New("download canceled"))
}
}()
out.Write(p.sf.FormatStatus(tag, "Pulling from %s", p.repo.Name()))
downloads := make([]downloadInfo, len(manifest.FSLayers))
@ -242,6 +265,7 @@ func (p *v2Puller) pullV2Tag(tag, taggedName string) (bool, error) {
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pulling fs layer", nil))
downloads[i].err = make(chan error)
downloads[i].out = pipeWriter
go p.download(&downloads[i])
}

View File

@ -446,7 +446,7 @@ func (s *DockerRegistrySuite) TestPullFailsWithAlteredManifest(c *check.C) {
imageReference := fmt.Sprintf("%s@%s", repoName, manifestDigest)
out, exitStatus, _ := dockerCmdWithError("pull", imageReference)
if exitStatus == 0 {
c.Fatalf("expected a zero exit status but got %d: %s", exitStatus, out)
c.Fatalf("expected a non-zero exit status but got %d: %s", exitStatus, out)
}
expectedErrorMsg := fmt.Sprintf("image verification failed for digest %s", manifestDigest)