1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/logger/copier.go
Kazuyoshi Kato bb11365e96 Handle long log messages correctly on SizedLogger
Loggers that implement BufSize() (e.g. awslogs) uses the method to
tell Copier about the maximum log line length. However loggerWithCache
and RingBuffer hide the method by wrapping loggers.

As a result, Copier uses its default 16KB limit which breaks log
lines > 16kB even the destinations can handle that.

This change implements BufSize() on loggerWithCache and RingBuffer to
make sure these logger wrappes don't hide the method on the underlying
loggers.

Fixes #41794.

Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
2021-01-20 16:44:06 -08:00

191 lines
4.7 KiB
Go

package logger // import "github.com/docker/docker/daemon/logger"
import (
"bytes"
"io"
"sync"
"time"
types "github.com/docker/docker/api/types/backend"
"github.com/docker/docker/pkg/stringid"
"github.com/sirupsen/logrus"
)
const (
// readSize is the maximum bytes read during a single read
// operation.
readSize = 2 * 1024
// defaultBufSize provides a reasonable default for loggers that do
// not have an external limit to impose on log line size.
defaultBufSize = 16 * 1024
)
// Copier can copy logs from specified sources to Logger and attach Timestamp.
// Writes are concurrent, so you need implement some sync in your logger.
type Copier struct {
// srcs is map of name -> reader pairs, for example "stdout", "stderr"
srcs map[string]io.Reader
dst Logger
copyJobs sync.WaitGroup
closeOnce sync.Once
closed chan struct{}
}
// NewCopier creates a new Copier
func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
return &Copier{
srcs: srcs,
dst: dst,
closed: make(chan struct{}),
}
}
// Run starts logs copying
func (c *Copier) Run() {
for src, w := range c.srcs {
c.copyJobs.Add(1)
go c.copySrc(src, w)
}
}
func (c *Copier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
bufSize := defaultBufSize
if sizedLogger, ok := c.dst.(SizedLogger); ok {
size := sizedLogger.BufSize()
// Loggers that wrap another loggers would have BufSize(), but cannot return the size
// when the wrapped loggers doesn't have BufSize().
if size > 0 {
bufSize = size
}
}
buf := make([]byte, bufSize)
n := 0
eof := false
var partialid string
var partialTS time.Time
var ordinal int
firstPartial := true
hasMorePartial := false
for {
select {
case <-c.closed:
return
default:
// Work out how much more data we are okay with reading this time.
upto := n + readSize
if upto > cap(buf) {
upto = cap(buf)
}
// Try to read that data.
if upto > n {
read, err := src.Read(buf[n:upto])
if err != nil {
if err != io.EOF {
logReadsFailedCount.Inc(1)
logrus.Errorf("Error scanning log stream: %s", err)
return
}
eof = true
}
n += read
}
// If we have no data to log, and there's no more coming, we're done.
if n == 0 && eof {
return
}
// Break up the data that we've buffered up into lines, and log each in turn.
p := 0
for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
select {
case <-c.closed:
return
default:
msg := NewMessage()
msg.Source = name
msg.Line = append(msg.Line, buf[p:p+q]...)
if hasMorePartial {
msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}
// reset
partialid = ""
ordinal = 0
firstPartial = true
hasMorePartial = false
}
if msg.PLogMetaData == nil {
msg.Timestamp = time.Now().UTC()
} else {
msg.Timestamp = partialTS
}
if logErr := c.dst.Log(msg); logErr != nil {
logWritesFailedCount.Inc(1)
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
}
p += q + 1
}
// If there's no more coming, or the buffer is full but
// has no newlines, log whatever we haven't logged yet,
// noting that it's a partial log line.
if eof || (p == 0 && n == len(buf)) {
if p < n {
msg := NewMessage()
msg.Source = name
msg.Line = append(msg.Line, buf[p:n]...)
// Generate unique partialID for first partial. Use it across partials.
// Record timestamp for first partial. Use it across partials.
// Initialize Ordinal for first partial. Increment it across partials.
if firstPartial {
msg.Timestamp = time.Now().UTC()
partialTS = msg.Timestamp
partialid = stringid.GenerateRandomID()
ordinal = 1
firstPartial = false
totalPartialLogs.Inc(1)
} else {
msg.Timestamp = partialTS
}
msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
ordinal++
hasMorePartial = true
if logErr := c.dst.Log(msg); logErr != nil {
logWritesFailedCount.Inc(1)
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
p = 0
n = 0
}
if eof {
return
}
}
// Move any unlogged data to the front of the buffer in preparation for another read.
if p > 0 {
copy(buf[0:], buf[p:n])
n -= p
}
}
}
}
// Wait waits until all copying is done
func (c *Copier) Wait() {
c.copyJobs.Wait()
}
// Close closes the copier
func (c *Copier) Close() {
c.closeOnce.Do(func() {
close(c.closed)
})
}