From e3ba3dd5b828307c3970d0036b019eca405e4a2c Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Wed, 6 May 2015 21:09:27 -0400 Subject: [PATCH] Make sure log pipes are closed Pipes are still not closed (and goroutines leaked) if neither pipe is used. Signed-off-by: Brian Goff --- daemon/logs.go | 43 ++++++++++++--------- integration-cli/docker_cli_logs_test.go | 51 +++++++++++++++++++++++++ pkg/jsonlog/jsonlog.go | 11 +++--- 3 files changed, 80 insertions(+), 25 deletions(-) diff --git a/daemon/logs.go b/daemon/logs.go index 384dca4e89..d002865b13 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -5,9 +5,10 @@ import ( "encoding/json" "fmt" "io" + "net" "os" "strconv" - "sync" + "syscall" "time" "github.com/Sirupsen/logrus" @@ -132,9 +133,10 @@ func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) er } } } + if config.Follow && container.IsRunning() { - errors := make(chan error, 2) - wg := sync.WaitGroup{} + chErr := make(chan error) + var stdoutPipe, stderrPipe io.ReadCloser // write an empty chunk of data (this is to ensure that the // HTTP Response is sent immediatly, even if the container has @@ -142,33 +144,36 @@ func (daemon *Daemon) ContainerLogs(name string, config *ContainerLogsConfig) er outStream.Write(nil) if config.UseStdout { - wg.Add(1) - stdoutPipe := container.StdoutLogPipe() - defer stdoutPipe.Close() + stdoutPipe = container.StdoutLogPipe() go func() { - errors <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since) - wg.Done() + logrus.Debug("logs: stdout stream begin") + chErr <- jsonlog.WriteLog(stdoutPipe, outStream, format, config.Since) + logrus.Debug("logs: stdout stream end") }() } if config.UseStderr { - wg.Add(1) - stderrPipe := container.StderrLogPipe() - defer stderrPipe.Close() + stderrPipe = container.StderrLogPipe() go func() { - errors <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since) - wg.Done() + logrus.Debug("logs: stderr stream begin") + chErr <- jsonlog.WriteLog(stderrPipe, errStream, format, config.Since) + logrus.Debug("logs: stderr stream end") }() } - wg.Wait() - close(errors) + err = <-chErr + if stdoutPipe != nil { + stdoutPipe.Close() + } + if stderrPipe != nil { + stderrPipe.Close() + } + <-chErr // wait for 2nd goroutine to exit, otherwise bad things will happen - for err := range errors { - if err != nil { - logrus.Errorf("%s", err) + if err != nil && err != io.EOF && err != io.ErrClosedPipe { + if e, ok := err.(*net.OpError); ok && e.Err != syscall.EPIPE { + logrus.Errorf("error streaming logs: %v", err) } } - } return nil } diff --git a/integration-cli/docker_cli_logs_test.go b/integration-cli/docker_cli_logs_test.go index 49146868fd..47c18322ea 100644 --- a/integration-cli/docker_cli_logs_test.go +++ b/integration-cli/docker_cli_logs_test.go @@ -1,7 +1,9 @@ package main import ( + "encoding/json" "fmt" + "io" "os/exec" "regexp" "strconv" @@ -393,3 +395,52 @@ func (s *DockerSuite) TestLogsFollowSlowStdoutConsumer(c *check.C) { } } + +func (s *DockerSuite) TestLogsFollowGoroutinesWithStdout(c *check.C) { + out, _ := dockerCmd(c, "run", "-d", "busybox", "/bin/sh", "-c", "while true; do echo hello; sleep 2; done") + id := strings.TrimSpace(out) + c.Assert(waitRun(id), check.IsNil) + + type info struct { + NGoroutines int + } + getNGoroutines := func() int { + var i info + status, b, err := sockRequest("GET", "/info", nil) + c.Assert(err, check.IsNil) + c.Assert(status, check.Equals, 200) + c.Assert(json.Unmarshal(b, &i), check.IsNil) + return i.NGoroutines + } + + nroutines := getNGoroutines() + + cmd := exec.Command(dockerBinary, "logs", "-f", id) + r, w := io.Pipe() + cmd.Stdout = w + c.Assert(cmd.Start(), check.IsNil) + + // Make sure pipe is written to + chErr := make(chan error) + go func() { + b := make([]byte, 1) + _, err := r.Read(b) + chErr <- err + }() + c.Assert(<-chErr, check.IsNil) + c.Assert(cmd.Process.Kill(), check.IsNil) + + // NGoroutines is not updated right away, so we need to wait before failing + t := time.After(5 * time.Second) + for { + select { + case <-t: + c.Assert(nroutines, check.Equals, getNGoroutines()) + default: + if nroutines == getNGoroutines() { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/pkg/jsonlog/jsonlog.go b/pkg/jsonlog/jsonlog.go index ce2da4a301..85afb3b503 100644 --- a/pkg/jsonlog/jsonlog.go +++ b/pkg/jsonlog/jsonlog.go @@ -5,8 +5,6 @@ import ( "fmt" "io" "time" - - "github.com/Sirupsen/logrus" ) type JSONLog struct { @@ -37,15 +35,16 @@ func WriteLog(src io.Reader, dst io.Writer, format string, since time.Time) erro l := &JSONLog{} for { l.Reset() - if err := dec.Decode(l); err == io.EOF { - return nil - } else if err != nil { - logrus.Printf("Error streaming logs: %s", err) + if err := dec.Decode(l); err != nil { + if err == io.EOF { + return nil + } return err } if !since.IsZero() && l.Created.Before(since) { continue } + line, err := l.Format(format) if err != nil { return err