diff --git a/container/monitor.go b/container/monitor.go index 2f3368f405..09b447d947 100644 --- a/container/monitor.go +++ b/container/monitor.go @@ -369,6 +369,9 @@ func (m *containerMonitor) resetContainer(lock bool) { select { case <-time.After(loggerCloseTimeout): logrus.Warnf("Logger didn't exit in time: logs may be truncated") + container.LogCopier.Close() + // always waits for the LogCopier to finished before closing + <-exit case <-exit: } } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 95cd9eb355..436c0a8f9c 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -20,14 +20,16 @@ type Copier struct { srcs map[string]io.Reader dst Logger copyJobs sync.WaitGroup + closed chan struct{} } // NewCopier creates a new Copier func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) *Copier { return &Copier{ - cid: cid, - srcs: srcs, - dst: dst, + cid: cid, + srcs: srcs, + dst: dst, + closed: make(chan struct{}), } } @@ -44,24 +46,28 @@ func (c *Copier) copySrc(name string, src io.Reader) { reader := bufio.NewReader(src) for { - line, err := reader.ReadBytes('\n') - line = bytes.TrimSuffix(line, []byte{'\n'}) - - // ReadBytes can return full or partial output even when it failed. - // e.g. it can return a full entry and EOF. - if err == nil || len(line) > 0 { - if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { - logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) - } - } - - if err != nil { - if err != io.EOF { - logrus.Errorf("Error scanning log stream: %s", err) - } + select { + case <-c.closed: return - } + default: + line, err := reader.ReadBytes('\n') + line = bytes.TrimSuffix(line, []byte{'\n'}) + // ReadBytes can return full or partial output even when it failed. + // e.g. it can return a full entry and EOF. + if err == nil || len(line) > 0 { + if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { + logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) + } + } + + if err != nil { + if err != io.EOF { + logrus.Errorf("Error scanning log stream: %s", err) + } + return + } + } } } @@ -69,3 +75,12 @@ func (c *Copier) copySrc(name string, src io.Reader) { func (c *Copier) Wait() { c.copyJobs.Wait() } + +// Close closes the copier +func (c *Copier) Close() { + select { + case <-c.closed: + default: + close(c.closed) + } +} diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index f8980b3615..30239f06bc 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -10,9 +10,15 @@ import ( type TestLoggerJSON struct { *json.Encoder + delay time.Duration } -func (l *TestLoggerJSON) Log(m *Message) error { return l.Encode(m) } +func (l *TestLoggerJSON) Log(m *Message) error { + if l.delay > 0 { + time.Sleep(l.delay) + } + return l.Encode(m) +} func (l *TestLoggerJSON) Close() error { return nil } @@ -94,3 +100,33 @@ func TestCopier(t *testing.T) { } } } + +func TestCopierSlow(t *testing.T) { + stdoutLine := "Line that thinks that it is log line from docker stdout" + var stdout bytes.Buffer + for i := 0; i < 30; i++ { + if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil { + t.Fatal(err) + } + } + + var jsonBuf bytes.Buffer + //encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)} + jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond} + + cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657" + c := NewCopier(cid, map[string]io.Reader{"stdout": &stdout}, jsonLog) + c.Run() + wait := make(chan struct{}) + go func() { + c.Wait() + close(wait) + }() + <-time.After(150 * time.Millisecond) + c.Close() + select { + case <-time.After(200 * time.Millisecond): + t.Fatalf("failed to exit in time after the copier is closed") + case <-wait: + } +}