From ae5f664f4e62b8235a9b37ad0f33c2f1748aaa32 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Wed, 2 Mar 2022 18:16:47 -0500 Subject: [PATCH] daemon/logger: open log reader synchronously The asynchronous startup of the log-reading goroutine made the follow-tail tests nondeterministic. The Log calls in the tests which were supposed to happen after the reader started reading would sometimes execute before the reader, throwing off the counts. Tweak the ReadLogs implementation so that the order of operations is deterministic. Signed-off-by: Cory Snider --- daemon/logger/jsonfilelog/jsonfilelog.go | 28 +++-------- daemon/logger/jsonfilelog/read.go | 19 +------ daemon/logger/local/local.go | 20 +------- daemon/logger/local/read.go | 19 +------ daemon/logger/loggerutils/logfile.go | 29 ++++++++++- daemon/logger/loggerutils/logfile_test.go | 60 +++++++++++------------ 6 files changed, 66 insertions(+), 109 deletions(-) diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 385f63abea..8ef82bbc33 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "strconv" - "sync" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog" @@ -22,11 +21,8 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - mu sync.Mutex - closed bool - writer *loggerutils.LogFile - readers map[*logger.LogWatcher]struct{} // stores the active log followers - tag string // tag values requested by the user to log + writer *loggerutils.LogFile + tag string // tag values requested by the user to log } func init() { @@ -115,18 +111,14 @@ func New(info logger.Info) (logger.Logger, error) { } return &JSONFileLogger{ - writer: writer, - readers: make(map[*logger.LogWatcher]struct{}), - tag: tag, + writer: writer, + tag: tag, }, nil } // Log converts logger.Message to jsonlog.JSONLog and serializes it to file. func (l *JSONFileLogger) Log(msg *logger.Message) error { - l.mu.Lock() - err := l.writer.WriteLogEntry(msg) - l.mu.Unlock() - return err + return l.writer.WriteLogEntry(msg) } func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { @@ -169,15 +161,7 @@ func ValidateLogOpt(cfg map[string]string) error { // Close closes underlying file and signals all the readers // that the logs producer is gone. func (l *JSONFileLogger) Close() error { - l.mu.Lock() - l.closed = true - err := l.writer.Close() - for r := range l.readers { - r.ProducerGone() - delete(l.readers, r) - } - l.mu.Unlock() - return err + return l.writer.Close() } // Name returns name of this logger. diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 5099a0b947..17e2438018 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -18,24 +18,7 @@ const maxJSONDecodeRetry = 20000 // ReadLogs implements the logger's LogReader interface for the logs // created by this driver. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - logWatcher := logger.NewLogWatcher() - - go l.readLogs(logWatcher, config) - return logWatcher -} - -func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { - defer close(watcher.Msg) - - l.mu.Lock() - l.readers[watcher] = struct{}{} - l.mu.Unlock() - - l.writer.ReadLogs(config, watcher) - - l.mu.Lock() - delete(l.readers, watcher) - l.mu.Unlock() + return l.writer.ReadLogs(config) } func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { diff --git a/daemon/logger/local/local.go b/daemon/logger/local/local.go index 169d2d38b8..fe1ef083f7 100644 --- a/daemon/logger/local/local.go +++ b/daemon/logger/local/local.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "io" "strconv" - "sync" "time" "github.com/docker/docker/api/types/backend" @@ -56,10 +55,7 @@ func init() { } type driver struct { - mu sync.Mutex - closed bool logfile *loggerutils.LogFile - readers map[*logger.LogWatcher]struct{} // stores the active log followers } // New creates a new local logger @@ -145,7 +141,6 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) { } return &driver{ logfile: lf, - readers: make(map[*logger.LogWatcher]struct{}), }, nil } @@ -154,22 +149,11 @@ func (d *driver) Name() string { } func (d *driver) Log(msg *logger.Message) error { - d.mu.Lock() - err := d.logfile.WriteLogEntry(msg) - d.mu.Unlock() - return err + return d.logfile.WriteLogEntry(msg) } func (d *driver) Close() error { - d.mu.Lock() - d.closed = true - err := d.logfile.Close() - for r := range d.readers { - r.ProducerGone() - delete(d.readers, r) - } - d.mu.Unlock() - return err + return d.logfile.Close() } func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) { diff --git a/daemon/logger/local/read.go b/daemon/logger/local/read.go index d517995cff..23b461f37d 100644 --- a/daemon/logger/local/read.go +++ b/daemon/logger/local/read.go @@ -19,24 +19,7 @@ import ( const maxMsgLen int = 1e6 // 1MB. func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - logWatcher := logger.NewLogWatcher() - - go d.readLogs(logWatcher, config) - return logWatcher -} - -func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { - defer close(watcher.Msg) - - d.mu.Lock() - d.readers[watcher] = struct{}{} - d.mu.Unlock() - - d.logfile.ReadLogs(config, watcher) - - d.mu.Lock() - delete(d.readers, watcher) - d.mu.Unlock() + return d.logfile.ReadLogs(config) } func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 9319ce5818..fe07f2a8ea 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -87,6 +87,7 @@ type LogFile struct { lastTimestamp time.Time // timestamp of the last log filesRefCounter refCounter // keep reference-counted of decompressed files notifyReaders *pubsub.Publisher + readers map[*logger.LogWatcher]struct{} // stores the active log followers marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc @@ -141,6 +142,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar compress: compress, filesRefCounter: refCounter{counter: make(map[string]int)}, notifyReaders: pubsub.NewPublisher(0, 1), + readers: make(map[*logger.LogWatcher]struct{}), marshal: marshaller, createDecoder: decodeFunc, perms: perms, @@ -342,6 +344,10 @@ func (w *LogFile) Close() error { if w.closed { return nil } + for r := range w.readers { + r.ProducerGone() + delete(w.readers, r) + } if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { return err } @@ -353,8 +359,29 @@ func (w *LogFile) Close() error { // // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations. -func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { +func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { + watcher := logger.NewLogWatcher() + w.mu.Lock() + w.readers[watcher] = struct{}{} + w.mu.Unlock() + + // Lock before starting the reader goroutine to synchronize operations + // for race-free unit testing. The writer is locked out until the reader + // has opened the log file and set the read cursor to the current + // position. w.mu.RLock() + go w.readLogsLocked(config, watcher) + return watcher +} + +func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) { + defer func() { + close(watcher.Msg) + w.mu.Lock() + delete(w.readers, watcher) + w.mu.Unlock() + }() + currentFile, err := open(w.f.Name()) if err != nil { w.mu.RUnlock() diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index e816d782cf..d9857bb52f 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -15,7 +15,6 @@ import ( "time" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/pubsub" "github.com/docker/docker/pkg/tailfile" "gotest.tools/v3/assert" "gotest.tools/v3/poll" @@ -247,45 +246,43 @@ func TestFollowLogsProducerGone(t *testing.T) { } func TestCheckCapacityAndRotate(t *testing.T) { - dir, err := os.MkdirTemp("", t.Name()) - assert.NilError(t, err) - defer os.RemoveAll(dir) + dir := t.TempDir() - f, err := os.CreateTemp(dir, "log") - assert.NilError(t, err) - - l := &LogFile{ - f: f, - capacity: 5, - maxFiles: 3, - compress: true, - notifyReaders: pubsub.NewPublisher(0, 1), - perms: 0600, - filesRefCounter: refCounter{counter: make(map[string]int)}, - getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { - return tailfile.NewTailReader(ctx, r, lines) - }, - createDecoder: func(io.Reader) Decoder { - return dummyDecoder{} - }, - marshal: func(msg *logger.Message) ([]byte, error) { - return msg.Line, nil - }, + logPath := filepath.Join(dir, "log") + getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + return tailfile.NewTailReader(ctx, r, lines) } + createDecoder := func(io.Reader) Decoder { + return dummyDecoder{} + } + marshal := func(msg *logger.Message) ([]byte, error) { + return msg.Line, nil + } + l, err := NewLogFile( + logPath, + 5, // capacity + 3, // maxFiles + true, // compress + marshal, + createDecoder, + 0600, // perms + getTailReader, + ) + assert.NilError(t, err) defer l.Close() ls := dirStringer{dir} assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - _, err = os.Stat(f.Name() + ".1") + _, err = os.Stat(logPath + ".1") assert.Assert(t, os.IsNotExist(err), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) - poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) t.Run("closed log file", func(t *testing.T) { // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere @@ -293,14 +290,13 @@ func TestCheckCapacityAndRotate(t *testing.T) { // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation. l.f.Close() assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - assert.NilError(t, os.Remove(f.Name()+".2.gz")) + assert.NilError(t, os.Remove(logPath+".2.gz")) }) t.Run("with log reader", func(t *testing.T) { // Make sure rotate works with an active reader - lw := logger.NewLogWatcher() + lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}) defer lw.ConsumerGone() - go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls) // make sure the log reader is primed @@ -310,7 +306,7 @@ func TestCheckCapacityAndRotate(t *testing.T) { assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls) - poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) }) }