daemon/logger: remove ProducerGone from LogWatcher

Whether or not the logger has been closed is a property of the logger,
and only of concern to its log reading implementation, not log watchers.
The loggers and their reader implementations can communicate as they see
fit. A single channel per logger which is closed when the logger is
closed is plenty sufficient to broadcast the state to log readers, with
no extra bookeeping or synchronization required.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2022-03-02 18:38:16 -05:00
parent ae5f664f4e
commit 906b979b88
9 changed files with 34 additions and 82 deletions

View File

@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) {
t.Fatal("timeout waiting for message channel to close")
}
lw.ProducerGone()
lw.ConsumerGone()
lw = lr.ReadLogs(ReadConfig{Follow: true})
for _, x := range testMsg {

View File

@ -8,7 +8,6 @@ package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"fmt"
"strconv"
"sync"
"unicode"
"github.com/coreos/go-systemd/v22/journal"
@ -19,9 +18,9 @@ import (
const name = "journald"
type journald struct {
mu sync.Mutex //nolint:structcheck,unused
vars map[string]string // additional variables and values to send to the journal along with the log message
readers map[*logger.LogWatcher]struct{}
vars map[string]string // additional variables and values to send to the journal along with the log message
closed chan struct{}
}
func init() {
@ -81,7 +80,7 @@ func New(info logger.Info) (logger.Logger, error) {
for k, v := range extraAttrs {
vars[k] = v
}
return &journald{vars: vars, readers: make(map[*logger.LogWatcher]struct{})}, nil
return &journald{vars: vars, closed: make(chan struct{})}, nil
}
// We don't actually accept any options, but we have to supply a callback for
@ -128,3 +127,8 @@ func (s *journald) Log(msg *logger.Message) error {
func (s *journald) Name() string {
return name
}
func (s *journald) Close() error {
close(s.closed)
return nil
}

View File

@ -116,16 +116,6 @@ import (
"github.com/sirupsen/logrus"
)
func (s *journald) Close() error {
s.mu.Lock()
for r := range s.readers {
r.ProducerGone()
delete(s.readers, r)
}
s.mu.Unlock()
return nil
}
// CErr converts error code returned from a sd_journal_* function
// (which returns -errno) to a string
func CErr(ret C.int) string {
@ -233,9 +223,7 @@ drain:
}
func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, cursor *C.char, untilUnixMicro uint64) *C.char {
s.mu.Lock()
s.readers[logWatcher] = struct{}{}
s.mu.Unlock()
defer close(logWatcher.Msg)
waitTimeout := C.uint64_t(250000) // 0.25s
@ -243,12 +231,12 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,
status := C.sd_journal_wait(j, waitTimeout)
if status < 0 {
logWatcher.Err <- errors.New("error waiting for journal: " + CErr(status))
goto cleanup
break
}
select {
case <-logWatcher.WatchConsumerGone():
goto cleanup // won't be able to write anything anymore
case <-logWatcher.WatchProducerGone():
break // won't be able to write anything anymore
case <-s.closed:
// container is gone, drain journal
default:
// container is still alive
@ -264,11 +252,6 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,
}
}
cleanup:
s.mu.Lock()
delete(s.readers, logWatcher)
s.mu.Unlock()
close(logWatcher.Msg)
return cursor
}

View File

@ -1,8 +0,0 @@
//go:build !linux || !cgo || static_build || !journald
// +build !linux !cgo static_build !journald
package journald // import "github.com/docker/docker/daemon/logger/journald"
func (s *journald) Close() error {
return nil
}

View File

@ -61,9 +61,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true})
for {
select {
case <-lw.Msg:
case <-lw.WatchProducerGone():
return
case _, ok := <-lw.Msg:
if !ok {
return
}
case err := <-chError:
b.Fatal(err)
}

View File

@ -97,8 +97,6 @@ type LogWatcher struct {
Msg chan *Message
// For sending error messages that occur while reading logs.
Err chan error
producerOnce sync.Once
producerGone chan struct{}
consumerOnce sync.Once
consumerGone chan struct{}
}
@ -108,26 +106,10 @@ func NewLogWatcher() *LogWatcher {
return &LogWatcher{
Msg: make(chan *Message, logWatcherBufferSize),
Err: make(chan error, 1),
producerGone: make(chan struct{}),
consumerGone: make(chan struct{}),
}
}
// ProducerGone notifies the underlying log reader that
// the logs producer (a container) is gone.
func (w *LogWatcher) ProducerGone() {
// only close if not already closed
w.producerOnce.Do(func() {
close(w.producerGone)
})
}
// WatchProducerGone returns a channel receiver that receives notification
// once the logs producer (a container) is gone.
func (w *LogWatcher) WatchProducerGone() <-chan struct{} {
return w.producerGone
}
// ConsumerGone notifies that the logs consumer is gone.
func (w *LogWatcher) ConsumerGone() {
// only close if not already closed

View File

@ -20,6 +20,7 @@ type follow struct {
dec Decoder
fileWatcher filenotify.FileWatcher
logWatcher *logger.LogWatcher
producerGone <-chan struct{}
notifyRotate, notifyEvict chan interface{}
oldSize int64
retries int
@ -73,7 +74,7 @@ func (fl *follow) waitRead() error {
case fsnotify.Rename, fsnotify.Remove:
select {
case <-fl.notifyRotate:
case <-fl.logWatcher.WatchProducerGone():
case <-fl.producerGone:
return errDone
case <-fl.logWatcher.WatchConsumerGone():
return errDone
@ -97,7 +98,7 @@ func (fl *follow) waitRead() error {
return errRetry
}
return err
case <-fl.logWatcher.WatchProducerGone():
case <-fl.producerGone:
return errDone
case <-fl.logWatcher.WatchConsumerGone():
return errDone
@ -183,7 +184,7 @@ func (fl *follow) mainLoop(since, until time.Time) {
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
func followLogs(f *os.File, logWatcher *logger.LogWatcher, producerGone <-chan struct{}, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
dec.Reset(f)
name := f.Name()
@ -203,6 +204,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyE
oldSize: -1,
logWatcher: logWatcher,
fileWatcher: fileWatcher,
producerGone: producerGone,
notifyRotate: notifyRotate,
notifyEvict: notifyEvict,
dec: dec,

View File

@ -79,6 +79,7 @@ type LogFile struct {
mu sync.RWMutex // protects the logfile access
f *os.File // store for closing
closed bool
closedCh chan struct{}
rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
capacity int64 // maximum size of each file
currentSize int64 // current size of the latest file
@ -87,7 +88,6 @@ type LogFile struct {
lastTimestamp time.Time // timestamp of the last log
filesRefCounter refCounter // keep reference-counted of decompressed files
notifyReaders *pubsub.Publisher
readers map[*logger.LogWatcher]struct{} // stores the active log followers
marshal logger.MarshalFunc
createDecoder MakeDecoderFn
getTailReader GetTailReaderFunc
@ -136,13 +136,13 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
return &LogFile{
f: log,
closedCh: make(chan struct{}),
capacity: capacity,
currentSize: size,
maxFiles: maxFiles,
compress: compress,
filesRefCounter: refCounter{counter: make(map[string]int)},
notifyReaders: pubsub.NewPublisher(0, 1),
readers: make(map[*logger.LogWatcher]struct{}),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
@ -344,14 +344,11 @@ func (w *LogFile) Close() error {
if w.closed {
return nil
}
for r := range w.readers {
r.ProducerGone()
delete(w.readers, r)
}
if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
return err
}
w.closed = true
close(w.closedCh)
return nil
}
@ -361,10 +358,6 @@ func (w *LogFile) Close() error {
// TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
watcher := logger.NewLogWatcher()
w.mu.Lock()
w.readers[watcher] = struct{}{}
w.mu.Unlock()
// Lock before starting the reader goroutine to synchronize operations
// for race-free unit testing. The writer is locked out until the reader
// has opened the log file and set the read cursor to the current
@ -375,12 +368,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
}
func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) {
defer func() {
close(watcher.Msg)
w.mu.Lock()
delete(w.readers, watcher)
w.mu.Unlock()
}()
defer close(watcher.Msg)
currentFile, err := open(w.f.Name())
if err != nil {
@ -464,7 +452,7 @@ func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWa
})
defer w.notifyReaders.Evict(notifyRotate)
followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until)
followLogs(currentFile, watcher, w.closedCh, notifyRotate, notifyEvict, dec, config.Since, config.Until)
}
func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {

View File

@ -126,7 +126,7 @@ func TestFollowLogsConsumerGone(t *testing.T) {
followLogsDone := make(chan struct{})
var since, until time.Time
go func() {
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
followLogs(f, lw, nil, make(chan interface{}), make(chan interface{}), dec, since, until)
close(followLogsDone)
}()
@ -186,8 +186,9 @@ func TestFollowLogsProducerGone(t *testing.T) {
var since, until time.Time
followLogsDone := make(chan struct{})
producerGone := make(chan struct{})
go func() {
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
followLogs(f, lw, producerGone, make(chan interface{}), make(chan interface{}), dec, since, until)
close(followLogsDone)
}()
@ -205,7 +206,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
// "stop" the "container"
atomic.StoreInt32(&closed, 1)
lw.ProducerGone()
close(producerGone)
// should receive all the messages sent
readDone := make(chan struct{})
@ -317,9 +318,8 @@ func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
defer timer.Stop()
select {
case <-lw.Msg:
case <-lw.WatchProducerGone():
t.Fatal("log producer gone before log message arrived")
case _, ok := <-lw.Msg:
assert.Assert(t, ok, "log producer gone before log message arrived")
case err := <-lw.Err:
assert.NilError(t, err)
case <-timer.C: