1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

move some io related utils to pkg/ioutils

Docker-DCO-1.1-Signed-off-by: Cristian Staretu <cristian.staretu@gmail.com> (github: unclejack)
This commit is contained in:
unclejack 2014-08-12 19:10:43 +03:00
parent 426fbee810
commit 76212635b5
12 changed files with 164 additions and 141 deletions

View file

@ -18,6 +18,7 @@ import (
"github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/log"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/utils"
@ -107,7 +108,7 @@ func CompressStream(dest io.WriteCloser, compression Compression) (io.WriteClose
switch compression {
case Uncompressed:
return utils.NopWriteCloser(dest), nil
return ioutils.NopWriteCloser(dest), nil
case Gzip:
return gzip.NewWriter(dest), nil
case Bzip2, Xz:

View file

@ -8,6 +8,7 @@ import (
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/log"
"github.com/docker/docker/utils"
@ -195,7 +196,7 @@ func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinClo
if cStdout, err := container.StdoutPipe(); err != nil {
log.Errorf("attach: stdout pipe: %s", err)
} else {
io.Copy(&utils.NopWriter{}, cStdout)
io.Copy(&ioutils.NopWriter{}, cStdout)
}
}()
}
@ -234,7 +235,7 @@ func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinClo
if cStderr, err := container.StderrPipe(); err != nil {
log.Errorf("attach: stdout pipe: %s", err)
} else {
io.Copy(&utils.NopWriter{}, cStderr)
io.Copy(&ioutils.NopWriter{}, cStderr)
}
}()
}

View file

