1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #15493 from stevvooe/no-buffer-on-push

Avoid buffering to tempfile when pushing with V2
This commit is contained in:
Alexander Morozov 2015-08-26 13:59:01 -07:00
commit 70a814c3f3
5 changed files with 32 additions and 53 deletions

View file

@ -2,7 +2,6 @@ package graph
import ( import (
"compress/gzip" "compress/gzip"
"crypto/sha256"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -340,26 +339,6 @@ func (graph *Graph) newTempFile() (*os.File, error) {
return ioutil.TempFile(tmp, "") return ioutil.TempFile(tmp, "")
} }
func bufferToFile(f *os.File, src io.Reader) (int64, digest.Digest, error) {
var (
h = sha256.New()
w = gzip.NewWriter(io.MultiWriter(f, h))
)
_, err := io.Copy(w, src)
w.Close()
if err != nil {
return 0, "", err
}
n, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return 0, "", err
}
if _, err := f.Seek(0, 0); err != nil {
return 0, "", err
}
return n, digest.NewDigest("sha256", h), nil
}
// Delete atomically removes an image from the graph. // Delete atomically removes an image from the graph.
func (graph *Graph) Delete(name string) error { func (graph *Graph) Delete(name string) error {
id, err := graph.idIndex.Get(name) id, err := graph.idIndex.Get(name)

View file

@ -2,8 +2,8 @@ package graph
import ( import (
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution" "github.com/docker/distribution"
@ -199,7 +199,7 @@ func (p *v2Pusher) pushV2Tag(tag string) error {
func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (digest.Digest, error) { func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (digest.Digest, error) {
out := p.config.OutStream out := p.config.OutStream
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Buffering to Disk", nil)) out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Preparing", nil))
image, err := p.graph.Get(img.ID) image, err := p.graph.Get(img.ID)
if err != nil { if err != nil {
@ -209,52 +209,46 @@ func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (d
if err != nil { if err != nil {
return "", err return "", err
} }
defer arch.Close()
tf, err := p.graph.newTempFile()
if err != nil {
return "", err
}
defer func() {
tf.Close()
os.Remove(tf.Name())
}()
size, dgst, err := bufferToFile(tf, arch)
if err != nil {
return "", err
}
// Send the layer // Send the layer
logrus.Debugf("rendered layer for %s of [%d] size", img.ID, size)
layerUpload, err := bs.Create(context.Background()) layerUpload, err := bs.Create(context.Background())
if err != nil { if err != nil {
return "", err return "", err
} }
defer layerUpload.Close() defer layerUpload.Close()
digester := digest.Canonical.New()
tee := io.TeeReader(arch, digester.Hash())
reader := progressreader.New(progressreader.Config{ reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(tf), In: ioutil.NopCloser(tee), // we'll take care of close here.
Out: out, Out: out,
Formatter: p.sf, Formatter: p.sf,
Size: size,
// TODO(stevvooe): This may cause a size reporting error. Try to get
// this from tar-split or elsewhere. The main issue here is that we
// don't want to buffer to disk *just* to calculate the size.
Size: img.Size,
NewLines: false, NewLines: false,
ID: stringid.TruncateID(img.ID), ID: stringid.TruncateID(img.ID),
Action: "Pushing", Action: "Pushing",
}) })
n, err := layerUpload.ReadFrom(reader)
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushing", nil))
nn, err := io.Copy(layerUpload, reader)
if err != nil { if err != nil {
return "", err return "", err
} }
if n != size {
return "", fmt.Errorf("short upload: only wrote %d of %d", n, size)
}
desc := distribution.Descriptor{Digest: dgst} dgst := digester.Digest()
if _, err := layerUpload.Commit(context.Background(), desc); err != nil { if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
return "", err return "", err
} }
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Image successfully pushed", nil)) logrus.Debugf("uploaded layer %s (%s), %d bytes", img.ID, dgst, nn)
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushed", nil))
return dgst, nil return dgst, nil
} }

View file

@ -108,7 +108,7 @@ func (s *DockerRegistrySuite) TestPushInterrupt(c *check.C) {
} }
// Interrupt push (yes, we have no idea at what point it will get killed). // Interrupt push (yes, we have no idea at what point it will get killed).
time.Sleep(200 * time.Millisecond) time.Sleep(50 * time.Millisecond) // dependent on race condition.
if err := pushCmd.Process.Kill(); err != nil { if err := pushCmd.Process.Kill(); err != nil {
c.Fatalf("Failed to kill push process: %v", err) c.Fatalf("Failed to kill push process: %v", err)
} }

View file

@ -67,8 +67,14 @@ func (p *JSONProgress) String() string {
} }
pbBox = fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", numSpaces)) pbBox = fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", numSpaces))
} }
numbersBox = fmt.Sprintf("%8v/%v", current, total) numbersBox = fmt.Sprintf("%8v/%v", current, total)
if p.Current > p.Total {
// remove total display if the reported current is wonky.
numbersBox = fmt.Sprintf("%8v", current)
}
if p.Current > 0 && p.Start > 0 && percentage < 50 { if p.Current > 0 && p.Start > 0 && percentage < 50 {
fromStart := time.Now().UTC().Sub(time.Unix(p.Start, 0)) fromStart := time.Now().UTC().Sub(time.Unix(p.Start, 0))
perEntry := fromStart / time.Duration(p.Current) perEntry := fromStart / time.Duration(p.Current)

View file

@ -3,12 +3,12 @@ package jsonmessage
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"strings"
"testing" "testing"
"time" "time"
"github.com/docker/docker/pkg/term" "github.com/docker/docker/pkg/term"
"github.com/docker/docker/pkg/timeutils" "github.com/docker/docker/pkg/timeutils"
"strings"
) )
func TestError(t *testing.T) { func TestError(t *testing.T) {
@ -45,7 +45,7 @@ func TestProgress(t *testing.T) {
} }
// this number can't be negative gh#7136 // this number can't be negative gh#7136
expected = "[==================================================>] 50 B/40 B" expected = "[==================================================>] 50 B"
jp5 := JSONProgress{Current: 50, Total: 40} jp5 := JSONProgress{Current: 50, Total: 40}
if jp5.String() != expected { if jp5.String() != expected {
t.Fatalf("Expected %q, got %q", expected, jp5.String()) t.Fatalf("Expected %q, got %q", expected, jp5.String())