mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
7493342926
Marshalling log messages by json-file and local drivers involved serializing the message into a shared buffer. This caused a regression resulting in log corruption with recent changes where Log may be called from multiple goroutines at the same time. Solution is to use a sync.Pool to manage the buffers used for the serialization. Also removed the MarshalFunc, which the driver had to expose to the LogFile so that it can marshal the message. This is now moved entirely to the driver. Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
233 lines
6.2 KiB
Go
233 lines
6.2 KiB
Go
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
"github.com/docker/docker/pkg/tailfile"
|
|
"gotest.tools/v3/assert"
|
|
"gotest.tools/v3/poll"
|
|
)
|
|
|
|
type testDecoder struct {
|
|
rdr io.Reader
|
|
scanner *bufio.Scanner
|
|
resetCount int
|
|
}
|
|
|
|
func (d *testDecoder) Decode() (*logger.Message, error) {
|
|
if d.scanner == nil {
|
|
d.scanner = bufio.NewScanner(d.rdr)
|
|
}
|
|
if !d.scanner.Scan() {
|
|
return nil, d.scanner.Err()
|
|
}
|
|
// some comment
|
|
return &logger.Message{Line: d.scanner.Bytes(), Timestamp: time.Now()}, nil
|
|
}
|
|
|
|
func (d *testDecoder) Reset(rdr io.Reader) {
|
|
d.rdr = rdr
|
|
d.scanner = bufio.NewScanner(rdr)
|
|
d.resetCount++
|
|
}
|
|
|
|
func (d *testDecoder) Close() {
|
|
d.rdr = nil
|
|
d.scanner = nil
|
|
}
|
|
|
|
func TestTailFiles(t *testing.T) {
|
|
s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
|
|
s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
|
|
s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
|
|
|
|
files := []SizeReaderAt{s1, s2, s3}
|
|
watcher := logger.NewLogWatcher()
|
|
defer watcher.ConsumerGone()
|
|
|
|
tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
|
|
return tailfile.NewTailReader(ctx, r, lines)
|
|
}
|
|
dec := &testDecoder{}
|
|
|
|
for desc, config := range map[string]logger.ReadConfig{} {
|
|
t.Run(desc, func(t *testing.T) {
|
|
started := make(chan struct{})
|
|
go func() {
|
|
close(started)
|
|
tailFiles(files, watcher, dec, tailReader, config)
|
|
}()
|
|
<-started
|
|
})
|
|
}
|
|
|
|
config := logger.ReadConfig{Tail: 2}
|
|
started := make(chan struct{})
|
|
go func() {
|
|
close(started)
|
|
tailFiles(files, watcher, dec, tailReader, config)
|
|
}()
|
|
<-started
|
|
|
|
select {
|
|
case <-time.After(60 * time.Second):
|
|
t.Fatal("timeout waiting for tail line")
|
|
case err := <-watcher.Err:
|
|
assert.NilError(t, err)
|
|
case msg := <-watcher.Msg:
|
|
assert.Assert(t, msg != nil)
|
|
assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
|
|
}
|
|
|
|
select {
|
|
case <-time.After(60 * time.Second):
|
|
t.Fatal("timeout waiting for tail line")
|
|
case err := <-watcher.Err:
|
|
assert.NilError(t, err)
|
|
case msg := <-watcher.Msg:
|
|
assert.Assert(t, msg != nil)
|
|
assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
|
|
}
|
|
}
|
|
|
|
type dummyDecoder struct{}
|
|
|
|
func (dummyDecoder) Decode() (*logger.Message, error) {
|
|
return &logger.Message{}, nil
|
|
}
|
|
|
|
func (dummyDecoder) Close() {}
|
|
func (dummyDecoder) Reset(io.Reader) {}
|
|
|
|
func TestCheckCapacityAndRotate(t *testing.T) {
|
|
dir := t.TempDir()
|
|
|
|
logPath := filepath.Join(dir, "log")
|
|
getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
|
|
return tailfile.NewTailReader(ctx, r, lines)
|
|
}
|
|
createDecoder := func(io.Reader) Decoder {
|
|
return dummyDecoder{}
|
|
}
|
|
l, err := NewLogFile(
|
|
logPath,
|
|
5, // capacity
|
|
3, // maxFiles
|
|
true, // compress
|
|
createDecoder,
|
|
0600, // perms
|
|
getTailReader,
|
|
)
|
|
assert.NilError(t, err)
|
|
defer l.Close()
|
|
|
|
ls := dirStringer{dir}
|
|
|
|
timestamp := time.Time{}
|
|
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
|
|
_, err = os.Stat(logPath + ".1")
|
|
assert.Assert(t, os.IsNotExist(err), ls)
|
|
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
|
|
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
|
|
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
|
|
t.Run("closed log file", func(t *testing.T) {
|
|
// Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
|
|
// down the line.
|
|
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
|
|
l.f.Close()
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
|
|
assert.NilError(t, os.Remove(logPath+".2.gz"))
|
|
})
|
|
|
|
t.Run("with log reader", func(t *testing.T) {
|
|
// Make sure rotate works with an active reader
|
|
lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000})
|
|
defer lw.ConsumerGone()
|
|
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls)
|
|
// make sure the log reader is primed
|
|
waitForMsg(t, lw, 30*time.Second)
|
|
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls)
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls)
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 3!")), ls)
|
|
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 4!")), ls)
|
|
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
|
|
})
|
|
}
|
|
|
|
func waitForMsg(t *testing.T, lw *logger.LogWatcher, timeout time.Duration) {
|
|
t.Helper()
|
|
|
|
timer := time.NewTimer(timeout)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case _, ok := <-lw.Msg:
|
|
assert.Assert(t, ok, "log producer gone before log message arrived")
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
case <-timer.C:
|
|
t.Fatal("timeout waiting for log message")
|
|
}
|
|
}
|
|
|
|
type dirStringer struct {
|
|
d string
|
|
}
|
|
|
|
func (d dirStringer) String() string {
|
|
ls, err := os.ReadDir(d.d)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
buf := bytes.NewBuffer(nil)
|
|
tw := tabwriter.NewWriter(buf, 1, 8, 1, '\t', 0)
|
|
buf.WriteString("\n")
|
|
|
|
btw := bufio.NewWriter(tw)
|
|
|
|
for _, entry := range ls {
|
|
fi, err := entry.Info()
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
|
|
btw.WriteString(fmt.Sprintf("%s\t%s\t%dB\t%s\n", fi.Name(), fi.Mode(), fi.Size(), fi.ModTime()))
|
|
}
|
|
btw.Flush()
|
|
tw.Flush()
|
|
return buf.String()
|
|
}
|
|
|
|
func checkFileExists(name string) poll.Check {
|
|
return func(t poll.LogT) poll.Result {
|
|
_, err := os.Stat(name)
|
|
switch {
|
|
case err == nil:
|
|
return poll.Success()
|
|
case os.IsNotExist(err):
|
|
return poll.Continue("waiting for %s to exist", name)
|
|
default:
|
|
t.Logf("waiting for %s: %v: %s", name, err, dirStringer{filepath.Dir(name)})
|
|
return poll.Error(err)
|
|
}
|
|
}
|
|
}
|