From 94a10150f64a24793216f5324a34e62be3f31a2d Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 5 Apr 2018 12:39:28 -0400 Subject: [PATCH] Decouple logfile from tailfile. This makes it so consumers of `LogFile` should pass in how to get an io.Reader to the requested number of lines to tail. This is also much more efficient when tailing a large number of lines. Signed-off-by: Brian Goff --- daemon/logger/jsonfilelog/jsonfilelog.go | 2 +- daemon/logger/jsonfilelog/jsonfilelog_test.go | 47 ++++--- daemon/logger/jsonfilelog/read.go | 14 +- daemon/logger/jsonfilelog/read_test.go | 30 +++++ daemon/logger/loggerutils/logfile.go | 121 +++++++++++++----- daemon/logger/loggerutils/logfile_test.go | 76 +++++++++++ 6 files changed, 238 insertions(+), 52 deletions(-) create mode 100644 daemon/logger/loggerutils/logfile_test.go diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index b806a5ad17..3649bdf91c 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -110,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) { return b, nil } - writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640) + writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640, getTailReader) if err != nil { return nil, err } diff --git a/daemon/logger/jsonfilelog/jsonfilelog_test.go b/daemon/logger/jsonfilelog/jsonfilelog_test.go index 22bbcf2eb7..8e66e6455a 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog_test.go +++ b/daemon/logger/jsonfilelog/jsonfilelog_test.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "encoding/json" + "fmt" "io/ioutil" "os" "path/filepath" @@ -107,7 +108,10 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) { ContainerID: "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657", LogPath: tmp.Join("container.log"), Config: map[string]string{ - "labels": "first,second", + "labels": "first,second", + "max-file": "10", + "compress": "true", + "max-size": "20m", }, ContainerLabels: map[string]string{ "first": "label_value", @@ -117,21 +121,34 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) { assert.NilError(b, err) defer jsonlogger.Close() - msg := &logger.Message{ - Line: []byte("Line that thinks that it is log line from docker\n"), - Source: "stderr", - Timestamp: time.Now().UTC(), - } + t := time.Now().UTC() + for _, data := range [][]byte{ + []byte(""), + []byte("a short string"), + bytes.Repeat([]byte("a long string"), 100), + bytes.Repeat([]byte("a really long string"), 10000), + } { + b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) { + testMsg := &logger.Message{ + Line: data, + Source: "stderr", + Timestamp: t, + } - buf := bytes.NewBuffer(nil) - assert.NilError(b, marshalMessage(msg, nil, buf)) - b.SetBytes(int64(buf.Len())) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := jsonlogger.Log(msg); err != nil { - b.Fatal(err) - } + buf := bytes.NewBuffer(nil) + assert.NilError(b, marshalMessage(testMsg, nil, buf)) + b.SetBytes(int64(buf.Len())) + b.ResetTimer() + for i := 0; i < b.N; i++ { + msg := logger.NewMessage() + msg.Line = testMsg.Line + msg.Timestamp = testMsg.Timestamp + msg.Source = testMsg.Source + if err := jsonlogger.Log(msg); err != nil { + b.Fatal(err) + } + } + }) } } diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index ab1793bb72..12f676bb1a 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -1,12 +1,16 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog" import ( + "context" "encoding/json" "io" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog" + "github.com/docker/docker/daemon/logger/loggerutils" + "github.com/docker/docker/pkg/tailfile" + "github.com/sirupsen/logrus" ) const maxJSONDecodeRetry = 20000 @@ -63,14 +67,14 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) { return func() (msg *logger.Message, err error) { for retries := 0; retries < maxJSONDecodeRetry; retries++ { msg, err = decodeLogLine(dec, l) - if err == nil { + if err == nil || err == io.EOF { break } + logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json") // try again, could be due to a an incomplete json object as we read if _, ok := err.(*json.SyntaxError); ok { dec = json.NewDecoder(rdr) - retries++ continue } @@ -81,9 +85,13 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) { if err == io.ErrUnexpectedEOF { reader := io.MultiReader(dec.Buffered(), rdr) dec = json.NewDecoder(reader) - retries++ + continue } } return msg, err } } + +func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { + return tailfile.NewTailReader(ctx, r, req) +} diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index cad8003e5e..6ce4936e0e 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo import ( "bytes" + "io" "testing" "time" @@ -62,3 +63,32 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) { } } } + +func TestEncodeDecode(t *testing.T) { + t.Parallel() + + m1 := &logger.Message{Line: []byte("hello 1"), Timestamp: time.Now(), Source: "stdout"} + m2 := &logger.Message{Line: []byte("hello 2"), Timestamp: time.Now(), Source: "stdout"} + m3 := &logger.Message{Line: []byte("hello 3"), Timestamp: time.Now(), Source: "stdout"} + + buf := bytes.NewBuffer(nil) + assert.Assert(t, marshalMessage(m1, nil, buf)) + assert.Assert(t, marshalMessage(m2, nil, buf)) + assert.Assert(t, marshalMessage(m3, nil, buf)) + + decode := decodeFunc(buf) + msg, err := decode() + assert.Assert(t, err) + assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line)) + + msg, err = decode() + assert.Assert(t, err) + assert.Assert(t, string(msg.Line) == "hello 2\n") + + msg, err = decode() + assert.Assert(t, err) + assert.Assert(t, string(msg.Line) == "hello 3\n") + + _, err = decode() + assert.Assert(t, err == io.EOF) +} diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index fe338e53e8..2b2ac422bf 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -1,7 +1,6 @@ package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" import ( - "bytes" "compress/gzip" "context" "encoding/json" @@ -14,11 +13,9 @@ import ( "time" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/daemon/logger/loggerutils/multireader" "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pubsub" - "github.com/docker/docker/pkg/tailfile" "github.com/fsnotify/fsnotify" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -92,13 +89,27 @@ type LogFile struct { notifyRotate *pubsub.Publisher marshal logger.MarshalFunc createDecoder makeDecoderFunc + getTailReader GetTailReaderFunc perms os.FileMode } type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error) +// SizeReaderAt defines a ReaderAt that also reports its size. +// This is used for tailing log files. +type SizeReaderAt interface { + io.ReaderAt + Size() int64 +} + +// GetTailReaderFunc is used to truncate a reader to only read as much as is required +// in order to get the passed in number of log lines. +// It returns the sectioned reader, the number of lines that the section reader +// contains, and any error that occurs. +type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error) + // NewLogFile creates new LogFile -func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) { +func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) if err != nil { return nil, err @@ -120,6 +131,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar marshal: marshaller, createDecoder: decodeFunc, perms: perms, + getTailReader: getTailReader, }, nil } @@ -309,34 +321,46 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) } if config.Tail != 0 { + // TODO(@cpuguy83): Instead of opening every file, only get the files which + // are needed to tail. + // This is especially costly when compression is enabled. files, err := w.openRotatedFiles(config) + w.mu.RUnlock() if err != nil { - w.mu.RUnlock() watcher.Err <- err return } - w.mu.RUnlock() - seekers := make([]io.ReadSeeker, 0, len(files)+1) - for _, f := range files { - seekers = append(seekers, f) - } - if currentChunk.Size() > 0 { - seekers = append(seekers, currentChunk) - } - if len(seekers) > 0 { - tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config) - } - for _, f := range files { - f.Close() - fileName := f.Name() - if strings.HasSuffix(fileName, tmpLogfileSuffix) { - err := w.filesRefCounter.Dereference(fileName) - if err != nil { - logrus.Errorf("Failed to dereference log file %q: %v", fileName, err) + + closeFiles := func() { + for _, f := range files { + f.Close() + fileName := f.Name() + if strings.HasSuffix(fileName, tmpLogfileSuffix) { + err := w.filesRefCounter.Dereference(fileName) + if err != nil { + logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err) + } } } } + readers := make([]SizeReaderAt, 0, len(files)+1) + for _, f := range files { + stat, err := f.Stat() + if err != nil { + watcher.Err <- errors.Wrap(err, "error reading size of rotated file") + closeFiles() + return + } + readers = append(readers, io.NewSectionReader(f, 0, stat.Size())) + } + if currentChunk.Size() > 0 { + readers = append(readers, currentChunk) + } + + tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config) + closeFiles() + w.mu.RLock() } @@ -454,19 +478,39 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) { return io.NewSectionReader(f, 0, size), nil } -type decodeFunc func() (*logger.Message, error) +func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) { + nLines := config.Tail -func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) { - var rdr io.Reader = f - if config.Tail > 0 { - ls, err := tailfile.TailFile(f, config.Tail) - if err != nil { - watcher.Err <- err - return + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. + go func() { + select { + case <-ctx.Done(): + case <-watcher.WatchClose(): + cancel() + } + }() + + readers := make([]io.Reader, 0, len(files)) + + if config.Tail > 0 { + for i := len(files) - 1; i >= 0 && nLines > 0; i-- { + tail, n, err := getTailReader(ctx, files[i], nLines) + if err != nil { + watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing") + return + } + nLines -= n + readers = append([]io.Reader{tail}, readers...) + } + } else { + for _, r := range files { + readers = append(readers, &wrappedReaderAt{ReaderAt: r}) } - rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) } + rdr := io.MultiReader(readers...) decodeLogLine := createDecoder(rdr) for { msg, err := decodeLogLine() @@ -483,7 +527,7 @@ func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDec return } select { - case <-watcher.WatchClose(): + case <-ctx.Done(): return case watcher.Msg <- msg: } @@ -664,3 +708,14 @@ func watchFile(name string) (filenotify.FileWatcher, error) { } return fileWatcher, nil } + +type wrappedReaderAt struct { + io.ReaderAt + pos int64 +} + +func (r *wrappedReaderAt) Read(p []byte) (int, error) { + n, err := r.ReaderAt.ReadAt(p, r.pos) + r.pos += int64(n) + return n, err +} diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go new file mode 100644 index 0000000000..0e359db3f8 --- /dev/null +++ b/daemon/logger/loggerutils/logfile_test.go @@ -0,0 +1,76 @@ +package loggerutils + +import ( + "bufio" + "context" + "io" + "strings" + "testing" + "time" + + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/pkg/tailfile" + "gotest.tools/assert" +) + +func TestTailFiles(t *testing.T) { + s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n") + s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n") + s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n") + + files := []SizeReaderAt{s1, s2, s3} + watcher := logger.NewLogWatcher() + createDecoder := func(r io.Reader) func() (*logger.Message, error) { + scanner := bufio.NewScanner(r) + return func() (*logger.Message, error) { + if !scanner.Scan() { + return nil, scanner.Err() + } + // some comment + return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil + } + } + + tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + return tailfile.NewTailReader(ctx, r, lines) + } + + for desc, config := range map[string]logger.ReadConfig{} { + t.Run(desc, func(t *testing.T) { + started := make(chan struct{}) + go func() { + close(started) + tailFiles(files, watcher, createDecoder, tailReader, config) + }() + <-started + }) + } + + config := logger.ReadConfig{Tail: 2} + started := make(chan struct{}) + go func() { + close(started) + tailFiles(files, watcher, createDecoder, tailReader, config) + }() + <-started + + select { + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for tail line") + case err := <-watcher.Err: + assert.Assert(t, err) + case msg := <-watcher.Msg: + assert.Assert(t, msg != nil) + assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line)) + } + + select { + case <-time.After(60 * time.Second): + t.Fatal("timeout waiting for tail line") + case err := <-watcher.Err: + assert.Assert(t, err) + case msg := <-watcher.Msg: + assert.Assert(t, msg != nil) + assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line)) + } +}