diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 385f63abea..8ef82bbc33 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "strconv" - "sync" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog" @@ -22,11 +21,8 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - mu sync.Mutex - closed bool - writer *loggerutils.LogFile - readers map[*logger.LogWatcher]struct{} // stores the active log followers - tag string // tag values requested by the user to log + writer *loggerutils.LogFile + tag string // tag values requested by the user to log } func init() { @@ -115,18 +111,14 @@ func New(info logger.Info) (logger.Logger, error) { } return &JSONFileLogger{ - writer: writer, - readers: make(map[*logger.LogWatcher]struct{}), - tag: tag, + writer: writer, + tag: tag, }, nil } // Log converts logger.Message to jsonlog.JSONLog and serializes it to file. func (l *JSONFileLogger) Log(msg *logger.Message) error { - l.mu.Lock() - err := l.writer.WriteLogEntry(msg) - l.mu.Unlock() - return err + return l.writer.WriteLogEntry(msg) } func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { @@ -169,15 +161,7 @@ func ValidateLogOpt(cfg map[string]string) error { // Close closes underlying file and signals all the readers // that the logs producer is gone. func (l *JSONFileLogger) Close() error { - l.mu.Lock() - l.closed = true - err := l.writer.Close() - for r := range l.readers { - r.ProducerGone() - delete(l.readers, r) - } - l.mu.Unlock() - return err + return l.writer.Close() } // Name returns name of this logger. diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 5099a0b947..17e2438018 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -18,24 +18,7 @@ const maxJSONDecodeRetry = 20000 // ReadLogs implements the logger's LogReader interface for the logs // created by this driver. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - logWatcher := logger.NewLogWatcher() - - go l.readLogs(logWatcher, config) - return logWatcher -} - -func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { - defer close(watcher.Msg) - - l.mu.Lock() - l.readers[watcher] = struct{}{} - l.mu.Unlock() - - l.writer.ReadLogs(config, watcher) - - l.mu.Lock() - delete(l.readers, watcher) - l.mu.Unlock() + return l.writer.ReadLogs(config) } func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { diff --git a/daemon/logger/local/local.go b/daemon/logger/local/local.go index 169d2d38b8..fe1ef083f7 100644 --- a/daemon/logger/local/local.go +++ b/daemon/logger/local/local.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "io" "strconv" - "sync" "time" "github.com/docker/docker/api/types/backend" @@ -56,10 +55,7 @@ func init() { } type driver struct { - mu sync.Mutex - closed bool logfile *loggerutils.LogFile - readers map[*logger.LogWatcher]struct{} // stores the active log followers } // New creates a new local logger @@ -145,7 +141,6 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) { } return &driver{ logfile: lf, - readers: make(map[*logger.LogWatcher]struct{}), }, nil } @@ -154,22 +149,11 @@ func (d *driver) Name() string { } func (d *driver) Log(msg *logger.Message) error { - d.mu.Lock() - err := d.logfile.WriteLogEntry(msg) - d.mu.Unlock() - return err + return d.logfile.WriteLogEntry(msg) } func (d *driver) Close() error { - d.mu.Lock() - d.closed = true - err := d.logfile.Close() - for r := range d.readers { - r.ProducerGone() - delete(d.readers, r) - } - d.mu.Unlock() - return err + return d.logfile.Close() } func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) { diff --git a/daemon/logger/local/read.go b/daemon/logger/local/read.go index d517995cff..23b461f37d 100644 --- a/daemon/logger/local/read.go +++ b/daemon/logger/local/read.go @@ -19,24 +19,7 @@ import ( const maxMsgLen int = 1e6 // 1MB. func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - logWatcher := logger.NewLogWatcher() - - go d.readLogs(logWatcher, config) - return logWatcher -} - -func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { - defer close(watcher.Msg) - - d.mu.Lock() - d.readers[watcher] = struct{}{} - d.mu.Unlock() - - d.logfile.ReadLogs(config, watcher) - - d.mu.Lock() - delete(d.readers, watcher) - d.mu.Unlock() + return d.logfile.ReadLogs(config) } func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index 9319ce5818..fe07f2a8ea 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -87,6 +87,7 @@ type LogFile struct { lastTimestamp time.Time // timestamp of the last log filesRefCounter refCounter // keep reference-counted of decompressed files notifyReaders *pubsub.Publisher + readers map[*logger.LogWatcher]struct{} // stores the active log followers marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc @@ -141,6 +142,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar compress: compress, filesRefCounter: refCounter{counter: make(map[string]int)}, notifyReaders: pubsub.NewPublisher(0, 1), + readers: make(map[*logger.LogWatcher]struct{}), marshal: marshaller, createDecoder: decodeFunc, perms: perms, @@ -342,6 +344,10 @@ func (w *LogFile) Close() error { if w.closed { return nil } + for r := range w.readers { + r.ProducerGone() + delete(w.readers, r) + } if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { return err } @@ -353,8 +359,29 @@ func (w *LogFile) Close() error { // // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations. -func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { +func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { + watcher := logger.NewLogWatcher() + w.mu.Lock() + w.readers[watcher] = struct{}{} + w.mu.Unlock() + + // Lock before starting the reader goroutine to synchronize operations + // for race-free unit testing. The writer is locked out until the reader + // has opened the log file and set the read cursor to the current + // position. w.mu.RLock() + go w.readLogsLocked(config, watcher) + return watcher +} + +func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) { + defer func() { + close(watcher.Msg) + w.mu.Lock() + delete(w.readers, watcher) + w.mu.Unlock() + }() + currentFile, err := open(w.f.Name()) if err != nil { w.mu.RUnlock() diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index e816d782cf..d9857bb52f 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -15,7 +15,6 @@ import ( "time" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/pubsub" "github.com/docker/docker/pkg/tailfile" "gotest.tools/v3/assert" "gotest.tools/v3/poll" @@ -247,45 +246,43 @@ func TestFollowLogsProducerGone(t *testing.T) { } func TestCheckCapacityAndRotate(t *testing.T) { - dir, err := os.MkdirTemp("", t.Name()) - assert.NilError(t, err) - defer os.RemoveAll(dir) + dir := t.TempDir() - f, err := os.CreateTemp(dir, "log") - assert.NilError(t, err) - - l := &LogFile{ - f: f, - capacity: 5, - maxFiles: 3, - compress: true, - notifyReaders: pubsub.NewPublisher(0, 1), - perms: 0600, - filesRefCounter: refCounter{counter: make(map[string]int)}, - getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { - return tailfile.NewTailReader(ctx, r, lines) - }, - createDecoder: func(io.Reader) Decoder { - return dummyDecoder{} - }, - marshal: func(msg *logger.Message) ([]byte, error) { - return msg.Line, nil - }, + logPath := filepath.Join(dir, "log") + getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { + return tailfile.NewTailReader(ctx, r, lines) } + createDecoder := func(io.Reader) Decoder { + return dummyDecoder{} + } + marshal := func(msg *logger.Message) ([]byte, error) { + return msg.Line, nil + } + l, err := NewLogFile( + logPath, + 5, // capacity + 3, // maxFiles + true, // compress + marshal, + createDecoder, + 0600, // perms + getTailReader, + ) + assert.NilError(t, err) defer l.Close() ls := dirStringer{dir} assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - _, err = os.Stat(f.Name() + ".1") + _, err = os.Stat(logPath + ".1") assert.Assert(t, os.IsNotExist(err), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) - poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) t.Run("closed log file", func(t *testing.T) { // Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere @@ -293,14 +290,13 @@ func TestCheckCapacityAndRotate(t *testing.T) { // We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation. l.f.Close() assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")})) - assert.NilError(t, os.Remove(f.Name()+".2.gz")) + assert.NilError(t, os.Remove(logPath+".2.gz")) }) t.Run("with log reader", func(t *testing.T) { // Make sure rotate works with an active reader - lw := logger.NewLogWatcher() + lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}) defer lw.ConsumerGone() - go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls) // make sure the log reader is primed @@ -310,7 +306,7 @@ func TestCheckCapacityAndRotate(t *testing.T) { assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls) assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls) - poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) + poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second)) }) }