diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index aafebe6274..c7642cece6 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -400,6 +400,7 @@ func (l *logStream) Name() string { return name } +// BufSize returns the maximum bytes CloudWatch can handle. func (l *logStream) BufSize() int { return maximumBytesPerEvent } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index e24272fa6d..e2ee36c098 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -54,7 +54,12 @@ func (c *Copier) copySrc(name string, src io.Reader) { bufSize := defaultBufSize if sizedLogger, ok := c.dst.(SizedLogger); ok { - bufSize = sizedLogger.BufSize() + size := sizedLogger.BufSize() + // Loggers that wrap another loggers would have BufSize(), but cannot return the size + // when the wrapped loggers doesn't have BufSize(). + if size > 0 { + bufSize = size + } } buf := make([]byte, bufSize) diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index 94077af669..db674b32a3 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -223,10 +223,28 @@ func TestCopierSlow(t *testing.T) { } func TestCopierWithSized(t *testing.T) { + t.Run("as is", func(t *testing.T) { + testCopierWithSized(t, func(l SizedLogger) SizedLogger { + return l + }) + }) + t.Run("With RingLogger", func(t *testing.T) { + testCopierWithSized(t, func(l SizedLogger) SizedLogger { + return newRingLogger(l, Info{}, defaultRingMaxSize) + }) + }) +} + +func testCopierWithSized(t *testing.T, loggerFactory func(SizedLogger) SizedLogger) { var jsonBuf bytes.Buffer expectedMsgs := 2 - sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} - logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs)) + sizedLogger := loggerFactory(&TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}) + + size := sizedLogger.BufSize() + if size < 0 { + size = 100 + } + logbuf := bytes.NewBufferString(strings.Repeat(".", size*expectedMsgs)) c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger) c.Run() @@ -234,6 +252,8 @@ func TestCopierWithSized(t *testing.T) { c.Wait() c.Close() + sizedLogger.Close() + recvdMsgs := 0 dec := json.NewDecoder(&jsonBuf) for { @@ -253,7 +273,7 @@ func TestCopierWithSized(t *testing.T) { recvdMsgs++ } if recvdMsgs != expectedMsgs { - t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs) + t.Fatalf("expected to receive %d messages, actually received %d %q", expectedMsgs, recvdMsgs, jsonBuf.String()) } } diff --git a/daemon/logger/loggerutils/cache/local_cache.go b/daemon/logger/loggerutils/cache/local_cache.go index 750dc498ab..1ca19eb5db 100644 --- a/daemon/logger/loggerutils/cache/local_cache.go +++ b/daemon/logger/loggerutils/cache/local_cache.go @@ -58,6 +58,17 @@ type loggerWithCache struct { cache logger.Logger } +var _ logger.SizedLogger = &loggerWithCache{} + +// BufSize returns the buffer size of the underlying logger. +// Returns -1 if the logger doesn't match SizedLogger interface. +func (l *loggerWithCache) BufSize() int { + if sl, ok := l.l.(logger.SizedLogger); ok { + return sl.BufSize() + } + return -1 +} + func (l *loggerWithCache) Log(msg *logger.Message) error { // copy the message as the original will be reset once the call to `Log` is complete dup := logger.NewMessage() diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go index c675c1e83c..b6432aed36 100644 --- a/daemon/logger/ring.go +++ b/daemon/logger/ring.go @@ -21,6 +21,8 @@ type RingLogger struct { closeFlag int32 } +var _ SizedLogger = &RingLogger{} + type ringWithReader struct { *RingLogger } @@ -57,6 +59,15 @@ func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger { return l } +// BufSize returns the buffer size of the underlying logger. +// Returns -1 if the logger doesn't match SizedLogger interface. +func (r *RingLogger) BufSize() int { + if sl, ok := r.l.(SizedLogger); ok { + return sl.BufSize() + } + return -1 +} + // Log queues messages into the ring buffer func (r *RingLogger) Log(msg *Message) error { if r.closed() {