mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
b102d4637c
This fixes the case where log rotation fails on Windows while there are clients reading container logs. Evicts readers if there is an error during rotation and try rotation again. This is needed for Windows with this scenario: 1. `docker logs -f` is called 2. Log rotation occurs (log.txt -> log.txt.1, truncate and re-open log.txt) 3. Log rotation occurs again (rm log.txt.1, log.txt -> log.txt.1) On step 3, before this change, the log rotation will fail with `Access is denied`. In this case, what we have is a reader holding a file handle to the primary log file. The log file is then rotated, but the reader still has a the handle open. `FILE_SHARE_DELETE` allows this to happen... but then we try to do it again for the next rotation and it blows up. So when it blows up we force all the readers to disconnect, close the log file, and try rotation again, which will succeed based on the added tests. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
369 lines
9.4 KiB
Go
369 lines
9.4 KiB
Go
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
"github.com/docker/docker/pkg/pubsub"
|
|
"github.com/docker/docker/pkg/tailfile"
|
|
"gotest.tools/v3/assert"
|
|
"gotest.tools/v3/poll"
|
|
)
|
|
|
|
type testDecoder struct {
|
|
rdr io.Reader
|
|
scanner *bufio.Scanner
|
|
}
|
|
|
|
func (d *testDecoder) Decode() (*logger.Message, error) {
|
|
if d.scanner == nil {
|
|
d.scanner = bufio.NewScanner(d.rdr)
|
|
}
|
|
if !d.scanner.Scan() {
|
|
return nil, d.scanner.Err()
|
|
}
|
|
// some comment
|
|
return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
|
|
}
|
|
|
|
func (d *testDecoder) Reset(rdr io.Reader) {
|
|
d.rdr = rdr
|
|
d.scanner = bufio.NewScanner(rdr)
|
|
}
|
|
|
|
func (d *testDecoder) Close() {
|
|
d.rdr = nil
|
|
d.scanner = nil
|
|
}
|
|
|
|
func TestTailFiles(t *testing.T) {
|
|
s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
|
|
s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
|
|
s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
|
|
|
|
files := []SizeReaderAt{s1, s2, s3}
|
|
watcher := logger.NewLogWatcher()
|
|
defer watcher.ConsumerGone()
|
|
|
|
tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
|
|
return tailfile.NewTailReader(ctx, r, lines)
|
|
}
|
|
dec := &testDecoder{}
|
|
|
|
for desc, config := range map[string]logger.ReadConfig{} {
|
|
t.Run(desc, func(t *testing.T) {
|
|
started := make(chan struct{})
|
|
go func() {
|
|
close(started)
|
|
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
|
|
}()
|
|
<-started
|
|
})
|
|
}
|
|
|
|
config := logger.ReadConfig{Tail: 2}
|
|
started := make(chan struct{})
|
|
go func() {
|
|
close(started)
|
|
tailFiles(files, watcher, dec, tailReader, config, make(chan interface{}))
|
|
}()
|
|
<-started
|
|
|
|
select {
|
|
case <-time.After(60 * time.Second):
|
|
t.Fatal("timeout waiting for tail line")
|
|
case err := <-watcher.Err:
|
|
assert.NilError(t, err)
|
|
case msg := <-watcher.Msg:
|
|
assert.Assert(t, msg != nil)
|
|
assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
|
|
}
|
|
|
|
select {
|
|
case <-time.After(60 * time.Second):
|
|
t.Fatal("timeout waiting for tail line")
|
|
case err := <-watcher.Err:
|
|
assert.NilError(t, err)
|
|
case msg := <-watcher.Msg:
|
|
assert.Assert(t, msg != nil)
|
|
assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
|
|
}
|
|
}
|
|
|
|
type dummyDecoder struct{}
|
|
|
|
func (dummyDecoder) Decode() (*logger.Message, error) {
|
|
return &logger.Message{}, nil
|
|
}
|
|
|
|
func (dummyDecoder) Close() {}
|
|
func (dummyDecoder) Reset(io.Reader) {}
|
|
|
|
func TestFollowLogsConsumerGone(t *testing.T) {
|
|
lw := logger.NewLogWatcher()
|
|
|
|
f, err := ioutil.TempFile("", t.Name())
|
|
assert.NilError(t, err)
|
|
defer func() {
|
|
f.Close()
|
|
os.Remove(f.Name())
|
|
}()
|
|
|
|
dec := dummyDecoder{}
|
|
|
|
followLogsDone := make(chan struct{})
|
|
var since, until time.Time
|
|
go func() {
|
|
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
|
|
close(followLogsDone)
|
|
}()
|
|
|
|
select {
|
|
case <-lw.Msg:
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
case <-followLogsDone:
|
|
t.Fatal("follow logs finished unexpectedly")
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("timeout waiting for log message")
|
|
}
|
|
|
|
lw.ConsumerGone()
|
|
select {
|
|
case <-followLogsDone:
|
|
case <-time.After(20 * time.Second):
|
|
t.Fatal("timeout waiting for followLogs() to finish")
|
|
}
|
|
}
|
|
|
|
type dummyWrapper struct {
|
|
dummyDecoder
|
|
fn func() error
|
|
}
|
|
|
|
func (d *dummyWrapper) Decode() (*logger.Message, error) {
|
|
if err := d.fn(); err != nil {
|
|
return nil, err
|
|
}
|
|
return d.dummyDecoder.Decode()
|
|
}
|
|
|
|
func TestFollowLogsProducerGone(t *testing.T) {
|
|
lw := logger.NewLogWatcher()
|
|
defer lw.ConsumerGone()
|
|
|
|
f, err := ioutil.TempFile("", t.Name())
|
|
assert.NilError(t, err)
|
|
defer os.Remove(f.Name())
|
|
|
|
var sent, received, closed int32
|
|
dec := &dummyWrapper{fn: func() error {
|
|
switch atomic.LoadInt32(&closed) {
|
|
case 0:
|
|
atomic.AddInt32(&sent, 1)
|
|
return nil
|
|
case 1:
|
|
atomic.AddInt32(&closed, 1)
|
|
t.Logf("logDecode() closed after sending %d messages\n", sent)
|
|
return io.EOF
|
|
default:
|
|
t.Fatal("logDecode() called after closing!")
|
|
return io.EOF
|
|
}
|
|
}}
|
|
var since, until time.Time
|
|
|
|
followLogsDone := make(chan struct{})
|
|
go func() {
|
|
followLogs(f, lw, make(chan interface{}), make(chan interface{}), dec, since, until)
|
|
close(followLogsDone)
|
|
}()
|
|
|
|
// read 1 message
|
|
select {
|
|
case <-lw.Msg:
|
|
received++
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
case <-followLogsDone:
|
|
t.Fatal("followLogs() finished unexpectedly")
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("timeout waiting for log message")
|
|
}
|
|
|
|
// "stop" the "container"
|
|
atomic.StoreInt32(&closed, 1)
|
|
lw.ProducerGone()
|
|
|
|
// should receive all the messages sent
|
|
readDone := make(chan struct{})
|
|
go func() {
|
|
defer close(readDone)
|
|
for {
|
|
select {
|
|
case <-lw.Msg:
|
|
received++
|
|
if received == atomic.LoadInt32(&sent) {
|
|
return
|
|
}
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
}
|
|
}
|
|
}()
|
|
select {
|
|
case <-readDone:
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
|
|
}
|
|
|
|
t.Logf("messages sent: %d, received: %d", atomic.LoadInt32(&sent), received)
|
|
|
|
// followLogs() should be done by now
|
|
select {
|
|
case <-followLogsDone:
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatal("timeout waiting for followLogs() to finish")
|
|
}
|
|
|
|
select {
|
|
case <-lw.WatchConsumerGone():
|
|
t.Fatal("consumer should not have exited")
|
|
default:
|
|
}
|
|
}
|
|
|
|
func TestCheckCapacityAndRotate(t *testing.T) {
|
|
dir, err := ioutil.TempDir("", t.Name())
|
|
assert.NilError(t, err)
|
|
defer os.RemoveAll(dir)
|
|
|
|
f, err := ioutil.TempFile(dir, "log")
|
|
assert.NilError(t, err)
|
|
|
|
l := &LogFile{
|
|
f: f,
|
|
capacity: 5,
|
|
maxFiles: 3,
|
|
compress: true,
|
|
notifyReaders: pubsub.NewPublisher(0, 1),
|
|
perms: 0600,
|
|
filesRefCounter: refCounter{counter: make(map[string]int)},
|
|
getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
|
|
return tailfile.NewTailReader(ctx, r, lines)
|
|
},
|
|
createDecoder: func(io.Reader) Decoder {
|
|
return dummyDecoder{}
|
|
},
|
|
marshal: func(msg *logger.Message) ([]byte, error) {
|
|
return msg.Line, nil
|
|
},
|
|
}
|
|
defer l.Close()
|
|
|
|
ls := dirStringer{dir}
|
|
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
|
_, err = os.Stat(f.Name() + ".1")
|
|
assert.Assert(t, os.IsNotExist(err), ls)
|
|
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
|
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
|
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
|
|
t.Run("closed log file", func(t *testing.T) {
|
|
// Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
|
|
// down the line.
|
|
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
|
|
l.f.Close()
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
|
|
assert.NilError(t, os.Remove(f.Name()+".2.gz"))
|
|
})
|
|
|
|
t.Run("with log reader", func(t *testing.T) {
|
|
// Make sure rotate works with an active reader
|
|
lw := logger.NewLogWatcher()
|
|
defer lw.ConsumerGone()
|
|
go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw)
|
|
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
|
|
// make sure the log reader is primed
|
|
waitForMsg(t, lw, 30*time.Second)
|
|
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
|
|
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
|
|
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
})
|
|
}
|
|
|
|
func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
|
|
t.Helper()
|
|
|
|
timer := time.NewTimer(timeout)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case <-lw.Msg:
|
|
case <-lw.WatchProducerGone():
|
|
t.Fatal("log producer gone before log message arrived")
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
case <-timer.C:
|
|
t.Fatal("timeout waiting for log message")
|
|
}
|
|
}
|
|
|
|
type dirStringer struct {
|
|
d string
|
|
}
|
|
|
|
func (d dirStringer) String() string {
|
|
ls, err := ioutil.ReadDir(d.d)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
buf := bytes.NewBuffer(nil)
|
|
tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
|
|
buf.WriteString("\n")
|
|
|
|
btw := bufio.NewWriter(tw)
|
|
|
|
for _, fi := range ls {
|
|
btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
|
|
}
|
|
btw.Flush()
|
|
tw.Flush()
|
|
return buf.String()
|
|
}
|
|
|
|
func checkFileExists(name string) poll.Check {
|
|
return func(t poll.LogT) poll.Result {
|
|
_, err := os.Stat(name)
|
|
switch {
|
|
case err == nil:
|
|
return poll.Success()
|
|
case os.IsNotExist(err):
|
|
return poll.Continue("waiting for %s to exist", name)
|
|
default:
|
|
t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
|
|
return poll.Error(err)
|
|
}
|
|
}
|
|
}
|