1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

daemon/logger: follow LogFile without file watches

File watches have been a source of complexity and unreliability in the
LogFile follow implementation, especially when combined with file
rotation. File change events can be unreliably delivered, especially on
Windows, and the polling fallback adds latency. Following across
rotations has never worked reliably on Windows. Without synchronization
between the log writer and readers, race conditions abound: readers can
read from the file while a log entry is only partially written, leading
to decode errors and necessitating retries.

In addition to the complexities stemming from file watches, the LogFile
follow implementation had complexity from needing to handle file
truncations, and (due to a now-fixed bug in the polling file watcher
implementation) evictions to unlock the log file so it could be rotated.
Log files are now always rotated, never truncated, so these situations
no longer need to be handled by the follow code.

Rewrite the LogFile follow implementation in terms of waiting until
LogFile notifies it that a new message has been written to the log file.
The LogFile informs the follower of the file offset of the last complete
write so that the follower knows not to read past that, preventing it
from attempting to decode partial messages and making retries
unnecessary. Synchronization between LogFile and its followers is used
at critical points to prevent missed notifications of writes and races
between file rotations and the follower opening files for read.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2022-02-09 14:49:23 -05:00
parent 6d5bc07189
commit 01915a725e
6 changed files with 346 additions and 656 deletions

View file

