diff --git a/api/server/server.go b/api/server/server.go index 7b2328fe7c..1e8d98a904 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -25,6 +25,7 @@ import ( "github.com/docker/docker/daemon" "github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/graph" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/parsers" "github.com/docker/docker/pkg/parsers/filters" @@ -34,7 +35,6 @@ import ( "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/version" "github.com/docker/docker/runconfig" - "github.com/docker/docker/utils" ) type ServerConfig struct { @@ -442,7 +442,7 @@ func (s *Server) getEvents(version version.Version, w http.ResponseWriter, r *ht d := s.daemon es := d.EventsService w.Header().Set("Content-Type", "application/json") - enc := json.NewEncoder(utils.NewWriteFlusher(w)) + enc := json.NewEncoder(ioutils.NewWriteFlusher(w)) getContainerId := func(cn string) string { c, err := d.Get(cn) @@ -577,7 +577,7 @@ func (s *Server) getContainersStats(version version.Version, w http.ResponseWrit return fmt.Errorf("Missing parameter") } - return s.daemon.ContainerStats(vars["name"], boolValue(r, "stream"), utils.NewWriteFlusher(w)) + return s.daemon.ContainerStats(vars["name"], boolValue(r, "stream"), ioutils.NewWriteFlusher(w)) } func (s *Server) getContainersLogs(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { @@ -600,7 +600,7 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite Tail: r.Form.Get("tail"), UseStdout: stdout, UseStderr: stderr, - OutStream: utils.NewWriteFlusher(w), + OutStream: ioutils.NewWriteFlusher(w), } if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil { @@ -698,7 +698,7 @@ func (s *Server) postImagesCreate(version version.Version, w http.ResponseWriter var ( err error useJSON = version.GreaterThan("1.0") - output = utils.NewWriteFlusher(w) + output = ioutils.NewWriteFlusher(w) ) if useJSON { @@ -824,7 +824,7 @@ func (s *Server) postImagesPush(version version.Version, w http.ResponseWriter, useJSON := version.GreaterThan("1.0") name := vars["name"] - output := utils.NewWriteFlusher(w) + output := ioutils.NewWriteFlusher(w) imagePushConfig := &graph.ImagePushConfig{ MetaHeaders: metaHeaders, AuthConfig: authConfig, @@ -860,7 +860,7 @@ func (s *Server) getImagesGet(version version.Version, w http.ResponseWriter, r w.Header().Set("Content-Type", "application/x-tar") } - output := utils.NewWriteFlusher(w) + output := ioutils.NewWriteFlusher(w) imageExportConfig := &graph.ImageExportConfig{Outstream: output} if name, ok := vars["name"]; ok { imageExportConfig.Names = []string{name} @@ -1279,7 +1279,7 @@ func (s *Server) postBuild(version version.Version, w http.ResponseWriter, r *ht buildConfig.Pull = true } - output := utils.NewWriteFlusher(w) + output := ioutils.NewWriteFlusher(w) buildConfig.Stdout = output buildConfig.Context = r.Body diff --git a/graph/push.go b/graph/push.go index e24ee9f54e..01e3ea1709 100644 --- a/graph/push.go +++ b/graph/push.go @@ -14,6 +14,7 @@ import ( "github.com/docker/distribution/digest" "github.com/docker/docker/cliconfig" "github.com/docker/docker/image" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/stringid" @@ -212,7 +213,7 @@ func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, localRepo map[string]string, tag string, sf *streamformatter.StreamFormatter) error { logrus.Debugf("Local repo: %s", localRepo) - out = utils.NewWriteFlusher(out) + out = ioutils.NewWriteFlusher(out) imgList, tags, err := s.getImageList(localRepo, tag) if err != nil { return err @@ -246,7 +247,7 @@ func (s *TagStore) pushRepository(r *registry.Session, out io.Writer, } func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *streamformatter.StreamFormatter) (checksum string, err error) { - out = utils.NewWriteFlusher(out) + out = ioutils.NewWriteFlusher(out) jsonRaw, err := ioutil.ReadFile(filepath.Join(s.graph.Root, imgID, "json")) if err != nil { return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err) diff --git a/pkg/ioutils/writeflusher.go b/pkg/ioutils/writeflusher.go new file mode 100644 index 0000000000..25095474df --- /dev/null +++ b/pkg/ioutils/writeflusher.go @@ -0,0 +1,47 @@ +package ioutils + +import ( + "io" + "net/http" + "sync" +) + +type WriteFlusher struct { + sync.Mutex + w io.Writer + flusher http.Flusher + flushed bool +} + +func (wf *WriteFlusher) Write(b []byte) (n int, err error) { + wf.Lock() + defer wf.Unlock() + n, err = wf.w.Write(b) + wf.flushed = true + wf.flusher.Flush() + return n, err +} + +// Flush the stream immediately. +func (wf *WriteFlusher) Flush() { + wf.Lock() + defer wf.Unlock() + wf.flushed = true + wf.flusher.Flush() +} + +func (wf *WriteFlusher) Flushed() bool { + wf.Lock() + defer wf.Unlock() + return wf.flushed +} + +func NewWriteFlusher(w io.Writer) *WriteFlusher { + var flusher http.Flusher + if f, ok := w.(http.Flusher); ok { + flusher = f + } else { + flusher = &NopFlusher{} + } + return &WriteFlusher{w: w, flusher: flusher} +} diff --git a/utils/utils.go b/utils/utils.go index 05dfb757a3..cb1b7b34cc 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -7,18 +7,15 @@ import ( "fmt" "io" "io/ioutil" - "net/http" "os" "os/exec" "path/filepath" "runtime" "strings" - "sync" "github.com/docker/docker/autogen/dockerversion" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/fileutils" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/stringid" ) @@ -123,47 +120,6 @@ func DockerInitPath(localCopy string) string { return "" } -// FIXME: move to httputils? ioutils? -type WriteFlusher struct { - sync.Mutex - w io.Writer - flusher http.Flusher - flushed bool -} - -func (wf *WriteFlusher) Write(b []byte) (n int, err error) { - wf.Lock() - defer wf.Unlock() - n, err = wf.w.Write(b) - wf.flushed = true - wf.flusher.Flush() - return n, err -} - -// Flush the stream immediately. -func (wf *WriteFlusher) Flush() { - wf.Lock() - defer wf.Unlock() - wf.flushed = true - wf.flusher.Flush() -} - -func (wf *WriteFlusher) Flushed() bool { - wf.Lock() - defer wf.Unlock() - return wf.flushed -} - -func NewWriteFlusher(w io.Writer) *WriteFlusher { - var flusher http.Flusher - if f, ok := w.(http.Flusher); ok { - flusher = f - } else { - flusher = &ioutils.NopFlusher{} - } - return &WriteFlusher{w: w, flusher: flusher} -} - var globalTestID string // TestDirectory creates a new temporary directory and returns its path.