1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

daemon/logger: Share buffers by sync.Pool

Marshalling log messages by json-file and local drivers involved
serializing the message into a shared buffer. This caused a regression
resulting in log corruption with recent changes where Log may be called
from multiple goroutines at the same time.

Solution is to use a sync.Pool to manage the buffers used for the
serialization. Also removed the MarshalFunc, which the driver had to
expose to the LogFile so that it can marshal the message. This is now
moved entirely to the driver.

Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
This commit is contained in:
Paweł Gronowski 2022-05-27 15:07:07 +02:00
parent 69adaa894d
commit 7493342926
6 changed files with 88 additions and 85 deletions

View file

@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
@ -21,8 +22,10 @@ const Name = "json-file"
// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
writer *loggerutils.LogFile
tag string // tag values requested by the user to log
writer *loggerutils.LogFile
tag string // tag values requested by the user to log
extra json.RawMessage
buffersPool sync.Pool
}
func init() {
@ -86,7 +89,7 @@ func New(info logger.Info) (logger.Logger, error) {
attrs["tag"] = tag
}
var extra []byte
var extra json.RawMessage
if len(attrs) > 0 {
var err error
extra, err = json.Marshal(attrs)
@ -95,30 +98,40 @@ func New(info logger.Info) (logger.Logger, error) {
}
}
buf := bytes.NewBuffer(nil)
marshalFunc := func(msg *logger.Message) ([]byte, error) {
if err := marshalMessage(msg, extra, buf); err != nil {
return nil, err
}
b := buf.Bytes()
buf.Reset()
return b, nil
}
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640, getTailReader)
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, decodeFunc, 0640, getTailReader)
if err != nil {
return nil, err
}
return &JSONFileLogger{
writer: writer,
tag: tag,
writer: writer,
tag: tag,
extra: extra,
buffersPool: makePool(),
}, nil
}
func makePool() sync.Pool {
// Every buffer will have to store the same constant json structure and the message
// len(`{"log":"","stream:"stdout","time":"2000-01-01T00:00:00.000000000Z"}\n`) = 68
// So let's start with a buffer bigger than this
const initialBufSize = 128
return sync.Pool{New: func() interface{} { return bytes.NewBuffer(make([]byte, 0, initialBufSize)) }}
}
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
func (l *JSONFileLogger) Log(msg *logger.Message) error {
return l.writer.WriteLogEntry(msg)
defer logger.PutMessage(msg)
buf := l.buffersPool.Get().(*bytes.Buffer)
buf.Reset()
defer l.buffersPool.Put(buf)
if err := marshalMessage(msg, l.extra, buf); err != nil {
return err
}
return l.writer.WriteLogEntry(msg.Timestamp, buf.Bytes())
}
func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {

View file

@ -4,6 +4,7 @@ import (
"encoding/binary"
"io"
"strconv"
"sync"
"time"
"github.com/docker/docker/api/types/backend"
@ -55,7 +56,8 @@ func init() {
}
type driver struct {
logfile *loggerutils.LogFile
logfile *loggerutils.LogFile
buffersPool sync.Pool
}
// New creates a new local logger
@ -92,42 +94,34 @@ func New(info logger.Info) (logger.Logger, error) {
return newDriver(info.LogPath, cfg)
}
func makeMarshaller() func(m *logger.Message) ([]byte, error) {
buf := make([]byte, initialBufSize)
func marshal(m *logger.Message, buffer *[]byte) error {
proto := logdriver.LogEntry{}
md := logdriver.PartialLogEntryMetadata{}
// allocate the partial log entry separately, which allows for easier re-use
proto := &logdriver.LogEntry{}
md := &logdriver.PartialLogEntryMetadata{}
resetProto(&proto)
return func(m *logger.Message) ([]byte, error) {
resetProto(proto)
messageToProto(m, &proto, &md)
protoSize := proto.Size()
writeLen := protoSize + (2 * encodeBinaryLen) // + len(messageDelimiter)
messageToProto(m, proto, md)
protoSize := proto.Size()
writeLen := protoSize + (2 * encodeBinaryLen) // + len(messageDelimiter)
if writeLen > len(buf) {
buf = make([]byte, writeLen)
} else {
// shrink the buffer back down
if writeLen <= initialBufSize {
buf = buf[:initialBufSize]
} else {
buf = buf[:writeLen]
}
}
binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize))
n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen])
if err != nil {
return nil, errors.Wrap(err, "error marshaling log entry")
}
if n+(encodeBinaryLen*2) != writeLen {
return nil, io.ErrShortWrite
}
binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize))
return buf[:writeLen], nil
buf := *buffer
if writeLen > cap(buf) {
buf = make([]byte, writeLen)
} else {
buf = buf[:writeLen]
}
*buffer = buf
binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize))
n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen])
if err != nil {
return errors.Wrap(err, "error marshaling log entry")
}
if n+(encodeBinaryLen*2) != writeLen {
return io.ErrShortWrite
}
binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize))
return nil
}
func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
@ -135,12 +129,16 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
return nil, errdefs.InvalidParameter(err)
}
lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, makeMarshaller(), decodeFunc, 0640, getTailReader)
lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, decodeFunc, 0640, getTailReader)
if err != nil {
return nil, err
}
return &driver{
logfile: lf,
buffersPool: sync.Pool{New: func() interface{} {
b := make([]byte, initialBufSize)
return &b
}},
}, nil
}
@ -149,7 +147,15 @@ func (d *driver) Name() string {
}
func (d *driver) Log(msg *logger.Message) error {
return d.logfile.WriteLogEntry(msg)
defer logger.PutMessage(msg)
buf := d.buffersPool.Get().(*[]byte)
defer d.buffersPool.Put(buf)
err := marshal(msg, buf)
if err != nil {
return errors.Wrap(err, "error marshalling logger.Message")
}
return d.logfile.WriteLogEntry(msg.Timestamp, *buf)
}
func (d *driver) Close() error {

View file

@ -11,9 +11,9 @@ import (
)
func TestDecode(t *testing.T) {
marshal := makeMarshaller()
buf := make([]byte, 0)
buf, err := marshal(&logger.Message{Line: []byte("hello")})
err := marshal(&logger.Message{Line: []byte("hello")}, &buf)
assert.NilError(t, err)
for i := 0; i < len(buf); i++ {

View file

@ -131,6 +131,3 @@ type Capability struct {
// Determines if a log driver can read back logs
ReadLogs bool
}
// MarshalFunc is a func that marshals a message into an arbitrary format
type MarshalFunc func(*Message) ([]byte, error)

View file

@ -44,7 +44,6 @@ type LogFile struct {
// Log file codec
marshal logger.MarshalFunc
createDecoder MakeDecoderFn
getTailReader GetTailReaderFunc
@ -120,7 +119,7 @@ type readAtCloser interface {
type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
// NewLogFile creates new LogFile
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
if err != nil {
return nil, err
@ -149,7 +148,6 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
maxFiles: maxFiles,
compress: compress,
decompress: newSharedTempFileConverter(decompress),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
getTailReader: getTailReader,
@ -158,16 +156,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
// WriteLogEntry writes the provided log message to the current log file.
// This may trigger a rotation event if the max file/capacity limits are hit.
func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
b, err := w.marshal(msg)
if err != nil {
return errors.Wrap(err, "error marshalling log message")
}
ts := msg.Timestamp
logger.PutMessage(msg)
msg = nil // Turn use-after-put bugs into panics.
func (w *LogFile) WriteLogEntry(timestamp time.Time, marshalled []byte) error {
select {
case <-w.closed:
return errors.New("cannot write because the output file was closed")
@ -183,12 +172,12 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
}
}
n, err := w.f.Write(b)
n, err := w.f.Write(marshalled)
if err != nil {
return errors.Wrap(err, "error writing log entry")
}
w.pos.size += int64(n)
w.lastTimestamp = ts
w.lastTimestamp = timestamp
// Notify any waiting readers that there is a new log entry to read.
st := <-w.read

View file

@ -120,15 +120,11 @@ func TestCheckCapacityAndRotate(t *testing.T) {
createDecoder := func(io.Reader) Decoder {
return dummyDecoder{}
}
marshal := func(msg *logger.Message) ([]byte, error) {
return msg.Line, nil
}
l, err := NewLogFile(
logPath,
5, // capacity
3, // maxFiles
true, // compress
marshal,
createDecoder,
0600, // perms
getTailReader,
@ -138,14 +134,16 @@ func TestCheckCapacityAndRotate(t *testing.T) {
ls := dirStringer{dir}
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
timestamp := time.Time{}
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
_, err = os.Stat(logPath + ".1")
assert.Assert(t, os.IsNotExist(err), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
@ -154,7 +152,7 @@ func TestCheckCapacityAndRotate(t *testing.T) {
// down the line.
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
l.f.Close()
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world!")))
assert.NilError(t, os.Remove(logPath+".2.gz"))
})
@ -163,14 +161,14 @@ func TestCheckCapacityAndRotate(t *testing.T) {
lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000})
defer lw.ConsumerGone()
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 0!")), ls)
// make sure the log reader is primed
waitForMsg(t, lw, 30*time.Second)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 1!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 1!")), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 2!")), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 3!")), ls)
assert.NilError(t, l.WriteLogEntry(timestamp, []byte("hello world 4!")), ls)
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
})
}