From 76212635b594d4472dd8d5b2e071fac416addbe7 Mon Sep 17 00:00:00 2001 From: unclejack Date: Tue, 12 Aug 2014 19:10:43 +0300 Subject: [PATCH] move some io related utils to pkg/ioutils Docker-DCO-1.1-Signed-off-by: Cristian Staretu (github: unclejack) --- archive/archive.go | 3 +- daemon/attach.go | 5 +- daemon/container.go | 15 +++--- daemon/daemon.go | 5 +- engine/engine.go | 3 +- image/image.go | 5 +- integration/runtime_test.go | 3 +- pkg/ioutils/readers.go | 82 +++++++++++++++++++++++++++++++ pkg/ioutils/readers_test.go | 34 +++++++++++++ pkg/ioutils/writers.go | 23 +++++++++ utils/utils.go | 98 +------------------------------------ utils/utils_test.go | 29 ----------- 12 files changed, 164 insertions(+), 141 deletions(-) create mode 100644 pkg/ioutils/readers.go create mode 100644 pkg/ioutils/readers_test.go create mode 100644 pkg/ioutils/writers.go diff --git a/archive/archive.go b/archive/archive.go index 7d9f7fb974..e73b1b8e39 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -18,6 +18,7 @@ import ( "github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/log" "github.com/docker/docker/pkg/system" "github.com/docker/docker/utils" @@ -107,7 +108,7 @@ func CompressStream(dest io.WriteCloser, compression Compression) (io.WriteClose switch compression { case Uncompressed: - return utils.NopWriteCloser(dest), nil + return ioutils.NopWriteCloser(dest), nil case Gzip: return gzip.NewWriter(dest), nil case Bzip2, Xz: diff --git a/daemon/attach.go b/daemon/attach.go index 7a4eb0bc99..c015ee7258 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -8,6 +8,7 @@ import ( "time" "github.com/docker/docker/engine" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/log" "github.com/docker/docker/utils" @@ -195,7 +196,7 @@ func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinClo if cStdout, err := container.StdoutPipe(); err != nil { log.Errorf("attach: stdout pipe: %s", err) } else { - io.Copy(&utils.NopWriter{}, cStdout) + io.Copy(&ioutils.NopWriter{}, cStdout) } }() } @@ -234,7 +235,7 @@ func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinClo if cStderr, err := container.StderrPipe(); err != nil { log.Errorf("attach: stdout pipe: %s", err) } else { - io.Copy(&utils.NopWriter{}, cStderr) + io.Copy(&ioutils.NopWriter{}, cStderr) } }() } diff --git a/daemon/container.go b/daemon/container.go index fa97553d8c..2ac6cb26b6 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -24,6 +24,7 @@ import ( "github.com/docker/docker/links" "github.com/docker/docker/nat" "github.com/docker/docker/pkg/broadcastwriter" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/log" "github.com/docker/docker/pkg/networkfs/etchosts" "github.com/docker/docker/pkg/networkfs/resolvconf" @@ -366,25 +367,25 @@ func (streamConfig *StreamConfig) StdinPipe() (io.WriteCloser, error) { func (streamConfig *StreamConfig) StdoutPipe() (io.ReadCloser, error) { reader, writer := io.Pipe() streamConfig.stdout.AddWriter(writer, "") - return utils.NewBufReader(reader), nil + return ioutils.NewBufReader(reader), nil } func (streamConfig *StreamConfig) StderrPipe() (io.ReadCloser, error) { reader, writer := io.Pipe() streamConfig.stderr.AddWriter(writer, "") - return utils.NewBufReader(reader), nil + return ioutils.NewBufReader(reader), nil } func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser { reader, writer := io.Pipe() streamConfig.stdout.AddWriter(writer, "stdout") - return utils.NewBufReader(reader) + return ioutils.NewBufReader(reader) } func (streamConfig *StreamConfig) StderrLogPipe() io.ReadCloser { reader, writer := io.Pipe() streamConfig.stderr.AddWriter(writer, "stderr") - return utils.NewBufReader(reader) + return ioutils.NewBufReader(reader) } func (container *Container) buildHostnameFile() error { @@ -655,7 +656,7 @@ func (container *Container) ExportRw() (archive.Archive, error) { container.Unmount() return nil, err } - return utils.NewReadCloserWrapper(archive, func() error { + return ioutils.NewReadCloserWrapper(archive, func() error { err := archive.Close() container.Unmount() return err @@ -673,7 +674,7 @@ func (container *Container) Export() (archive.Archive, error) { container.Unmount() return nil, err } - return utils.NewReadCloserWrapper(archive, func() error { + return ioutils.NewReadCloserWrapper(archive, func() error { err := archive.Close() container.Unmount() return err @@ -809,7 +810,7 @@ func (container *Container) Copy(resource string) (io.ReadCloser, error) { container.Unmount() return nil, err } - return utils.NewReadCloserWrapper(archive, func() error { + return ioutils.NewReadCloserWrapper(archive, func() error { err := archive.Close() container.Unmount() return err diff --git a/daemon/daemon.go b/daemon/daemon.go index 196d561acc..fab28110c6 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -28,6 +28,7 @@ import ( "github.com/docker/docker/image" "github.com/docker/docker/pkg/broadcastwriter" "github.com/docker/docker/pkg/graphdb" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/log" "github.com/docker/docker/pkg/namesgenerator" "github.com/docker/docker/pkg/networkfs/resolvconf" @@ -201,7 +202,7 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool) err if container.Config.OpenStdin { container.stdin, container.stdinPipe = io.Pipe() } else { - container.stdinPipe = utils.NopWriteCloser(ioutil.Discard) // Silently drop stdin + container.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin } // done daemon.containers.Add(container.ID, container) @@ -965,7 +966,7 @@ func (daemon *Daemon) Diff(container *Container) (archive.Archive, error) { if err != nil { return nil, err } - return utils.NewReadCloserWrapper(archive, func() error { + return ioutils.NewReadCloserWrapper(archive, func() error { err := archive.Close() daemon.driver.Put(container.ID) return err diff --git a/engine/engine.go b/engine/engine.go index 4550df9d1d..e35acbbaf3 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/utils" ) @@ -123,7 +124,7 @@ func (eng *Engine) Job(name string, args ...string) *Job { env: &Env{}, } if eng.Logging { - job.Stderr.Add(utils.NopWriteCloser(eng.Stderr)) + job.Stderr.Add(ioutils.NopWriteCloser(eng.Stderr)) } // Catchall is shadowed by specific Register. diff --git a/image/image.go b/image/image.go index 702782c892..468d01293d 100644 --- a/image/image.go +++ b/image/image.go @@ -11,6 +11,7 @@ import ( "github.com/docker/docker/archive" "github.com/docker/docker/daemon/graphdriver" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/log" "github.com/docker/docker/runconfig" "github.com/docker/docker/utils" @@ -198,7 +199,7 @@ func (img *Image) TarLayer() (arch archive.Archive, err error) { if err != nil { return nil, err } - return utils.NewReadCloserWrapper(archive, func() error { + return ioutils.NewReadCloserWrapper(archive, func() error { err := archive.Close() driver.Put(img.ID) return err @@ -218,7 +219,7 @@ func (img *Image) TarLayer() (arch archive.Archive, err error) { if err != nil { return nil, err } - return utils.NewReadCloserWrapper(archive, func() error { + return ioutils.NewReadCloserWrapper(archive, func() error { err := archive.Close() driver.Put(img.ID) return err diff --git a/integration/runtime_test.go b/integration/runtime_test.go index d6427adf5d..12ff409f19 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -20,6 +20,7 @@ import ( "github.com/docker/docker/engine" "github.com/docker/docker/image" "github.com/docker/docker/nat" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/log" "github.com/docker/docker/reexec" "github.com/docker/docker/runconfig" @@ -141,7 +142,7 @@ func setupBaseImage() { if err := job.Run(); err != nil || img.Get("Id") != unitTestImageID { // Retrieve the Image job = eng.Job("pull", unitTestImageName) - job.Stdout.Add(utils.NopWriteCloser(os.Stdout)) + job.Stdout.Add(ioutils.NopWriteCloser(os.Stdout)) if err := job.Run(); err != nil { log.Fatalf("Unable to pull the test image: %s", err) } diff --git a/pkg/ioutils/readers.go b/pkg/ioutils/readers.go new file mode 100644 index 0000000000..8488034bd8 --- /dev/null +++ b/pkg/ioutils/readers.go @@ -0,0 +1,82 @@ +package ioutils + +import ( + "bytes" + "io" + "sync" +) + +type readCloserWrapper struct { + io.Reader + closer func() error +} + +func (r *readCloserWrapper) Close() error { + return r.closer() +} + +func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { + return &readCloserWrapper{ + Reader: r, + closer: closer, + } +} + +type bufReader struct { + sync.Mutex + buf *bytes.Buffer + reader io.Reader + err error + wait sync.Cond +} + +func NewBufReader(r io.Reader) *bufReader { + reader := &bufReader{ + buf: &bytes.Buffer{}, + reader: r, + } + reader.wait.L = &reader.Mutex + go reader.drain() + return reader +} + +func (r *bufReader) drain() { + buf := make([]byte, 1024) + for { + n, err := r.reader.Read(buf) + r.Lock() + if err != nil { + r.err = err + } else { + r.buf.Write(buf[0:n]) + } + r.wait.Signal() + r.Unlock() + if err != nil { + break + } + } +} + +func (r *bufReader) Read(p []byte) (n int, err error) { + r.Lock() + defer r.Unlock() + for { + n, err = r.buf.Read(p) + if n > 0 { + return n, err + } + if r.err != nil { + return 0, r.err + } + r.wait.Wait() + } +} + +func (r *bufReader) Close() error { + closer, ok := r.reader.(io.ReadCloser) + if !ok { + return nil + } + return closer.Close() +} diff --git a/pkg/ioutils/readers_test.go b/pkg/ioutils/readers_test.go new file mode 100644 index 0000000000..a7a2dad176 --- /dev/null +++ b/pkg/ioutils/readers_test.go @@ -0,0 +1,34 @@ +package ioutils + +import ( + "bytes" + "io" + "io/ioutil" + "testing" +) + +func TestBufReader(t *testing.T) { + reader, writer := io.Pipe() + bufreader := NewBufReader(reader) + + // Write everything down to a Pipe + // Usually, a pipe should block but because of the buffered reader, + // the writes will go through + done := make(chan bool) + go func() { + writer.Write([]byte("hello world")) + writer.Close() + done <- true + }() + + // Drain the reader *after* everything has been written, just to verify + // it is indeed buffering + <-done + output, err := ioutil.ReadAll(bufreader) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(output, []byte("hello world")) { + t.Error(string(output)) + } +} diff --git a/pkg/ioutils/writers.go b/pkg/ioutils/writers.go new file mode 100644 index 0000000000..de7bd0259e --- /dev/null +++ b/pkg/ioutils/writers.go @@ -0,0 +1,23 @@ +package ioutils + +import "io" + +type NopWriter struct{} + +func (*NopWriter) Write(buf []byte) (int, error) { + return len(buf), nil +} + +type nopWriteCloser struct { + io.Writer +} + +func (w *nopWriteCloser) Close() error { return nil } + +func NopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w} +} + +type NopFlusher struct{} + +func (f *NopFlusher) Flush() {} diff --git a/utils/utils.go b/utils/utils.go index b6ae91afec..f9b41c0bbd 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -20,6 +20,7 @@ import ( "syscall" "github.com/docker/docker/dockerversion" + "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/log" ) @@ -157,81 +158,6 @@ func DockerInitPath(localCopy string) string { return "" } -type NopWriter struct{} - -func (*NopWriter) Write(buf []byte) (int, error) { - return len(buf), nil -} - -type nopWriteCloser struct { - io.Writer -} - -func (w *nopWriteCloser) Close() error { return nil } - -func NopWriteCloser(w io.Writer) io.WriteCloser { - return &nopWriteCloser{w} -} - -type bufReader struct { - sync.Mutex - buf *bytes.Buffer - reader io.Reader - err error - wait sync.Cond -} - -func NewBufReader(r io.Reader) *bufReader { - reader := &bufReader{ - buf: &bytes.Buffer{}, - reader: r, - } - reader.wait.L = &reader.Mutex - go reader.drain() - return reader -} - -func (r *bufReader) drain() { - buf := make([]byte, 1024) - for { - n, err := r.reader.Read(buf) - r.Lock() - if err != nil { - r.err = err - } else { - r.buf.Write(buf[0:n]) - } - r.wait.Signal() - r.Unlock() - if err != nil { - break - } - } -} - -func (r *bufReader) Read(p []byte) (n int, err error) { - r.Lock() - defer r.Unlock() - for { - n, err = r.buf.Read(p) - if n > 0 { - return n, err - } - if r.err != nil { - return 0, r.err - } - r.wait.Wait() - } -} - -func (r *bufReader) Close() error { - closer, ok := r.reader.(io.ReadCloser) - if !ok { - return nil - } - return closer.Close() -} - func GetTotalUsedFds() int { if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil { log.Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err) @@ -340,10 +266,6 @@ func CopyDirectory(source, dest string) error { return nil } -type NopFlusher struct{} - -func (f *NopFlusher) Flush() {} - type WriteFlusher struct { sync.Mutex w io.Writer @@ -370,7 +292,7 @@ func NewWriteFlusher(w io.Writer) *WriteFlusher { if f, ok := w.(http.Flusher); ok { flusher = f } else { - flusher = &NopFlusher{} + flusher = &ioutils.NopFlusher{} } return &WriteFlusher{w: w, flusher: flusher} } @@ -527,22 +449,6 @@ func CopyFile(src, dst string) (int64, error) { return io.Copy(df, sf) } -type readCloserWrapper struct { - io.Reader - closer func() error -} - -func (r *readCloserWrapper) Close() error { - return r.closer() -} - -func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { - return &readCloserWrapper{ - Reader: r, - closer: closer, - } -} - // ReplaceOrAppendValues returns the defaults with the overrides either // replaced by env key or appended to the list func ReplaceOrAppendEnvValues(defaults, overrides []string) []string { diff --git a/utils/utils_test.go b/utils/utils_test.go index cf11182f43..7990faa8b1 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -1,39 +1,10 @@ package utils import ( - "bytes" - "io" - "io/ioutil" "os" "testing" ) -func TestBufReader(t *testing.T) { - reader, writer := io.Pipe() - bufreader := NewBufReader(reader) - - // Write everything down to a Pipe - // Usually, a pipe should block but because of the buffered reader, - // the writes will go through - done := make(chan bool) - go func() { - writer.Write([]byte("hello world")) - writer.Close() - done <- true - }() - - // Drain the reader *after* everything has been written, just to verify - // it is indeed buffering - <-done - output, err := ioutil.ReadAll(bufreader) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(output, []byte("hello world")) { - t.Error(string(output)) - } -} - func TestCheckLocalDns(t *testing.T) { for resolv, result := range map[string]bool{`# Dynamic nameserver 10.0.2.3