mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
d13e162a63
Loggers that implement BufSize() (e.g. awslogs) uses the method to
tell Copier about the maximum log line length. However loggerWithCache
and RingBuffer hide the method by wrapping loggers.
As a result, Copier uses its default 16KB limit which breaks log
lines > 16kB even the destinations can handle that.
This change implements BufSize() on loggerWithCache and RingBuffer to
make sure these logger wrappes don't hide the method on the underlying
loggers.
Fixes #41794.
Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
(cherry picked from commit bb11365e96
)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
503 lines
12 KiB
Go
503 lines
12 KiB
Go
package logger // import "github.com/docker/docker/daemon/logger"
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
type TestLoggerJSON struct {
|
|
*json.Encoder
|
|
mu sync.Mutex
|
|
delay time.Duration
|
|
}
|
|
|
|
func (l *TestLoggerJSON) Log(m *Message) error {
|
|
if l.delay > 0 {
|
|
time.Sleep(l.delay)
|
|
}
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
return l.Encode(m)
|
|
}
|
|
|
|
func (l *TestLoggerJSON) Close() error { return nil }
|
|
|
|
func (l *TestLoggerJSON) Name() string { return "json" }
|
|
|
|
type TestSizedLoggerJSON struct {
|
|
*json.Encoder
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (l *TestSizedLoggerJSON) Log(m *Message) error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
return l.Encode(m)
|
|
}
|
|
|
|
func (*TestSizedLoggerJSON) Close() error { return nil }
|
|
|
|
func (*TestSizedLoggerJSON) Name() string { return "sized-json" }
|
|
|
|
func (*TestSizedLoggerJSON) BufSize() int {
|
|
return 32 * 1024
|
|
}
|
|
|
|
func TestCopier(t *testing.T) {
|
|
stdoutLine := "Line that thinks that it is log line from docker stdout"
|
|
stderrLine := "Line that thinks that it is log line from docker stderr"
|
|
stdoutTrailingLine := "stdout trailing line"
|
|
stderrTrailingLine := "stderr trailing line"
|
|
|
|
var stdout bytes.Buffer
|
|
var stderr bytes.Buffer
|
|
for i := 0; i < 30; i++ {
|
|
if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, err := stderr.WriteString(stderrLine + "\n"); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Test remaining lines without line-endings
|
|
if _, err := stdout.WriteString(stdoutTrailingLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, err := stderr.WriteString(stderrTrailingLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var jsonBuf bytes.Buffer
|
|
|
|
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
|
|
|
|
c := NewCopier(
|
|
map[string]io.Reader{
|
|
"stdout": &stdout,
|
|
"stderr": &stderr,
|
|
},
|
|
jsonLog)
|
|
c.Run()
|
|
wait := make(chan struct{})
|
|
go func() {
|
|
c.Wait()
|
|
close(wait)
|
|
}()
|
|
select {
|
|
case <-time.After(1 * time.Second):
|
|
t.Fatal("Copier failed to do its work in 1 second")
|
|
case <-wait:
|
|
}
|
|
dec := json.NewDecoder(&jsonBuf)
|
|
for {
|
|
var msg Message
|
|
if err := dec.Decode(&msg); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
t.Fatal(err)
|
|
}
|
|
if msg.Source != "stdout" && msg.Source != "stderr" {
|
|
t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
|
|
}
|
|
if msg.Source == "stdout" {
|
|
if string(msg.Line) != stdoutLine && string(msg.Line) != stdoutTrailingLine {
|
|
t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stdoutLine, stdoutTrailingLine)
|
|
}
|
|
}
|
|
if msg.Source == "stderr" {
|
|
if string(msg.Line) != stderrLine && string(msg.Line) != stderrTrailingLine {
|
|
t.Fatalf("Wrong Line: %q, expected %q or %q", msg.Line, stderrLine, stderrTrailingLine)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestCopierLongLines tests long lines without line breaks
|
|
func TestCopierLongLines(t *testing.T) {
|
|
// Long lines (should be split at "defaultBufSize")
|
|
stdoutLongLine := strings.Repeat("a", defaultBufSize)
|
|
stderrLongLine := strings.Repeat("b", defaultBufSize)
|
|
stdoutTrailingLine := "stdout trailing line"
|
|
stderrTrailingLine := "stderr trailing line"
|
|
|
|
var stdout bytes.Buffer
|
|
var stderr bytes.Buffer
|
|
|
|
for i := 0; i < 3; i++ {
|
|
if _, err := stdout.WriteString(stdoutLongLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, err := stderr.WriteString(stderrLongLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
if _, err := stdout.WriteString(stdoutTrailingLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, err := stderr.WriteString(stderrTrailingLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var jsonBuf bytes.Buffer
|
|
|
|
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
|
|
|
|
c := NewCopier(
|
|
map[string]io.Reader{
|
|
"stdout": &stdout,
|
|
"stderr": &stderr,
|
|
},
|
|
jsonLog)
|
|
c.Run()
|
|
wait := make(chan struct{})
|
|
go func() {
|
|
c.Wait()
|
|
close(wait)
|
|
}()
|
|
select {
|
|
case <-time.After(1 * time.Second):
|
|
t.Fatal("Copier failed to do its work in 1 second")
|
|
case <-wait:
|
|
}
|
|
dec := json.NewDecoder(&jsonBuf)
|
|
for {
|
|
var msg Message
|
|
if err := dec.Decode(&msg); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
t.Fatal(err)
|
|
}
|
|
if msg.Source != "stdout" && msg.Source != "stderr" {
|
|
t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
|
|
}
|
|
if msg.Source == "stdout" {
|
|
if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine {
|
|
t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line)
|
|
}
|
|
}
|
|
if msg.Source == "stderr" {
|
|
if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine {
|
|
t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestCopierSlow(t *testing.T) {
|
|
stdoutLine := "Line that thinks that it is log line from docker stdout"
|
|
var stdout bytes.Buffer
|
|
for i := 0; i < 30; i++ {
|
|
if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
var jsonBuf bytes.Buffer
|
|
// encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
|
|
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}
|
|
|
|
c := NewCopier(map[string]io.Reader{"stdout": &stdout}, jsonLog)
|
|
c.Run()
|
|
wait := make(chan struct{})
|
|
go func() {
|
|
c.Wait()
|
|
close(wait)
|
|
}()
|
|
<-time.After(150 * time.Millisecond)
|
|
c.Close()
|
|
select {
|
|
case <-time.After(200 * time.Millisecond):
|
|
t.Fatal("failed to exit in time after the copier is closed")
|
|
case <-wait:
|
|
}
|
|
}
|
|
|
|
func TestCopierWithSized(t *testing.T) {
|
|
t.Run("as is", func(t *testing.T) {
|
|
testCopierWithSized(t, func(l SizedLogger) SizedLogger {
|
|
return l
|
|
})
|
|
})
|
|
t.Run("With RingLogger", func(t *testing.T) {
|
|
testCopierWithSized(t, func(l SizedLogger) SizedLogger {
|
|
return newRingLogger(l, Info{}, defaultRingMaxSize)
|
|
})
|
|
})
|
|
}
|
|
|
|
func testCopierWithSized(t *testing.T, loggerFactory func(SizedLogger) SizedLogger) {
|
|
var jsonBuf bytes.Buffer
|
|
expectedMsgs := 2
|
|
sizedLogger := loggerFactory(&TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)})
|
|
|
|
size := sizedLogger.BufSize()
|
|
if size < 0 {
|
|
size = 100
|
|
}
|
|
logbuf := bytes.NewBufferString(strings.Repeat(".", size*expectedMsgs))
|
|
c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger)
|
|
|
|
c.Run()
|
|
// Wait for Copier to finish writing to the buffered logger.
|
|
c.Wait()
|
|
c.Close()
|
|
|
|
sizedLogger.Close()
|
|
|
|
recvdMsgs := 0
|
|
dec := json.NewDecoder(&jsonBuf)
|
|
for {
|
|
var msg Message
|
|
if err := dec.Decode(&msg); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
t.Fatal(err)
|
|
}
|
|
if msg.Source != "stdout" {
|
|
t.Fatalf("Wrong Source: %q, should be %q", msg.Source, "stdout")
|
|
}
|
|
if len(msg.Line) != sizedLogger.BufSize() {
|
|
t.Fatalf("Line was not of expected max length %d, was %d", sizedLogger.BufSize(), len(msg.Line))
|
|
}
|
|
recvdMsgs++
|
|
}
|
|
if recvdMsgs != expectedMsgs {
|
|
t.Fatalf("expected to receive %d messages, actually received %d %q", expectedMsgs, recvdMsgs, jsonBuf.String())
|
|
}
|
|
}
|
|
|
|
func checkIdentical(t *testing.T, msg Message, expectedID string, expectedTS time.Time) {
|
|
if msg.PLogMetaData.ID != expectedID {
|
|
t.Fatalf("IDs are not he same across partials. Expected: %s Received: %s",
|
|
expectedID, msg.PLogMetaData.ID)
|
|
}
|
|
if msg.Timestamp != expectedTS {
|
|
t.Fatalf("Timestamps are not the same across partials. Expected: %v Received: %v",
|
|
expectedTS.Format(time.UnixDate), msg.Timestamp.Format(time.UnixDate))
|
|
}
|
|
}
|
|
|
|
// Have long lines and make sure that it comes out with PartialMetaData
|
|
func TestCopierWithPartial(t *testing.T) {
|
|
stdoutLongLine := strings.Repeat("a", defaultBufSize)
|
|
stderrLongLine := strings.Repeat("b", defaultBufSize)
|
|
stdoutTrailingLine := "stdout trailing line"
|
|
stderrTrailingLine := "stderr trailing line"
|
|
normalStr := "This is an impartial message :)"
|
|
|
|
var stdout bytes.Buffer
|
|
var stderr bytes.Buffer
|
|
var normalMsg bytes.Buffer
|
|
|
|
for i := 0; i < 3; i++ {
|
|
if _, err := stdout.WriteString(stdoutLongLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, err := stderr.WriteString(stderrLongLine); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
if _, err := stdout.WriteString(stdoutTrailingLine + "\n"); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, err := stderr.WriteString(stderrTrailingLine + "\n"); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, err := normalMsg.WriteString(normalStr + "\n"); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var jsonBuf bytes.Buffer
|
|
|
|
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
|
|
|
|
c := NewCopier(
|
|
map[string]io.Reader{
|
|
"stdout": &stdout,
|
|
"normal": &normalMsg,
|
|
"stderr": &stderr,
|
|
},
|
|
jsonLog)
|
|
c.Run()
|
|
wait := make(chan struct{})
|
|
go func() {
|
|
c.Wait()
|
|
close(wait)
|
|
}()
|
|
select {
|
|
case <-time.After(1 * time.Second):
|
|
t.Fatal("Copier failed to do its work in 1 second")
|
|
case <-wait:
|
|
}
|
|
|
|
dec := json.NewDecoder(&jsonBuf)
|
|
expectedMsgs := 9
|
|
recvMsgs := 0
|
|
var expectedPartID1, expectedPartID2 string
|
|
var expectedTS1, expectedTS2 time.Time
|
|
|
|
for {
|
|
var msg Message
|
|
|
|
if err := dec.Decode(&msg); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
t.Fatal(err)
|
|
}
|
|
if msg.Source != "stdout" && msg.Source != "stderr" && msg.Source != "normal" {
|
|
t.Fatalf("Wrong Source: %q, should be %q or %q or %q", msg.Source, "stdout", "stderr", "normal")
|
|
}
|
|
|
|
if msg.Source == "stdout" {
|
|
if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine {
|
|
t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line)
|
|
}
|
|
|
|
if msg.PLogMetaData.ID == "" {
|
|
t.Fatalf("Expected partial metadata. Got nothing")
|
|
}
|
|
|
|
if msg.PLogMetaData.Ordinal == 1 {
|
|
expectedPartID1 = msg.PLogMetaData.ID
|
|
expectedTS1 = msg.Timestamp
|
|
} else {
|
|
checkIdentical(t, msg, expectedPartID1, expectedTS1)
|
|
}
|
|
if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last {
|
|
t.Fatalf("Last is not set for last chunk")
|
|
}
|
|
}
|
|
|
|
if msg.Source == "stderr" {
|
|
if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine {
|
|
t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line)
|
|
}
|
|
|
|
if msg.PLogMetaData.ID == "" {
|
|
t.Fatalf("Expected partial metadata. Got nothing")
|
|
}
|
|
|
|
if msg.PLogMetaData.Ordinal == 1 {
|
|
expectedPartID2 = msg.PLogMetaData.ID
|
|
expectedTS2 = msg.Timestamp
|
|
} else {
|
|
checkIdentical(t, msg, expectedPartID2, expectedTS2)
|
|
}
|
|
if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last {
|
|
t.Fatalf("Last is not set for last chunk")
|
|
}
|
|
}
|
|
|
|
if msg.Source == "normal" && msg.PLogMetaData != nil {
|
|
t.Fatalf("Normal messages should not have PartialLogMetaData")
|
|
}
|
|
recvMsgs++
|
|
}
|
|
|
|
if expectedMsgs != recvMsgs {
|
|
t.Fatalf("Expected msgs: %d Recv msgs: %d", expectedMsgs, recvMsgs)
|
|
}
|
|
}
|
|
|
|
type BenchmarkLoggerDummy struct {
|
|
}
|
|
|
|
func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil }
|
|
|
|
func (l *BenchmarkLoggerDummy) Close() error { return nil }
|
|
|
|
func (l *BenchmarkLoggerDummy) Name() string { return "dummy" }
|
|
|
|
func BenchmarkCopier64(b *testing.B) {
|
|
benchmarkCopier(b, 1<<6)
|
|
}
|
|
func BenchmarkCopier128(b *testing.B) {
|
|
benchmarkCopier(b, 1<<7)
|
|
}
|
|
func BenchmarkCopier256(b *testing.B) {
|
|
benchmarkCopier(b, 1<<8)
|
|
}
|
|
func BenchmarkCopier512(b *testing.B) {
|
|
benchmarkCopier(b, 1<<9)
|
|
}
|
|
func BenchmarkCopier1K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<10)
|
|
}
|
|
func BenchmarkCopier2K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<11)
|
|
}
|
|
func BenchmarkCopier4K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<12)
|
|
}
|
|
func BenchmarkCopier8K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<13)
|
|
}
|
|
func BenchmarkCopier16K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<14)
|
|
}
|
|
func BenchmarkCopier32K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<15)
|
|
}
|
|
func BenchmarkCopier64K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<16)
|
|
}
|
|
func BenchmarkCopier128K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<17)
|
|
}
|
|
func BenchmarkCopier256K(b *testing.B) {
|
|
benchmarkCopier(b, 1<<18)
|
|
}
|
|
|
|
func piped(b *testing.B, iterations int, delay time.Duration, buf []byte) io.Reader {
|
|
r, w, err := os.Pipe()
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
return nil
|
|
}
|
|
go func() {
|
|
for i := 0; i < iterations; i++ {
|
|
time.Sleep(delay)
|
|
if n, err := w.Write(buf); err != nil || n != len(buf) {
|
|
if err != nil {
|
|
b.Error(err)
|
|
}
|
|
b.Error("short write")
|
|
}
|
|
}
|
|
w.Close()
|
|
}()
|
|
return r
|
|
}
|
|
|
|
func benchmarkCopier(b *testing.B, length int) {
|
|
b.StopTimer()
|
|
buf := []byte{'A'}
|
|
for len(buf) < length {
|
|
buf = append(buf, buf...)
|
|
}
|
|
buf = append(buf[:length-1], []byte{'\n'}...)
|
|
b.StartTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
c := NewCopier(
|
|
map[string]io.Reader{
|
|
"buffer": piped(b, 10, time.Nanosecond, buf),
|
|
},
|
|
&BenchmarkLoggerDummy{})
|
|
c.Run()
|
|
c.Wait()
|
|
c.Close()
|
|
}
|
|
}
|