only close LogDriver after LogCopier is done

this prevents the copier from sending messages in the buffer to the closed
driver. If the copied took longer than the timeout to drain the buffer, this
aborts the copier read loop and return back so we can cleanup resources
properly.

Signed-off-by: Daniel Dao <dqminh@cloudflare.com>
This commit is contained in:
Daniel Dao 2016-01-15 13:42:23 +00:00
parent 012a3b6e74
commit 84e14754e1
3 changed files with 74 additions and 20 deletions

View File

@ -369,6 +369,9 @@ func (m *containerMonitor) resetContainer(lock bool) {
select {
case <-time.After(loggerCloseTimeout):
logrus.Warnf("Logger didn't exit in time: logs may be truncated")
container.LogCopier.Close()
// always waits for the LogCopier to finished before closing
<-exit
case <-exit:
}
}

View File

@ -20,14 +20,16 @@ type Copier struct {
srcs map[string]io.Reader
dst Logger
copyJobs sync.WaitGroup
closed chan struct{}
}
// NewCopier creates a new Copier
func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) *Copier {
return &Copier{
cid: cid,
srcs: srcs,
dst: dst,
cid: cid,
srcs: srcs,
dst: dst,
closed: make(chan struct{}),
}
}
@ -44,24 +46,28 @@ func (c *Copier) copySrc(name string, src io.Reader) {
reader := bufio.NewReader(src)
for {
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})
// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
}
if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
}
select {
case <-c.closed:
return
}
default:
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})
// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
}
if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
}
return
}
}
}
}
@ -69,3 +75,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
func (c *Copier) Wait() {
c.copyJobs.Wait()
}
// Close closes the copier
func (c *Copier) Close() {
select {
case <-c.closed:
default:
close(c.closed)
}
}

View File

@ -10,9 +10,15 @@ import (
type TestLoggerJSON struct {
*json.Encoder
delay time.Duration
}
func (l *TestLoggerJSON) Log(m *Message) error { return l.Encode(m) }
func (l *TestLoggerJSON) Log(m *Message) error {
if l.delay > 0 {
time.Sleep(l.delay)
}
return l.Encode(m)
}
func (l *TestLoggerJSON) Close() error { return nil }
@ -94,3 +100,33 @@ func TestCopier(t *testing.T) {
}
}
}
func TestCopierSlow(t *testing.T) {
stdoutLine := "Line that thinks that it is log line from docker stdout"
var stdout bytes.Buffer
for i := 0; i < 30; i++ {
if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
t.Fatal(err)
}
}
var jsonBuf bytes.Buffer
//encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}
cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657"
c := NewCopier(cid, map[string]io.Reader{"stdout": &stdout}, jsonLog)
c.Run()
wait := make(chan struct{})
go func() {
c.Wait()
close(wait)
}()
<-time.After(150 * time.Millisecond)
c.Close()
select {
case <-time.After(200 * time.Millisecond):
t.Fatalf("failed to exit in time after the copier is closed")
case <-wait:
}
}