diff --git a/daemon/container.go b/daemon/container.go index 5b41438680..c7a6774601 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -29,6 +29,7 @@ import ( "github.com/dotcloud/docker/pkg/symlink" "github.com/dotcloud/docker/runconfig" "github.com/dotcloud/docker/utils" + "github.com/dotcloud/docker/utils/broadcastwriter" ) const DefaultPathEnv = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" @@ -66,8 +67,8 @@ type Container struct { ExecDriver string command *execdriver.Command - stdout *utils.WriteBroadcaster - stderr *utils.WriteBroadcaster + stdout *broadcastwriter.BroadcastWriter + stderr *broadcastwriter.BroadcastWriter stdin io.ReadCloser stdinPipe io.WriteCloser @@ -502,10 +503,10 @@ func (container *Container) cleanup() { utils.Errorf("%s: Error close stdin: %s", container.ID, err) } } - if err := container.stdout.CloseWriters(); err != nil { + if err := container.stdout.Close(); err != nil { utils.Errorf("%s: Error close stdout: %s", container.ID, err) } - if err := container.stderr.CloseWriters(); err != nil { + if err := container.stderr.Close(); err != nil { utils.Errorf("%s: Error close stderr: %s", container.ID, err) } if container.command != nil && container.command.Terminal != nil { diff --git a/daemon/daemon.go b/daemon/daemon.go index 9500526f9e..0b51715ecd 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -34,6 +34,7 @@ import ( "github.com/dotcloud/docker/pkg/truncindex" "github.com/dotcloud/docker/runconfig" "github.com/dotcloud/docker/utils" + "github.com/dotcloud/docker/utils/broadcastwriter" ) // Set the max depth to the aufs default that most @@ -169,8 +170,8 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool, con container.daemon = daemon // Attach to stdout and stderr - container.stderr = utils.NewWriteBroadcaster() - container.stdout = utils.NewWriteBroadcaster() + container.stderr = broadcastwriter.New() + container.stdout = broadcastwriter.New() // Attach to stdin if container.Config.OpenStdin { container.stdin, container.stdinPipe = io.Pipe() @@ -255,7 +256,7 @@ func (daemon *Daemon) ensureName(container *Container) error { return nil } -func (daemon *Daemon) LogToDisk(src *utils.WriteBroadcaster, dst, stream string) error { +func (daemon *Daemon) LogToDisk(src *broadcastwriter.BroadcastWriter, dst, stream string) error { log, err := os.OpenFile(dst, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return err diff --git a/utils/broadcastwriter/broadcastwriter.go b/utils/broadcastwriter/broadcastwriter.go new file mode 100644 index 0000000000..81ca9e5bdd --- /dev/null +++ b/utils/broadcastwriter/broadcastwriter.go @@ -0,0 +1,92 @@ +package broadcastwriter + +import ( + "bytes" + "encoding/json" + "io" + "sync" + "time" + + "github.com/dotcloud/docker/utils" +) + +type BroadcastWriter struct { + sync.Mutex + buf *bytes.Buffer + streams map[string](map[io.WriteCloser]struct{}) +} + +func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) { + w.Lock() + if _, ok := w.streams[stream]; !ok { + w.streams[stream] = make(map[io.WriteCloser]struct{}) + } + w.streams[stream][writer] = struct{}{} + w.Unlock() +} + +func (w *BroadcastWriter) Write(p []byte) (n int, err error) { + created := time.Now().UTC() + w.Lock() + defer w.Unlock() + if writers, ok := w.streams[""]; ok { + for sw := range writers { + if n, err := sw.Write(p); err != nil || n != len(p) { + // On error, evict the writer + delete(writers, sw) + } + } + } + w.buf.Write(p) + lines := []string{} + for { + line, err := w.buf.ReadString('\n') + if err != nil { + w.buf.Write([]byte(line)) + break + } + lines = append(lines, line) + } + + if len(lines) != 0 { + for stream, writers := range w.streams { + if stream == "" { + continue + } + var lp []byte + for _, line := range lines { + b, err := json.Marshal(&utils.JSONLog{Log: line, Stream: stream, Created: created}) + if err != nil { + utils.Errorf("Error making JSON log line: %s", err) + } + lp = append(lp, b...) + lp = append(lp, '\n') + } + for sw := range writers { + if _, err := sw.Write(lp); err != nil { + delete(writers, sw) + } + } + } + } + return len(p), nil +} + +func (w *BroadcastWriter) Close() error { + w.Lock() + defer w.Unlock() + for _, writers := range w.streams { + for w := range writers { + w.Close() + } + } + w.streams = make(map[string](map[io.WriteCloser]struct{})) + return nil +} + +func New() *BroadcastWriter { + return &BroadcastWriter{ + streams: make(map[string](map[io.WriteCloser]struct{})), + buf: bytes.NewBuffer(nil), + } +} diff --git a/utils/broadcastwriter/broadcastwriter_test.go b/utils/broadcastwriter/broadcastwriter_test.go new file mode 100644 index 0000000000..8d946e2f45 --- /dev/null +++ b/utils/broadcastwriter/broadcastwriter_test.go @@ -0,0 +1,108 @@ +package broadcastwriter + +import ( + "bytes" + "errors" + + "testing" +) + +type dummyWriter struct { + buffer bytes.Buffer + failOnWrite bool +} + +func (dw *dummyWriter) Write(p []byte) (n int, err error) { + if dw.failOnWrite { + return 0, errors.New("Fake fail") + } + return dw.buffer.Write(p) +} + +func (dw *dummyWriter) String() string { + return dw.buffer.String() +} + +func (dw *dummyWriter) Close() error { + return nil +} + +func TestBroadcastWriter(t *testing.T) { + writer := New() + + // Test 1: Both bufferA and bufferB should contain "foo" + bufferA := &dummyWriter{} + writer.AddWriter(bufferA, "") + bufferB := &dummyWriter{} + writer.AddWriter(bufferB, "") + writer.Write([]byte("foo")) + + if bufferA.String() != "foo" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + + if bufferB.String() != "foo" { + t.Errorf("Buffer contains %v", bufferB.String()) + } + + // Test2: bufferA and bufferB should contain "foobar", + // while bufferC should only contain "bar" + bufferC := &dummyWriter{} + writer.AddWriter(bufferC, "") + writer.Write([]byte("bar")) + + if bufferA.String() != "foobar" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + + if bufferB.String() != "foobar" { + t.Errorf("Buffer contains %v", bufferB.String()) + } + + if bufferC.String() != "bar" { + t.Errorf("Buffer contains %v", bufferC.String()) + } + + // Test3: Test eviction on failure + bufferA.failOnWrite = true + writer.Write([]byte("fail")) + if bufferA.String() != "foobar" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + if bufferC.String() != "barfail" { + t.Errorf("Buffer contains %v", bufferC.String()) + } + // Even though we reset the flag, no more writes should go in there + bufferA.failOnWrite = false + writer.Write([]byte("test")) + if bufferA.String() != "foobar" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + if bufferC.String() != "barfailtest" { + t.Errorf("Buffer contains %v", bufferC.String()) + } + + writer.Close() +} + +type devNullCloser int + +func (d devNullCloser) Close() error { + return nil +} + +func (d devNullCloser) Write(buf []byte) (int, error) { + return len(buf), nil +} + +// This test checks for races. It is only useful when run with the race detector. +func TestRaceBroadcastWriter(t *testing.T) { + writer := New() + c := make(chan bool) + go func() { + writer.AddWriter(devNullCloser(0), "") + c <- true + }() + writer.Write([]byte("hello")) + <-c +} diff --git a/utils/utils.go b/utils/utils.go index 333468d361..ef28aceca7 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -265,21 +265,6 @@ func (r *bufReader) Close() error { return closer.Close() } -type WriteBroadcaster struct { - sync.Mutex - buf *bytes.Buffer - streams map[string](map[io.WriteCloser]struct{}) -} - -func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) { - w.Lock() - if _, ok := w.streams[stream]; !ok { - w.streams[stream] = make(map[io.WriteCloser]struct{}) - } - w.streams[stream][writer] = struct{}{} - w.Unlock() -} - type JSONLog struct { Log string `json:"log,omitempty"` Stream string `json:"stream,omitempty"` @@ -316,77 +301,6 @@ func WriteLog(src io.Reader, dst io.WriteCloser, format string) error { } } -type LogFormatter struct { - wc io.WriteCloser - timeFormat string -} - -func (w *WriteBroadcaster) Write(p []byte) (n int, err error) { - created := time.Now().UTC() - w.Lock() - defer w.Unlock() - if writers, ok := w.streams[""]; ok { - for sw := range writers { - if n, err := sw.Write(p); err != nil || n != len(p) { - // On error, evict the writer - delete(writers, sw) - } - } - } - w.buf.Write(p) - lines := []string{} - for { - line, err := w.buf.ReadString('\n') - if err != nil { - w.buf.Write([]byte(line)) - break - } - lines = append(lines, line) - } - - if len(lines) != 0 { - for stream, writers := range w.streams { - if stream == "" { - continue - } - var lp []byte - for _, line := range lines { - b, err := json.Marshal(&JSONLog{Log: line, Stream: stream, Created: created}) - if err != nil { - Errorf("Error making JSON log line: %s", err) - } - lp = append(lp, b...) - lp = append(lp, '\n') - } - for sw := range writers { - if _, err := sw.Write(lp); err != nil { - delete(writers, sw) - } - } - } - } - return len(p), nil -} - -func (w *WriteBroadcaster) CloseWriters() error { - w.Lock() - defer w.Unlock() - for _, writers := range w.streams { - for w := range writers { - w.Close() - } - } - w.streams = make(map[string](map[io.WriteCloser]struct{})) - return nil -} - -func NewWriteBroadcaster() *WriteBroadcaster { - return &WriteBroadcaster{ - streams: make(map[string](map[io.WriteCloser]struct{})), - buf: bytes.NewBuffer(nil), - } -} - func GetTotalUsedFds() int { if fds, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())); err != nil { Errorf("Error opening /proc/%d/fd: %s", os.Getpid(), err) diff --git a/utils/utils_test.go b/utils/utils_test.go index 049c0e30a2..6fd09b97db 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -2,7 +2,6 @@ package utils import ( "bytes" - "errors" "io" "io/ioutil" "os" @@ -35,106 +34,6 @@ func TestBufReader(t *testing.T) { } } -type dummyWriter struct { - buffer bytes.Buffer - failOnWrite bool -} - -func (dw *dummyWriter) Write(p []byte) (n int, err error) { - if dw.failOnWrite { - return 0, errors.New("Fake fail") - } - return dw.buffer.Write(p) -} - -func (dw *dummyWriter) String() string { - return dw.buffer.String() -} - -func (dw *dummyWriter) Close() error { - return nil -} - -func TestWriteBroadcaster(t *testing.T) { - writer := NewWriteBroadcaster() - - // Test 1: Both bufferA and bufferB should contain "foo" - bufferA := &dummyWriter{} - writer.AddWriter(bufferA, "") - bufferB := &dummyWriter{} - writer.AddWriter(bufferB, "") - writer.Write([]byte("foo")) - - if bufferA.String() != "foo" { - t.Errorf("Buffer contains %v", bufferA.String()) - } - - if bufferB.String() != "foo" { - t.Errorf("Buffer contains %v", bufferB.String()) - } - - // Test2: bufferA and bufferB should contain "foobar", - // while bufferC should only contain "bar" - bufferC := &dummyWriter{} - writer.AddWriter(bufferC, "") - writer.Write([]byte("bar")) - - if bufferA.String() != "foobar" { - t.Errorf("Buffer contains %v", bufferA.String()) - } - - if bufferB.String() != "foobar" { - t.Errorf("Buffer contains %v", bufferB.String()) - } - - if bufferC.String() != "bar" { - t.Errorf("Buffer contains %v", bufferC.String()) - } - - // Test3: Test eviction on failure - bufferA.failOnWrite = true - writer.Write([]byte("fail")) - if bufferA.String() != "foobar" { - t.Errorf("Buffer contains %v", bufferA.String()) - } - if bufferC.String() != "barfail" { - t.Errorf("Buffer contains %v", bufferC.String()) - } - // Even though we reset the flag, no more writes should go in there - bufferA.failOnWrite = false - writer.Write([]byte("test")) - if bufferA.String() != "foobar" { - t.Errorf("Buffer contains %v", bufferA.String()) - } - if bufferC.String() != "barfailtest" { - t.Errorf("Buffer contains %v", bufferC.String()) - } - - writer.CloseWriters() -} - -type devNullCloser int - -func (d devNullCloser) Close() error { - return nil -} - -func (d devNullCloser) Write(buf []byte) (int, error) { - return len(buf), nil -} - -// This test checks for races. It is only useful when run with the race detector. -func TestRaceWriteBroadcaster(t *testing.T) { - writer := NewWriteBroadcaster() - c := make(chan bool) - go func() { - writer.AddWriter(devNullCloser(0), "") - c <- true - }() - writer.Write([]byte("hello")) - <-c -} - func assertKernelVersion(t *testing.T, a, b *KernelVersionInfo, result int) { if r := CompareKernelVersion(a, b); r != result { t.Fatalf("Unexpected kernel version comparison result. Found %d, expected %d", r, result)