1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #41100 from cpuguy83/windows_fix_rename_open_file

Fix windows log file rotation with readers
This commit is contained in:
Tianon Gravi 2020-11-05 20:02:58 +00:00 committed by GitHub
commit 761066faf1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 248 additions and 102 deletions

View file

@ -7,3 +7,7 @@ import "os"
func openFile(name string, flag int, perm os.FileMode) (*os.File, error) {
return os.OpenFile(name, flag, perm)
}
func open(name string) (*os.File, error) {
return os.Open(name)
}

View file

@ -4,17 +4,19 @@ import (
"os"
"syscall"
"unsafe"
"github.com/pkg/errors"
)
func open(name string) (*os.File, error) {
return openFile(name, os.O_RDONLY, 0)
}
func openFile(name string, flag int, perm os.FileMode) (*os.File, error) {
if name == "" {
return nil, &os.PathError{Op: "open", Path: name, Err: syscall.ENOENT}
}
h, err := syscallOpen(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm))
if err != nil {
return nil, errors.Wrap(err, "error opening file")
return nil, &os.PathError{Op: "open", Path: name, Err: err}
}
return os.NewFile(uintptr(h), name), nil
}

View file

@ -68,7 +68,7 @@ func (rc *refCounter) Dereference(fileName string) error {
if rc.counter[fileName] <= 0 {
delete(rc.counter, fileName)
err := os.Remove(fileName)
if err != nil {
if err != nil && !os.IsNotExist(err) {
return err
}
}
@ -87,7 +87,7 @@ type LogFile struct {
compress bool // whether old versions of log files are compressed
lastTimestamp time.Time // timestamp of the last log
filesRefCounter refCounter // keep reference-counted of decompressed files
notifyRotate *pubsub.Publisher
notifyReaders *pubsub.Publisher
marshal logger.MarshalFunc
createDecoder MakeDecoderFn
getTailReader GetTailReaderFunc
@ -141,7 +141,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
maxFiles: maxFiles,
compress: compress,
filesRefCounter: refCounter{counter: make(map[string]int)},
notifyRotate: pubsub.NewPublisher(0, 1),
notifyReaders: pubsub.NewPublisher(0, 1),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
@ -167,7 +167,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
if err := w.checkCapacityAndRotate(); err != nil {
w.mu.Unlock()
return err
return errors.Wrap(err, "error rotating log file")
}
n, err := w.f.Write(b)
@ -175,49 +175,77 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
w.currentSize += int64(n)
w.lastTimestamp = msg.Timestamp
}
w.mu.Unlock()
return err
return errors.Wrap(err, "error writing log entry")
}
func (w *LogFile) checkCapacityAndRotate() error {
func (w *LogFile) checkCapacityAndRotate() (retErr error) {
if w.capacity == -1 {
return nil
}
if w.currentSize >= w.capacity {
w.rotateMu.Lock()
fname := w.f.Name()
if err := w.f.Close(); err != nil {
// if there was an error during a prior rotate, the file could already be closed
if !errors.Is(err, os.ErrClosed) {
w.rotateMu.Unlock()
return errors.Wrap(err, "error closing file")
}
}
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
w.rotateMu.Unlock()
return err
}
file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
if err != nil {
w.rotateMu.Unlock()
return err
}
w.f = file
w.currentSize = 0
w.notifyRotate.Publish(struct{}{})
if w.maxFiles <= 1 || !w.compress {
w.rotateMu.Unlock()
return nil
}
go func() {
compressFile(fname+".1", w.lastTimestamp)
w.rotateMu.Unlock()
}()
if w.currentSize < w.capacity {
return nil
}
w.rotateMu.Lock()
noCompress := w.maxFiles <= 1 || !w.compress
defer func() {
// If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function.
// Otherwise the lock will be released in the goroutine that handles compression.
if retErr != nil || noCompress {
w.rotateMu.Unlock()
}
}()
fname := w.f.Name()
if err := w.f.Close(); err != nil {
// if there was an error during a prior rotate, the file could already be closed
if !errors.Is(err, os.ErrClosed) {
return errors.Wrap(err, "error closing file")
}
}
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
} else {
var renameErr error
for i := 0; i < 10; i++ {
if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) {
logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying")
w.notifyReaders.Publish(renameErr)
time.Sleep(100 * time.Millisecond)
continue
}
break
}
if renameErr != nil {
logrus.WithError(renameErr).Error("Error renaming current log file")
}
}
file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
if err != nil {
return err
}
w.f = file
w.currentSize = 0
w.notifyReaders.Publish(struct{}{})
if noCompress {
return nil
}
ts := w.lastTimestamp
go func() {
if err := compressFile(fname+".1", ts); err != nil {
logrus.WithError(err).Error("Error compressing log file after rotation")
}
w.rotateMu.Unlock()
}()
return nil
}
@ -240,41 +268,44 @@ func rotate(name string, maxFiles int, compress bool) error {
for i := maxFiles - 1; i > 1; i-- {
toPath := name + "." + strconv.Itoa(i) + extension
fromPath := name + "." + strconv.Itoa(i-1) + extension
logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
return err
}
}
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func compressFile(fileName string, lastTimestamp time.Time) {
file, err := os.Open(fileName)
func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
file, err := open(fileName)
if err != nil {
logrus.Errorf("Failed to open log file: %v", err)
return
if os.IsNotExist(err) {
logrus.WithField("file", fileName).WithError(err).Debug("Could not open log file to compress")
return nil
}
return errors.Wrap(err, "failed to open log file")
}
defer func() {
file.Close()
err := os.Remove(fileName)
if err != nil {
logrus.Errorf("Failed to remove source log file: %v", err)
if retErr == nil {
err := os.Remove(fileName)
if err != nil && !os.IsNotExist(err) {
retErr = errors.Wrap(err, "failed to remove source log file")
}
}
}()
outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
if err != nil {
logrus.Errorf("Failed to open or create gzip log file: %v", err)
return
return errors.Wrap(err, "failed to open or create gzip log file")
}
defer func() {
outFile.Close()
if err != nil {
os.Remove(fileName + ".gz")
if retErr != nil {
if err := os.Remove(fileName + ".gz"); err != nil && !os.IsExist(err) {
logrus.WithError(err).Error("Error cleaning up after failed log compression")
}
}
}()
@ -292,9 +323,10 @@ func compressFile(fileName string, lastTimestamp time.Time) {
_, err = pools.Copy(compressWriter, file)
if err != nil {
logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
return
return errors.Wrapf(err, "error compressing log file %s", fileName)
}
return nil
}
// MaxFiles return maximum number of files
@ -309,7 +341,7 @@ func (w *LogFile) Close() error {
if w.closed {
return nil
}
if err := w.f.Close(); err != nil {
if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
return err
}
w.closed = true
@ -322,7 +354,7 @@ func (w *LogFile) Close() error {
// TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
w.mu.RLock()
currentFile, err := os.Open(w.f.Name())
currentFile, err := open(w.f.Name())
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
@ -340,6 +372,12 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
return
}
notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool {
_, ok := i.(error)
return ok
}, 1)
defer w.notifyReaders.Evict(notifyEvict)
if config.Tail != 0 {
// TODO(@cpuguy83): Instead of opening every file, only get the files which
// are needed to tail.
@ -358,7 +396,7 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
logrus.WithError(err).WithField("file", fileName).Error("Failed to dereference the log file")
}
}
}
@ -378,9 +416,11 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
readers = append(readers, currentChunk)
}
tailFiles(readers, watcher, dec, w.getTailReader, config)
ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict)
closeFiles()
if !ok {
return
}
w.mu.RLock()
}
@ -390,9 +430,13 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
}
w.mu.RUnlock()
notifyRotate := w.notifyRotate.Subscribe()
defer w.notifyRotate.Evict(notifyRotate)
followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until)
notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
_, ok := i.(struct{})
return ok
})
defer w.notifyReaders.Evict(notifyRotate)
followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until)
}
func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
@ -415,7 +459,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File,
}()
for i := w.maxFiles; i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
if err != nil {
if !os.IsNotExist(err) {
return nil, errors.Wrap(err, "error opening rotated log file")
@ -425,7 +469,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File,
decompressedFileName := fileName + tmpLogfileSuffix
tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
if exists {
return os.Open(refFileName)
return open(refFileName)
}
return decompressfile(fileName, refFileName, config.Since)
})
@ -451,7 +495,7 @@ func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File,
}
func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
cf, err := os.Open(fileName)
cf, err := open(fileName)
if err != nil {
return nil, errors.Wrap(err, "error opening file for decompression")
}
@ -498,16 +542,25 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
return io.NewSectionReader(f, 0, size), nil
}
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) {
nLines := config.Tail
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cont = true
// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
go func() {
select {
case err := <-notifyEvict:
if err != nil {
watcher.Err <- err.(error)
cont = false
cancel()
}
case <-ctx.Done():
case <-watcher.WatchConsumerGone():
cont = false
cancel()
}
}()
@ -555,7 +608,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) {
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
dec.Reset(f)
name := f.Name()
@ -566,6 +619,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
}
defer func() {
f.Close()
dec.Close()
fileWatcher.Close()
}()
@ -576,7 +630,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
// retry when the file doesn't exist
for retries := 0; retries <= 5; retries++ {
f, err = os.Open(name)
f, err = open(name)
if err == nil || !os.IsNotExist(err) {
break
}
@ -593,8 +647,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
errRetry := errors.New("retry")
errDone := errors.New("done")
handleMustClose := func(evictErr error) {
f.Close()
dec.Close()
logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors")
logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.")
}
waitRead := func() error {
select {
case e := <-notifyEvict:
if e != nil {
err := e.(error)
handleMustClose(err)
}
return errDone
case e := <-fileWatcher.Events():
switch e.Op {
case fsnotify.Write:
@ -668,6 +736,14 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
// main loop
for {
select {
case err := <-notifyEvict:
if err != nil {
handleMustClose(err.(error))
}
return
default:
}
msg, err := dec.Decode()
if err != nil {
if err := handleDecodeErr(err); err != nil {
@ -691,6 +767,13 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
}
// send the message, unless the consumer is gone
select {
case e := <-notifyEvict:
if e != nil {
err := e.(error)
logrus.WithError(err).Debug("Reader evicted while sending log message")
logWatcher.Err <- err
}
return
case logWatcher.Msg <- msg:
case <-logWatcher.WatchConsumerGone():
return

View file

@ -1,14 +1,18 @@
package loggerutils
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"text/tabwriter"
"time"
"github.com/docker/docker/daemon/logger"
@ -51,6 +55,7 @@ func TestTailFiles(t *testing.T) {
files := []SizeReaderAt{s1, s2, s3}
watcher := logger.NewLogWatcher()
defer watcher.ConsumerGone()
tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
return tailfile.NewTailReader(ctx, r, lines)
@ -62,7 +67,7 @@ func TestTailFiles(t *testing.T) {
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, dec, tailReader, config)
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
}()
<-started
})
@ -72,7 +77,7 @@ func TestTailFiles(t *testing.T) {
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, dec, tailReader, config)
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
}()
<-started
@ -121,7 +126,7 @@ func TestFollowLogsConsumerGone(t *testing.T) {
followLogsDone := make(chan struct{})
var since, until time.Time
go func() {
followLogs(f, lw, make(chan interface{}), dec, since, until)
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
close(followLogsDone)
}()
@ -157,19 +162,20 @@ func (d *dummyWrapper) Decode() (*logger.Message, error) {
func TestFollowLogsProducerGone(t *testing.T) {
lw := logger.NewLogWatcher()
defer lw.ConsumerGone()
f, err := ioutil.TempFile("", t.Name())
assert.NilError(t, err)
defer os.Remove(f.Name())
var sent, received, closed int
var sent, received, closed int32
dec := &dummyWrapper{fn: func() error {
switch closed {
switch atomic.LoadInt32(&closed) {
case 0:
sent++
atomic.AddInt32(&sent, 1)
return nil
case 1:
closed++
atomic.AddInt32(&closed, 1)
t.Logf("logDecode() closed after sending %d messages\n", sent)
return io.EOF
default:
@ -181,7 +187,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
followLogsDone := make(chan struct{})
go func() {
followLogs(f, lw, make(chan interface{}), dec, since, until)
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
close(followLogsDone)
}()
@ -198,7 +204,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
}
// "stop" the "container"
closed = 1
atomic.StoreInt32(&closed, 1)
lw.ProducerGone()
// should receive all the messages sent
@ -209,7 +215,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
select {
case <-lw.Msg:
received++
if received == sent {
if received == atomic.LoadInt32(&sent) {
return
}
case err := <-lw.Err:
@ -223,7 +229,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
}
t.Logf("messages sent: %d, received: %d", sent, received)
t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
// followLogs() should be done by now
select {
@ -248,21 +254,30 @@ func TestCheckCapacityAndRotate(t *testing.T) {
assert.NilError(t, err)
l := &LogFile{
f: f,
capacity: 5,
maxFiles: 3,
compress: true,
notifyRotate: pubsub.NewPublisher(0, 1),
perms: 0600,
f: f,
capacity: 5,
maxFiles: 3,
compress: true,
notifyReaders: pubsub.NewPublisher(0, 1),
perms: 0600,
filesRefCounter: refCounter{counter: make(map[string]int)},
getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
return tailfile.NewTailReader(ctx, r, lines)
},
createDecoder: func(io.Reader) Decoder {
return dummyDecoder{}
},
marshal: func(msg *logger.Message) ([]byte, error) {
return msg.Line, nil
},
}
defer l.Close()
ls := dirStringer{dir}
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
_, err = os.Stat(f.Name() + ".1")
assert.Assert(t, os.IsNotExist(err), dirStringer{dir})
assert.Assert(t, os.IsNotExist(err), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
@ -271,11 +286,48 @@ func TestCheckCapacityAndRotate(t *testing.T) {
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
// Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
// down the line.
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
l.f.Close()
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
t.Run("closed log file", func(t *testing.T) {
// Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
// down the line.
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
l.f.Close()
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
assert.NilError(t, os.Remove(f.Name()+".2.gz"))
})
t.Run("with log reader", func(t *testing.T) {
// Make sure rotate works with an active reader
lw := logger.NewLogWatcher()
defer lw.ConsumerGone()
go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
// make sure the log reader is primed
waitForMsg(t, lw, 30*time.Second)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
})
}
func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
t.Helper()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-lw.Msg:
case <-lw.WatchProducerGone():
t.Fatal("log producer gone before log message arrived")
case err := <-lw.Err:
assert.NilError(t, err)
case <-timer.C:
t.Fatal("timeout waiting for log message")
}
}
type dirStringer struct {
@ -287,13 +339,18 @@ func (d dirStringer) String() string {
if err != nil {
return ""
}
var s strings.Builder
s.WriteString("\n")
buf := bytes.NewBuffer(nil)
tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
buf.WriteString("\n")
btw := bufio.NewWriter(tw)
for _, fi := range ls {
s.WriteString(fi.Name() + "\n")
btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
}
return s.String()
btw.Flush()
tw.Flush()
return buf.String()
}
func checkFileExists(name string) poll.Check {
@ -305,7 +362,7 @@ func checkFileExists(name string) poll.Check {
case os.IsNotExist(err):
return poll.Continue("waiting for %s to exist", name)
default:
t.Logf("%s", dirStringer{filepath.Dir(name)})
t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
return poll.Error(err)
}
}