From 12b278d3540bc32699e8c2197b556188fd98b77b Mon Sep 17 00:00:00 2001 From: bobby abbott Date: Tue, 24 Feb 2015 00:51:46 -0800 Subject: [PATCH] Remove import to utils in progressreader Added method in StreamFormatter to handle calls from progressreader. Solves #10959 Signed-off-by: bobby abbott --- api/client/commands.go | 10 +++- builder/internals.go | 11 ++++- graph/graph.go | 15 ++++-- graph/import.go | 11 ++++- graph/pull.go | 30 ++++++++++-- graph/push.go | 23 +++++++++- pkg/progressreader/progressreader.go | 69 ++++++++++++++++++++++++++++ utils/progressreader.go | 55 ---------------------- utils/streamformatter.go | 11 ++++- 9 files changed, 168 insertions(+), 67 deletions(-) create mode 100644 pkg/progressreader/progressreader.go delete mode 100644 utils/progressreader.go diff --git a/api/client/commands.go b/api/client/commands.go index a682befa70..e62fdeacef 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -40,6 +40,7 @@ import ( "github.com/docker/docker/pkg/networkfs/resolvconf" "github.com/docker/docker/pkg/parsers" "github.com/docker/docker/pkg/parsers/filters" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/symlink" @@ -232,7 +233,14 @@ func (cli *DockerCli) CmdBuild(args ...string) error { // FIXME: ProgressReader shouldn't be this annoying to use if context != nil { sf := utils.NewStreamFormatter(false) - body = utils.ProgressReader(context, 0, cli.out, sf, true, "", "Sending build context to Docker daemon") + body = progressreader.New(progressreader.Config{ + In: context, + Out: cli.out, + Formatter: sf, + NewLines: true, + ID: "", + Action: "Sending build context to Docker daemon", + }) } // Send the build context v := &url.Values{} diff --git a/builder/internals.go b/builder/internals.go index f6b929f2b6..de066a127a 100644 --- a/builder/internals.go +++ b/builder/internals.go @@ -28,6 +28,7 @@ import ( "github.com/docker/docker/pkg/common" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/symlink" "github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/tarsum" @@ -268,7 +269,15 @@ func calcCopyInfo(b *Builder, cmdName string, cInfos *[]*copyInfo, origPath stri } // Download and dump result to tmp file - if _, err := io.Copy(tmpFile, utils.ProgressReader(resp.Body, int(resp.ContentLength), b.OutOld, b.StreamFormatter, true, "", "Downloading")); err != nil { + if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{ + In: resp.Body, + Out: b.OutOld, + Formatter: b.StreamFormatter, + Size: int(resp.ContentLength), + NewLines: true, + ID: "", + Action: "Downloading", + })); err != nil { tmpFile.Close() return err } diff --git a/graph/graph.go b/graph/graph.go index e8917a9c7d..ecb52a0c5a 100644 --- a/graph/graph.go +++ b/graph/graph.go @@ -18,6 +18,7 @@ import ( "github.com/docker/docker/image" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/common" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/truncindex" "github.com/docker/docker/runconfig" "github.com/docker/docker/utils" @@ -210,9 +211,17 @@ func (graph *Graph) TempLayerArchive(id string, sf *utils.StreamFormatter, outpu if err != nil { return nil, err } - progress := utils.ProgressReader(a, 0, output, sf, false, common.TruncateID(id), "Buffering to disk") - defer progress.Close() - return archive.NewTempArchive(progress, tmp) + progressReader := progressreader.New(progressreader.Config{ + In: a, + Out: output, + Formatter: sf, + Size: 0, + NewLines: false, + ID: common.TruncateID(id), + Action: "Buffering to disk", + }) + defer progressReader.Close() + return archive.NewTempArchive(progressReader, tmp) } // Mktemp creates a temporary sub-directory inside the graph's filesystem. diff --git a/graph/import.go b/graph/import.go index 41f4b4f3f1..3a83fcf5f4 100644 --- a/graph/import.go +++ b/graph/import.go @@ -9,6 +9,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/engine" "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/runconfig" "github.com/docker/docker/utils" ) @@ -48,7 +49,15 @@ func (s *TagStore) CmdImport(job *engine.Job) engine.Status { if err != nil { return job.Error(err) } - progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing") + progressReader := progressreader.New(progressreader.Config{ + In: resp.Body, + Out: job.Stdout, + Formatter: sf, + Size: int(resp.ContentLength), + NewLines: true, + ID: "", + Action: "Importing", + }) defer progressReader.Close() archive = progressReader } diff --git a/graph/pull.go b/graph/pull.go index 27830f64e3..ecf0309b17 100644 --- a/graph/pull.go +++ b/graph/pull.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/engine" "github.com/docker/docker/image" "github.com/docker/docker/pkg/common" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/tarsum" "github.com/docker/docker/registry" "github.com/docker/docker/utils" @@ -337,7 +338,15 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint defer layer.Close() err = s.graph.Register(img, - utils.ProgressReader(layer, imgSize, out, sf, false, common.TruncateID(id), "Downloading")) + progressreader.New(progressreader.Config{ + In: layer, + Out: out, + Formatter: sf, + Size: imgSize, + NewLines: false, + ID: common.TruncateID(id), + Action: "Downloading", + })) if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries { time.Sleep(time.Duration(j) * 500 * time.Millisecond) continue @@ -496,7 +505,15 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri return fmt.Errorf("unable to wrap image blob reader with TarSum: %s", err) } - if _, err := io.Copy(tmpFile, utils.ProgressReader(ioutil.NopCloser(tarSumReader), int(l), out, sf, false, common.TruncateID(img.ID), "Downloading")); err != nil { + if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{ + In: ioutil.NopCloser(tarSumReader), + Out: out, + Formatter: sf, + Size: int(l), + NewLines: false, + ID: common.TruncateID(img.ID), + Action: "Downloading", + })); err != nil { return fmt.Errorf("unable to copy v2 image blob data: %s", err) } @@ -548,7 +565,14 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri d.tmpFile.Seek(0, 0) if d.tmpFile != nil { err = s.graph.Register(d.img, - utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, common.TruncateID(d.img.ID), "Extracting")) + progressreader.New(progressreader.Config{ + In: d.tmpFile, + Out: out, + Formatter: sf, + Size: int(d.length), + ID: common.TruncateID(d.img.ID), + Action: "Extracting", + })) if err != nil { return false, err } diff --git a/graph/push.go b/graph/push.go index 91ac107818..117f535b50 100644 --- a/graph/push.go +++ b/graph/push.go @@ -16,6 +16,7 @@ import ( "github.com/docker/docker/engine" "github.com/docker/docker/image" "github.com/docker/docker/pkg/common" + "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/tarsum" "github.com/docker/docker/registry" "github.com/docker/docker/runconfig" @@ -258,7 +259,16 @@ func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep strin // Send the layer log.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size) - checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, common.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw) + checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, + progressreader.New(progressreader.Config{ + In: layerData, + Out: out, + Formatter: sf, + Size: int(layerData.Size), + NewLines: false, + ID: common.TruncateID(imgData.ID), + Action: "Pushing", + }), ep, token, jsonRaw) if err != nil { return "", err } @@ -459,7 +469,16 @@ func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint * // Send the layer log.Debugf("rendered layer for %s of [%d] size", img.ID, size) - if err := r.PutV2ImageBlob(endpoint, imageName, sumParts[0], sumParts[1], utils.ProgressReader(tf, int(size), out, sf, false, common.TruncateID(img.ID), "Pushing"), auth); err != nil { + if err := r.PutV2ImageBlob(endpoint, imageName, sumParts[0], sumParts[1], + progressreader.New(progressreader.Config{ + In: tf, + Out: out, + Formatter: sf, + Size: int(size), + NewLines: false, + ID: common.TruncateID(img.ID), + Action: "Pushing", + }), auth); err != nil { out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image push failed", nil)) return "", err } diff --git a/pkg/progressreader/progressreader.go b/pkg/progressreader/progressreader.go new file mode 100644 index 0000000000..730559e9fb --- /dev/null +++ b/pkg/progressreader/progressreader.go @@ -0,0 +1,69 @@ +package progressreader + +import ( + "io" +) + +type StreamFormatter interface { + FormatProg(string, string, interface{}) []byte + FormatStatus(string, string, ...interface{}) []byte + FormatError(error) []byte +} + +type PR_JSONProgress interface { + GetCurrent() int + GetTotal() int +} + +type JSONProg struct { + Current int + Total int +} + +func (j *JSONProg) GetCurrent() int { + return j.Current +} +func (j *JSONProg) GetTotal() int { + return j.Total +} + +// Reader with progress bar +type Config struct { + In io.ReadCloser // Stream to read from + Out io.Writer // Where to send progress bar to + Formatter StreamFormatter + Size int + Current int + LastUpdate int + NewLines bool + ID string + Action string +} + +func New(newReader Config) *Config { + return &newReader +} +func (config *Config) Read(p []byte) (n int, err error) { + read, err := config.In.Read(p) + config.Current += read + updateEvery := 1024 * 512 //512kB + if config.Size > 0 { + // Update progress for every 1% read if 1% < 512kB + if increment := int(0.01 * float64(config.Size)); increment < updateEvery { + updateEvery = increment + } + } + if config.Current-config.LastUpdate > updateEvery || err != nil { + config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size})) + config.LastUpdate = config.Current + } + // Send newline when complete + if config.NewLines && err != nil && read == 0 { + config.Out.Write(config.Formatter.FormatStatus("", "")) + } + return read, err +} +func (config *Config) Close() error { + config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size})) + return config.In.Close() +} diff --git a/utils/progressreader.go b/utils/progressreader.go deleted file mode 100644 index 87eae8ba73..0000000000 --- a/utils/progressreader.go +++ /dev/null @@ -1,55 +0,0 @@ -package utils - -import ( - "io" - "time" -) - -// Reader with progress bar -type progressReader struct { - reader io.ReadCloser // Stream to read from - output io.Writer // Where to send progress bar to - progress JSONProgress - lastUpdate int // How many bytes read at least update - ID string - action string - sf *StreamFormatter - newLine bool -} - -func (r *progressReader) Read(p []byte) (n int, err error) { - read, err := r.reader.Read(p) - r.progress.Current += read - updateEvery := 1024 * 512 //512kB - if r.progress.Total > 0 { - // Update progress for every 1% read if 1% < 512kB - if increment := int(0.01 * float64(r.progress.Total)); increment < updateEvery { - updateEvery = increment - } - } - if r.progress.Current-r.lastUpdate > updateEvery || err != nil { - r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress)) - r.lastUpdate = r.progress.Current - } - // Send newline when complete - if r.newLine && err != nil && read == 0 { - r.output.Write(r.sf.FormatStatus("", "")) - } - return read, err -} -func (r *progressReader) Close() error { - r.progress.Current = r.progress.Total - r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress)) - return r.reader.Close() -} -func ProgressReader(r io.ReadCloser, size int, output io.Writer, sf *StreamFormatter, newline bool, ID, action string) *progressReader { - return &progressReader{ - reader: r, - output: NewWriteFlusher(output), - ID: ID, - action: action, - progress: JSONProgress{Total: size, Start: time.Now().UTC().Unix()}, - sf: sf, - newLine: newline, - } -} diff --git a/utils/streamformatter.go b/utils/streamformatter.go index d0bc295bb3..e5b15f9835 100644 --- a/utils/streamformatter.go +++ b/utils/streamformatter.go @@ -3,6 +3,7 @@ package utils import ( "encoding/json" "fmt" + "github.com/docker/docker/pkg/progressreader" "io" ) @@ -54,7 +55,15 @@ func (sf *StreamFormatter) FormatError(err error) []byte { } return []byte("Error: " + err.Error() + streamNewline) } - +func (sf *StreamFormatter) FormatProg(id, action string, p interface{}) []byte { + switch progress := p.(type) { + case *JSONProgress: + return sf.FormatProgress(id, action, progress) + case progressreader.PR_JSONProgress: + return sf.FormatProgress(id, action, &JSONProgress{Current: progress.GetCurrent(), Total: progress.GetTotal()}) + } + return nil +} func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte { if progress == nil { progress = &JSONProgress{}