From 629815b42472635c87ec6ce9ed4ec37019ae4ffa Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 27 Jan 2015 18:10:28 -0800 Subject: [PATCH] Buffer tar file on v2 push fixes #10312 fixes #10306 Signed-off-by: Derek McGowan (github: dmcgowan) --- graph/graph.go | 22 +++++++++++++++++++++ graph/push.go | 52 ++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/graph/graph.go b/graph/graph.go index 30bea0470f..f7b9fc4f10 100644 --- a/graph/graph.go +++ b/graph/graph.go @@ -223,6 +223,28 @@ func (graph *Graph) Mktemp(id string) (string, error) { return dir, nil } +func (graph *Graph) newTempFile() (*os.File, error) { + tmp, err := graph.Mktemp("") + if err != nil { + return nil, err + } + return ioutil.TempFile(tmp, "") +} + +func bufferToFile(f *os.File, src io.Reader) (int64, error) { + n, err := io.Copy(f, src) + if err != nil { + return n, err + } + if err = f.Sync(); err != nil { + return n, err + } + if _, err := f.Seek(0, 0); err != nil { + return n, err + } + return n, nil +} + // setupInitLayer populates a directory with mountpoints suitable // for bind-mounting dockerinit into the container. The mountpoint is simply an // empty file at /.dockerinit diff --git a/graph/push.go b/graph/push.go index d3f3596e06..fafa41b9bf 100644 --- a/graph/push.go +++ b/graph/push.go @@ -322,16 +322,6 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out return fmt.Errorf("Failed to parse json: %s", err) } - img, err = s.graph.Get(img.ID) - if err != nil { - return err - } - - arch, err := img.TarLayer() - if err != nil { - return fmt.Errorf("Could not get tar layer: %s", err) - } - // Call mount blob exists, err := r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, auth) if err != nil { @@ -340,12 +330,9 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out } if !exists { - err = r.PutV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, utils.ProgressReader(arch, int(img.Size), out, sf, false, utils.TruncateID(img.ID), "Pushing"), auth) - if err != nil { - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil)) + if err := s.PushV2Image(r, img, endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, sf, out, auth); err != nil { return err } - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil)) } else { out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil)) } @@ -355,6 +342,43 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out return r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth) } +// PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk +func (s *TagStore) PushV2Image(r *registry.Session, img *image.Image, endpoint *registry.Endpoint, imageName, sumType, sumStr string, sf *utils.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) error { + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Buffering to Disk", nil)) + + image, err := s.graph.Get(img.ID) + if err != nil { + return err + } + arch, err := image.TarLayer() + if err != nil { + return err + } + tf, err := s.graph.newTempFile() + if err != nil { + return err + } + defer func() { + tf.Close() + os.Remove(tf.Name()) + }() + + size, err := bufferToFile(tf, arch) + if err != nil { + return err + } + + // Send the layer + log.Debugf("rendered layer for %s of [%d] size", img.ID, size) + + if err := r.PutV2ImageBlob(endpoint, imageName, sumType, sumStr, utils.ProgressReader(tf, int(size), out, sf, false, utils.TruncateID(img.ID), "Pushing"), auth); err != nil { + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil)) + return err + } + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil)) + return nil +} + // FIXME: Allow to interrupt current push when new push of same image is done. func (s *TagStore) CmdPush(job *engine.Job) engine.Status { if n := len(job.Args); n != 1 {