mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Remove import to utils in progressreader
Added method in StreamFormatter to handle calls from progressreader. Solves #10959 Signed-off-by: bobby abbott <ttobbaybbob@gmail.com>
This commit is contained in:
parent
558c3b09db
commit
12b278d354
9 changed files with 168 additions and 67 deletions
|
@ -40,6 +40,7 @@ import (
|
||||||
"github.com/docker/docker/pkg/networkfs/resolvconf"
|
"github.com/docker/docker/pkg/networkfs/resolvconf"
|
||||||
"github.com/docker/docker/pkg/parsers"
|
"github.com/docker/docker/pkg/parsers"
|
||||||
"github.com/docker/docker/pkg/parsers/filters"
|
"github.com/docker/docker/pkg/parsers/filters"
|
||||||
|
"github.com/docker/docker/pkg/progressreader"
|
||||||
"github.com/docker/docker/pkg/promise"
|
"github.com/docker/docker/pkg/promise"
|
||||||
"github.com/docker/docker/pkg/signal"
|
"github.com/docker/docker/pkg/signal"
|
||||||
"github.com/docker/docker/pkg/symlink"
|
"github.com/docker/docker/pkg/symlink"
|
||||||
|
@ -232,7 +233,14 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
|
||||||
// FIXME: ProgressReader shouldn't be this annoying to use
|
// FIXME: ProgressReader shouldn't be this annoying to use
|
||||||
if context != nil {
|
if context != nil {
|
||||||
sf := utils.NewStreamFormatter(false)
|
sf := utils.NewStreamFormatter(false)
|
||||||
body = utils.ProgressReader(context, 0, cli.out, sf, true, "", "Sending build context to Docker daemon")
|
body = progressreader.New(progressreader.Config{
|
||||||
|
In: context,
|
||||||
|
Out: cli.out,
|
||||||
|
Formatter: sf,
|
||||||
|
NewLines: true,
|
||||||
|
ID: "",
|
||||||
|
Action: "Sending build context to Docker daemon",
|
||||||
|
})
|
||||||
}
|
}
|
||||||
// Send the build context
|
// Send the build context
|
||||||
v := &url.Values{}
|
v := &url.Values{}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/docker/docker/pkg/common"
|
"github.com/docker/docker/pkg/common"
|
||||||
"github.com/docker/docker/pkg/ioutils"
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
"github.com/docker/docker/pkg/parsers"
|
"github.com/docker/docker/pkg/parsers"
|
||||||
|
"github.com/docker/docker/pkg/progressreader"
|
||||||
"github.com/docker/docker/pkg/symlink"
|
"github.com/docker/docker/pkg/symlink"
|
||||||
"github.com/docker/docker/pkg/system"
|
"github.com/docker/docker/pkg/system"
|
||||||
"github.com/docker/docker/pkg/tarsum"
|
"github.com/docker/docker/pkg/tarsum"
|
||||||
|
@ -268,7 +269,15 @@ func calcCopyInfo(b *Builder, cmdName string, cInfos *[]*copyInfo, origPath stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// Download and dump result to tmp file
|
// Download and dump result to tmp file
|
||||||
if _, err := io.Copy(tmpFile, utils.ProgressReader(resp.Body, int(resp.ContentLength), b.OutOld, b.StreamFormatter, true, "", "Downloading")); err != nil {
|
if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{
|
||||||
|
In: resp.Body,
|
||||||
|
Out: b.OutOld,
|
||||||
|
Formatter: b.StreamFormatter,
|
||||||
|
Size: int(resp.ContentLength),
|
||||||
|
NewLines: true,
|
||||||
|
ID: "",
|
||||||
|
Action: "Downloading",
|
||||||
|
})); err != nil {
|
||||||
tmpFile.Close()
|
tmpFile.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/docker/docker/image"
|
"github.com/docker/docker/image"
|
||||||
"github.com/docker/docker/pkg/archive"
|
"github.com/docker/docker/pkg/archive"
|
||||||
"github.com/docker/docker/pkg/common"
|
"github.com/docker/docker/pkg/common"
|
||||||
|
"github.com/docker/docker/pkg/progressreader"
|
||||||
"github.com/docker/docker/pkg/truncindex"
|
"github.com/docker/docker/pkg/truncindex"
|
||||||
"github.com/docker/docker/runconfig"
|
"github.com/docker/docker/runconfig"
|
||||||
"github.com/docker/docker/utils"
|
"github.com/docker/docker/utils"
|
||||||
|
@ -210,9 +211,17 @@ func (graph *Graph) TempLayerArchive(id string, sf *utils.StreamFormatter, outpu
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
progress := utils.ProgressReader(a, 0, output, sf, false, common.TruncateID(id), "Buffering to disk")
|
progressReader := progressreader.New(progressreader.Config{
|
||||||
defer progress.Close()
|
In: a,
|
||||||
return archive.NewTempArchive(progress, tmp)
|
Out: output,
|
||||||
|
Formatter: sf,
|
||||||
|
Size: 0,
|
||||||
|
NewLines: false,
|
||||||
|
ID: common.TruncateID(id),
|
||||||
|
Action: "Buffering to disk",
|
||||||
|
})
|
||||||
|
defer progressReader.Close()
|
||||||
|
return archive.NewTempArchive(progressReader, tmp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mktemp creates a temporary sub-directory inside the graph's filesystem.
|
// Mktemp creates a temporary sub-directory inside the graph's filesystem.
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/engine"
|
"github.com/docker/docker/engine"
|
||||||
"github.com/docker/docker/pkg/archive"
|
"github.com/docker/docker/pkg/archive"
|
||||||
|
"github.com/docker/docker/pkg/progressreader"
|
||||||
"github.com/docker/docker/runconfig"
|
"github.com/docker/docker/runconfig"
|
||||||
"github.com/docker/docker/utils"
|
"github.com/docker/docker/utils"
|
||||||
)
|
)
|
||||||
|
@ -48,7 +49,15 @@ func (s *TagStore) CmdImport(job *engine.Job) engine.Status {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return job.Error(err)
|
return job.Error(err)
|
||||||
}
|
}
|
||||||
progressReader := utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing")
|
progressReader := progressreader.New(progressreader.Config{
|
||||||
|
In: resp.Body,
|
||||||
|
Out: job.Stdout,
|
||||||
|
Formatter: sf,
|
||||||
|
Size: int(resp.ContentLength),
|
||||||
|
NewLines: true,
|
||||||
|
ID: "",
|
||||||
|
Action: "Importing",
|
||||||
|
})
|
||||||
defer progressReader.Close()
|
defer progressReader.Close()
|
||||||
archive = progressReader
|
archive = progressReader
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/docker/docker/engine"
|
"github.com/docker/docker/engine"
|
||||||
"github.com/docker/docker/image"
|
"github.com/docker/docker/image"
|
||||||
"github.com/docker/docker/pkg/common"
|
"github.com/docker/docker/pkg/common"
|
||||||
|
"github.com/docker/docker/pkg/progressreader"
|
||||||
"github.com/docker/docker/pkg/tarsum"
|
"github.com/docker/docker/pkg/tarsum"
|
||||||
"github.com/docker/docker/registry"
|
"github.com/docker/docker/registry"
|
||||||
"github.com/docker/docker/utils"
|
"github.com/docker/docker/utils"
|
||||||
|
@ -337,7 +338,15 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
|
||||||
defer layer.Close()
|
defer layer.Close()
|
||||||
|
|
||||||
err = s.graph.Register(img,
|
err = s.graph.Register(img,
|
||||||
utils.ProgressReader(layer, imgSize, out, sf, false, common.TruncateID(id), "Downloading"))
|
progressreader.New(progressreader.Config{
|
||||||
|
In: layer,
|
||||||
|
Out: out,
|
||||||
|
Formatter: sf,
|
||||||
|
Size: imgSize,
|
||||||
|
NewLines: false,
|
||||||
|
ID: common.TruncateID(id),
|
||||||
|
Action: "Downloading",
|
||||||
|
}))
|
||||||
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
|
if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries {
|
||||||
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
time.Sleep(time.Duration(j) * 500 * time.Millisecond)
|
||||||
continue
|
continue
|
||||||
|
@ -496,7 +505,15 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri
|
||||||
return fmt.Errorf("unable to wrap image blob reader with TarSum: %s", err)
|
return fmt.Errorf("unable to wrap image blob reader with TarSum: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.Copy(tmpFile, utils.ProgressReader(ioutil.NopCloser(tarSumReader), int(l), out, sf, false, common.TruncateID(img.ID), "Downloading")); err != nil {
|
if _, err := io.Copy(tmpFile, progressreader.New(progressreader.Config{
|
||||||
|
In: ioutil.NopCloser(tarSumReader),
|
||||||
|
Out: out,
|
||||||
|
Formatter: sf,
|
||||||
|
Size: int(l),
|
||||||
|
NewLines: false,
|
||||||
|
ID: common.TruncateID(img.ID),
|
||||||
|
Action: "Downloading",
|
||||||
|
})); err != nil {
|
||||||
return fmt.Errorf("unable to copy v2 image blob data: %s", err)
|
return fmt.Errorf("unable to copy v2 image blob data: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -548,7 +565,14 @@ func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Wri
|
||||||
d.tmpFile.Seek(0, 0)
|
d.tmpFile.Seek(0, 0)
|
||||||
if d.tmpFile != nil {
|
if d.tmpFile != nil {
|
||||||
err = s.graph.Register(d.img,
|
err = s.graph.Register(d.img,
|
||||||
utils.ProgressReader(d.tmpFile, int(d.length), out, sf, false, common.TruncateID(d.img.ID), "Extracting"))
|
progressreader.New(progressreader.Config{
|
||||||
|
In: d.tmpFile,
|
||||||
|
Out: out,
|
||||||
|
Formatter: sf,
|
||||||
|
Size: int(d.length),
|
||||||
|
ID: common.TruncateID(d.img.ID),
|
||||||
|
Action: "Extracting",
|
||||||
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/docker/docker/engine"
|
"github.com/docker/docker/engine"
|
||||||
"github.com/docker/docker/image"
|
"github.com/docker/docker/image"
|
||||||
"github.com/docker/docker/pkg/common"
|
"github.com/docker/docker/pkg/common"
|
||||||
|
"github.com/docker/docker/pkg/progressreader"
|
||||||
"github.com/docker/docker/pkg/tarsum"
|
"github.com/docker/docker/pkg/tarsum"
|
||||||
"github.com/docker/docker/registry"
|
"github.com/docker/docker/registry"
|
||||||
"github.com/docker/docker/runconfig"
|
"github.com/docker/docker/runconfig"
|
||||||
|
@ -258,7 +259,16 @@ func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep strin
|
||||||
// Send the layer
|
// Send the layer
|
||||||
log.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
|
log.Debugf("rendered layer for %s of [%d] size", imgData.ID, layerData.Size)
|
||||||
|
|
||||||
checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, common.TruncateID(imgData.ID), "Pushing"), ep, token, jsonRaw)
|
checksum, checksumPayload, err := r.PushImageLayerRegistry(imgData.ID,
|
||||||
|
progressreader.New(progressreader.Config{
|
||||||
|
In: layerData,
|
||||||
|
Out: out,
|
||||||
|
Formatter: sf,
|
||||||
|
Size: int(layerData.Size),
|
||||||
|
NewLines: false,
|
||||||
|
ID: common.TruncateID(imgData.ID),
|
||||||
|
Action: "Pushing",
|
||||||
|
}), ep, token, jsonRaw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -459,7 +469,16 @@ func (s *TagStore) pushV2Image(r *registry.Session, img *image.Image, endpoint *
|
||||||
// Send the layer
|
// Send the layer
|
||||||
log.Debugf("rendered layer for %s of [%d] size", img.ID, size)
|
log.Debugf("rendered layer for %s of [%d] size", img.ID, size)
|
||||||
|
|
||||||
if err := r.PutV2ImageBlob(endpoint, imageName, sumParts[0], sumParts[1], utils.ProgressReader(tf, int(size), out, sf, false, common.TruncateID(img.ID), "Pushing"), auth); err != nil {
|
if err := r.PutV2ImageBlob(endpoint, imageName, sumParts[0], sumParts[1],
|
||||||
|
progressreader.New(progressreader.Config{
|
||||||
|
In: tf,
|
||||||
|
Out: out,
|
||||||
|
Formatter: sf,
|
||||||
|
Size: int(size),
|
||||||
|
NewLines: false,
|
||||||
|
ID: common.TruncateID(img.ID),
|
||||||
|
Action: "Pushing",
|
||||||
|
}), auth); err != nil {
|
||||||
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image push failed", nil))
|
out.Write(sf.FormatProgress(common.TruncateID(img.ID), "Image push failed", nil))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
69
pkg/progressreader/progressreader.go
Normal file
69
pkg/progressreader/progressreader.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
package progressreader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StreamFormatter interface {
|
||||||
|
FormatProg(string, string, interface{}) []byte
|
||||||
|
FormatStatus(string, string, ...interface{}) []byte
|
||||||
|
FormatError(error) []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type PR_JSONProgress interface {
|
||||||
|
GetCurrent() int
|
||||||
|
GetTotal() int
|
||||||
|
}
|
||||||
|
|
||||||
|
type JSONProg struct {
|
||||||
|
Current int
|
||||||
|
Total int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JSONProg) GetCurrent() int {
|
||||||
|
return j.Current
|
||||||
|
}
|
||||||
|
func (j *JSONProg) GetTotal() int {
|
||||||
|
return j.Total
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reader with progress bar
|
||||||
|
type Config struct {
|
||||||
|
In io.ReadCloser // Stream to read from
|
||||||
|
Out io.Writer // Where to send progress bar to
|
||||||
|
Formatter StreamFormatter
|
||||||
|
Size int
|
||||||
|
Current int
|
||||||
|
LastUpdate int
|
||||||
|
NewLines bool
|
||||||
|
ID string
|
||||||
|
Action string
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(newReader Config) *Config {
|
||||||
|
return &newReader
|
||||||
|
}
|
||||||
|
func (config *Config) Read(p []byte) (n int, err error) {
|
||||||
|
read, err := config.In.Read(p)
|
||||||
|
config.Current += read
|
||||||
|
updateEvery := 1024 * 512 //512kB
|
||||||
|
if config.Size > 0 {
|
||||||
|
// Update progress for every 1% read if 1% < 512kB
|
||||||
|
if increment := int(0.01 * float64(config.Size)); increment < updateEvery {
|
||||||
|
updateEvery = increment
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if config.Current-config.LastUpdate > updateEvery || err != nil {
|
||||||
|
config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size}))
|
||||||
|
config.LastUpdate = config.Current
|
||||||
|
}
|
||||||
|
// Send newline when complete
|
||||||
|
if config.NewLines && err != nil && read == 0 {
|
||||||
|
config.Out.Write(config.Formatter.FormatStatus("", ""))
|
||||||
|
}
|
||||||
|
return read, err
|
||||||
|
}
|
||||||
|
func (config *Config) Close() error {
|
||||||
|
config.Out.Write(config.Formatter.FormatProg(config.ID, config.Action, &JSONProg{Current: config.Current, Total: config.Size}))
|
||||||
|
return config.In.Close()
|
||||||
|
}
|
|
@ -1,55 +0,0 @@
|
||||||
package utils
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Reader with progress bar
|
|
||||||
type progressReader struct {
|
|
||||||
reader io.ReadCloser // Stream to read from
|
|
||||||
output io.Writer // Where to send progress bar to
|
|
||||||
progress JSONProgress
|
|
||||||
lastUpdate int // How many bytes read at least update
|
|
||||||
ID string
|
|
||||||
action string
|
|
||||||
sf *StreamFormatter
|
|
||||||
newLine bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *progressReader) Read(p []byte) (n int, err error) {
|
|
||||||
read, err := r.reader.Read(p)
|
|
||||||
r.progress.Current += read
|
|
||||||
updateEvery := 1024 * 512 //512kB
|
|
||||||
if r.progress.Total > 0 {
|
|
||||||
// Update progress for every 1% read if 1% < 512kB
|
|
||||||
if increment := int(0.01 * float64(r.progress.Total)); increment < updateEvery {
|
|
||||||
updateEvery = increment
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if r.progress.Current-r.lastUpdate > updateEvery || err != nil {
|
|
||||||
r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress))
|
|
||||||
r.lastUpdate = r.progress.Current
|
|
||||||
}
|
|
||||||
// Send newline when complete
|
|
||||||
if r.newLine && err != nil && read == 0 {
|
|
||||||
r.output.Write(r.sf.FormatStatus("", ""))
|
|
||||||
}
|
|
||||||
return read, err
|
|
||||||
}
|
|
||||||
func (r *progressReader) Close() error {
|
|
||||||
r.progress.Current = r.progress.Total
|
|
||||||
r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress))
|
|
||||||
return r.reader.Close()
|
|
||||||
}
|
|
||||||
func ProgressReader(r io.ReadCloser, size int, output io.Writer, sf *StreamFormatter, newline bool, ID, action string) *progressReader {
|
|
||||||
return &progressReader{
|
|
||||||
reader: r,
|
|
||||||
output: NewWriteFlusher(output),
|
|
||||||
ID: ID,
|
|
||||||
action: action,
|
|
||||||
progress: JSONProgress{Total: size, Start: time.Now().UTC().Unix()},
|
|
||||||
sf: sf,
|
|
||||||
newLine: newline,
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,6 +3,7 @@ package utils
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/docker/docker/pkg/progressreader"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -54,7 +55,15 @@ func (sf *StreamFormatter) FormatError(err error) []byte {
|
||||||
}
|
}
|
||||||
return []byte("Error: " + err.Error() + streamNewline)
|
return []byte("Error: " + err.Error() + streamNewline)
|
||||||
}
|
}
|
||||||
|
func (sf *StreamFormatter) FormatProg(id, action string, p interface{}) []byte {
|
||||||
|
switch progress := p.(type) {
|
||||||
|
case *JSONProgress:
|
||||||
|
return sf.FormatProgress(id, action, progress)
|
||||||
|
case progressreader.PR_JSONProgress:
|
||||||
|
return sf.FormatProgress(id, action, &JSONProgress{Current: progress.GetCurrent(), Total: progress.GetTotal()})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte {
|
func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte {
|
||||||
if progress == nil {
|
if progress == nil {
|
||||||
progress = &JSONProgress{}
|
progress = &JSONProgress{}
|
||||||
|
|
Loading…
Reference in a new issue