Merge pull request #10372 from dmcgowan/v2-registry-buffer-push

Buffer tar file on v2 push
This commit is contained in:
Arnaud Porterie 2015-01-28 08:47:21 -08:00
commit 2cb82c11cf
2 changed files with 60 additions and 14 deletions

View File

@ -223,6 +223,28 @@ func (graph *Graph) Mktemp(id string) (string, error) {
return dir, nil
}
func (graph *Graph) newTempFile() (*os.File, error) {
tmp, err := graph.Mktemp("")
if err != nil {
return nil, err
}
return ioutil.TempFile(tmp, "")
}
func bufferToFile(f *os.File, src io.Reader) (int64, error) {
n, err := io.Copy(f, src)
if err != nil {
return n, err
}
if err = f.Sync(); err != nil {
return n, err
}
if _, err := f.Seek(0, 0); err != nil {
return n, err
}
return n, nil
}
// setupInitLayer populates a directory with mountpoints suitable
// for bind-mounting dockerinit into the container. The mountpoint is simply an
// empty file at /.dockerinit

View File

@ -322,16 +322,6 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out
return fmt.Errorf("Failed to parse json: %s", err)
}
img, err = s.graph.Get(img.ID)
if err != nil {
return err
}
arch, err := img.TarLayer()
if err != nil {
return fmt.Errorf("Could not get tar layer: %s", err)
}
// Call mount blob
exists, err := r.HeadV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, auth)
if err != nil {
@ -340,12 +330,9 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out
}
if !exists {
err = r.PutV2ImageBlob(endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, utils.ProgressReader(arch, int(img.Size), out, sf, false, utils.TruncateID(img.ID), "Pushing"), auth)
if err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
if err := s.PushV2Image(r, img, endpoint, repoInfo.RemoteName, sumParts[0], manifestSum, sf, out, auth); err != nil {
return err
}
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil))
} else {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image already exists", nil))
}
@ -355,6 +342,43 @@ func (s *TagStore) pushV2Repository(r *registry.Session, eng *engine.Engine, out
return r.PutV2ImageManifest(endpoint, repoInfo.RemoteName, tag, bytes.NewReader([]byte(manifestBytes)), auth)
}
// PushV2Image pushes the image content to the v2 registry, first buffering the contents to disk
func (s *TagStore) PushV2Image(r *registry.Session, img *image.Image, endpoint *registry.Endpoint, imageName, sumType, sumStr string, sf *utils.StreamFormatter, out io.Writer, auth *registry.RequestAuthorization) error {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Buffering to Disk", nil))
image, err := s.graph.Get(img.ID)
if err != nil {
return err
}
arch, err := image.TarLayer()
if err != nil {
return err
}
tf, err := s.graph.newTempFile()
if err != nil {
return err
}
defer func() {
tf.Close()
os.Remove(tf.Name())
}()
size, err := bufferToFile(tf, arch)
if err != nil {
return err
}
// Send the layer
log.Debugf("rendered layer for %s of [%d] size", img.ID, size)
if err := r.PutV2ImageBlob(endpoint, imageName, sumType, sumStr, utils.ProgressReader(tf, int(size), out, sf, false, utils.TruncateID(img.ID), "Pushing"), auth); err != nil {
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image push failed", nil))
return err
}
out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Image successfully pushed", nil))
return nil
}
// FIXME: Allow to interrupt current push when new push of same image is done.
func (s *TagStore) CmdPush(job *engine.Job) engine.Status {
if n := len(job.Args); n != 1 {