diff --git a/daemon/logger/adapter_test.go b/daemon/logger/adapter_test.go index 1ca28de6d9..51fb475b1d 100644 --- a/daemon/logger/adapter_test.go +++ b/daemon/logger/adapter_test.go @@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) { t.Fatal("timeout waiting for message channel to close") } - lw.ProducerGone() + lw.ConsumerGone() lw = lr.ReadLogs(ReadConfig{Follow: true}) for _, x := range testMsg { diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index dd7414e2ee..3de3ca7e39 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -8,7 +8,6 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" import ( "fmt" "strconv" - "sync" "unicode" "github.com/coreos/go-systemd/v22/journal" @@ -19,9 +18,9 @@ import ( const name = "journald" type journald struct { - mu sync.Mutex //nolint:structcheck,unused - vars map[string]string // additional variables and values to send to the journal along with the log message - readers map[*logger.LogWatcher]struct{} + vars map[string]string // additional variables and values to send to the journal along with the log message + + closed chan struct{} } func init() { @@ -81,7 +80,7 @@ func New(info logger.Info) (logger.Logger, error) { for k, v := range extraAttrs { vars[k] = v } - return &journald{vars: vars, readers: make(map[*logger.LogWatcher]struct{})}, nil + return &journald{vars: vars, closed: make(chan struct{})}, nil } // We don't actually accept any options, but we have to supply a callback for @@ -128,3 +127,8 @@ func (s *journald) Log(msg *logger.Message) error { func (s *journald) Name() string { return name } + +func (s *journald) Close() error { + close(s.closed) + return nil +} diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 5a43ecf4ac..eb299e2c5e 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -116,16 +116,6 @@ import ( "github.com/sirupsen/logrus" ) -func (s *journald) Close() error { - s.mu.Lock() - for r := range s.readers { - r.ProducerGone() - delete(s.readers, r) - } - s.mu.Unlock() - return nil -} - // CErr converts error code returned from a sd_journal_* function // (which returns -errno) to a string func CErr(ret C.int) string { @@ -233,9 +223,7 @@ drain: } func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char { - s.mu.Lock() - s.readers[logWatcher] = struct{}{} - s.mu.Unlock() + defer close(logWatcher.Msg) waitTimeout := C.uint64_t(250000) // 0.25s @@ -243,12 +231,12 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, status := C.sd_journal_wait(j, waitTimeout) if status < 0 { logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status)) - goto cleanup + break } select { case <-logWatcher.WatchConsumerGone(): - goto cleanup // won't be able to write anything anymore - case <-logWatcher.WatchProducerGone(): + break // won't be able to write anything anymore + case <-s.closed: // container is gone, drain journal default: // container is still alive @@ -264,11 +252,6 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, } } -cleanup: - s.mu.Lock() - delete(s.readers, logWatcher) - s.mu.Unlock() - close(logWatcher.Msg) return cursor } diff --git a/daemon/logger/journald/read_unsupported.go b/daemon/logger/journald/read_unsupported.go deleted file mode 100644 index 199d7683a6..0000000000 --- a/daemon/logger/journald/read_unsupported.go +++ /dev/null @@ -1,8 +0,0 @@ -//go:build !linux || !cgo || static_build || !journald -// +build !linux !cgo static_build !journald - -package journald // import "github.com/docker/docker/daemon/logger/journald" - -func (s *journald) Close() error { - return nil -} diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index 9280746316..87068a8747 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -61,9 +61,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) { lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true}) for { select { - case <-lw.Msg: - case <-lw.WatchProducerGone(): - return + case _, ok := <-lw.Msg: + if !ok { + return + } case err := <-chError: b.Fatal(err) } diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 15899e07ed..05ea1b3af0 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -97,8 +97,6 @@ type LogWatcher struct { Msg chan *Message // For sending error messages that occur while reading logs. Err chan error - producerOnce sync.Once - producerGone chan struct{} consumerOnce sync.Once consumerGone chan struct{} } @@ -108,26 +106,10 @@ func NewLogWatcher() *LogWatcher { return &LogWatcher{ Msg: make(chan *Message, logWatcherBufferSize), Err: make(chan error, 1), - producerGone: make(chan struct{}), consumerGone: make(chan struct{}), } } -// ProducerGone notifies the underlying log reader that -// the logs producer (a container) is gone. -func (w *LogWatcher) ProducerGone() { - // only close if not already closed - w.producerOnce.Do(func() { - close(w.producerGone) - }) -} - -// WatchProducerGone returns a channel receiver that receives notification -// once the logs producer (a container) is gone. -func (w *LogWatcher) WatchProducerGone() <-chan struct{} { - return w.producerGone -} - // ConsumerGone notifies that the logs consumer is gone. func (w *LogWatcher) ConsumerGone() { // only close if not already closed diff --git a/daemon/logger/loggerutils/follow.go b/daemon/logger/loggerutils/follow.go index 755a483d7a..d2e63c133a 100644 --- a/daemon/logger/loggerutils/follow.go +++ b/daemon/logger/loggerutils/follow.go @@ -20,6 +20,7 @@ type follow struct { dec Decoder fileWatcher filenotify.FileWatcher logWatcher *logger.LogWatcher + producerGone <-chan struct{} notifyRotate, notifyEvict chan interface{} oldSize int64 retries int @@ -73,7 +74,7 @@ func (fl *follow) waitRead() error { case fsnotify.Rename, fsnotify.Remove: select { case <-fl.notifyRotate: - case <-fl.logWatcher.WatchProducerGone(): + case <-fl.producerGone: return errDone case <-fl.logWatcher.WatchConsumerGone(): return errDone @@ -97,7 +98,7 @@ func (fl *follow) waitRead() error { return errRetry } return err - case <-fl.logWatcher.WatchProducerGone(): + case <-fl.producerGone: return errDone case <-fl.logWatcher.WatchConsumerGone(): return errDone @@ -183,7 +184,7 @@ func (fl *follow) mainLoop(since, until time.Time) { } } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { +func followLogs(f *os.File, logWatcher *logger.LogWatcher, producerGone <-chan struct{}, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { dec.Reset(f) name := f.Name() @@ -203,6 +204,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyE oldSize: -1, logWatcher: logWatcher, fileWatcher: fileWatcher, + producerGone: producerGone, notifyRotate: notifyRotate, notifyEvict: notifyEvict, dec: dec, diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index fe07f2a8ea..3f65c52685 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -79,6 +79,7 @@ type LogFile struct { mu sync.RWMutex // protects the logfile access f *os.File // store for closing closed bool + closedCh chan struct{} rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed capacity int64 // maximum size of each file currentSize int64 // current size of the latest file @@ -87,7 +88,6 @@ type LogFile struct { lastTimestamp time.Time // timestamp of the last log filesRefCounter refCounter // keep reference-counted of decompressed files notifyReaders *pubsub.Publisher - readers map[*logger.LogWatcher]struct{} // stores the active log followers marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc @@ -136,13 +136,13 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar return &LogFile{ f: log, + closedCh: make(chan struct{}), capacity: capacity, currentSize: size, maxFiles: maxFiles, compress: compress, filesRefCounter: refCounter{counter: make(map[string]int)}, notifyReaders: pubsub.NewPublisher(0, 1), - readers: make(map[*logger.LogWatcher]struct{}), marshal: marshaller, createDecoder: decodeFunc, perms: perms, @@ -344,14 +344,11 @@ func (w *LogFile) Close() error { if w.closed { return nil } - for r := range w.readers { - r.ProducerGone() - delete(w.readers, r) - } if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { return err } w.closed = true + close(w.closedCh) return nil } @@ -361,10 +358,6 @@ func (w *LogFile) Close() error { // TODO: Consider a different implementation which can effectively follow logs under frequent rotations. func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { watcher := logger.NewLogWatcher() - w.mu.Lock() - w.readers[watcher] = struct{}{} - w.mu.Unlock() - // Lock before starting the reader goroutine to synchronize operations // for race-free unit testing. The writer is locked out until the reader // has opened the log file and set the read cursor to the current @@ -375,12 +368,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { } func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) { - defer func() { - close(watcher.Msg) - w.mu.Lock() - delete(w.readers, watcher) - w.mu.Unlock() - }() + defer close(watcher.Msg) currentFile, err := open(w.f.Name()) if err != nil { @@ -464,7 +452,7 @@ func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWa }) defer w.notifyReaders.Evict(notifyRotate) - followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until) + followLogs(currentFile, watcher, w.closedCh, notifyRotate, notifyEvict, dec, config.Since, config.Until) } func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) { diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index d9857bb52f..ecd0c87bf3 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -126,7 +126,7 @@ func TestFollowLogsConsumerGone(t *testing.T) { followLogsDone := make(chan struct{}) var since, until time.Time go func() { - followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until) + followLogs(f, lw, nil, make(chan interface{}), make(chan interface{}), dec, since, until) close(followLogsDone) }() @@ -186,8 +186,9 @@ func TestFollowLogsProducerGone(t *testing.T) { var since, until time.Time followLogsDone := make(chan struct{}) + producerGone := make(chan struct{}) go func() { - followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until) + followLogs(f, lw, producerGone, make(chan interface{}), make(chan interface{}), dec, since, until) close(followLogsDone) }() @@ -205,7 +206,7 @@ func TestFollowLogsProducerGone(t *testing.T) { // "stop" the "container" atomic.StoreInt32(&closed, 1) - lw.ProducerGone() + close(producerGone) // should receive all the messages sent readDone := make(chan struct{}) @@ -317,9 +318,8 @@ func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) { defer timer.Stop() select { - case <-lw.Msg: - case <-lw.WatchProducerGone(): - t.Fatal("log producer gone before log message arrived") + case _, ok := <-lw.Msg: + assert.Assert(t, ok, "log producer gone before log message arrived") case err := <-lw.Err: assert.NilError(t, err) case <-timer.C: