1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #13093 from tianon/writeflusher

Move WriteFlusher out of utils into ioutils
This commit is contained in:
Alexander Morozov 2015-05-09 09:39:17 -07:00
commit c5710c7318
4 changed files with 58 additions and 54 deletions

View file

@ -25,6 +25,7 @@ import (
"github.com/docker/docker/daemon" "github.com/docker/docker/daemon"
"github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/graph" "github.com/docker/docker/graph"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/parsers" "github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/parsers/filters" "github.com/docker/docker/pkg/parsers/filters"
@ -34,7 +35,6 @@ import (
"github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/version" "github.com/docker/docker/pkg/version"
"github.com/docker/docker/runconfig" "github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
) )
type ServerConfig struct { type ServerConfig struct {
@ -442,7 +442,7 @@ func (s *Server) getEvents(version version.Version, w http.ResponseWriter, r *ht
d := s.daemon d := s.daemon
es := d.EventsService es := d.EventsService
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(utils.NewWriteFlusher(w)) enc := json.NewEncoder(ioutils.NewWriteFlusher(w))
getContainerId := func(cn string) string { getContainerId := func(cn string) string {
c, err := d.Get(cn) c, err := d.Get(cn)
@ -577,7 +577,7 @@ func (s *Server) getContainersStats(version version.Version, w http.ResponseWrit
return fmt.Errorf("Missing parameter") return fmt.Errorf("Missing parameter")
} }
return s.daemon.ContainerStats(vars["name"], boolValue(r, "stream"), utils.NewWriteFlusher(w)) return s.daemon.ContainerStats(vars["name"], boolValue(r, "stream"), ioutils.NewWriteFlusher(w))
} }
func (s *Server) getContainersLogs(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { func (s *Server) getContainersLogs(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@ -600,7 +600,7 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
Tail: r.Form.Get("tail"), Tail: r.Form.Get("tail"),
UseStdout: stdout, UseStdout: stdout,
UseStderr: stderr, UseStderr: stderr,
OutStream: utils.NewWriteFlusher(w), OutStream: ioutils.NewWriteFlusher(w),
} }
if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil { if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil {
@ -698,7 +698,7 @@ func (s *Server) postImagesCreate(version version.Version, w http.ResponseWriter
var ( var (
err error err error
useJSON = version.GreaterThan("1.0") useJSON = version.GreaterThan("1.0")
output = utils.NewWriteFlusher(w) output = ioutils.NewWriteFlusher(w)
) )
if useJSON { if useJSON {
@ -824,7 +824,7 @@ func (s *Server) postImagesPush(version version.Version, w http.ResponseWriter,
useJSON := version.GreaterThan("1.0") useJSON := version.GreaterThan("1.0")
name := vars["name"] name := vars["name"]
output := utils.NewWriteFlusher(w) output := ioutils.NewWriteFlusher(w)
imagePushConfig := &graph.ImagePushConfig{ imagePushConfig := &graph.ImagePushConfig{
MetaHeaders: metaHeaders, MetaHeaders: metaHeaders,
AuthConfig: authConfig, AuthConfig: authConfig,
@ -860,7 +860,7 @@ func (s *Server) getImagesGet(version version.Version, w http.ResponseWriter, r
w.Header().Set("Content-Type", "application/x-tar") w.Header().Set("Content-Type", "application/x-tar")
} }
output := utils.NewWriteFlusher(w) output := ioutils.NewWriteFlusher(w)
imageExportConfig := &graph.ImageExportConfig{Outstream: output} imageExportConfig := &graph.ImageExportConfig{Outstream: output}
if name, ok := vars["name"]; ok { if name, ok := vars["name"]; ok {
imageExportConfig.Names = []string{name} imageExportConfig.Names = []string{name}
@ -1279,7 +1279,7 @@ func (s *Server) postBuild(version version.Version, w http.ResponseWriter, r *ht
buildConfig.Pull = true buildConfig.Pull = true
} }
output := utils.NewWriteFlusher(w) output := ioutils.NewWriteFlusher(w)
buildConfig.Stdout = output buildConfig.Stdout = output
buildConfig.Context = r.Body buildConfig.Context = r.Body

View file

@ -14,6 +14,7 @@ import (
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/docker/cliconfig" "github.com/docker/docker/cliconfig"
"github.com/docker/docker/image" "github.com/docker/docker/image"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
@ -212,7 +213,7 @@ func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
repoInfo *registry.RepositoryInfo, localRepo map[string]string, repoInfo *registry.RepositoryInfo, localRepo map[string]string,
tag string, sf *streamformatter.StreamFormatter) error { tag string, sf *streamformatter.StreamFormatter) error {
logrus.Debugf("Local repo: %s", localRepo) logrus.Debugf("Local repo: %s", localRepo)
out = utils.NewWriteFlusher(out) out = ioutils.NewWriteFlusher(out)
imgList, tags, err := s.getImageList(localRepo, tag) imgList, tags, err := s.getImageList(localRepo, tag)
if err != nil { if err != nil {
return err return err
@ -246,7 +247,7 @@ func (s *TagStore) pushRepository(r *registry.Session, out io.Writer,
} }
func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *streamformatter.StreamFormatter) (checksum string, err error) { func (s *TagStore) pushImage(r *registry.Session, out io.Writer, imgID, ep string, token []string, sf *streamformatter.StreamFormatter) (checksum string, err error) {
out = utils.NewWriteFlusher(out) out = ioutils.NewWriteFlusher(out)
jsonRaw, err := ioutil.ReadFile(filepath.Join(s.graph.Root, imgID, "json")) jsonRaw, err := ioutil.ReadFile(filepath.Join(s.graph.Root, imgID, "json"))
if err != nil { if err != nil {
return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err) return "", fmt.Errorf("Cannot retrieve the path for {%s}: %s", imgID, err)

View file

@ -0,0 +1,47 @@
package ioutils
import (
"io"
"net/http"
"sync"
)
type WriteFlusher struct {
sync.Mutex
w io.Writer
flusher http.Flusher
flushed bool
}
func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
wf.Lock()
defer wf.Unlock()
n, err = wf.w.Write(b)
wf.flushed = true
wf.flusher.Flush()
return n, err
}
// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
wf.Lock()
defer wf.Unlock()
wf.flushed = true
wf.flusher.Flush()
}
func (wf *WriteFlusher) Flushed() bool {
wf.Lock()
defer wf.Unlock()
return wf.flushed
}
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var flusher http.Flusher
if f, ok := w.(http.Flusher); ok {
flusher = f
} else {
flusher = &NopFlusher{}
}
return &WriteFlusher{w: w, flusher: flusher}
}

View file

@ -7,18 +7,15 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "strings"
"sync"
"github.com/docker/docker/autogen/dockerversion" "github.com/docker/docker/autogen/dockerversion"
"github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/fileutils" "github.com/docker/docker/pkg/fileutils"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
) )
@ -123,47 +120,6 @@ func DockerInitPath(localCopy string) string {
return "" return ""
} }
// FIXME: move to httputils? ioutils?
type WriteFlusher struct {
sync.Mutex
w io.Writer
flusher http.Flusher
flushed bool
}
func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
wf.Lock()
defer wf.Unlock()
n, err = wf.w.Write(b)
wf.flushed = true
wf.flusher.Flush()
return n, err
}
// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
wf.Lock()
defer wf.Unlock()
wf.flushed = true
wf.flusher.Flush()
}
func (wf *WriteFlusher) Flushed() bool {
wf.Lock()
defer wf.Unlock()
return wf.flushed
}
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var flusher http.Flusher
if f, ok := w.(http.Flusher); ok {
flusher = f
} else {
flusher = &ioutils.NopFlusher{}
}
return &WriteFlusher{w: w, flusher: flusher}
}
var globalTestID string var globalTestID string
// TestDirectory creates a new temporary directory and returns its path. // TestDirectory creates a new temporary directory and returns its path.