Use bufio.Reader instead of bufio.Scanner for logger.Copier

When using a scanner, log lines over 64K will crash the Copier with
bufio.ErrTooLong. Subsequently, the ioutils.bufReader will grow without
bound as the logs are no longer being flushed to disk.

Signed-off-by: Burke Libbey <burke.libbey@shopify.com>
This commit is contained in:
Burke Libbey 2015-05-28 16:59:25 -04:00
parent 1f22676fcb
commit f779cfc5d8
No known key found for this signature in database
GPG Key ID: E893DEF914F22410
1 changed files with 21 additions and 7 deletions

View File

@ -2,6 +2,7 @@ package logger
import (
"bufio"
"bytes"
"io"
"sync"
"time"
@ -40,14 +41,27 @@ func (c *Copier) Run() {
func (c *Copier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
scanner := bufio.NewScanner(src)
for scanner.Scan() {
if err := c.dst.Log(&Message{ContainerID: c.cid, Line: scanner.Bytes(), Source: name, Timestamp: time.Now().UTC()}); err != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", scanner.Bytes(), c.dst.Name(), err)
reader := bufio.NewReader(src)
for {
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})
// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
}
}
if err := scanner.Err(); err != nil {
logrus.Errorf("Error scanning log stream: %s", err)
if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
}
return
}
}
}