From 0e71e368a8a781f593b25fdd1318d3882e6d28e5 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 24 Jul 2013 15:41:34 +0000 Subject: [PATCH] Add ID to JSONMessage in pull Use goroutines to pull in parallel If multiple images pulled at the same time, each progress is displayed on a new line --- commands.go | 2 +- graph.go | 2 +- server.go | 63 ++++++++++++++++++++++++++++++-------------------- utils/utils.go | 13 +++++++---- 4 files changed, 49 insertions(+), 31 deletions(-) diff --git a/commands.go b/commands.go index 2d8ea4efb5..946c83dd11 100644 --- a/commands.go +++ b/commands.go @@ -196,7 +196,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error { // FIXME: ProgressReader shouldn't be this annoyning to use if context != nil { sf := utils.NewStreamFormatter(false) - body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("Uploading context", "%v bytes%0.0s%0.0s"), sf) + body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("Uploading context", "%v bytes%0.0s%0.0s", ""), sf) } // Upload the build context v := &url.Values{} diff --git a/graph.go b/graph.go index 42d1bdbd4c..3ae342e7c5 100644 --- a/graph.go +++ b/graph.go @@ -175,7 +175,7 @@ func (graph *Graph) TempLayerArchive(id string, compression Compression, sf *uti if err != nil { return nil, err } - return NewTempArchive(utils.ProgressReader(ioutil.NopCloser(archive), 0, output, sf.FormatProgress("Buffering to disk", "%v/%v (%v)"), sf), tmp.Root) + return NewTempArchive(utils.ProgressReader(ioutil.NopCloser(archive), 0, output, sf.FormatProgress("Buffering to disk", "%v/%v (%v)", ""), sf), tmp.Root) } // Mktemp creates a temporary sub-directory inside the graph's filesystem. diff --git a/server.go b/server.go index 4179a1e160..1a221d3f00 100644 --- a/server.go +++ b/server.go @@ -145,7 +145,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils. return "", err } - if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("Downloading", "%8v/%v (%v)"), sf), path); err != nil { + if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("Downloading", "%8v/%v (%v)", ""), sf), path); err != nil { return "", err } // FIXME: Handle custom repo, tag comment, author @@ -425,7 +425,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin return err } defer layer.Close() - if err := srv.runtime.graph.Register(utils.ProgressReader(layer, imgSize, out, sf.FormatProgress("Downloading", "%8v/%v (%v)"), sf), false, img); err != nil { + if err := srv.runtime.graph.Register(utils.ProgressReader(layer, imgSize, out, sf.FormatProgress("Downloading", "%8v/%v (%v)", id), sf), false, img); err != nil { return err } } @@ -477,30 +477,43 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName repoData.ImgList[id].Tag = askedTag } - for _, img := range repoData.ImgList { - if askedTag != "" && img.Tag != askedTag { - utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID) - continue - } - - if img.Tag == "" { - utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID) - continue - } - out.Write(sf.FormatStatus("Pulling image %s (%s) from %s", img.ID, img.Tag, localName)) - success := false - for _, ep := range repoData.Endpoints { - if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil { - out.Write(sf.FormatStatus("Error while retrieving image for tag: %s (%s); checking next endpoint", askedTag, err)) - continue + errors := make(chan error) + for _, image := range repoData.ImgList { + go func(img *registry.ImgData) { + if askedTag != "" && img.Tag != askedTag { + utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID) + errors <- nil + return } - success = true - break - } - if !success { - return fmt.Errorf("Could not find repository on any of the indexed registries.") + + if img.Tag == "" { + utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID) + errors <- nil + return + } + out.Write(sf.FormatStatus("Pulling image %s (%s) from %s", img.ID, img.Tag, localName)) + success := false + for _, ep := range repoData.Endpoints { + if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil { + out.Write(sf.FormatStatus("Error while retrieving image for tag: %s (%s); checking next endpoint", askedTag, err)) + continue + } + success = true + break + } + if !success { + errors <- fmt.Errorf("Could not find repository on any of the indexed registries.") + } + errors <- nil + }(image) + } + + for i := 0; i < len(repoData.ImgList); i++ { + if err := <-errors; err != nil { + return err } } + for tag, id := range tagsList { if askedTag != "" && tag != askedTag { continue @@ -748,7 +761,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, } // Send the layer - if err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("Pushing", "%8v/%v (%v)"), sf), ep, token); err != nil { + if err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("Pushing", "%8v/%v (%v)", ""), sf), ep, token); err != nil { return err } return nil @@ -818,7 +831,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write if err != nil { return err } - archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("Importing", "%8v/%v (%v)"), sf) + archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("Importing", "%8v/%v (%v)", ""), sf) } img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil) if err != nil { diff --git a/utils/utils.go b/utils/utils.go index acb015becd..ffba2352a7 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -107,7 +107,7 @@ func (r *progressReader) Close() error { func ProgressReader(r io.ReadCloser, size int, output io.Writer, template []byte, sf *StreamFormatter) *progressReader { tpl := string(template) if tpl == "" { - tpl = string(sf.FormatProgress("", "%8v/%v (%v)")) + tpl = string(sf.FormatProgress("", "%8v/%v (%v)", "")) } return &progressReader{r, NewWriteFlusher(output), size, 0, 0, tpl, sf} } @@ -587,11 +587,14 @@ type NopFlusher struct{} func (f *NopFlusher) Flush() {} type WriteFlusher struct { + sync.Mutex w io.Writer flusher http.Flusher } func (wf *WriteFlusher) Write(b []byte) (n int, err error) { + wf.Lock() + defer wf.Unlock() n, err = wf.w.Write(b) wf.flusher.Flush() return n, err @@ -619,7 +622,9 @@ func (jm *JSONMessage) Display(out io.Writer) (error) { if jm.Time != 0 { fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0)) } - if jm.Progress != "" { + if jm.Progress != "" && jm.ID != ""{ + fmt.Fprintf(out, "\n%s %s %s\r", jm.Status, jm.ID, jm.Progress) + } else if jm.Progress != "" { fmt.Fprintf(out, "%s %s\r", jm.Status, jm.Progress) } else if jm.Error != "" { return fmt.Errorf(jm.Error) @@ -665,10 +670,10 @@ func (sf *StreamFormatter) FormatError(err error) []byte { return []byte("Error: " + err.Error() + "\r\n") } -func (sf *StreamFormatter) FormatProgress(action, str string) []byte { +func (sf *StreamFormatter) FormatProgress(action, str, id string) []byte { sf.used = true if sf.json { - b, err := json.Marshal(&JSONMessage{Status: action, Progress: str}) + b, err := json.Marshal(&JSONMessage{Status: action, Progress: str, ID:id}) if err != nil { return nil }