diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 17e2438018..bea8ceedb3 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -10,11 +10,8 @@ import ( "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/pkg/tailfile" - "github.com/sirupsen/logrus" ) -const maxJSONDecodeRetry = 20000 - // ReadLogs implements the logger's LogReader interface for the logs // created by this driver. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { @@ -44,10 +41,9 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro } type decoder struct { - rdr io.Reader - dec *json.Decoder - jl *jsonlog.JSONLog - maxRetry int + rdr io.Reader + dec *json.Decoder + jl *jsonlog.JSONLog } func (d *decoder) Reset(rdr io.Reader) { @@ -71,74 +67,7 @@ func (d *decoder) Decode() (msg *logger.Message, err error) { if d.jl == nil { d.jl = &jsonlog.JSONLog{} } - if d.maxRetry == 0 { - // We aren't using maxJSONDecodeRetry directly so we can give a custom value for testing. - d.maxRetry = maxJSONDecodeRetry - } - for retries := 0; retries < d.maxRetry; retries++ { - msg, err = decodeLogLine(d.dec, d.jl) - if err == nil || err == io.EOF { - break - } - - logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json") - // try again, could be due to a an incomplete json object as we read - if _, ok := err.(*json.SyntaxError); ok { - d.dec = json.NewDecoder(d.rdr) - continue - } - - // io.ErrUnexpectedEOF is returned from json.Decoder when there is - // remaining data in the parser's buffer while an io.EOF occurs. - // If the json logger writes a partial json log entry to the disk - // while at the same time the decoder tries to decode it, the race condition happens. - if err == io.ErrUnexpectedEOF { - d.rdr = combineReaders(d.dec.Buffered(), d.rdr) - d.dec = json.NewDecoder(d.rdr) - continue - } - } - return msg, err -} - -func combineReaders(pre, rdr io.Reader) io.Reader { - return &combinedReader{pre: pre, rdr: rdr} -} - -// combinedReader is a reader which is like `io.MultiReader` where except it does not cache a full EOF. -// Once `io.MultiReader` returns EOF, it is always EOF. -// -// For this usecase we have an underlying reader which is a file which may reach EOF but have more data written to it later. -// As such, io.MultiReader does not work for us. -type combinedReader struct { - pre io.Reader - rdr io.Reader -} - -func (r *combinedReader) Read(p []byte) (int, error) { - var read int - if r.pre != nil { - n, err := r.pre.Read(p) - if err != nil { - if err != io.EOF { - return n, err - } - r.pre = nil - } - read = n - } - - if read < len(p) { - n, err := r.rdr.Read(p[read:]) - if n > 0 { - read += n - } - if err != nil { - return read, err - } - } - - return read, nil + return decodeLogLine(d.dec, d.jl) } // decodeFunc is used to create a decoder for the log file reader diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index 2b01b7f56f..dd56be0ff3 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -3,7 +3,6 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo import ( "bufio" "bytes" - "encoding/json" "fmt" "io" "os" @@ -107,45 +106,6 @@ func TestEncodeDecode(t *testing.T) { assert.Assert(t, err == io.EOF) } -func TestUnexpectedEOF(t *testing.T) { - buf := bytes.NewBuffer(nil) - msg1 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello1")} - msg2 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello2")} - - err := marshalMessage(msg1, json.RawMessage{}, buf) - assert.NilError(t, err) - err = marshalMessage(msg2, json.RawMessage{}, buf) - assert.NilError(t, err) - - r := &readerWithErr{ - err: io.EOF, - after: buf.Len() / 4, - r: buf, - } - dec := &decoder{rdr: r, maxRetry: 1} - - _, err = dec.Decode() - assert.Error(t, err, io.ErrUnexpectedEOF.Error()) - // again just to check - _, err = dec.Decode() - assert.Error(t, err, io.ErrUnexpectedEOF.Error()) - - // reset the error - // from here all reads should succeed until we get EOF on the underlying reader - r.err = nil - - msg, err := dec.Decode() - assert.NilError(t, err) - assert.Equal(t, string(msg1.Line)+"\n", string(msg.Line)) - - msg, err = dec.Decode() - assert.NilError(t, err) - assert.Equal(t, string(msg2.Line)+"\n", string(msg.Line)) - - _, err = dec.Decode() - assert.Error(t, err, io.EOF.Error()) -} - func TestReadLogs(t *testing.T) { t.Parallel() r := loggertest.Reader{ @@ -193,6 +153,40 @@ func TestTailLogsWithRotation(t *testing.T) { compress(false) } +func TestFollowLogsWithRotation(t *testing.T) { + t.Parallel() + compress := func(cmprs bool) { + t.Run(fmt.Sprintf("compress=%v", cmprs), func(t *testing.T) { + t.Parallel() + (&loggertest.Reader{ + Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger { + // The log follower can fall behind and drop logs if there are too many + // rotations in a short time. If that was to happen, loggertest would fail the + // test. Configure the logger so that there will be only one rotation with the + // set of logs that loggertest writes. + info.Config = map[string]string{ + "compress": strconv.FormatBool(cmprs), + "max-size": "4096b", + "max-file": "3", + } + dir := t.TempDir() + t.Cleanup(func() { + t.Logf("%s:\n%s", t.Name(), dirStringer{dir}) + }) + info.LogPath = filepath.Join(dir, info.ContainerID+".log") + return func(t *testing.T) logger.Logger { + l, err := New(info) + assert.NilError(t, err) + return l + } + }, + }).TestFollow(t) + }) + } + compress(true) + compress(false) +} + type dirStringer struct { d string } @@ -220,22 +214,3 @@ func (d dirStringer) String() string { tw.Flush() return buf.String() } - -type readerWithErr struct { - err error - after int - r io.Reader - read int -} - -func (r *readerWithErr) Read(p []byte) (int, error) { - if r.err != nil && r.read > r.after { - return 0, r.err - } - - n, err := r.r.Read(p[:1]) - if n > 0 { - r.read += n - } - return n, err -} diff --git a/daemon/logger/loggerutils/follow.go b/daemon/logger/loggerutils/follow.go index cef184cc08..b7f93a9aff 100644 --- a/daemon/logger/loggerutils/follow.go +++ b/daemon/logger/loggerutils/follow.go @@ -1,221 +1,165 @@ package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" import ( + "fmt" "io" "os" "time" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/filenotify" - "github.com/fsnotify/fsnotify" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) -var errRetry = errors.New("retry") -var errDone = errors.New("done") - type follow struct { - file *os.File - dec Decoder - fileWatcher filenotify.FileWatcher - logWatcher *logger.LogWatcher - producerGone <-chan struct{} - draining bool - notifyRotate, notifyEvict chan interface{} - oldSize int64 - retries int + LogFile *LogFile + Watcher *logger.LogWatcher + Decoder Decoder + Since, Until time.Time + + log *logrus.Entry + c chan logPos } -func (fl *follow) handleRotate() error { - name := fl.file.Name() +// Do follows the log file as it is written, starting from f at read. +func (fl *follow) Do(f *os.File, read logPos) { + fl.log = logrus.WithFields(logrus.Fields{ + "module": "logger", + "file": f.Name(), + }) + // Optimization: allocate the write-notifications channel only once and + // reuse it for multiple invocations of nextPos(). + fl.c = make(chan logPos, 1) - fl.file.Close() - fl.fileWatcher.Remove(name) - - // retry when the file doesn't exist - var err error - for retries := 0; retries <= 5; retries++ { - f, err := open(name) - if err == nil || !os.IsNotExist(err) { - fl.file = f - break + defer func() { + if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + fl.log.WithError(err).Warn("error closing current log file") } - } - if err != nil { - return err - } - if err := fl.fileWatcher.Add(name); err != nil { - return err - } - fl.dec.Reset(fl.file) - return nil -} - -func (fl *follow) handleMustClose(evictErr error) { - fl.file.Close() - fl.dec.Close() - fl.logWatcher.Err <- errors.Wrap(evictErr, "log reader evicted due to errors") - logrus.WithField("file", fl.file.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.") -} - -func (fl *follow) waitRead() error { - select { - case e := <-fl.notifyEvict: - if e != nil { - err := e.(error) - fl.handleMustClose(err) - } - return errDone - case e := <-fl.fileWatcher.Events(): - switch e.Op { - case fsnotify.Write: - fl.dec.Reset(fl.file) - return nil - case fsnotify.Rename, fsnotify.Remove: - select { - case <-fl.notifyRotate: - case <-fl.producerGone: - return errDone - case <-fl.logWatcher.WatchConsumerGone(): - return errDone - } - if err := fl.handleRotate(); err != nil { - return err - } - return nil - } - return errRetry - case err := <-fl.fileWatcher.Errors(): - logrus.Debugf("logger got error watching file: %v", err) - // Something happened, let's try and stay alive and create a new watcher - if fl.retries <= 5 { - fl.fileWatcher.Close() - fl.fileWatcher, err = watchFile(fl.file.Name()) - if err != nil { - return err - } - fl.retries++ - return errRetry - } - return err - case <-fl.producerGone: - // There may be messages written out which the fileWatcher has - // not yet notified us about. - if fl.draining { - return errDone - } - fl.draining = true - fl.dec.Reset(fl.file) - return nil - case <-fl.logWatcher.WatchConsumerGone(): - return errDone - } -} - -func (fl *follow) handleDecodeErr(err error) error { - if !errors.Is(err, io.EOF) { - return err - } - - // Handle special case (#39235): max-file=1 and file was truncated - st, stErr := fl.file.Stat() - if stErr == nil { - size := st.Size() - defer func() { fl.oldSize = size }() - if size < fl.oldSize { // truncated - fl.file.Seek(0, 0) - fl.dec.Reset(fl.file) - return nil - } - } else { - logrus.WithError(stErr).Warn("logger: stat error") - } + }() for { - err := fl.waitRead() - if err == nil { - break - } - if err == errRetry { - continue - } - return err - } - return nil -} - -func (fl *follow) mainLoop(since, until time.Time) { - for { - select { - case err := <-fl.notifyEvict: - if err != nil { - fl.handleMustClose(err.(error)) - } + wrote, ok := fl.nextPos(read) + if !ok { return - default: } - msg, err := fl.dec.Decode() - if err != nil { - if err := fl.handleDecodeErr(err); err != nil { - if err == errDone { - return - } - // we got an unrecoverable error, so return - fl.logWatcher.Err <- err + + if wrote.rotation != read.rotation { + // Flush the current file before moving on to the next. + if _, err := f.Seek(read.size, io.SeekStart); err != nil { + fl.Watcher.Err <- err return } - // ready to try again - continue + if fl.decode(f) { + return + } + + // Open the new file, which has the same name as the old + // file thanks to file rotation. Make no mistake: they + // are different files, with distinct identities. + // Atomically capture the wrote position to make + // absolutely sure that the position corresponds to the + // file we have opened; more rotations could have + // occurred since we previously received it. + if err := f.Close(); err != nil { + fl.log.WithError(err).Warn("error closing rotated log file") + } + var err error + func() { + fl.LogFile.fsopMu.RLock() + st := <-fl.LogFile.read + defer func() { + fl.LogFile.read <- st + fl.LogFile.fsopMu.RUnlock() + }() + f, err = open(f.Name()) + wrote = st.pos + }() + // We tried to open the file inside a critical section + // so we shouldn't have been racing the rotation of the + // file. Any error, even fs.ErrNotFound, is exceptional. + if err != nil { + fl.Watcher.Err <- fmt.Errorf("logger: error opening log file for follow after rotation: %w", err) + return + } + + if nrot := wrote.rotation - read.rotation; nrot > 1 { + fl.log.WithField("missed-rotations", nrot). + Warn("file rotations were missed while following logs; some log messages have been skipped over") + } + + // Set up our read position to start from the top of the file. + read.size = 0 } - fl.retries = 0 // reset retries since we've succeeded - if !since.IsZero() && msg.Timestamp.Before(since) { + if fl.decode(io.NewSectionReader(f, read.size, wrote.size-read.size)) { + return + } + read = wrote + } +} + +// nextPos waits until the write position of the LogFile being followed has +// advanced from current and returns the new position. +func (fl *follow) nextPos(current logPos) (next logPos, ok bool) { + var st logReadState + select { + case <-fl.Watcher.WatchConsumerGone(): + return current, false + case st = <-fl.LogFile.read: + } + + // Have any any logs been written since we last checked? + if st.pos == current { // Nope. + // Add ourself to the notify list. + st.wait = append(st.wait, fl.c) + } else { // Yes. + // "Notify" ourself immediately. + fl.c <- st.pos + } + fl.LogFile.read <- st + + select { + case <-fl.LogFile.closed: // No more logs will be written. + select { // Have we followed to the end? + case next = <-fl.c: // No: received a new position. + default: // Yes. + return current, false + } + case <-fl.Watcher.WatchConsumerGone(): + return current, false + case next = <-fl.c: + } + return next, true +} + +// decode decodes log messages from r and sends messages with timestamps between +// Since and Until to the log watcher. +// +// The return value, done, signals whether following should end due to a +// condition encountered during decode. +func (fl *follow) decode(r io.Reader) (done bool) { + fl.Decoder.Reset(r) + for { + msg, err := fl.Decoder.Decode() + if err != nil { + if errors.Is(err, io.EOF) { + return false + } + fl.Watcher.Err <- err + return true + } + + if !fl.Since.IsZero() && msg.Timestamp.Before(fl.Since) { continue } - if !until.IsZero() && msg.Timestamp.After(until) { - return + if !fl.Until.IsZero() && msg.Timestamp.After(fl.Until) { + return true } // send the message, unless the consumer is gone select { - case e := <-fl.notifyEvict: - if e != nil { - err := e.(error) - logrus.WithError(err).Debug("Reader evicted while sending log message") - fl.logWatcher.Err <- err - } - return - case fl.logWatcher.Msg <- msg: - case <-fl.logWatcher.WatchConsumerGone(): - return + case fl.Watcher.Msg <- msg: + case <-fl.Watcher.WatchConsumerGone(): + return true } } } - -func followLogs(f *os.File, logWatcher *logger.LogWatcher, producerGone <-chan struct{}, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { - dec.Reset(f) - - name := f.Name() - fileWatcher, err := watchFile(name) - if err != nil { - logWatcher.Err <- err - return - } - defer func() { - f.Close() - dec.Close() - fileWatcher.Close() - }() - - fl := &follow{ - file: f, - oldSize: -1, - logWatcher: logWatcher, - fileWatcher: fileWatcher, - producerGone: producerGone, - notifyRotate: notifyRotate, - notifyEvict: notifyEvict, - dec: dec, - } - fl.mainLoop(since, until) -} diff --git a/daemon/logger/loggerutils/follow_test.go b/daemon/logger/loggerutils/follow_test.go deleted file mode 100644 index 224cd192e9..0000000000 --- a/daemon/logger/loggerutils/follow_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" - -import ( - "io" - "os" - "testing" - - "gotest.tools/v3/assert" -) - -func TestHandleDecoderErr(t *testing.T) { - f, err := os.CreateTemp("", t.Name()) - assert.NilError(t, err) - defer os.Remove(f.Name()) - - _, err = f.Write([]byte("hello")) - assert.NilError(t, err) - - pos, err := f.Seek(0, io.SeekCurrent) - assert.NilError(t, err) - assert.Assert(t, pos != 0) - - dec := &testDecoder{} - - // Simulate "turncate" case, where the file was bigger before. - fl := &follow{file: f, dec: dec, oldSize: 100} - err = fl.handleDecodeErr(io.EOF) - assert.NilError(t, err) - - // handleDecodeErr seeks to zero. - pos, err = f.Seek(0, io.SeekCurrent) - assert.NilError(t, err) - assert.Equal(t, int64(0), pos) - - // Reset is called. - assert.Equal(t, 1, dec.resetCount) -} diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index a78fa419c0..d5d0e3b853 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -7,16 +7,14 @@ import ( "fmt" "io" "io/fs" + "math" "os" - "runtime" "strconv" "sync" "time" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/pools" - "github.com/docker/docker/pkg/pubsub" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -28,22 +26,63 @@ type rotateFileMetadata struct { // LogFile is Logger implementation for default Docker logging. type LogFile struct { - mu sync.RWMutex // protects the logfile access - f *os.File // store for closing - closed bool - closedCh chan struct{} - rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed - capacity int64 // maximum size of each file - currentSize int64 // current size of the latest file - maxFiles int // maximum number of files - compress bool // whether old versions of log files are compressed - lastTimestamp time.Time // timestamp of the last log - decompress *sharedTempFileConverter // keep reference-counted decompressed files - notifyReaders *pubsub.Publisher + mu sync.Mutex // protects the logfile access + closed chan struct{} + rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed + // Lock out readers while performing a non-atomic sequence of filesystem + // operations (RLock: open, Lock: rename, delete). + // + // fsopMu should be locked for writing only while holding rotateMu. + fsopMu sync.RWMutex + + // Logger configuration + + capacity int64 // maximum size of each file + maxFiles int // maximum number of files + compress bool // whether old versions of log files are compressed + perms os.FileMode + + // Log file codec + marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc - perms os.FileMode + + // Log reader state in a 1-buffered channel. + // + // Share memory by communicating: receive to acquire, send to release. + // The state struct is passed around by value so that use-after-send + // bugs cannot escalate to data races. + // + // A method which receives the state value takes ownership of it. The + // owner is responsible for either passing ownership along or sending + // the state back to the channel. By convention, the semantics of + // passing along ownership is expressed with function argument types. + // Methods which take a pointer *logReadState argument borrow the state, + // analogous to functions which require a lock to be held when calling. + // The caller retains ownership. Calling a method which which takes a + // value logFileState argument gives ownership to the callee. + read chan logReadState + + decompress *sharedTempFileConverter + + pos logPos // Current log file write position. + f *os.File // Current log file for writing. + lastTimestamp time.Time // timestamp of the last log +} + +type logPos struct { + // Size of the current file. + size int64 + // File rotation sequence number (modulo 2**16). + rotation uint16 +} + +type logReadState struct { + // Current log file position. + pos logPos + // Wait list to be notified of the value of pos next time it changes. + wait []chan<- logPos } // MakeDecoderFn creates a decoder @@ -92,15 +131,24 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar return nil, err } + pos := logPos{ + size: size, + // Force a wraparound on first rotation to shake out any + // modular-arithmetic bugs. + rotation: math.MaxUint16, + } + st := make(chan logReadState, 1) + st <- logReadState{pos: pos} + return &LogFile{ f: log, - closedCh: make(chan struct{}), + read: st, + pos: pos, + closed: make(chan struct{}), capacity: capacity, - currentSize: size, maxFiles: maxFiles, compress: compress, decompress: newSharedTempFileConverter(decompress), - notifyReaders: pubsub.NewPublisher(0, 1), marshal: marshaller, createDecoder: decodeFunc, perms: perms, @@ -120,35 +168,45 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { logger.PutMessage(msg) msg = nil // Turn use-after-put bugs into panics. - w.mu.Lock() - if w.closed { - w.mu.Unlock() + select { + case <-w.closed: return errors.New("cannot write because the output file was closed") + default: } + w.mu.Lock() + defer w.mu.Unlock() - if err := w.checkCapacityAndRotate(); err != nil { - w.mu.Unlock() - return errors.Wrap(err, "error rotating log file") + // Are we due for a rotation? + if w.capacity != -1 && w.pos.size >= w.capacity { + if err := w.rotate(); err != nil { + return errors.Wrap(err, "error rotating log file") + } } n, err := w.f.Write(b) - if err == nil { - w.currentSize += int64(n) - w.lastTimestamp = ts + if err != nil { + return errors.Wrap(err, "error writing log entry") } + w.pos.size += int64(n) + w.lastTimestamp = ts - w.mu.Unlock() - return errors.Wrap(err, "error writing log entry") + // Notify any waiting readers that there is a new log entry to read. + st := <-w.read + defer func() { w.read <- st }() + st.pos = w.pos + + for _, c := range st.wait { + c <- st.pos + } + // Optimization: retain the backing array to save a heap allocation next + // time a reader appends to the list. + if st.wait != nil { + st.wait = st.wait[:0] + } + return nil } -func (w *LogFile) checkCapacityAndRotate() (retErr error) { - if w.capacity == -1 { - return nil - } - if w.currentSize < w.capacity { - return nil - } - +func (w *LogFile) rotate() (retErr error) { w.rotateMu.Lock() noCompress := w.maxFiles <= 1 || !w.compress defer func() { @@ -167,46 +225,56 @@ func (w *LogFile) checkCapacityAndRotate() (retErr error) { } } - if err := rotate(fname, w.maxFiles, w.compress); err != nil { - logrus.WithError(err).Warn("Error rotating log file, log data may have been lost") - } else { - // We may have readers working their way through the current - // log file so we can't truncate it. We need to start writing - // new logs to an empty file with the same name as the current - // one so we need to rotate the current file out of the way. - if w.maxFiles < 2 { - if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) { - logrus.WithError(err).Error("Error unlinking current log file") - } + file, err := func() (*os.File, error) { + w.fsopMu.Lock() + defer w.fsopMu.Unlock() + + if err := rotate(fname, w.maxFiles, w.compress); err != nil { + logrus.WithError(err).Warn("Error rotating log file, log data may have been lost") } else { - if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) { - logrus.WithError(err).Error("Error renaming current log file") + // We may have readers working their way through the + // current log file so we can't truncate it. We need to + // start writing new logs to an empty file with the same + // name as the current one so we need to rotate the + // current file out of the way. + if w.maxFiles < 2 { + if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) { + logrus.WithError(err).Error("Error unlinking current log file") + } + } else { + if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) { + logrus.WithError(err).Error("Error renaming current log file") + } } } - } - // Notwithstanding the above, open with the truncate flag anyway in case - // rotation didn't work out as planned. - file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) + // Notwithstanding the above, open with the truncate flag anyway + // in case rotation didn't work out as planned. + return openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) + }() if err != nil { return err } w.f = file - w.currentSize = 0 - - w.notifyReaders.Publish(struct{}{}) + w.pos = logPos{rotation: w.pos.rotation + 1} if noCompress { return nil } ts := w.lastTimestamp - go func() { + defer w.rotateMu.Unlock() + // No need to hold fsopMu as at no point will the filesystem be + // in a state which would cause problems for readers. Opening + // the uncompressed file is tried first, falling back to the + // compressed one. compressFile only deletes the uncompressed + // file once the compressed one is fully written out, so at no + // point during the compression process will a reader fail to + // open a complete copy of the file. if err := compressFile(fname+".1", ts); err != nil { logrus.WithError(err).Error("Error compressing log file after rotation") } - w.rotateMu.Unlock() }() return nil @@ -231,8 +299,9 @@ func rotate(name string, maxFiles int, compress bool) error { for i := maxFiles - 1; i > 1; i-- { toPath := name + "." + strconv.Itoa(i) + extension fromPath := name + "." + strconv.Itoa(i-1) + extension - logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file") - if err := os.Rename(fromPath, toPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + err := os.Rename(fromPath, toPath) + logrus.WithError(err).WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file") + if err != nil && !errors.Is(err, fs.ErrNotExist) { return err } } @@ -301,38 +370,49 @@ func (w *LogFile) MaxFiles() int { func (w *LogFile) Close() error { w.mu.Lock() defer w.mu.Unlock() - if w.closed { + select { + case <-w.closed: return nil + default: } if err := w.f.Close(); err != nil && !errors.Is(err, fs.ErrClosed) { return err } - w.closed = true - close(w.closedCh) + close(w.closed) + // Wait until any in-progress rotation is complete. + w.rotateMu.Lock() + w.rotateMu.Unlock() //nolint:staticcheck return nil } -// ReadLogs decodes entries from log files and sends them the passed in watcher +// ReadLogs decodes entries from log files. // -// Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1. -// TODO: Consider a different implementation which can effectively follow logs under frequent rotations. +// It is the caller's responsibility to call ConsumerGone on the LogWatcher. func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { watcher := logger.NewLogWatcher() - // Lock before starting the reader goroutine to synchronize operations - // for race-free unit testing. The writer is locked out until the reader - // has opened the log file and set the read cursor to the current - // position. - w.mu.RLock() - go w.readLogsLocked(config, watcher) + // Lock out filesystem operations so that we can capture the read + // position and atomically open the corresponding log file, without the + // file getting rotated out from under us. + w.fsopMu.RLock() + // Capture the read position synchronously to ensure that we start + // following from the last entry logged before ReadLogs was called, + // which is required for flake-free unit testing. + st := <-w.read + pos := st.pos + w.read <- st + go w.readLogsLocked(pos, config, watcher) return watcher } -func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) { +// readLogsLocked is the bulk of the implementation of ReadLogs. +// +// w.fsopMu must be locked for reading when calling this method. +// w.fsopMu.RUnlock() is called before returning. +func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) { defer close(watcher.Msg) currentFile, err := open(w.f.Name()) if err != nil { - w.mu.RUnlock() watcher.Err <- err return } @@ -341,25 +421,13 @@ func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWa dec := w.createDecoder(nil) defer dec.Close() - currentChunk, err := newSectionReader(currentFile) - if err != nil { - w.mu.RUnlock() - watcher.Err <- err - return - } - - notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool { - _, ok := i.(error) - return ok - }, 1) - defer w.notifyReaders.Evict(notifyEvict) + currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size) if config.Tail != 0 { // TODO(@cpuguy83): Instead of opening every file, only get the files which // are needed to tail. // This is especially costly when compression is enabled. files, err := w.openRotatedFiles(config) - w.mu.RUnlock() if err != nil { watcher.Err <- err return @@ -392,34 +460,35 @@ func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWa readers = append(readers, currentChunk) } - ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict) + ok := tailFiles(readers, watcher, dec, w.getTailReader, config) closeFiles() if !ok { return } - w.mu.RLock() + } else { + w.fsopMu.RUnlock() } - w.mu.RUnlock() if !config.Follow { return } - notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool { - _, ok := i.(struct{}) - return ok - }) - defer w.notifyReaders.Evict(notifyRotate) - - followLogs(currentFile, watcher, w.closedCh, notifyRotate, notifyEvict, dec, config.Since, config.Until) + (&follow{ + LogFile: w, + Watcher: watcher, + Decoder: dec, + Since: config.Since, + Until: config.Until, + }).Do(currentFile, currentPos) } -// openRotatedFiles returns a slice of files open for reading, in order from oldest to newest. +// openRotatedFiles returns a slice of files open for reading, in order from +// oldest to newest, and calls w.fsopMu.RUnlock() before returning. +// +// This method must only be called with w.fsopMu locked for reading. func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) { - w.rotateMu.Lock() - defer w.rotateMu.Unlock() - defer func() { + w.fsopMu.RUnlock() if err == nil { return } @@ -491,19 +560,7 @@ func decompress(dst io.WriteSeeker, src io.ReadSeeker) error { return rc.Close() } -func newSectionReader(f *os.File) (*io.SectionReader, error) { - // seek to the end to get the size - // we'll leave this at the end of the file since section reader does not advance the reader - size, err := f.Seek(0, io.SeekEnd) - if err != nil { - return nil, errors.Wrap(err, "error getting current file size") - } - return io.NewSectionReader(f, 0, size), nil -} - -func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) { - nLines := config.Tail - +func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) (cont bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -511,12 +568,6 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { - case err := <-notifyEvict: - if err != nil { - watcher.Err <- err.(error) - cont = false - cancel() - } case <-ctx.Done(): case <-watcher.WatchConsumerGone(): cont = false @@ -527,6 +578,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge readers := make([]io.Reader, 0, len(files)) if config.Tail > 0 { + nLines := config.Tail for i := len(files) - 1; i >= 0 && nLines > 0; i-- { tail, n, err := getTailReader(ctx, files[i], nLines) if err != nil { @@ -566,41 +618,3 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge } } } - -func watchFile(name string) (filenotify.FileWatcher, error) { - var fileWatcher filenotify.FileWatcher - - if runtime.GOOS == "windows" { - // FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching. - // It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only. - // Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher - // doesn't work. Hence for Windows we will use poll based notifier. - fileWatcher = filenotify.NewPollingWatcher() - } else { - var err error - fileWatcher, err = filenotify.New() - if err != nil { - return nil, err - } - } - - logger := logrus.WithFields(logrus.Fields{ - "module": "logger", - "file": name, - }) - - if err := fileWatcher.Add(name); err != nil { - // we will retry using file poller. - logger.WithError(err).Warnf("falling back to file poller") - fileWatcher.Close() - fileWatcher = filenotify.NewPollingWatcher() - - if err := fileWatcher.Add(name); err != nil { - fileWatcher.Close() - logger.WithError(err).Debugf("error watching log file for modifications") - return nil, err - } - } - - return fileWatcher, nil -} diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index 3c62f52abc..df3d0a1226 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -9,7 +9,6 @@ import ( "os" "path/filepath" "strings" - "sync/atomic" "testing" "text/tabwriter" "time" @@ -67,7 +66,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config, make(chan interface{})) + tailFiles(files, watcher, dec, tailReader, config) }() <-started }) @@ -77,7 +76,7 @@ func TestTailFiles(t *testing.T) { started := make(chan struct{}) go func() { close(started) - tailFiles(files, watcher, dec, tailReader, config, make(chan interface{})) + tailFiles(files, watcher, dec, tailReader, config) }() <-started @@ -111,140 +110,6 @@ func (dummyDecoder) Decode() (*logger.Message, error) { func (dummyDecoder) Close() {} func (dummyDecoder) Reset(io.Reader) {} -func TestFollowLogsConsumerGone(t *testing.T) { - lw := logger.NewLogWatcher() - - f, err := os.CreateTemp("", t.Name()) - assert.NilError(t, err) - defer func() { - f.Close() - os.Remove(f.Name()) - }() - - dec := dummyDecoder{} - - followLogsDone := make(chan struct{}) - var since, until time.Time - go func() { - followLogs(f, lw, nil, make(chan interface{}), make(chan interface{}), dec, since, until) - close(followLogsDone) - }() - - select { - case <-lw.Msg: - case err := <-lw.Err: - assert.NilError(t, err) - case <-followLogsDone: - t.Fatal("follow logs finished unexpectedly") - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for log message") - } - - lw.ConsumerGone() - select { - case <-followLogsDone: - case <-time.After(20 * time.Second): - t.Fatal("timeout waiting for followLogs() to finish") - } -} - -type dummyWrapper struct { - dummyDecoder - fn func() error -} - -func (d *dummyWrapper) Decode() (*logger.Message, error) { - if err := d.fn(); err != nil { - return nil, err - } - return d.dummyDecoder.Decode() -} - -func TestFollowLogsProducerGone(t *testing.T) { - lw := logger.NewLogWatcher() - defer lw.ConsumerGone() - - f, err := os.CreateTemp("", t.Name()) - assert.NilError(t, err) - defer os.Remove(f.Name()) - - var sent, received, closed int32 - dec := &dummyWrapper{fn: func() error { - switch atomic.LoadInt32(&closed) { - case 0: - atomic.AddInt32(&sent, 1) - return nil - case 1: - atomic.AddInt32(&closed, 1) - t.Logf("logDecode() closed after sending %d messages\n", sent) - return io.EOF - default: - return io.EOF - } - }} - var since, until time.Time - - followLogsDone := make(chan struct{}) - producerGone := make(chan struct{}) - go func() { - followLogs(f, lw, producerGone, make(chan interface{}), make(chan interface{}), dec, since, until) - close(followLogsDone) - }() - - // read 1 message - select { - case <-lw.Msg: - received++ - case err := <-lw.Err: - assert.NilError(t, err) - case <-followLogsDone: - t.Fatal("followLogs() finished unexpectedly") - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for log message") - } - - // "stop" the "container" - atomic.StoreInt32(&closed, 1) - close(producerGone) - - // should receive all the messages sent - readDone := make(chan struct{}) - go func() { - defer close(readDone) - for { - select { - case <-lw.Msg: - received++ - if received == atomic.LoadInt32(&sent) { - return - } - case err := <-lw.Err: - assert.NilError(t, err) - } - } - }() - select { - case <-readDone: - case <-time.After(30 * time.Second): - t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received) - } - - t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received) - - // followLogs() should be done by now - select { - case <-followLogsDone: - case <-time.After(30 * time.Second): - t.Fatal("timeout waiting for followLogs() to finish") - } - - select { - case <-lw.WatchConsumerGone(): - t.Fatal("consumer should not have exited") - default: - } -} - func TestCheckCapacityAndRotate(t *testing.T) { dir := t.TempDir()