From bb11365e96a4f25fe20606b30cbfb79998fadff3 Mon Sep 17 00:00:00 2001 From: Kazuyoshi Kato Date: Wed, 20 Jan 2021 13:18:41 -0800 Subject: [PATCH] Handle long log messages correctly on SizedLogger Loggers that implement BufSize() (e.g. awslogs) uses the method to tell Copier about the maximum log line length. However loggerWithCache and RingBuffer hide the method by wrapping loggers. As a result, Copier uses its default 16KB limit which breaks log lines > 16kB even the destinations can handle that. This change implements BufSize() on loggerWithCache and RingBuffer to make sure these logger wrappes don't hide the method on the underlying loggers. Fixes #41794. Signed-off-by: Kazuyoshi Kato --- daemon/logger/awslogs/cloudwatchlogs.go | 1 + daemon/logger/copier.go | 7 ++++- daemon/logger/copier_test.go | 26 ++++++++++++++++--- .../logger/loggerutils/cache/local_cache.go | 11 ++++++++ daemon/logger/ring.go | 11 ++++++++ 5 files changed, 52 insertions(+), 4 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index aafebe6274..c7642cece6 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -400,6 +400,7 @@ func (l *logStream) Name() string { return name } +// BufSize returns the maximum bytes CloudWatch can handle. func (l *logStream) BufSize() int { return maximumBytesPerEvent } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index e24272fa6d..e2ee36c098 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -54,7 +54,12 @@ func (c *Copier) copySrc(name string, src io.Reader) { bufSize := defaultBufSize if sizedLogger, ok := c.dst.(SizedLogger); ok { - bufSize = sizedLogger.BufSize() + size := sizedLogger.BufSize() + // Loggers that wrap another loggers would have BufSize(), but cannot return the size + // when the wrapped loggers doesn't have BufSize(). + if size > 0 { + bufSize = size + } } buf := make([]byte, bufSize) diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index 94077af669..db674b32a3 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -223,10 +223,28 @@ func TestCopierSlow(t *testing.T) { } func TestCopierWithSized(t *testing.T) { + t.Run("as is", func(t *testing.T) { + testCopierWithSized(t, func(l SizedLogger) SizedLogger { + return l + }) + }) + t.Run("With RingLogger", func(t *testing.T) { + testCopierWithSized(t, func(l SizedLogger) SizedLogger { + return newRingLogger(l, Info{}, defaultRingMaxSize) + }) + }) +} + +func testCopierWithSized(t *testing.T, loggerFactory func(SizedLogger) SizedLogger) { var jsonBuf bytes.Buffer expectedMsgs := 2 - sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} - logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs)) + sizedLogger := loggerFactory(&TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}) + + size := sizedLogger.BufSize() + if size < 0 { + size = 100 + } + logbuf := bytes.NewBufferString(strings.Repeat(".", size*expectedMsgs)) c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger) c.Run() @@ -234,6 +252,8 @@ func TestCopierWithSized(t *testing.T) { c.Wait() c.Close() + sizedLogger.Close() + recvdMsgs := 0 dec := json.NewDecoder(&jsonBuf) for { @@ -253,7 +273,7 @@ func TestCopierWithSized(t *testing.T) { recvdMsgs++ } if recvdMsgs != expectedMsgs { - t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs) + t.Fatalf("expected to receive %d messages, actually received %d %q", expectedMsgs, recvdMsgs, jsonBuf.String()) } } diff --git a/daemon/logger/loggerutils/cache/local_cache.go b/daemon/logger/loggerutils/cache/local_cache.go index 750dc498ab..1ca19eb5db 100644 --- a/daemon/logger/loggerutils/cache/local_cache.go +++ b/daemon/logger/loggerutils/cache/local_cache.go @@ -58,6 +58,17 @@ type loggerWithCache struct { cache logger.Logger } +var _ logger.SizedLogger = &loggerWithCache{} + +// BufSize returns the buffer size of the underlying logger. +// Returns -1 if the logger doesn't match SizedLogger interface. +func (l *loggerWithCache) BufSize() int { + if sl, ok := l.l.(logger.SizedLogger); ok { + return sl.BufSize() + } + return -1 +} + func (l *loggerWithCache) Log(msg *logger.Message) error { // copy the message as the original will be reset once the call to `Log` is complete dup := logger.NewMessage() diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go index c675c1e83c..b6432aed36 100644 --- a/daemon/logger/ring.go +++ b/daemon/logger/ring.go @@ -21,6 +21,8 @@ type RingLogger struct { closeFlag int32 } +var _ SizedLogger = &RingLogger{} + type ringWithReader struct { *RingLogger } @@ -57,6 +59,15 @@ func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger { return l } +// BufSize returns the buffer size of the underlying logger. +// Returns -1 if the logger doesn't match SizedLogger interface. +func (r *RingLogger) BufSize() int { + if sl, ok := r.l.(SizedLogger); ok { + return sl.BufSize() + } + return -1 +} + // Log queues messages into the ring buffer func (r *RingLogger) Log(msg *Message) error { if r.closed() {