From 84e14754e1ef3b089442398a31c5c5813fa9a1b6 Mon Sep 17 00:00:00 2001 From: Daniel Dao Date: Fri, 15 Jan 2016 13:42:23 +0000 Subject: [PATCH] only close LogDriver after LogCopier is done this prevents the copier from sending messages in the buffer to the closed driver. If the copied took longer than the timeout to drain the buffer, this aborts the copier read loop and return back so we can cleanup resources properly. Signed-off-by: Daniel Dao --- container/monitor.go | 3 ++ daemon/logger/copier.go | 53 +++++++++++++++++++++++------------- daemon/logger/copier_test.go | 38 +++++++++++++++++++++++++- 3 files changed, 74 insertions(+), 20 deletions(-) diff --git a/container/monitor.go b/container/monitor.go index 2f3368f405..09b447d947 100644 --- a/container/monitor.go +++ b/container/monitor.go @@ -369,6 +369,9 @@ func (m *containerMonitor) resetContainer(lock bool) { select { case <-time.After(loggerCloseTimeout): logrus.Warnf("Logger didn't exit in time: logs may be truncated") + container.LogCopier.Close() + // always waits for the LogCopier to finished before closing + <-exit case <-exit: } } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 95cd9eb355..436c0a8f9c 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -20,14 +20,16 @@ type Copier struct { srcs map[string]io.Reader dst Logger copyJobs sync.WaitGroup + closed chan struct{} } // NewCopier creates a new Copier func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) *Copier { return &Copier{ - cid: cid, - srcs: srcs, - dst: dst, + cid: cid, + srcs: srcs, + dst: dst, + closed: make(chan struct{}), } } @@ -44,24 +46,28 @@ func (c *Copier) copySrc(name string, src io.Reader) { reader := bufio.NewReader(src) for { - line, err := reader.ReadBytes('\n') - line = bytes.TrimSuffix(line, []byte{'\n'}) - - // ReadBytes can return full or partial output even when it failed. - // e.g. it can return a full entry and EOF. - if err == nil || len(line) > 0 { - if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { - logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) - } - } - - if err != nil { - if err != io.EOF { - logrus.Errorf("Error scanning log stream: %s", err) - } + select { + case <-c.closed: return - } + default: + line, err := reader.ReadBytes('\n') + line = bytes.TrimSuffix(line, []byte{'\n'}) + // ReadBytes can return full or partial output even when it failed. + // e.g. it can return a full entry and EOF. + if err == nil || len(line) > 0 { + if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { + logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) + } + } + + if err != nil { + if err != io.EOF { + logrus.Errorf("Error scanning log stream: %s", err) + } + return + } + } } } @@ -69,3 +75,12 @@ func (c *Copier) copySrc(name string, src io.Reader) { func (c *Copier) Wait() { c.copyJobs.Wait() } + +// Close closes the copier +func (c *Copier) Close() { + select { + case <-c.closed: + default: + close(c.closed) + } +} diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index f8980b3615..30239f06bc 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -10,9 +10,15 @@ import ( type TestLoggerJSON struct { *json.Encoder + delay time.Duration } -func (l *TestLoggerJSON) Log(m *Message) error { return l.Encode(m) } +func (l *TestLoggerJSON) Log(m *Message) error { + if l.delay > 0 { + time.Sleep(l.delay) + } + return l.Encode(m) +} func (l *TestLoggerJSON) Close() error { return nil } @@ -94,3 +100,33 @@ func TestCopier(t *testing.T) { } } } + +func TestCopierSlow(t *testing.T) { + stdoutLine := "Line that thinks that it is log line from docker stdout" + var stdout bytes.Buffer + for i := 0; i < 30; i++ { + if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil { + t.Fatal(err) + } + } + + var jsonBuf bytes.Buffer + //encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)} + jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond} + + cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657" + c := NewCopier(cid, map[string]io.Reader{"stdout": &stdout}, jsonLog) + c.Run() + wait := make(chan struct{}) + go func() { + c.Wait() + close(wait) + }() + <-time.After(150 * time.Millisecond) + c.Close() + select { + case <-time.After(200 * time.Millisecond): + t.Fatalf("failed to exit in time after the copier is closed") + case <-wait: + } +}