mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Fix windows log file rotation with readers
This fixes the case where log rotation fails on Windows while there are clients reading container logs. Evicts readers if there is an error during rotation and try rotation again. This is needed for Windows with this scenario: 1. `docker logs -f` is called 2. Log rotation occurs (log.txt -> log.txt.1, truncate and re-open log.txt) 3. Log rotation occurs again (rm log.txt.1, log.txt -> log.txt.1) On step 3, before this change, the log rotation will fail with `Access is denied`. In this case, what we have is a reader holding a file handle to the primary log file. The log file is then rotated, but the reader still has a the handle open. `FILE_SHARE_DELETE` allows this to happen... but then we try to do it again for the next rotation and it blows up. So when it blows up we force all the readers to disconnect, close the log file, and try rotation again, which will succeed based on the added tests. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
parent
bcc993b494
commit
b102d4637c
3 changed files with 168 additions and 47 deletions
|
@ -4,8 +4,6 @@ import (
|
|||
"os"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func open(name string) (*os.File, error) {
|
||||
|
@ -18,7 +16,7 @@ func openFile(name string, flag int, perm os.FileMode) (*os.File, error) {
|
|||
}
|
||||
h, err := syscallOpen(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error opening file")
|
||||
return nil, &os.PathError{Op: "open", Path: name, Err: err}
|
||||
}
|
||||
return os.NewFile(uintptr(h), name), nil
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ type LogFile struct {
|
|||
compress bool // whether old versions of log files are compressed
|
||||
lastTimestamp time.Time // timestamp of the last log
|
||||
filesRefCounter refCounter // keep reference-counted of decompressed files
|
||||
notifyRotate *pubsub.Publisher
|
||||
notifyReaders *pubsub.Publisher
|
||||
marshal logger.MarshalFunc
|
||||
createDecoder MakeDecoderFn
|
||||
getTailReader GetTailReaderFunc
|
||||
|
@ -141,7 +141,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
|
|||
maxFiles: maxFiles,
|
||||
compress: compress,
|
||||
filesRefCounter: refCounter{counter: make(map[string]int)},
|
||||
notifyRotate: pubsub.NewPublisher(0, 1),
|
||||
notifyReaders: pubsub.NewPublisher(0, 1),
|
||||
marshal: marshaller,
|
||||
createDecoder: decodeFunc,
|
||||
perms: perms,
|
||||
|
@ -167,7 +167,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
|
|||
|
||||
if err := w.checkCapacityAndRotate(); err != nil {
|
||||
w.mu.Unlock()
|
||||
return err
|
||||
return errors.Wrap(err, "error rotating log file")
|
||||
}
|
||||
|
||||
n, err := w.f.Write(b)
|
||||
|
@ -175,22 +175,25 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
|
|||
w.currentSize += int64(n)
|
||||
w.lastTimestamp = msg.Timestamp
|
||||
}
|
||||
|
||||
w.mu.Unlock()
|
||||
return err
|
||||
return errors.Wrap(err, "error writing log entry")
|
||||
}
|
||||
|
||||
func (w *LogFile) checkCapacityAndRotate() (retErr error) {
|
||||
if w.capacity == -1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if w.currentSize < w.capacity {
|
||||
return nil
|
||||
}
|
||||
|
||||
w.rotateMu.Lock()
|
||||
noCompress := w.maxFiles <= 1 || !w.compress
|
||||
defer func() {
|
||||
if retErr != nil || w.maxFiles <= 1 || !w.compress {
|
||||
// If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function.
|
||||
// Otherwise the lock will be released in the goroutine that handles compression.
|
||||
if retErr != nil || noCompress {
|
||||
w.rotateMu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
@ -204,17 +207,33 @@ func (w *LogFile) checkCapacityAndRotate() (retErr error) {
|
|||
}
|
||||
|
||||
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
|
||||
return err
|
||||
logrus.WithError(err).Warn("Error rotating log file, log data may have been lost")
|
||||
} else {
|
||||
var renameErr error
|
||||
for i := 0; i < 10; i++ {
|
||||
if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) {
|
||||
logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying")
|
||||
w.notifyReaders.Publish(renameErr)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if renameErr != nil {
|
||||
logrus.WithError(renameErr).Error("Error renaming current log file")
|
||||
}
|
||||
}
|
||||
|
||||
file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.f = file
|
||||
w.currentSize = 0
|
||||
w.notifyRotate.Publish(struct{}{})
|
||||
|
||||
if w.maxFiles <= 1 || !w.compress {
|
||||
w.notifyReaders.Publish(struct{}{})
|
||||
|
||||
if noCompress {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -249,15 +268,12 @@ func rotate(name string, maxFiles int, compress bool) error {
|
|||
for i := maxFiles - 1; i > 1; i-- {
|
||||
toPath := name + "." + strconv.Itoa(i) + extension
|
||||
fromPath := name + "." + strconv.Itoa(i-1) + extension
|
||||
logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file")
|
||||
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -272,9 +288,11 @@ func compressFile(fileName string, lastTimestamp time.Time) (retErr error) {
|
|||
}
|
||||
defer func() {
|
||||
file.Close()
|
||||
err := os.Remove(fileName)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
retErr = errors.Wrap(err, "failed to remove source log file")
|
||||
if retErr == nil {
|
||||
err := os.Remove(fileName)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
retErr = errors.Wrap(err, "failed to remove source log file")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -354,6 +372,12 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
|||
return
|
||||
}
|
||||
|
||||
notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool {
|
||||
_, ok := i.(error)
|
||||
return ok
|
||||
}, 1)
|
||||
defer w.notifyReaders.Evict(notifyEvict)
|
||||
|
||||
if config.Tail != 0 {
|
||||
// TODO(@cpuguy83): Instead of opening every file, only get the files which
|
||||
// are needed to tail.
|
||||
|
@ -392,9 +416,11 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
|||
readers = append(readers, currentChunk)
|
||||
}
|
||||
|
||||
tailFiles(readers, watcher, dec, w.getTailReader, config)
|
||||
ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict)
|
||||
closeFiles()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
w.mu.RLock()
|
||||
}
|
||||
|
||||
|
@ -404,9 +430,13 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
|||
}
|
||||
w.mu.RUnlock()
|
||||
|
||||
notifyRotate := w.notifyRotate.Subscribe()
|
||||
defer w.notifyRotate.Evict(notifyRotate)
|
||||
followLogs(currentFile, watcher, notifyRotate, dec, config.Since, config.Until)
|
||||
notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool {
|
||||
_, ok := i.(struct{})
|
||||
return ok
|
||||
})
|
||||
defer w.notifyReaders.Evict(notifyRotate)
|
||||
|
||||
followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until)
|
||||
}
|
||||
|
||||
func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
|
||||
|
@ -512,16 +542,25 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
|||
return io.NewSectionReader(f, 0, size), nil
|
||||
}
|
||||
|
||||
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
|
||||
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) {
|
||||
nLines := config.Tail
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cont = true
|
||||
// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
|
||||
go func() {
|
||||
select {
|
||||
case err := <-notifyEvict:
|
||||
if err != nil {
|
||||
watcher.Err <- err.(error)
|
||||
cont = false
|
||||
cancel()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
case <-watcher.WatchConsumerGone():
|
||||
cont = false
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
@ -569,7 +608,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, ge
|
|||
}
|
||||
}
|
||||
|
||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, dec Decoder, since, until time.Time) {
|
||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) {
|
||||
dec.Reset(f)
|
||||
|
||||
name := f.Name()
|
||||
|
@ -580,6 +619,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
|||
}
|
||||
defer func() {
|
||||
f.Close()
|
||||
dec.Close()
|
||||
fileWatcher.Close()
|
||||
}()
|
||||
|
||||
|
@ -607,8 +647,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
|||
|
||||
errRetry := errors.New("retry")
|
||||
errDone := errors.New("done")
|
||||
|
||||
handleMustClose := func(evictErr error) {
|
||||
f.Close()
|
||||
dec.Close()
|
||||
logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors")
|
||||
logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.")
|
||||
}
|
||||
|
||||
waitRead := func() error {
|
||||
select {
|
||||
case e := <-notifyEvict:
|
||||
if e != nil {
|
||||
err := e.(error)
|
||||
handleMustClose(err)
|
||||
}
|
||||
return errDone
|
||||
case e := <-fileWatcher.Events():
|
||||
switch e.Op {
|
||||
case fsnotify.Write:
|
||||
|
@ -682,6 +736,14 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
|||
|
||||
// main loop
|
||||
for {
|
||||
select {
|
||||
case err := <-notifyEvict:
|
||||
if err != nil {
|
||||
handleMustClose(err.(error))
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
msg, err := dec.Decode()
|
||||
if err != nil {
|
||||
if err := handleDecodeErr(err); err != nil {
|
||||
|
@ -705,6 +767,13 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
|||
}
|
||||
// send the message, unless the consumer is gone
|
||||
select {
|
||||
case e := <-notifyEvict:
|
||||
if e != nil {
|
||||
err := e.(error)
|
||||
logrus.WithError(err).Debug("Reader evicted while sending log message")
|
||||
logWatcher.Err <- err
|
||||
}
|
||||
return
|
||||
case logWatcher.Msg <- msg:
|
||||
case <-logWatcher.WatchConsumerGone():
|
||||
return
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package loggerutils
|
||||
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -10,6 +12,7 @@ import (
|
|||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
|
@ -64,7 +67,7 @@ func TestTailFiles(t *testing.T) {
|
|||
started := make(chan struct{})
|
||||
go func() {
|
||||
close(started)
|
||||
tailFiles(files, watcher, dec, tailReader, config)
|
||||
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
|
||||
}()
|
||||
<-started
|
||||
})
|
||||
|
@ -74,7 +77,7 @@ func TestTailFiles(t *testing.T) {
|
|||
started := make(chan struct{})
|
||||
go func() {
|
||||
close(started)
|
||||
tailFiles(files, watcher, dec, tailReader, config)
|
||||
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
|
||||
}()
|
||||
<-started
|
||||
|
||||
|
@ -123,7 +126,7 @@ func TestFollowLogsConsumerGone(t *testing.T) {
|
|||
followLogsDone := make(chan struct{})
|
||||
var since, until time.Time
|
||||
go func() {
|
||||
followLogs(f, lw, make(chan interface{}), dec, since, until)
|
||||
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
|
||||
close(followLogsDone)
|
||||
}()
|
||||
|
||||
|
@ -184,7 +187,7 @@ func TestFollowLogsProducerGone(t *testing.T) {
|
|||
|
||||
followLogsDone := make(chan struct{})
|
||||
go func() {
|
||||
followLogs(f, lw, make(chan interface{}), dec, since, until)
|
||||
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
|
||||
close(followLogsDone)
|
||||
}()
|
||||
|
||||
|
@ -251,21 +254,30 @@ func TestCheckCapacityAndRotate(t *testing.T) {
|
|||
assert.NilError(t, err)
|
||||
|
||||
l := &LogFile{
|
||||
f: f,
|
||||
capacity: 5,
|
||||
maxFiles: 3,
|
||||
compress: true,
|
||||
notifyRotate: pubsub.NewPublisher(0, 1),
|
||||
perms: 0600,
|
||||
f: f,
|
||||
capacity: 5,
|
||||
maxFiles: 3,
|
||||
compress: true,
|
||||
notifyReaders: pubsub.NewPublisher(0, 1),
|
||||
perms: 0600,
|
||||
filesRefCounter: refCounter{counter: make(map[string]int)},
|
||||
getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
|
||||
return tailfile.NewTailReader(ctx, r, lines)
|
||||
},
|
||||
createDecoder: func(io.Reader) Decoder {
|
||||
return dummyDecoder{}
|
||||
},
|
||||
marshal: func(msg *logger.Message) ([]byte, error) {
|
||||
return msg.Line, nil
|
||||
},
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
ls := dirStringer{dir}
|
||||
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
||||
_, err = os.Stat(f.Name() + ".1")
|
||||
assert.Assert(t, os.IsNotExist(err), dirStringer{dir})
|
||||
assert.Assert(t, os.IsNotExist(err), ls)
|
||||
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
||||
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
||||
|
@ -274,11 +286,48 @@ func TestCheckCapacityAndRotate(t *testing.T) {
|
|||
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
||||
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
||||
|
||||
// Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
|
||||
// down the line.
|
||||
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
|
||||
l.f.Close()
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
||||
t.Run("closed log file", func(t *testing.T) {
|
||||
// Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
|
||||
// down the line.
|
||||
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
|
||||
l.f.Close()
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
||||
assert.NilError(t, os.Remove(f.Name()+".2.gz"))
|
||||
})
|
||||
|
||||
t.Run("with log reader", func(t *testing.T) {
|
||||
// Make sure rotate works with an active reader
|
||||
lw := logger.NewLogWatcher()
|
||||
defer lw.ConsumerGone()
|
||||
go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw)
|
||||
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
|
||||
// make sure the log reader is primed
|
||||
waitForMsg(t, lw, 30*time.Second)
|
||||
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
|
||||
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
|
||||
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
||||
})
|
||||
}
|
||||
|
||||
func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
|
||||
t.Helper()
|
||||
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-lw.Msg:
|
||||
case <-lw.WatchProducerGone():
|
||||
t.Fatal("log producer gone before log message arrived")
|
||||
case err := <-lw.Err:
|
||||
assert.NilError(t, err)
|
||||
case <-timer.C:
|
||||
t.Fatal("timeout waiting for log message")
|
||||
}
|
||||
}
|
||||
|
||||
type dirStringer struct {
|
||||
|
@ -290,13 +339,18 @@ func (d dirStringer) String() string {
|
|||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
var s strings.Builder
|
||||
s.WriteString("\n")
|
||||
buf := bytes.NewBuffer(nil)
|
||||
tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
|
||||
buf.WriteString("\n")
|
||||
|
||||
btw := bufio.NewWriter(tw)
|
||||
|
||||
for _, fi := range ls {
|
||||
s.WriteString(fi.Name() + "\n")
|
||||
btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
|
||||
}
|
||||
return s.String()
|
||||
btw.Flush()
|
||||
tw.Flush()
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func checkFileExists(name string) poll.Check {
|
||||
|
|
Loading…
Add table
Reference in a new issue