@ -24,6 +24,7 @@ import (
"github.com/docker/docker/links"
"github.com/docker/docker/nat"
"github.com/docker/docker/pkg/broadcastwriter"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/log"
"github.com/docker/docker/pkg/networkfs/etchosts"
"github.com/docker/docker/pkg/networkfs/resolvconf"
@ -366,25 +367,25 @@ func (streamConfig *StreamConfig) StdinPipe() (io.WriteCloser, error) {
func (streamConfig *StreamConfig) StdoutPipe() (io.ReadCloser, error) {
reader, writer := io.Pipe()
streamConfig.stdout.AddWriter(writer, "")
return utils.NewBufReader(reader), nil
return ioutils.NewBufReader(reader), nil
}
func (streamConfig *StreamConfig) StderrPipe() (io.ReadCloser, error) {
reader, writer := io.Pipe()
streamConfig.stderr.AddWriter(writer, "")
return utils.NewBufReader(reader), nil
return ioutils.NewBufReader(reader), nil
}
func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stdout.AddWriter(writer, "stdout")
return utils.NewBufReader(reader)
return ioutils.NewBufReader(reader)
}
func (streamConfig *StreamConfig) StderrLogPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stderr.AddWriter(writer, "stderr")
return utils.NewBufReader(reader)
return ioutils.NewBufReader(reader)
}
func (container *Container) buildHostnameFile() error {
@ -655,7 +656,7 @@ func (container *Container) ExportRw() (archive.Archive, error) {
container.Unmount()
return nil, err
}
return utils.NewReadCloserWrapper(archive, func() error {
return ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
container.Unmount()
return err
@ -673,7 +674,7 @@ func (container *Container) Export() (archive.Archive, error) {
container.Unmount()
return nil, err
}
return utils.NewReadCloserWrapper(archive, func() error {
return ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
container.Unmount()
return err
@ -809,7 +810,7 @@ func (container *Container) Copy(resource string) (io.ReadCloser, error) {
container.Unmount()
return nil, err
}
return utils.NewReadCloserWrapper(archive, func() error {
return ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
container.Unmount()
return err

View file

@ -28,6 +28,7 @@ import (
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/broadcastwriter"
"github.com/docker/docker/pkg/graphdb"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/log"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/docker/docker/pkg/networkfs/resolvconf"
@ -201,7 +202,7 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool) err
if container.Config.OpenStdin {
container.stdin, container.stdinPipe = io.Pipe()
} else {
container.stdinPipe = utils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
container.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
}
// done
daemon.containers.Add(container.ID, container)
@ -965,7 +966,7 @@ func (daemon *Daemon) Diff(container *Container) (archive.Archive, error) {
if err != nil {
return nil, err
}
return utils.NewReadCloserWrapper(archive, func() error {
return ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
daemon.driver.Put(container.ID)
return err

View file

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/utils"
)
@ -123,7 +124,7 @@ func (eng *Engine) Job(name string, args ...string) *Job {
env: &Env{},
}
if eng.Logging {
job.Stderr.Add(utils.NopWriteCloser(eng.Stderr))
job.Stderr.Add(ioutils.NopWriteCloser(eng.Stderr))
}
// Catchall is shadowed by specific Register.

View file

@ -11,6 +11,7 @@ import (
"github.com/docker/docker/archive"
"github.com/docker/docker/daemon/graphdriver"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/log"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
@ -198,7 +199,7 @@ func (img *Image) TarLayer() (arch archive.Archive, err error) {
if err != nil {
return nil, err
}
return utils.NewReadCloserWrapper(archive, func() error {
return ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
driver.Put(img.ID)
return err
@ -218,7 +219,7 @@ func (img *Image) TarLayer() (arch archive.Archive, err error) {
if err != nil {
return nil, err
}
return utils.NewReadCloserWrapper(archive, func() error {
return ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
driver.Put(img.ID)
return err

View file

@ -20,6 +20,7 @@ import (
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/nat"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/log"
"github.com/docker/docker/reexec"
"github.com/docker/docker/runconfig"
@ -141,7 +142,7 @@ func setupBaseImage() {
if err := job.Run(); err != nil || img.Get("Id") != unitTestImageID {
// Retrieve the Image
job = eng.Job("pull", unitTestImageName)
job.Stdout.Add(utils.NopWriteCloser(os.Stdout))
job.Stdout.Add(ioutils.NopWriteCloser(os.Stdout))
if err := job.Run(); err != nil {
log.Fatalf("Unable to pull the test image: %s", err)
}

82
pkg/ioutils/readers.go Normal file
View file

@ -0,0 +1,82 @@
package ioutils
import (
"bytes"
"io"
"sync"
)
type readCloserWrapper struct {
io.Reader
closer func() error
}
func (r *readCloserWrapper) Close() error {
return r.closer()
}
func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
return &readCloserWrapper{
Reader: r,
closer: closer,
}
}
type bufReader struct {
sync.Mutex
buf *bytes.Buffer
reader io.Reader
err error
wait sync.Cond
}
func NewBufReader(r io.Reader) *bufReader {
reader := &bufReader{
buf: &bytes.Buffer{},
reader: r,
}
reader.wait.L = &reader.Mutex
go reader.drain()
return reader
}
func (r *bufReader) drain() {
buf := make([]byte, 1024)
for {
n, err := r.reader.Read(buf)
r.Lock()
if err != nil {
r.err = err
} else {
r.buf.Write(buf[0:n])
}
r.wait.Signal()
r.Unlock()
if err != nil {
break
}
}
}
func (r *bufReader) Read(p []byte) (n int, err error) {
r.Lock()
defer r.Unlock()
for {
n, err = r.buf.Read(p)
if n > 0 {
return n, err
}
if r.err != nil {
return 0, r.err
}
r.wait.Wait()
}
}
func (r *bufReader) Close() error {
closer, ok := r.reader.(io.ReadCloser)
if !ok {
return nil
}
return closer.Close()
}

View file

@ -0,0 +1,34 @@
package ioutils
import (
"bytes"
"io"
"io/ioutil"
"testing"
)
func TestBufReader(t *testing.T) {
reader, writer := io.Pipe()
bufreader := NewBufReader(reader)
// Write everything down to a Pipe
// Usually, a pipe should block but because of the buffered reader,
// the writes will go through
done := make(chan bool)
go func() {
writer.Write([]byte("hello world"))
writer.Close()
done <- true
}()
// Drain the reader *after* everything has been written, just to verify
// it is indeed buffering
<-done
output, err := ioutil.ReadAll(bufreader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(output, []byte("hello world")) {
t.Error(string(output))
}
}

23
pkg/ioutils/writers.go Normal file
View file

@ -0,0 +1,23 @@
package ioutils
import "io"
type NopWriter struct{}
func (*NopWriter) Write(buf []byte) (int, error) {
return len(buf), nil
}
type nopWriteCloser struct {
io.Writer
}
func (w *nopWriteCloser) Close() error { return nil }
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}
type NopFlusher struct{}
func (f *NopFlusher) Flush() {}

View file

@ -20,6 +20,7 @@ import (
"syscall"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/log"
)
@ -157,81 +158,6 @@ func DockerInitPath(localCopy string) string {
return ""
}
type NopWriter struct{}
func (*NopWriter) Write(buf []byte) (int, error) {
return len(buf), nil
}
type nopWriteCloser struct {
io.Writer
}
func (w *nopWriteCloser) Close() error { return nil }
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}
type bufReader struct {
sync.Mutex
buf *bytes.Buffer
reader io.Reader
err error
wait sync.Cond
}
func NewBufReader(r io.Reader) *bufReader {
reader := &bufReader{
buf: &bytes.Buffer{},
reader: r,
}
reader.wait.L = &reader.Mutex
go reader.drain()
return reader
}
func (r *bufReader) drain() {
buf := make([]byte, 1024)
for {
n, err := r.reader.Read(buf)
r.Lock()
if err != nil {
r.err = err
} else {
r.buf.Write(buf[0:n])
}
r.wait.Signal()
r.Unlock()
if err != nil {
break
}
}
}
func (r *bufReader) Read(p []byte) (n int, err error) {
r.Lock()
defer r.Unlock()
for {
n, err = r.buf.Read(p)
if n > 0 {
return n, err
}
if r.err != nil {
return 0, r.err
}
r.wait.Wait()
}
}
func (r *bufReader) Close() error {
closer, ok := r.reader.(io.ReadCloser)
if !ok {
return nil
}
return closer.Close()
}
func GetTotalUsedFds() int {
if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil {
log.Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err)
@ -340,10 +266,6 @@ func CopyDirectory(source, dest string) error {
return nil
}
type NopFlusher struct{}
func (f *NopFlusher) Flush() {}
type WriteFlusher struct {
sync.Mutex
w io.Writer
@ -370,7 +292,7 @@ func NewWriteFlusher(w io.Writer) *WriteFlusher {
if f, ok := w.(http.Flusher); ok {
flusher = f
} else {
flusher = &NopFlusher{}
flusher = &ioutils.NopFlusher{}
}
return &WriteFlusher{w: w, flusher: flusher}
}
@ -527,22 +449,6 @@ func CopyFile(src, dst string) (int64, error) {
return io.Copy(df, sf)
}
type readCloserWrapper struct {
io.Reader
closer func() error
}
func (r *readCloserWrapper) Close() error {
return r.closer()
}
func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
return &readCloserWrapper{
Reader: r,
closer: closer,
}
}
// ReplaceOrAppendValues returns the defaults with the overrides either
// replaced by env key or appended to the list
func ReplaceOrAppendEnvValues(defaults, overrides []string) []string {

View file

@ -1,39 +1,10 @@
package utils
import (
"bytes"
"io"
"io/ioutil"
"os"
"testing"
)
func TestBufReader(t *testing.T) {
reader, writer := io.Pipe()
bufreader := NewBufReader(reader)
// Write everything down to a Pipe
// Usually, a pipe should block but because of the buffered reader,
// the writes will go through
done := make(chan bool)
go func() {
writer.Write([]byte("hello world"))
writer.Close()
done <- true
}()
// Drain the reader *after* everything has been written, just to verify
// it is indeed buffering
<-done
output, err := ioutil.ReadAll(bufreader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(output, []byte("hello world")) {
t.Error(string(output))
}
}
func TestCheckLocalDns(t *testing.T) {
for resolv, result := range map[string]bool{`# Dynamic
nameserver 10.0.2.3