@ -10,11 +10,8 @@ import (
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/tailfile"
"github.com/sirupsen/logrus"
)
const maxJSONDecodeRetry = 20000
// ReadLogs implements the logger's LogReader interface for the logs
// created by this driver.
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
@ -44,10 +41,9 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
}
type decoder struct {
rdr io.Reader
dec *json.Decoder
jl *jsonlog.JSONLog
maxRetry int
rdr io.Reader
dec *json.Decoder
jl *jsonlog.JSONLog
}
func (d *decoder) Reset(rdr io.Reader) {
@ -71,74 +67,7 @@ func (d *decoder) Decode() (msg *logger.Message, err error) {
if d.jl == nil {
d.jl = &jsonlog.JSONLog{}
}
if d.maxRetry == 0 {
// We aren't using maxJSONDecodeRetry directly so we can give a custom value for testing.
d.maxRetry = maxJSONDecodeRetry
}
for retries := 0; retries < d.maxRetry; retries++ {
msg, err = decodeLogLine(d.dec, d.jl)
if err == nil || err == io.EOF {
break
}
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
// try again, could be due to a an incomplete json object as we read
if _, ok := err.(*json.SyntaxError); ok {
d.dec = json.NewDecoder(d.rdr)
continue
}
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
// remaining data in the parser's buffer while an io.EOF occurs.
// If the json logger writes a partial json log entry to the disk
// while at the same time the decoder tries to decode it, the race condition happens.
if err == io.ErrUnexpectedEOF {
d.rdr = combineReaders(d.dec.Buffered(), d.rdr)
d.dec = json.NewDecoder(d.rdr)
continue
}
}
return msg, err
}
func combineReaders(pre, rdr io.Reader) io.Reader {
return &combinedReader{pre: pre, rdr: rdr}
}
// combinedReader is a reader which is like `io.MultiReader` where except it does not cache a full EOF.
// Once `io.MultiReader` returns EOF, it is always EOF.
//
// For this usecase we have an underlying reader which is a file which may reach EOF but have more data written to it later.
// As such, io.MultiReader does not work for us.
type combinedReader struct {
pre io.Reader
rdr io.Reader
}
func (r *combinedReader) Read(p []byte) (int, error) {
var read int
if r.pre != nil {
n, err := r.pre.Read(p)
if err != nil {
if err != io.EOF {
return n, err
}
r.pre = nil
}
read = n
}
if read < len(p) {
n, err := r.rdr.Read(p[read:])
if n > 0 {
read += n
}
if err != nil {
return read, err
}
}
return read, nil
return decodeLogLine(d.dec, d.jl)
}
// decodeFunc is used to create a decoder for the log file reader

View file

@ -3,7 +3,6 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
@ -107,45 +106,6 @@ func TestEncodeDecode(t *testing.T) {
assert.Assert(t, err == io.EOF)
}
func TestUnexpectedEOF(t *testing.T) {
buf := bytes.NewBuffer(nil)
msg1 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello1")}
msg2 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello2")}
err := marshalMessage(msg1, json.RawMessage{}, buf)
assert.NilError(t, err)
err = marshalMessage(msg2, json.RawMessage{}, buf)
assert.NilError(t, err)
r := &readerWithErr{
err: io.EOF,
after: buf.Len() / 4,
r: buf,
}
dec := &decoder{rdr: r, maxRetry: 1}
_, err = dec.Decode()
assert.Error(t, err, io.ErrUnexpectedEOF.Error())
// again just to check
_, err = dec.Decode()
assert.Error(t, err, io.ErrUnexpectedEOF.Error())
// reset the error
// from here all reads should succeed until we get EOF on the underlying reader
r.err = nil
msg, err := dec.Decode()
assert.NilError(t, err)
assert.Equal(t, string(msg1.Line)+"\n", string(msg.Line))
msg, err = dec.Decode()
assert.NilError(t, err)
assert.Equal(t, string(msg2.Line)+"\n", string(msg.Line))
_, err = dec.Decode()
assert.Error(t, err, io.EOF.Error())
}
func TestReadLogs(t *testing.T) {
t.Parallel()
r := loggertest.Reader{
@ -193,6 +153,40 @@ func TestTailLogsWithRotation(t *testing.T) {
compress(false)
}
func TestFollowLogsWithRotation(t *testing.T) {
t.Parallel()
compress := func(cmprs bool) {
t.Run(fmt.Sprintf("compress=%v", cmprs), func(t *testing.T) {
t.Parallel()
(&loggertest.Reader{
Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger {
// The log follower can fall behind and drop logs if there are too many
// rotations in a short time. If that was to happen, loggertest would fail the
// test. Configure the logger so that there will be only one rotation with the
// set of logs that loggertest writes.
info.Config = map[string]string{
"compress": strconv.FormatBool(cmprs),
"max-size": "4096b",
"max-file": "3",
}
dir := t.TempDir()
t.Cleanup(func() {
t.Logf("%s:\n%s", t.Name(), dirStringer{dir})
})
info.LogPath = filepath.Join(dir, info.ContainerID+".log")
return func(t *testing.T) logger.Logger {
l, err := New(info)
assert.NilError(t, err)
return l
}
},
}).TestFollow(t)
})
}
compress(true)
compress(false)
}
type dirStringer struct {
d string
}
@ -220,22 +214,3 @@ func (d dirStringer) String() string {
tw.Flush()
return buf.String()
}
type readerWithErr struct {
err error
after int
r io.Reader
read int
}
func (r *readerWithErr) Read(p []byte) (int, error) {
if r.err != nil && r.read > r.after {
return 0, r.err
}
n, err := r.r.Read(p[:1])
if n > 0 {
r.read += n
}
return n, err
}

View file

@ -1,221 +1,165 @@
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"fmt"
"io"
"os"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/filenotify"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var errRetry = errors.New("retry")
var errDone = errors.New("done")
type follow struct {
file *os.File
dec Decoder
fileWatcher filenotify.FileWatcher
logWatcher *logger.LogWatcher
producerGone <-chan struct{}
draining bool
notifyRotate, notifyEvict chan interface{}
oldSize int64
retries int
LogFile *LogFile
Watcher *logger.LogWatcher
Decoder Decoder
Since, Until time.Time
log *logrus.Entry
c chan logPos
}
func (fl *follow) handleRotate() error {
name := fl.file.Name()
// Do follows the log file as it is written, starting from f at read.
func (fl *follow) Do(f *os.File, read logPos) {
fl.log = logrus.WithFields(logrus.Fields{
"module": "logger",
"file": f.Name(),
})
// Optimization: allocate the write-notifications channel only once and
// reuse it for multiple invocations of nextPos().
fl.c = make(chan logPos, 1)
fl.file.Close()
fl.fileWatcher.Remove(name)
// retry when the file doesn't exist
var err error
for retries := 0; retries <= 5; retries++ {
f, err := open(name)
if err == nil || !os.IsNotExist(err) {
fl.file = f
break
defer func() {
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
fl.log.WithError(err).Warn("error closing current log file")
}
}
if err != nil {
return err
}
if err := fl.fileWatcher.Add(name); err != nil {
return err
}
fl.dec.Reset(fl.file)
return nil
}
func (fl *follow) handleMustClose(evictErr error) {
fl.file.Close()
fl.dec.Close()
fl.logWatcher.Err <- errors.Wrap(evictErr, "log reader evicted due to errors")
logrus.WithField("file", fl.file.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.")
}
func (fl *follow) waitRead() error {
select {
case e := <-fl.notifyEvict:
if e != nil {
err := e.(error)
fl.handleMustClose(err)
}
return errDone
case e := <-fl.fileWatcher.Events():
switch e.Op {
case fsnotify.Write:
fl.dec.Reset(fl.file)
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-fl.notifyRotate:
case <-fl.producerGone:
return errDone
case <-fl.logWatcher.WatchConsumerGone():
return errDone
}
if err := fl.handleRotate(); err != nil {
return err
}
return nil
}
return errRetry
case err := <-fl.fileWatcher.Errors():
logrus.Debugf("logger got error watching file: %v", err)
// Something happened, let's try and stay alive and create a new watcher
if fl.retries <= 5 {
fl.fileWatcher.Close()
fl.fileWatcher, err = watchFile(fl.file.Name())
if err != nil {
return err
}
fl.retries++
return errRetry
}
return err
case <-fl.producerGone:
// There may be messages written out which the fileWatcher has
// not yet notified us about.
if fl.draining {
return errDone
}
fl.draining = true
fl.dec.Reset(fl.file)
return nil
case <-fl.logWatcher.WatchConsumerGone():
return errDone
}
}
func (fl *follow) handleDecodeErr(err error) error {
if !errors.Is(err, io.EOF) {
return err
}
// Handle special case (#39235): max-file=1 and file was truncated
st, stErr := fl.file.Stat()
if stErr == nil {
size := st.Size()
defer func() { fl.oldSize = size }()
if size < fl.oldSize { // truncated
fl.file.Seek(0, 0)
fl.dec.Reset(fl.file)
return nil
}
} else {
logrus.WithError(stErr).Warn("logger: stat error")
}
}()
for {
err := fl.waitRead()
if err == nil {
break
}
if err == errRetry {
continue
}
return err
}
return nil
}
func (fl *follow) mainLoop(since, until time.Time) {
for {
select {
case err := <-fl.notifyEvict:
if err != nil {
fl.handleMustClose(err.(error))
}
wrote, ok := fl.nextPos(read)
if !ok {
return
default:
}
msg, err := fl.dec.Decode()
if err != nil {
if err := fl.handleDecodeErr(err); err != nil {
if err == errDone {
return
}
// we got an unrecoverable error, so return
fl.logWatcher.Err <- err
if wrote.rotation != read.rotation {
// Flush the current file before moving on to the next.
if _, err := f.Seek(read.size, io.SeekStart); err != nil {
fl.Watcher.Err <- err
return
}
// ready to try again
continue
if fl.decode(f) {
return
}
// Open the new file, which has the same name as the old
// file thanks to file rotation. Make no mistake: they
// are different files, with distinct identities.
// Atomically capture the wrote position to make
// absolutely sure that the position corresponds to the
// file we have opened; more rotations could have
// occurred since we previously received it.
if err := f.Close(); err != nil {
fl.log.WithError(err).Warn("error closing rotated log file")
}
var err error
func() {
fl.LogFile.fsopMu.RLock()
st := <-fl.LogFile.read
defer func() {
fl.LogFile.read <- st
fl.LogFile.fsopMu.RUnlock()
}()
f, err = open(f.Name())
wrote = st.pos
}()
// We tried to open the file inside a critical section
// so we shouldn't have been racing the rotation of the
// file. Any error, even fs.ErrNotFound, is exceptional.
if err != nil {
fl.Watcher.Err <- fmt.Errorf("logger: error opening log file for follow after rotation: %w", err)
return
}
if nrot := wrote.rotation - read.rotation; nrot > 1 {
fl.log.WithField("missed-rotations", nrot).
Warn("file rotations were missed while following logs; some log messages have been skipped over")
}
// Set up our read position to start from the top of the file.
read.size = 0
}
fl.retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
if fl.decode(io.NewSectionReader(f, read.size, wrote.size-read.size)) {
return
}
read = wrote
}
}
// nextPos waits until the write position of the LogFile being followed has
// advanced from current and returns the new position.
func (fl *follow) nextPos(current logPos) (next logPos, ok bool) {
var st logReadState
select {
case <-fl.Watcher.WatchConsumerGone():
return current, false
case st = <-fl.LogFile.read:
}
// Have any any logs been written since we last checked?
if st.pos == current { // Nope.
// Add ourself to the notify list.
st.wait = append(st.wait, fl.c)
} else { // Yes.
// "Notify" ourself immediately.
fl.c <- st.pos
}
fl.LogFile.read <- st
select {
case <-fl.LogFile.closed: // No more logs will be written.
select { // Have we followed to the end?
case next = <-fl.c: // No: received a new position.
default: // Yes.
return current, false
}
case <-fl.Watcher.WatchConsumerGone():
return current, false
case next = <-fl.c:
}
return next, true
}
// decode decodes log messages from r and sends messages with timestamps between
// Since and Until to the log watcher.
//
// The return value, done, signals whether following should end due to a
// condition encountered during decode.
func (fl *follow) decode(r io.Reader) (done bool) {
fl.Decoder.Reset(r)
for {
msg, err := fl.Decoder.Decode()
if err != nil {
if errors.Is(err, io.EOF) {
return false
}
fl.Watcher.Err <- err
return true
}
if !fl.Since.IsZero() && msg.Timestamp.Before(fl.Since) {
continue
}
if !until.IsZero() && msg.Timestamp.After(until) {
return
if !fl.Until.IsZero() && msg.Timestamp.After(fl.Until) {
return true
}
// send the message, unless the consumer is gone
select {
case e := <-fl.notifyEvict:
if e != nil {
err := e.(error)
logrus.WithError(err).Debug("Reader evicted while sending log message")
fl.logWatcher.Err <- err
}
return
case fl.logWatcher.Msg <- msg:
case <-fl.logWatcher.WatchConsumerGone():
return
case fl.Watcher.Msg <- msg:
case <-fl.Watcher.WatchConsumerGone():
return true
}
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, producerGone <-chan struct{}, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
dec.Reset(f)
name := f.Name()
fileWatcher, err := watchFile(name)
if err != nil {
logWatcher.Err <- err
return
}
defer func() {
f.Close()
dec.Close()
fileWatcher.Close()
}()
fl := &follow{
file: f,
oldSize: -1,
logWatcher: logWatcher,
fileWatcher: fileWatcher,
producerGone: producerGone,
notifyRotate: notifyRotate,
notifyEvict: notifyEvict,
dec: dec,
}
fl.mainLoop(since, until)
}

View file

@ -1,37 +0,0 @@
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"io"
"os"
"testing"
"gotest.tools/v3/assert"
)
func TestHandleDecoderErr(t *testing.T) {
f, err := os.CreateTemp("", t.Name())
assert.NilError(t, err)
defer os.Remove(f.Name())
_, err = f.Write([]byte("hello"))
assert.NilError(t, err)
pos, err := f.Seek(0, io.SeekCurrent)
assert.NilError(t, err)
assert.Assert(t, pos != 0)
dec := &testDecoder{}
// Simulate "turncate" case, where the file was bigger before.
fl := &follow{file: f, dec: dec, oldSize: 100}
err = fl.handleDecodeErr(io.EOF)
assert.NilError(t, err)
// handleDecodeErr seeks to zero.
pos, err = f.Seek(0, io.SeekCurrent)
assert.NilError(t, err)
assert.Equal(t, int64(0), pos)
// Reset is called.
assert.Equal(t, 1, dec.resetCount)
}

View file

@ -7,16 +7,14 @@ import (
"fmt"
"io"
"io/fs"
"math"
"os"
"runtime"
"strconv"
"sync"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/pubsub"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -28,22 +26,63 @@ type rotateFileMetadata struct {
// LogFile is Logger implementation for default Docker logging.
type LogFile struct {
mu sync.RWMutex // protects the logfile access
f *os.File // store for closing
closed bool
closedCh chan struct{}
rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
capacity int64 // maximum size of each file
currentSize int64 // current size of the latest file
maxFiles int // maximum number of files
compress bool // whether old versions of log files are compressed
lastTimestamp time.Time // timestamp of the last log
decompress *sharedTempFileConverter // keep reference-counted decompressed files
notifyReaders *pubsub.Publisher
mu sync.Mutex // protects the logfile access
closed chan struct{}
rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
// Lock out readers while performing a non-atomic sequence of filesystem
// operations (RLock: open, Lock: rename, delete).
//
// fsopMu should be locked for writing only while holding rotateMu.
fsopMu sync.RWMutex
// Logger configuration
capacity int64 // maximum size of each file
maxFiles int // maximum number of files
compress bool // whether old versions of log files are compressed
perms os.FileMode
// Log file codec
marshal logger.MarshalFunc
createDecoder MakeDecoderFn
getTailReader GetTailReaderFunc
perms os.FileMode
// Log reader state in a 1-buffered channel.
//
// Share memory by communicating: receive to acquire, send to release.
// The state struct is passed around by value so that use-after-send
// bugs cannot escalate to data races.
//
// A method which receives the state value takes ownership of it. The
// owner is responsible for either passing ownership along or sending
// the state back to the channel. By convention, the semantics of
// passing along ownership is expressed with function argument types.
// Methods which take a pointer *logReadState argument borrow the state,
// analogous to functions which require a lock to be held when calling.
// The caller retains ownership. Calling a method which which takes a
// value logFileState argument gives ownership to the callee.
read chan logReadState
decompress *sharedTempFileConverter
pos logPos // Current log file write position.
f *os.File // Current log file for writing.
lastTimestamp time.Time // timestamp of the last log
}
type logPos struct {
// Size of the current file.
size int64
// File rotation sequence number (modulo 2**16).
rotation uint16
}
type logReadState struct {
// Current log file position.
pos logPos
// Wait list to be notified of the value of pos next time it changes.
wait []chan<- logPos
}
// MakeDecoderFn creates a decoder
@ -92,15 +131,24 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
return nil, err
}
pos := logPos{
size: size,
// Force a wraparound on first rotation to shake out any
// modular-arithmetic bugs.
rotation: math.MaxUint16,
}
st := make(chan logReadState, 1)
st <- logReadState{pos: pos}
return &LogFile{
f: log,
closedCh: make(chan struct{}),
read: st,
pos: pos,
closed: make(chan struct{}),
capacity: capacity,
currentSize: size,
maxFiles: maxFiles,
compress: compress,
decompress: newSharedTempFileConverter(decompress),
notifyReaders: pubsub.NewPublisher(0, 1),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
@ -120,35 +168,45 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
logger.PutMessage(msg)
msg = nil // Turn use-after-put bugs into panics.
w.mu.Lock()
if w.closed {
w.mu.Unlock()
select {
case <-w.closed:
return errors.New("cannot write because the output file was closed")
default:
}
w.mu.Lock()
defer w.mu.Unlock()
if err := w.checkCapacityAndRotate(); err != nil {
w.mu.Unlock()
return errors.Wrap(err, "error rotating log file")
// Are we due for a rotation?
if w.capacity != -1 && w.pos.size >= w.capacity {
if err := w.rotate(); err != nil {
return errors.Wrap(err, "error rotating log file")
}
}
n, err := w.f.Write(b)
if err == nil {
w.currentSize += int64(n)
w.lastTimestamp = ts
if err != nil {
return errors.Wrap(err, "error writing log entry")
}
w.pos.size += int64(n)
w.lastTimestamp = ts
w.mu.Unlock()
return errors.Wrap(err, "error writing log entry")
// Notify any waiting readers that there is a new log entry to read.
st := <-w.read
defer func() { w.read <- st }()
st.pos = w.pos
for _, c := range st.wait {
c <- st.pos
}
// Optimization: retain the backing array to save a heap allocation next
// time a reader appends to the list.
if st.wait != nil {
st.wait = st.wait[:0]
}
return nil
}
func (w *LogFile) checkCapacityAndRotate() (retErr error) {
if w.capacity == -1 {
return nil
}
if w.currentSize < w.capacity {
return nil
}
func (w *LogFile) rotate() (retErr error) {
w.rotateMu.Lock()
noCompress := w.maxFiles <= 1 || !w.compress
defer func() {
@ -167,46 +225,56 @@ func (w *LogFile) checkCapacityAndRotate() (retErr error) {
}
}
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
} else {
// We may have readers working their way through the current
// log file so we can't truncate it. We need to start writing
// new logs to an empty file with the same name as the current
// one so we need to rotate the current file out of the way.
if w.maxFiles < 2 {
if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) {
logrus.WithError(err).Error("Error unlinking current log file")
}
file, err := func() (*os.File, error) {
w.fsopMu.Lock()
defer w.fsopMu.Unlock()
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
} else {
if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) {
logrus.WithError(err).Error("Error renaming current log file")
// We may have readers working their way through the
// current log file so we can't truncate it. We need to
// start writing new logs to an empty file with the same
// name as the current one so we need to rotate the
// current file out of the way.
if w.maxFiles < 2 {
if err := unlink(fname); err != nil && !errors.Is(err, fs.ErrNotExist) {
logrus.WithError(err).Error("Error unlinking current log file")
}
} else {
if err := os.Rename(fname, fname+".1"); err != nil && !errors.Is(err, fs.ErrNotExist) {
logrus.WithError(err).Error("Error renaming current log file")
}
}
}
}
// Notwithstanding the above, open with the truncate flag anyway in case
// rotation didn't work out as planned.
file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
// Notwithstanding the above, open with the truncate flag anyway
// in case rotation didn't work out as planned.
return openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
}()
if err != nil {
return err
}
w.f = file
w.currentSize = 0
w.notifyReaders.Publish(struct{}{})
w.pos = logPos{rotation: w.pos.rotation + 1}
if noCompress {
return nil
}
ts := w.lastTimestamp
go func() {
defer w.rotateMu.Unlock()
// No need to hold fsopMu as at no point will the filesystem be
// in a state which would cause problems for readers. Opening
// the uncompressed file is tried first, falling back to the
// compressed one. compressFile only deletes the uncompressed
// file once the compressed one is fully written out, so at no
// point during the compression process will a reader fail to
// open a complete copy of the file.
if err := compressFile(fname+".1", ts); err != nil {
logrus.WithError(err).Error("Error compressing log file after rotation")
}
w.rotateMu.Unlock()
}()
return nil
@ -231,8 +299,9 @@ func rotate(name string, maxFiles int, compress bool) error {
for i := maxFiles - 1; i > 1; i-- {
toPath := name + "." + strconv.Itoa(i) + extension
fromPath := name + "." + strconv.Itoa(i-1) + extension
logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
if err := os.Rename(fromPath, toPath); err != nil && !errors.Is(err, fs.ErrNotExist) {
err := os.Rename(fromPath, toPath)
logrus.WithError(err).WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
}
@ -301,38 +370,49 @@ func (w *LogFile) MaxFiles() int {
func (w *LogFile) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
select {
case <-w.closed:
return nil
default:
}
if err := w.f.Close(); err != nil && !errors.Is(err, fs.ErrClosed) {
return err
}
w.closed = true
close(w.closedCh)
close(w.closed)
// Wait until any in-progress rotation is complete.
w.rotateMu.Lock()
w.rotateMu.Unlock() //nolint:staticcheck
return nil
}
// ReadLogs decodes entries from log files and sends them the passed in watcher
// ReadLogs decodes entries from log files.
//
// Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
// TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
// It is the caller's responsibility to call ConsumerGone on the LogWatcher.
func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
watcher := logger.NewLogWatcher()
// Lock before starting the reader goroutine to synchronize operations
// for race-free unit testing. The writer is locked out until the reader
// has opened the log file and set the read cursor to the current
// position.
w.mu.RLock()
go w.readLogsLocked(config, watcher)
// Lock out filesystem operations so that we can capture the read
// position and atomically open the corresponding log file, without the
// file getting rotated out from under us.
w.fsopMu.RLock()
// Capture the read position synchronously to ensure that we start
// following from the last entry logged before ReadLogs was called,
// which is required for flake-free unit testing.
st := <-w.read
pos := st.pos
w.read <- st
go w.readLogsLocked(pos, config, watcher)
return watcher
}
func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) {
// readLogsLocked is the bulk of the implementation of ReadLogs.
//
// w.fsopMu must be locked for reading when calling this method.
// w.fsopMu.RUnlock() is called before returning.
func (w *LogFile) readLogsLocked(currentPos logPos, config logger.ReadConfig, watcher *logger.LogWatcher) {
defer close(watcher.Msg)
currentFile, err := open(w.f.Name())
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
@ -341,25 +421,13 @@ func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWa
dec := w.createDecoder(nil)
defer dec.Close()
currentChunk, err := newSectionReader(currentFile)
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool {
_, ok := i.(error)
return ok
}, 1)
defer w.notifyReaders.Evict(notifyEvict)
currentChunk := io.NewSectionReader(currentFile, 0, currentPos.size)
if config.Tail != 0 {
// TODO(@cpuguy83): Instead of opening every file, only get the files which
// are needed to tail.
// This is especially costly when compression is enabled.
files, err := w.openRotatedFiles(config)
w.mu.RUnlock()
if err != nil {
watcher.Err <- err
return
@ -392,34 +460,35 @@ func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWa
readers = append(readers, currentChunk)
}
ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict)
ok := tailFiles(readers, watcher, dec, w.getTailReader, config)
closeFiles()
if !ok {
return
}
w.mu.RLock()
} else {
w.fsopMu.RUnlock()
}
w.mu.RUnlock()
if !config.Follow {
return
}
notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
_, ok := i.(struct{})
return ok
})
defer w.notifyReaders.Evict(notifyRotate)
followLogs(currentFile, watcher, w.closedCh, notifyRotate, notifyEvict, dec, config.Since, config.Until)
(&follow{
LogFile: w,
Watcher: watcher,
Decoder: dec,
Since: config.Since,
Until: config.Until,
}).Do(currentFile, currentPos)
}
// openRotatedFiles returns a slice of files open for reading, in order from oldest to newest.
// openRotatedFiles returns a slice of files open for reading, in order from
// oldest to newest, and calls w.fsopMu.RUnlock() before returning.
//
// This method must only be called with w.fsopMu locked for reading.
func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []readAtCloser, err error) {
w.rotateMu.Lock()
defer w.rotateMu.Unlock()
defer func() {
w.fsopMu.RUnlock()
if err == nil {
return
}
@ -491,19 +560,7 @@ func decompress(dst io.WriteSeeker, src io.ReadSeeker) error {
return rc.Close()
}
func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return nil, errors.Wrap(err, "error getting current file size")
}
return io.NewSectionReader(f, 0, size), nil
}
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) {
nLines := config.Tail
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) (cont bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -511,12 +568,6 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
go func() {
select {
case err := <-notifyEvict:
if err != nil {
watcher.Err <- err.(error)
cont = false
cancel()
}
case <-ctx.Done():
case <-watcher.WatchConsumerGone():
cont = false
@ -527,6 +578,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
readers := make([]io.Reader, 0, len(files))
if config.Tail > 0 {
nLines := config.Tail
for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
tail, n, err := getTailReader(ctx, files[i], nLines)
if err != nil {
@ -566,41 +618,3 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
}
}
}
func watchFile(name string) (filenotify.FileWatcher, error) {
var fileWatcher filenotify.FileWatcher
if runtime.GOOS == "windows" {
// FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching.
// It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only.
// Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher
// doesn't work. Hence for Windows we will use poll based notifier.
fileWatcher = filenotify.NewPollingWatcher()
} else {
var err error
fileWatcher, err = filenotify.New()
if err != nil {
return nil, err
}
}
logger := logrus.WithFields(logrus.Fields{
"module": "logger",
"file": name,
})
if err := fileWatcher.Add(name); err != nil {
// we will retry using file poller.
logger.WithError(err).Warnf("falling back to file poller")
fileWatcher.Close()
fileWatcher = filenotify.NewPollingWatcher()
if err := fileWatcher.Add(name); err != nil {
fileWatcher.Close()
logger.WithError(err).Debugf("error watching log file for modifications")
return nil, err
}
}
return fileWatcher, nil
}

View file

@ -9,7 +9,6 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"text/tabwriter"
"time"
@ -67,7 +66,7 @@ func TestTailFiles(t *testing.T) {
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
tailFiles(files, watcher, dec, tailReader, config)
}()
<-started
})
@ -77,7 +76,7 @@ func TestTailFiles(t *testing.T) {
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
tailFiles(files, watcher, dec, tailReader, config)
}()
<-started
@ -111,140 +110,6 @@ func (dummyDecoder) Decode() (*logger.Message, error) {
func (dummyDecoder) Close() {}
func (dummyDecoder) Reset(io.Reader) {}
func TestFollowLogsConsumerGone(t *testing.T) {
lw := logger.NewLogWatcher()
f, err := os.CreateTemp("", t.Name())
assert.NilError(t, err)
defer func() {
f.Close()
os.Remove(f.Name())
}()
dec := dummyDecoder{}
followLogsDone := make(chan struct{})
var since, until time.Time
go func() {
followLogs(f, lw, nil, make(chan interface{}), make(chan interface{}), dec, since, until)
close(followLogsDone)
}()
select {
case <-lw.Msg:
case err := <-lw.Err:
assert.NilError(t, err)
case <-followLogsDone:
t.Fatal("follow logs finished unexpectedly")
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for log message")
}
lw.ConsumerGone()
select {
case <-followLogsDone:
case <-time.After(20 * time.Second):
t.Fatal("timeout waiting for followLogs() to finish")
}
}
type dummyWrapper struct {
dummyDecoder
fn func() error
}
func (d *dummyWrapper) Decode() (*logger.Message, error) {
if err := d.fn(); err != nil {
return nil, err
}
return d.dummyDecoder.Decode()
}
func TestFollowLogsProducerGone(t *testing.T) {
lw := logger.NewLogWatcher()
defer lw.ConsumerGone()
f, err := os.CreateTemp("", t.Name())
assert.NilError(t, err)
defer os.Remove(f.Name())
var sent, received, closed int32
dec := &dummyWrapper{fn: func() error {
switch atomic.LoadInt32(&closed) {
case 0:
atomic.AddInt32(&sent, 1)
return nil
case 1:
atomic.AddInt32(&closed, 1)
t.Logf("logDecode() closed after sending %d messages\n", sent)
return io.EOF
default:
return io.EOF
}
}}
var since, until time.Time
followLogsDone := make(chan struct{})
producerGone := make(chan struct{})
go func() {
followLogs(f, lw, producerGone, make(chan interface{}), make(chan interface{}), dec, since, until)
close(followLogsDone)
}()
// read 1 message
select {
case <-lw.Msg:
received++
case err := <-lw.Err:
assert.NilError(t, err)
case <-followLogsDone:
t.Fatal("followLogs() finished unexpectedly")
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for log message")
}
// "stop" the "container"
atomic.StoreInt32(&closed, 1)
close(producerGone)
// should receive all the messages sent
readDone := make(chan struct{})
go func() {
defer close(readDone)
for {
select {
case <-lw.Msg:
received++
if received == atomic.LoadInt32(&sent) {
return
}
case err := <-lw.Err:
assert.NilError(t, err)
}
}
}()
select {
case <-readDone:
case <-time.After(30 * time.Second):
t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
}
t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
// followLogs() should be done by now
select {
case <-followLogsDone:
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for followLogs() to finish")
}
select {
case <-lw.WatchConsumerGone():
t.Fatal("consumer should not have exited")
default:
}
}
func TestCheckCapacityAndRotate(t *testing.T) {
dir := t.TempDir()