From f29e5dc8a15d0ab8e9c6084e41ee373376051659 Mon Sep 17 00:00:00 2001 From: "Guillaume J. Charmes" Date: Thu, 16 May 2013 12:09:06 -0700 Subject: [PATCH 1/6] Remove hijack from api when not necessary --- api.go | 40 ++++++++-------------------------------- commands.go | 23 +++++++++++++---------- registry/registry.go | 1 - server.go | 11 ++++++----- 4 files changed, 27 insertions(+), 48 deletions(-) diff --git a/api.go b/api.go index 8984d00cd9..4cfb0aac74 100644 --- a/api.go +++ b/api.go @@ -283,23 +283,17 @@ func postImagesCreate(srv *Server, w http.ResponseWriter, r *http.Request, vars src := r.Form.Get("fromSrc") image := r.Form.Get("fromImage") - repo := r.Form.Get("repo") tag := r.Form.Get("tag") + repo := r.Form.Get("repo") - in, out, err := hijackServer(w) - if err != nil { - return err - } - defer in.Close() - fmt.Fprintf(out, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") if image != "" { //pull registry := r.Form.Get("registry") - if err := srv.ImagePull(image, tag, registry, out); err != nil { - fmt.Fprintf(out, "Error: %s\n", err) + if err := srv.ImagePull(image, tag, registry, w); err != nil { + return err } } else { //import - if err := srv.ImageImport(src, repo, tag, in, out); err != nil { - fmt.Fprintf(out, "Error: %s\n", err) + if err := srv.ImageImport(src, repo, tag, r.Body, w); err != nil { + return err } } return nil @@ -335,15 +329,9 @@ func postImagesInsert(srv *Server, w http.ResponseWriter, r *http.Request, vars } name := vars["name"] - in, out, err := hijackServer(w) - if err != nil { + if err := srv.ImageInsert(name, url, path, w); err != nil { return err } - defer in.Close() - fmt.Fprintf(out, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") - if err := srv.ImageInsert(name, url, path, out); err != nil { - fmt.Fprintf(out, "Error: %s\n", err) - } return nil } @@ -358,28 +346,16 @@ func postImagesPush(srv *Server, w http.ResponseWriter, r *http.Request, vars ma } name := vars["name"] - in, out, err := hijackServer(w) - if err != nil { + if err := srv.ImagePush(name, registry, w); err != nil { return err } - defer in.Close() - fmt.Fprintf(out, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") - if err := srv.ImagePush(name, registry, out); err != nil { - fmt.Fprintf(out, "Error: %s\n", err) - } return nil } func postBuild(srv *Server, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - in, out, err := hijackServer(w) - if err != nil { + if err := srv.ImageCreateFromFile(r.Body, w); err != nil { return err } - defer in.Close() - fmt.Fprintf(out, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") - if err := srv.ImageCreateFromFile(in, out); err != nil { - fmt.Fprintf(out, "Error: %s\n", err) - } return nil } diff --git a/commands.go b/commands.go index 8734da176f..cbd9d146f4 100644 --- a/commands.go +++ b/commands.go @@ -104,7 +104,7 @@ func (cli *DockerCli) CmdInsert(args ...string) error { v.Set("url", cmd.Arg(1)) v.Set("path", cmd.Arg(2)) - err := cli.hijack("POST", "/images/"+cmd.Arg(0)+"?"+v.Encode(), false) + err := cli.stream("POST", "/images/"+cmd.Arg(0)+"?"+v.Encode(), nil, os.Stdout) if err != nil { return err } @@ -117,7 +117,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error { return nil } - err := cli.hijack("POST", "/build", false) + err := cli.stream("POST", "/build", nil, os.Stdout) if err != nil { return err } @@ -571,7 +571,7 @@ func (cli *DockerCli) CmdImport(args ...string) error { v.Set("tag", tag) v.Set("fromSrc", src) - err := cli.hijack("POST", "/images/create?"+v.Encode(), false) + err := cli.stream("POST", "/images/create?"+v.Encode(), os.Stdin, os.Stdout) if err != nil { return err } @@ -628,7 +628,7 @@ func (cli *DockerCli) CmdPush(args ...string) error { v := url.Values{} v.Set("registry", *registry) - if err := cli.hijack("POST", "/images/"+name+"/push?"+v.Encode(), false); err != nil { + if err := cli.stream("POST", "/images/"+name+"/push?"+v.Encode(), nil, os.Stdout); err != nil { return err } return nil @@ -659,7 +659,7 @@ func (cli *DockerCli) CmdPull(args ...string) error { v.Set("tag", *tag) v.Set("registry", *registry) - if err := cli.hijack("POST", "/images/create?"+v.Encode(), false); err != nil { + if err := cli.stream("POST", "/images/create?"+v.Encode(), nil, os.Stdout); err != nil { return err } @@ -864,7 +864,7 @@ func (cli *DockerCli) CmdExport(args ...string) error { return nil } - if err := cli.stream("GET", "/containers/"+cmd.Arg(0)+"/export"); err != nil { + if err := cli.stream("GET", "/containers/"+cmd.Arg(0)+"/export", nil, os.Stdout); err != nil { return err } return nil @@ -1086,7 +1086,7 @@ func (cli *DockerCli) CmdRun(args ...string) error { if statusCode == 404 { v := url.Values{} v.Set("fromImage", config.Image) - err = cli.hijack("POST", "/images/create?"+v.Encode(), false) + err = cli.stream("POST", "/images/create?"+v.Encode(), nil, os.Stderr) if err != nil { return err } @@ -1179,8 +1179,11 @@ func (cli *DockerCli) call(method, path string, data interface{}) ([]byte, int, return body, resp.StatusCode, nil } -func (cli *DockerCli) stream(method, path string) error { - req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", cli.host, cli.port, path), nil) +func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer) error { + if (method == "POST" || method == "PUT") && in == nil { + in = bytes.NewReader([]byte{}) + } + req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", cli.host, cli.port, path), in) if err != nil { return err } @@ -1204,7 +1207,7 @@ func (cli *DockerCli) stream(method, path string) error { return fmt.Errorf("error: %s", body) } - if _, err := io.Copy(os.Stdout, resp.Body); err != nil { + if _, err := io.Copy(out, resp.Body); err != nil { return err } return nil diff --git a/registry/registry.go b/registry/registry.go index e2ffb292c5..71648d1803 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -175,7 +175,6 @@ func (r *Registry) GetRemoteTags(registries []string, repository string, token [ } func (r *Registry) GetRepositoryData(remote string) (*RepositoryData, error) { - utils.Debugf("Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress()) repositoryTarget := auth.IndexServerAddress() + "/repositories/" + remote + "/images" req, err := http.NewRequest("GET", repositoryTarget, nil) diff --git a/server.go b/server.go index 2f45802ca6..f6a242606d 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package docker import ( "fmt" + "github.com/dotcloud/docker/auth" "github.com/dotcloud/docker/registry" "github.com/dotcloud/docker/utils" "io" @@ -322,8 +323,8 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri return nil } -func (srv *Server) pullRepository(stdout io.Writer, remote, askedTag string) error { - utils.Debugf("Retrieving repository data") +func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error { + fmt.Fprintf(out, "Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress()) repoData, err := srv.registry.GetRepositoryData(remote) if err != nil { return err @@ -349,11 +350,11 @@ func (srv *Server) pullRepository(stdout io.Writer, remote, askedTag string) err if askedTag != "" && askedTag != img.Tag { continue } - fmt.Fprintf(stdout, "Pulling image %s (%s) from %s\n", img.Id, img.Tag, remote) + fmt.Fprintf(out, "Pulling image %s (%s) from %s\n", img.Id, img.Tag, remote) success := false for _, ep := range repoData.Endpoints { - if err := srv.pullImage(stdout, img.Id, "https://"+ep+"/v1", repoData.Tokens); err != nil { - fmt.Fprintf(stdout, "Error while retrieving image for tag: %s (%s); checking next endpoint\n", askedTag, err) + if err := srv.pullImage(out, img.Id, "https://"+ep+"/v1", repoData.Tokens); err != nil { + fmt.Fprintf(out, "Error while retrieving image for tag: %s (%s); checking next endpoint\n", askedTag, err) continue } if err := srv.runtime.repositories.Set(remote, img.Tag, img.Id, true); err != nil { From 6145812444fb3eda2cc362795ed0b1addb8f4847 Mon Sep 17 00:00:00 2001 From: "Guillaume J. Charmes" Date: Thu, 16 May 2013 14:33:10 -0700 Subject: [PATCH 2/6] Update tests to reflect new behavior --- api_test.go | 53 +++++++++++++++++++---------------------------------- commands.go | 2 +- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/api_test.go b/api_test.go index 07ecc6d0bf..0827c9b5a4 100644 --- a/api_test.go +++ b/api_test.go @@ -14,6 +14,7 @@ import ( "net/http/httptest" "os" "path" + "strings" "testing" "time" ) @@ -587,45 +588,29 @@ func TestPostBuild(t *testing.T) { srv := &Server{runtime: runtime} - stdin, stdinPipe := io.Pipe() - stdout, stdoutPipe := io.Pipe() + imgs, err := runtime.graph.All() + if err != nil { + t.Fatal(err) + } + beginCount := len(imgs) - c1 := make(chan struct{}) - go func() { - defer close(c1) - r := &hijackTester{ - ResponseRecorder: httptest.NewRecorder(), - in: stdin, - out: stdoutPipe, - } - - if err := postBuild(srv, r, nil, nil); err != nil { - t.Fatal(err) - } - }() - - // Acknowledge hijack - setTimeout(t, "hijack acknowledge timed out", 2*time.Second, func() { - stdout.Read([]byte{}) - stdout.Read(make([]byte, 4096)) - }) - - setTimeout(t, "read/write assertion timed out", 2*time.Second, func() { - if err := assertPipe("from docker-ut\n", "FROM docker-ut", stdout, stdinPipe, 15); err != nil { - t.Fatal(err) - } - }) - - // Close pipes (client disconnects) - if err := closeWrap(stdin, stdinPipe, stdout, stdoutPipe); err != nil { + req, err := http.NewRequest("POST", "/build", strings.NewReader(Dockerfile)) + if err != nil { t.Fatal(err) } - // Wait for build to finish, the client disconnected, therefore, Build finished his job - setTimeout(t, "Waiting for CmdBuild timed out", 2*time.Second, func() { - <-c1 - }) + r := httptest.NewRecorder() + if err := postBuild(srv, r, req, nil); err != nil { + t.Fatal(err) + } + imgs, err = runtime.graph.All() + if err != nil { + t.Fatal(err) + } + if len(imgs) != beginCount+3 { + t.Fatalf("Expected %d images, %d found", beginCount+3, len(imgs)) + } } func TestPostImagesCreate(t *testing.T) { diff --git a/commands.go b/commands.go index cbd9d146f4..4567370d3e 100644 --- a/commands.go +++ b/commands.go @@ -117,7 +117,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error { return nil } - err := cli.stream("POST", "/build", nil, os.Stdout) + err := cli.stream("POST", "/build", os.Stdin, os.Stdout) if err != nil { return err } From 08121c8f6b435779027d837c1e7fc8046bc1e165 Mon Sep 17 00:00:00 2001 From: "Guillaume J. Charmes" Date: Thu, 16 May 2013 14:33:29 -0700 Subject: [PATCH 3/6] Update Push to reflect the correct API --- registry/registry.go | 48 ++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/registry/registry.go b/registry/registry.go index 71648d1803..ce9b4b4ac7 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -326,10 +326,11 @@ func (r *Registry) PushImageJsonIndex(remote string, imgList []*ImgData, validat if err != nil { return nil, err } - - utils.Debugf("json sent: %s\n", imgListJson) - - req, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/", bytes.NewReader(imgListJson)) + var suffix string + if validate { + suffix = "images" + } + req, err := http.NewRequest("PUT", auth.IndexServerAddress()+"/repositories/"+remote+"/"+suffix, bytes.NewReader(imgListJson)) if err != nil { return nil, err } @@ -361,29 +362,28 @@ func (r *Registry) PushImageJsonIndex(remote string, imgList []*ImgData, validat defer res.Body.Close() } - if res.StatusCode != 200 && res.StatusCode != 201 { - errBody, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, err + var tokens, endpoints []string + if !validate { + if res.StatusCode != 200 && res.StatusCode != 201 { + errBody, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + return nil, fmt.Errorf("Error: Status %d trying to push repository %s: %s", res.StatusCode, remote, errBody) + } + if res.Header.Get("X-Docker-Token") != "" { + tokens = res.Header["X-Docker-Token"] + utils.Debugf("Auth token: %v", tokens) + } else { + return nil, fmt.Errorf("Index response didn't contain an access token") } - return nil, fmt.Errorf("Error: Status %d trying to push repository %s: %s", res.StatusCode, remote, errBody) - } - var tokens []string - if res.Header.Get("X-Docker-Token") != "" { - tokens = res.Header["X-Docker-Token"] - utils.Debugf("Auth token: %v", tokens) - } else { - return nil, fmt.Errorf("Index response didn't contain an access token") + if res.Header.Get("X-Docker-Endpoints") != "" { + endpoints = res.Header["X-Docker-Endpoints"] + } else { + return nil, fmt.Errorf("Index response didn't contain any endpoints") + } } - - var endpoints []string - if res.Header.Get("X-Docker-Endpoints") != "" { - endpoints = res.Header["X-Docker-Endpoints"] - } else { - return nil, fmt.Errorf("Index response didn't contain any endpoints") - } - if validate { if res.StatusCode != 204 { if errBody, err := ioutil.ReadAll(res.Body); err != nil { From 0143be42a13b9f8082ceddc64a5e523e45f54d88 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Sat, 18 May 2013 14:03:53 +0000 Subject: [PATCH 4/6] add flush after each write when needed --- server.go | 36 ++++++++++++++++++------------------ utils/utils.go | 15 ++++++++++++--- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/server.go b/server.go index f6a242606d..6576fac0db 100644 --- a/server.go +++ b/server.go @@ -98,7 +98,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer) error { if err != nil { return err } - fmt.Fprintf(out, "%s\n", img.Id) + utils.FprintfFlush(out, "%s\n", img.Id) return nil } @@ -298,7 +298,7 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri // FIXME: Launch the getRemoteImage() in goroutines for _, id := range history { if !srv.runtime.graph.Exists(id) { - fmt.Fprintf(out, "Pulling %s metadata\r\n", id) + utils.FprintfFlush(out, "Pulling %s metadata\r\n", id) imgJson, err := srv.registry.GetRemoteImageJson(id, registry, token) if err != nil { // FIXME: Keep goging in case of error? @@ -310,7 +310,7 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri } // Get the layer - fmt.Fprintf(out, "Pulling %s fs layer\r\n", img.Id) + utils.FprintfFlush(out, "Pulling %s fs layer\r\n", img.Id) layer, contentLength, err := srv.registry.GetRemoteImageLayer(img.Id, registry, token) if err != nil { return err @@ -324,7 +324,7 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri } func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error { - fmt.Fprintf(out, "Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress()) + utils.FprintfFlush(out, "Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress()) repoData, err := srv.registry.GetRepositoryData(remote) if err != nil { return err @@ -350,11 +350,11 @@ func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error if askedTag != "" && askedTag != img.Tag { continue } - fmt.Fprintf(out, "Pulling image %s (%s) from %s\n", img.Id, img.Tag, remote) + utils.FprintfFlush(out, "Pulling image %s (%s) from %s\n", img.Id, img.Tag, remote) success := false for _, ep := range repoData.Endpoints { if err := srv.pullImage(out, img.Id, "https://"+ep+"/v1", repoData.Tokens); err != nil { - fmt.Fprintf(out, "Error while retrieving image for tag: %s (%s); checking next endpoint\n", askedTag, err) + utils.FprintfFlush(out, "Error while retrieving image for tag: %s (%s); checking next endpoint\n", askedTag, err) continue } if err := srv.runtime.repositories.Set(remote, img.Tag, img.Id, true); err != nil { @@ -462,12 +462,12 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat } func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[string]string) error { - fmt.Fprintf(out, "Processing checksums\n") + utils.FprintfFlush(out, "Processing checksums\n") imgList, err := srv.getImageList(localRepo) if err != nil { return err } - fmt.Fprintf(out, "Sending image list\n") + utils.FprintfFlush(out, "Sending image list\n") repoData, err := srv.registry.PushImageJsonIndex(name, imgList, false) if err != nil { @@ -476,18 +476,18 @@ func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[stri // FIXME: Send only needed images for _, ep := range repoData.Endpoints { - fmt.Fprintf(out, "Pushing repository %s to %s (%d tags)\r\n", name, ep, len(localRepo)) + utils.FprintfFlush(out, "Pushing repository %s to %s (%d tags)\r\n", name, ep, len(localRepo)) // For each image within the repo, push them for _, elem := range imgList { if _, exists := repoData.ImgList[elem.Id]; exists { - fmt.Fprintf(out, "Image %s already on registry, skipping\n", name) + utils.FprintfFlush(out, "Image %s already on registry, skipping\n", name) continue } if err := srv.pushImage(out, name, elem.Id, ep, repoData.Tokens); err != nil { // FIXME: Continue on error? return err } - fmt.Fprintf(out, "Pushing tags for rev [%s] on {%s}\n", elem.Id, ep+"/users/"+name+"/"+elem.Tag) + utils.FprintfFlush(out, "Pushing tags for rev [%s] on {%s}\n", elem.Id, ep+"/users/"+name+"/"+elem.Tag) if err := srv.registry.PushRegistryTag(name, elem.Id, elem.Tag, ep, repoData.Tokens); err != nil { return err } @@ -505,7 +505,7 @@ func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []st if err != nil { return fmt.Errorf("Error while retreiving the path for {%s}: %s", imgId, err) } - fmt.Fprintf(out, "Pushing %s\r\n", imgId) + utils.FprintfFlush(out, "Pushing %s\r\n", imgId) // Make sure we have the image's checksum checksum, err := srv.getChecksum(imgId) @@ -520,7 +520,7 @@ func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []st // Send the json if err := srv.registry.PushImageJsonRegistry(imgData, jsonRaw, ep, token); err != nil { if err == registry.ErrAlreadyExists { - fmt.Fprintf(out, "Image %s already uploaded ; skipping\n", imgData.Id) + utils.FprintfFlush(out, "Image %s already uploaded ; skipping\n", imgData.Id) return nil } return err @@ -562,7 +562,7 @@ func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []st func (srv *Server) ImagePush(name, registry string, out io.Writer) error { img, err := srv.runtime.graph.Get(name) if err != nil { - fmt.Fprintf(out, "The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name])) + utils.FprintfFlush(out, "The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name])) // If it fails, try to get the repository if localRepo, exists := srv.runtime.repositories.Repositories[name]; exists { if err := srv.pushRepository(out, name, localRepo); err != nil { @@ -573,7 +573,7 @@ func (srv *Server) ImagePush(name, registry string, out io.Writer) error { return err } - fmt.Fprintf(out, "The push refers to an image: [%s]\n", name) + utils.FprintfFlush(out, "The push refers to an image: [%s]\n", name) if err := srv.pushImage(out, name, img.Id, registry, nil); err != nil { return err } @@ -589,14 +589,14 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write } else { u, err := url.Parse(src) if err != nil { - fmt.Fprintf(out, "Error: %s\n", err) + utils.FprintfFlush(out, "Error: %s\n", err) } if u.Scheme == "" { u.Scheme = "http" u.Host = src u.Path = "" } - fmt.Fprintln(out, "Downloading from", u) + utils.FprintfFlush(out, "Downloading from %s\n", u) // Download with curl (pretty progress bar) // If curl is not available, fallback to http.Get() resp, err = utils.Download(u.String(), out) @@ -615,7 +615,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write return err } } - fmt.Fprintln(out, img.ShortId()) + utils.FprintfFlush(out, "%s\n", img.ShortId()) return nil } diff --git a/utils/utils.go b/utils/utils.go index 88d0c87f5c..6a6a9f95b5 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -84,15 +84,15 @@ func (r *progressReader) Read(p []byte) (n int, err error) { } if r.readProgress-r.lastUpdate > updateEvery || err != nil { if r.readTotal > 0 { - fmt.Fprintf(r.output, r.template+"\r", r.readProgress, r.readTotal, fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100)) + FprintfFlush(r.output, r.template+"\r", r.readProgress, r.readTotal, fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100)) } else { - fmt.Fprintf(r.output, r.template+"\r", r.readProgress, "?", "n/a") + FprintfFlush(r.output, r.template+"\r", r.readProgress, "?", "n/a") } r.lastUpdate = r.readProgress } // Send newline when complete if err != nil { - fmt.Fprintf(r.output, "\n") + FprintfFlush(r.output, "\n") } return read, err @@ -530,3 +530,12 @@ func GetKernelVersion() (*KernelVersionInfo, error) { Flavor: flavor, }, nil } + + +func FprintfFlush(w io.Writer, format string, a ...interface{}) (n int, err error) { + n, err = fmt.Fprintf(w, format, a...) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + return n, err +} \ No newline at end of file From 98b0fd173b7b59316d776534b84ecc9ab0a1da77 Mon Sep 17 00:00:00 2001 From: "Guillaume J. Charmes" Date: Mon, 20 May 2013 10:22:50 -0700 Subject: [PATCH 5/6] Make the printflfush an interface --- server.go | 42 ++++++++++++++++++++++++------------------ utils/utils.go | 19 +++++++++++-------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/server.go b/server.go index 6576fac0db..b07e85b44c 100644 --- a/server.go +++ b/server.go @@ -68,6 +68,7 @@ func (srv *Server) ImagesSearch(term string) ([]ApiSearch, error) { } func (srv *Server) ImageInsert(name, url, path string, out io.Writer) error { + out = &utils.WriteFlusher{W: out} img, err := srv.runtime.repositories.LookupImage(name) if err != nil { return err @@ -98,7 +99,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer) error { if err != nil { return err } - utils.FprintfFlush(out, "%s\n", img.Id) + fmt.Fprintf(out, "%s\n", img.Id) return nil } @@ -289,6 +290,7 @@ func (srv *Server) ContainerTag(name, repo, tag string, force bool) error { } func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []string) error { + out = &utils.WriteFlusher{W: out} history, err := srv.registry.GetRemoteHistory(imgId, registry, token) if err != nil { return err @@ -298,7 +300,7 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri // FIXME: Launch the getRemoteImage() in goroutines for _, id := range history { if !srv.runtime.graph.Exists(id) { - utils.FprintfFlush(out, "Pulling %s metadata\r\n", id) + fmt.Fprintf(out, "Pulling %s metadata\r\n", id) imgJson, err := srv.registry.GetRemoteImageJson(id, registry, token) if err != nil { // FIXME: Keep goging in case of error? @@ -310,7 +312,7 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri } // Get the layer - utils.FprintfFlush(out, "Pulling %s fs layer\r\n", img.Id) + fmt.Fprintf(out, "Pulling %s fs layer\r\n", img.Id) layer, contentLength, err := srv.registry.GetRemoteImageLayer(img.Id, registry, token) if err != nil { return err @@ -324,7 +326,8 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri } func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error { - utils.FprintfFlush(out, "Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress()) + out = &utils.WriteFlusher{W: out} + fmt.Fprintf(out, "Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress()) repoData, err := srv.registry.GetRepositoryData(remote) if err != nil { return err @@ -350,11 +353,11 @@ func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error if askedTag != "" && askedTag != img.Tag { continue } - utils.FprintfFlush(out, "Pulling image %s (%s) from %s\n", img.Id, img.Tag, remote) + fmt.Fprintf(out, "Pulling image %s (%s) from %s\n", img.Id, img.Tag, remote) success := false for _, ep := range repoData.Endpoints { if err := srv.pullImage(out, img.Id, "https://"+ep+"/v1", repoData.Tokens); err != nil { - utils.FprintfFlush(out, "Error while retrieving image for tag: %s (%s); checking next endpoint\n", askedTag, err) + fmt.Fprintf(out, "Error while retrieving image for tag: %s (%s); checking next endpoint\n", askedTag, err) continue } if err := srv.runtime.repositories.Set(remote, img.Tag, img.Id, true); err != nil { @@ -462,12 +465,13 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat } func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[string]string) error { - utils.FprintfFlush(out, "Processing checksums\n") + out = &utils.WriteFlusher{W: out} + fmt.Fprintf(out, "Processing checksums\n") imgList, err := srv.getImageList(localRepo) if err != nil { return err } - utils.FprintfFlush(out, "Sending image list\n") + fmt.Fprintf(out, "Sending image list\n") repoData, err := srv.registry.PushImageJsonIndex(name, imgList, false) if err != nil { @@ -476,18 +480,18 @@ func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[stri // FIXME: Send only needed images for _, ep := range repoData.Endpoints { - utils.FprintfFlush(out, "Pushing repository %s to %s (%d tags)\r\n", name, ep, len(localRepo)) + fmt.Fprintf(out, "Pushing repository %s to %s (%d tags)\r\n", name, ep, len(localRepo)) // For each image within the repo, push them for _, elem := range imgList { if _, exists := repoData.ImgList[elem.Id]; exists { - utils.FprintfFlush(out, "Image %s already on registry, skipping\n", name) + fmt.Fprintf(out, "Image %s already on registry, skipping\n", name) continue } if err := srv.pushImage(out, name, elem.Id, ep, repoData.Tokens); err != nil { // FIXME: Continue on error? return err } - utils.FprintfFlush(out, "Pushing tags for rev [%s] on {%s}\n", elem.Id, ep+"/users/"+name+"/"+elem.Tag) + fmt.Fprintf(out, "Pushing tags for rev [%s] on {%s}\n", elem.Id, ep+"/users/"+name+"/"+elem.Tag) if err := srv.registry.PushRegistryTag(name, elem.Id, elem.Tag, ep, repoData.Tokens); err != nil { return err } @@ -501,11 +505,12 @@ func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[stri } func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []string) error { + out = &utils.WriteFlusher{W: out} jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgId, "json")) if err != nil { return fmt.Errorf("Error while retreiving the path for {%s}: %s", imgId, err) } - utils.FprintfFlush(out, "Pushing %s\r\n", imgId) + fmt.Fprintf(out, "Pushing %s\r\n", imgId) // Make sure we have the image's checksum checksum, err := srv.getChecksum(imgId) @@ -520,7 +525,7 @@ func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []st // Send the json if err := srv.registry.PushImageJsonRegistry(imgData, jsonRaw, ep, token); err != nil { if err == registry.ErrAlreadyExists { - utils.FprintfFlush(out, "Image %s already uploaded ; skipping\n", imgData.Id) + fmt.Fprintf(out, "Image %s already uploaded ; skipping\n", imgData.Id) return nil } return err @@ -560,9 +565,10 @@ func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []st } func (srv *Server) ImagePush(name, registry string, out io.Writer) error { + out = &utils.WriteFlusher{W: out} img, err := srv.runtime.graph.Get(name) if err != nil { - utils.FprintfFlush(out, "The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name])) + fmt.Fprintf(out, "The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name])) // If it fails, try to get the repository if localRepo, exists := srv.runtime.repositories.Repositories[name]; exists { if err := srv.pushRepository(out, name, localRepo); err != nil { @@ -573,7 +579,7 @@ func (srv *Server) ImagePush(name, registry string, out io.Writer) error { return err } - utils.FprintfFlush(out, "The push refers to an image: [%s]\n", name) + fmt.Fprintf(out, "The push refers to an image: [%s]\n", name) if err := srv.pushImage(out, name, img.Id, registry, nil); err != nil { return err } @@ -589,14 +595,14 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write } else { u, err := url.Parse(src) if err != nil { - utils.FprintfFlush(out, "Error: %s\n", err) + fmt.Fprintf(out, "Error: %s\n", err) } if u.Scheme == "" { u.Scheme = "http" u.Host = src u.Path = "" } - utils.FprintfFlush(out, "Downloading from %s\n", u) + fmt.Fprintf(out, "Downloading from %s\n", u) // Download with curl (pretty progress bar) // If curl is not available, fallback to http.Get() resp, err = utils.Download(u.String(), out) @@ -615,7 +621,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write return err } } - utils.FprintfFlush(out, "%s\n", img.ShortId()) + fmt.Fprintf(out, "%s\n", img.ShortId()) return nil } diff --git a/utils/utils.go b/utils/utils.go index 6a6a9f95b5..a2fd3bde6d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -84,15 +84,15 @@ func (r *progressReader) Read(p []byte) (n int, err error) { } if r.readProgress-r.lastUpdate > updateEvery || err != nil { if r.readTotal > 0 { - FprintfFlush(r.output, r.template+"\r", r.readProgress, r.readTotal, fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100)) + fmt.Fprintf(r.output, r.template+"\r", r.readProgress, r.readTotal, fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100)) } else { - FprintfFlush(r.output, r.template+"\r", r.readProgress, "?", "n/a") + fmt.Fprintf(r.output, r.template+"\r", r.readProgress, "?", "n/a") } r.lastUpdate = r.readProgress } // Send newline when complete if err != nil { - FprintfFlush(r.output, "\n") + fmt.Fprintf(r.output, "\n") } return read, err @@ -104,7 +104,7 @@ func ProgressReader(r io.ReadCloser, size int, output io.Writer, template string if template == "" { template = "%v/%v (%v)" } - return &progressReader{r, output, size, 0, 0, template} + return &progressReader{r, &WriteFlusher{W: output}, size, 0, 0, template} } // HumanDuration returns a human-readable approximation of a duration @@ -531,11 +531,14 @@ func GetKernelVersion() (*KernelVersionInfo, error) { }, nil } +type WriteFlusher struct { + W io.Writer +} -func FprintfFlush(w io.Writer, format string, a ...interface{}) (n int, err error) { - n, err = fmt.Fprintf(w, format, a...) - if f, ok := w.(http.Flusher); ok { +func (wf *WriteFlusher) Write(b []byte) (n int, err error) { + n, err = wf.W.Write(b) + if f, ok := wf.W.(http.Flusher); ok { f.Flush() } return n, err -} \ No newline at end of file +} From ae9d7a5167da58de9a1a4beac489cf1e6adcea11 Mon Sep 17 00:00:00 2001 From: "Guillaume J. Charmes" Date: Mon, 20 May 2013 10:58:35 -0700 Subject: [PATCH 6/6] Avoid cast each write for flusher --- server.go | 12 ++++++------ utils/utils.go | 25 +++++++++++++++++++------ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/server.go b/server.go index b07e85b44c..956a4f8d36 100644 --- a/server.go +++ b/server.go @@ -68,7 +68,7 @@ func (srv *Server) ImagesSearch(term string) ([]ApiSearch, error) { } func (srv *Server) ImageInsert(name, url, path string, out io.Writer) error { - out = &utils.WriteFlusher{W: out} + out = utils.NewWriteFlusher(out) img, err := srv.runtime.repositories.LookupImage(name) if err != nil { return err @@ -290,7 +290,7 @@ func (srv *Server) ContainerTag(name, repo, tag string, force bool) error { } func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []string) error { - out = &utils.WriteFlusher{W: out} + out = utils.NewWriteFlusher(out) history, err := srv.registry.GetRemoteHistory(imgId, registry, token) if err != nil { return err @@ -326,7 +326,7 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri } func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error { - out = &utils.WriteFlusher{W: out} + out = utils.NewWriteFlusher(out) fmt.Fprintf(out, "Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress()) repoData, err := srv.registry.GetRepositoryData(remote) if err != nil { @@ -465,7 +465,7 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat } func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[string]string) error { - out = &utils.WriteFlusher{W: out} + out = utils.NewWriteFlusher(out) fmt.Fprintf(out, "Processing checksums\n") imgList, err := srv.getImageList(localRepo) if err != nil { @@ -505,7 +505,7 @@ func (srv *Server) pushRepository(out io.Writer, name string, localRepo map[stri } func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []string) error { - out = &utils.WriteFlusher{W: out} + out = utils.NewWriteFlusher(out) jsonRaw, err := ioutil.ReadFile(path.Join(srv.runtime.graph.Root, imgId, "json")) if err != nil { return fmt.Errorf("Error while retreiving the path for {%s}: %s", imgId, err) @@ -565,7 +565,7 @@ func (srv *Server) pushImage(out io.Writer, remote, imgId, ep string, token []st } func (srv *Server) ImagePush(name, registry string, out io.Writer) error { - out = &utils.WriteFlusher{W: out} + out = utils.NewWriteFlusher(out) img, err := srv.runtime.graph.Get(name) if err != nil { fmt.Fprintf(out, "The push refers to a repository [%s] (len: %d)\n", name, len(srv.runtime.repositories.Repositories[name])) diff --git a/utils/utils.go b/utils/utils.go index a2fd3bde6d..150eae8570 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -104,7 +104,7 @@ func ProgressReader(r io.ReadCloser, size int, output io.Writer, template string if template == "" { template = "%v/%v (%v)" } - return &progressReader{r, &WriteFlusher{W: output}, size, 0, 0, template} + return &progressReader{r, NewWriteFlusher(output), size, 0, 0, template} } // HumanDuration returns a human-readable approximation of a duration @@ -531,14 +531,27 @@ func GetKernelVersion() (*KernelVersionInfo, error) { }, nil } +type NopFlusher struct{} + +func (f *NopFlusher) Flush() {} + type WriteFlusher struct { - W io.Writer + w io.Writer + flusher http.Flusher } func (wf *WriteFlusher) Write(b []byte) (n int, err error) { - n, err = wf.W.Write(b) - if f, ok := wf.W.(http.Flusher); ok { - f.Flush() - } + n, err = wf.w.Write(b) + wf.flusher.Flush() return n, err } + +func NewWriteFlusher(w io.Writer) *WriteFlusher { + var flusher http.Flusher + if f, ok := w.(http.Flusher); ok { + flusher = f + } else { + flusher = &NopFlusher{} + } + return &WriteFlusher{w: w, flusher: flusher} +}