diff --git a/daemon/logger/jsonfilelog/jsonlog/jsonlog.go b/daemon/logger/jsonfilelog/jsonlog/jsonlog.go index 74be8e7da0..050900197f 100644 --- a/daemon/logger/jsonfilelog/jsonlog/jsonlog.go +++ b/daemon/logger/jsonfilelog/jsonlog/jsonlog.go @@ -21,5 +21,7 @@ func (jl *JSONLog) Reset() { jl.Log = "" jl.Stream = "" jl.Created = time.Time{} - jl.Attrs = make(map[string]string) + for k := range jl.Attrs { + delete(jl.Attrs, k) + } } diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 12f676bb1a..cc4649903a 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -60,35 +60,65 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro return msg, nil } -// decodeFunc is used to create a decoder for the log file reader -func decodeFunc(rdr io.Reader) func() (*logger.Message, error) { - l := &jsonlog.JSONLog{} - dec := json.NewDecoder(rdr) - return func() (msg *logger.Message, err error) { - for retries := 0; retries < maxJSONDecodeRetry; retries++ { - msg, err = decodeLogLine(dec, l) - if err == nil || err == io.EOF { - break - } +type decoder struct { + rdr io.Reader + dec *json.Decoder + jl *jsonlog.JSONLog +} - logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json") - // try again, could be due to a an incomplete json object as we read - if _, ok := err.(*json.SyntaxError); ok { - dec = json.NewDecoder(rdr) - continue - } +func (d *decoder) Reset(rdr io.Reader) { + d.rdr = rdr + d.dec = nil + if d.jl != nil { + d.jl.Reset() + } +} - // io.ErrUnexpectedEOF is returned from json.Decoder when there is - // remaining data in the parser's buffer while an io.EOF occurs. - // If the json logger writes a partial json log entry to the disk - // while at the same time the decoder tries to decode it, the race condition happens. - if err == io.ErrUnexpectedEOF { - reader := io.MultiReader(dec.Buffered(), rdr) - dec = json.NewDecoder(reader) - continue - } +func (d *decoder) Close() { + d.dec = nil + d.rdr = nil + d.jl = nil +} + +func (d *decoder) Decode() (msg *logger.Message, err error) { + if d.dec == nil { + d.dec = json.NewDecoder(d.rdr) + } + if d.jl == nil { + d.jl = &jsonlog.JSONLog{} + } + for retries := 0; retries < maxJSONDecodeRetry; retries++ { + msg, err = decodeLogLine(d.dec, d.jl) + if err == nil || err == io.EOF { + break } - return msg, err + + logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json") + // try again, could be due to a an incomplete json object as we read + if _, ok := err.(*json.SyntaxError); ok { + d.dec = json.NewDecoder(d.rdr) + continue + } + + // io.ErrUnexpectedEOF is returned from json.Decoder when there is + // remaining data in the parser's buffer while an io.EOF occurs. + // If the json logger writes a partial json log entry to the disk + // while at the same time the decoder tries to decode it, the race condition happens. + if err == io.ErrUnexpectedEOF { + d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr) + d.dec = json.NewDecoder(d.rdr) + continue + } + } + return msg, err +} + +// decodeFunc is used to create a decoder for the log file reader +func decodeFunc(rdr io.Reader) loggerutils.Decoder { + return &decoder{ + rdr: rdr, + dec: nil, + jl: nil, } } diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index c15cfd453c..19536a1ff9 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -75,19 +75,21 @@ func TestEncodeDecode(t *testing.T) { assert.Assert(t, marshalMessage(m2, nil, buf)) assert.Assert(t, marshalMessage(m3, nil, buf)) - decode := decodeFunc(buf) - msg, err := decode() + dec := decodeFunc(buf) + defer dec.Close() + + msg, err := dec.Decode() assert.NilError(t, err) assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line)) - msg, err = decode() + msg, err = dec.Decode() assert.NilError(t, err) assert.Assert(t, string(msg.Line) == "hello 2\n") - msg, err = decode() + msg, err = dec.Decode() assert.NilError(t, err) assert.Assert(t, string(msg.Line) == "hello 3\n") - _, err = decode() + _, err = dec.Decode() assert.Assert(t, err == io.EOF) } diff --git a/daemon/logger/local/local_test.go b/daemon/logger/local/local_test.go index 48df6256a7..21ea8e6a82 100644 --- a/daemon/logger/local/local_test.go +++ b/daemon/logger/local/local_test.go @@ -1,21 +1,18 @@ package local import ( + "bytes" "context" "encoding/binary" + "fmt" + "io" "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" - "bytes" - "fmt" - - "strings" - - "io" - "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/daemon/logger" diff --git a/daemon/logger/local/read.go b/daemon/logger/local/read.go index a752de2a8d..b28aa90651 100644 --- a/daemon/logger/local/read.go +++ b/daemon/logger/local/read.go @@ -96,49 +96,79 @@ func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io return io.NewSectionReader(r, offset, size), found, nil } -func decodeFunc(rdr io.Reader) func() (*logger.Message, error) { - proto := &logdriver.LogEntry{} - buf := make([]byte, initialBufSize) +type decoder struct { + rdr io.Reader + proto *logdriver.LogEntry + buf []byte +} - return func() (*logger.Message, error) { - var ( - read int - err error - ) +func (d *decoder) Decode() (*logger.Message, error) { + if d.proto == nil { + d.proto = &logdriver.LogEntry{} + } else { + resetProto(d.proto) + } + if d.buf == nil { + d.buf = make([]byte, initialBufSize) + } + var ( + read int + err error + ) - resetProto(proto) - - for i := 0; i < maxDecodeRetry; i++ { - var n int - n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen]) - if err != nil { - if err != io.ErrUnexpectedEOF { - return nil, errors.Wrap(err, "error reading log message length") - } - read += n - continue + for i := 0; i < maxDecodeRetry; i++ { + var n int + n, err = io.ReadFull(d.rdr, d.buf[read:encodeBinaryLen]) + if err != nil { + if err != io.ErrUnexpectedEOF { + return nil, errors.Wrap(err, "error reading log message length") } read += n - break + continue } - if err != nil { - return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen) - } - - msgLen := int(binary.BigEndian.Uint32(buf[:read])) - - if len(buf) < msgLen+encodeBinaryLen { - buf = make([]byte, msgLen+encodeBinaryLen) - } else { - if msgLen <= initialBufSize { - buf = buf[:initialBufSize] - } else { - buf = buf[:msgLen+encodeBinaryLen] - } - } - - return decodeLogEntry(rdr, proto, buf, msgLen) + read += n + break } + if err != nil { + return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen) + } + + msgLen := int(binary.BigEndian.Uint32(d.buf[:read])) + + if len(d.buf) < msgLen+encodeBinaryLen { + d.buf = make([]byte, msgLen+encodeBinaryLen) + } else { + if msgLen <= initialBufSize { + d.buf = d.buf[:initialBufSize] + } else { + d.buf = d.buf[:msgLen+encodeBinaryLen] + } + } + + return decodeLogEntry(d.rdr, d.proto, d.buf, msgLen) +} + +func (d *decoder) Reset(rdr io.Reader) { + d.rdr = rdr + if d.proto != nil { + resetProto(d.proto) + } + if d.buf != nil { + d.buf = d.buf[:initialBufSize] + } +} + +func (d *decoder) Close() { + d.buf = d.buf[:0] + d.buf = nil + if d.proto != nil { + resetProto(d.proto) + } + d.rdr = nil +} + +func decodeFunc(rdr io.Reader) loggerutils.Decoder { + return &decoder{rdr: rdr} } func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) { diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 4048d18097..88e2b5f6bb 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -89,12 +89,25 @@ type LogFile struct { filesRefCounter refCounter // keep reference-counted of decompressed files notifyRotate *pubsub.Publisher marshal logger.MarshalFunc - createDecoder makeDecoderFunc + createDecoder MakeDecoderFn getTailReader GetTailReaderFunc perms os.FileMode } -type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error) +// MakeDecoderFn creates a decoder +type MakeDecoderFn func(rdr io.Reader) Decoder + +// Decoder is for reading logs +// It is created by the log reader by calling the `MakeDecoderFunc` +type Decoder interface { + // Reset resets the decoder + // Reset is called for certain events, such as log rotations + Reset(io.Reader) + // Decode decodes the next log messeage from the stream + Decode() (*logger.Message, error) + // Close signals to the decoder that it can release whatever resources it was using. + Close() +} // SizeReaderAt defines a ReaderAt that also reports its size. // This is used for tailing log files. @@ -110,7 +123,7 @@ type SizeReaderAt interface { type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error) // NewLogFile creates new LogFile -func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { +func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) if err != nil { return nil, err @@ -314,6 +327,9 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) } defer currentFile.Close() + dec := w.createDecoder(nil) + defer dec.Close() + currentChunk, err := newSectionReader(currentFile) if err != nil { w.mu.RUnlock() @@ -359,7 +375,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) readers = append(readers, currentChunk) } - tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config) + tailFiles(readers, watcher, dec, w.getTailReader, config) closeFiles() w.mu.RLock() @@ -373,7 +389,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) notifyRotate := w.notifyRotate.Subscribe() defer w.notifyRotate.Evict(notifyRotate) - followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until) + followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until) } func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) { @@ -479,7 +495,7 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) { return io.NewSectionReader(f, 0, size), nil } -func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) { +func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) { nLines := config.Tail ctx, cancel := context.WithCancel(context.Background()) @@ -512,9 +528,10 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m } rdr := io.MultiReader(readers...) - decodeLogLine := createDecoder(rdr) + dec.Reset(rdr) + for { - msg, err := decodeLogLine() + msg, err := dec.Decode() if err != nil { if errors.Cause(err) != io.EOF { watcher.Err <- err @@ -535,8 +552,8 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m } } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) { - decodeLogLine := createDecoder(f) +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) { + dec.Reset(f) name := f.Name() fileWatcher, err := watchFile(name) @@ -567,7 +584,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int if err := fileWatcher.Add(name); err != nil { return err } - decodeLogLine = createDecoder(f) + dec.Reset(f) return nil } @@ -578,7 +595,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int case e := <-fileWatcher.Events(): switch e.Op { case fsnotify.Write: - decodeLogLine = createDecoder(f) + dec.Reset(f) return nil case fsnotify.Rename, fsnotify.Remove: select { @@ -648,7 +665,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int // main loop for { - msg, err := decodeLogLine() + msg, err := dec.Decode() if err != nil { if err := handleDecodeErr(err); err != nil { if err == errDone { diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index 2d6c704c84..8e3ce1aa41 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -15,6 +15,32 @@ import ( "gotest.tools/v3/assert" ) +type testDecoder struct { + rdr io.Reader + scanner *bufio.Scanner +} + +func (d *testDecoder) Decode() (*logger.Message, error) { + if d.scanner == nil { + d.scanner = bufio.NewScanner(d.rdr) + } + if !d.scanner.Scan() { + return nil, d.scanner.Err() + } + // some comment + return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil +} + +func (d *testDecoder) Reset(rdr io.Reader) { + d.rdr = rdr + d.scanner = bufio.NewScanner(rdr) +} + +func (d *testDecoder) Close() { + d.rdr = nil + d.scanner = nil +} + func TestTailFiles(t *testing.T) { s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n") s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n") @@ -22,27 +48,18 @@ func TestTailFiles(t *testing.T) { files := []SizeReaderAt{s1, s2, s3} watcher := logger.NewLogWatcher() - createDecoder := func(r io.Reader) func() (*logger.Message, error) { - scanner := bufio.NewScanner(r) - return func() (*logger.Message, error) { - if !scanner.Scan() { - return nil, scanner.Err() - } - // some comment - return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil - } - } tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { return tailfile.NewTailReader(ctx, r, lines) } + dec := &testDecoder{} for desc, config := range map[string]logger.ReadConfig{} { t.Run(desc, func(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, createDecoder, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config) }() <-started }) @@ -52,7 +69,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, createDecoder, tailReader, config) + tailFiles(files, watcher, dec, tailReader, config) }() <-started @@ -77,6 +94,15 @@ func TestTailFiles(t *testing.T) { } } +type dummyDecoder struct{} + +func (dummyDecoder) Decode() (*logger.Message, error) { + return &logger.Message{}, nil +} + +func (dummyDecoder) Close() {} +func (dummyDecoder) Reset(io.Reader) {} + func TestFollowLogsConsumerGone(t *testing.T) { lw := logger.NewLogWatcher() @@ -87,16 +113,12 @@ func TestFollowLogsConsumerGone(t *testing.T) { os.Remove(f.Name()) }() - makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) { - return func() (*logger.Message, error) { - return &logger.Message{}, nil - } - } + dec := dummyDecoder{} followLogsDone := make(chan struct{}) var since, until time.Time go func() { - followLogs(f, lw, make(chan interface{}), makeDecoder, since, until) + followLogs(f, lw, make(chan interface{}), dec, since, until) close(followLogsDone) }() @@ -118,6 +140,18 @@ func TestFollowLogsConsumerGone(t *testing.T) { } } +type dummyWrapper struct { + dummyDecoder + fn func() error +} + +func (d *dummyWrapper) Decode() (*logger.Message, error) { + if err := d.fn(); err != nil { + return nil, err + } + return d.dummyDecoder.Decode() +} + func TestFollowLogsProducerGone(t *testing.T) { lw := logger.NewLogWatcher() @@ -126,25 +160,25 @@ func TestFollowLogsProducerGone(t *testing.T) { defer os.Remove(f.Name()) var sent, received, closed int - makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) { - return func() (*logger.Message, error) { - if closed == 1 { - closed++ - t.Logf("logDecode() closed after sending %d messages\n", sent) - return nil, io.EOF - } else if closed > 1 { - t.Fatal("logDecode() called after closing!") - return nil, io.EOF - } + dec := &dummyWrapper{fn: func() error { + switch closed { + case 0: sent++ - return &logger.Message{}, nil + return nil + case 1: + closed++ + t.Logf("logDecode() closed after sending %d messages\n", sent) + return io.EOF + default: + t.Fatal("logDecode() called after closing!") + return io.EOF } - } + }} var since, until time.Time followLogsDone := make(chan struct{}) go func() { - followLogs(f, lw, make(chan interface{}), makeDecoder, since, until) + followLogs(f, lw, make(chan interface{}), dec, since, until) close(followLogsDone) }()