From a351b38e7217af059eb2f8fc3dba14dc03836a45 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Thu, 5 Apr 2018 12:41:35 -0400 Subject: [PATCH] Add new `local` log driver This driver uses protobuf to store log messages and has better defaults for log file handling (e.g. compression and file rotation enabled by default). Signed-off-by: Brian Goff --- container/container.go | 18 ++- daemon/logdrivers_linux.go | 1 + daemon/logger/local/config.go | 36 +++++ daemon/logger/local/doc.go | 9 ++ daemon/logger/local/local.go | 218 +++++++++++++++++++++++++++++ daemon/logger/local/local_test.go | 220 ++++++++++++++++++++++++++++++ daemon/logger/local/read.go | 174 +++++++++++++++++++++++ 7 files changed, 675 insertions(+), 1 deletion(-) create mode 100644 daemon/logger/local/config.go create mode 100644 daemon/logger/local/doc.go create mode 100644 daemon/logger/local/local.go create mode 100644 daemon/logger/local/local_test.go create mode 100644 daemon/logger/local/read.go diff --git a/container/container.go b/container/container.go index 5f31d8df12..2c9cc29fda 100644 --- a/container/container.go +++ b/container/container.go @@ -22,7 +22,9 @@ import ( "github.com/docker/docker/daemon/exec" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog" + "github.com/docker/docker/daemon/logger/local" "github.com/docker/docker/daemon/network" + "github.com/docker/docker/errdefs" "github.com/docker/docker/image" "github.com/docker/docker/layer" "github.com/docker/docker/pkg/containerfs" @@ -375,13 +377,27 @@ func (container *Container) StartLogger() (logger.Logger, error) { } // Set logging file for "json-logger" - if cfg.Type == jsonfilelog.Name { + // TODO(@cpuguy83): Setup here based on log driver is a little weird. + switch cfg.Type { + case jsonfilelog.Name: info.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID)) if err != nil { return nil, err } container.LogPath = info.LogPath + case local.Name: + // Do not set container.LogPath for the local driver + // This would expose the value to the API, which should not be done as it means + // that the log file implementation would become a stable API that cannot change. + logDir, err := container.GetRootResourcePath("local-logs") + if err != nil { + return nil, err + } + if err := os.MkdirAll(logDir, 0700); err != nil { + return nil, errdefs.System(errors.Wrap(err, "error creating local logs dir")) + } + info.LogPath = filepath.Join(logDir, "container.log") } l, err := initDriver(info) diff --git a/daemon/logdrivers_linux.go b/daemon/logdrivers_linux.go index 6ddcd2fc8d..67154a7a98 100644 --- a/daemon/logdrivers_linux.go +++ b/daemon/logdrivers_linux.go @@ -9,6 +9,7 @@ import ( _ "github.com/docker/docker/daemon/logger/gelf" _ "github.com/docker/docker/daemon/logger/journald" _ "github.com/docker/docker/daemon/logger/jsonfilelog" + _ "github.com/docker/docker/daemon/logger/local" _ "github.com/docker/docker/daemon/logger/logentries" _ "github.com/docker/docker/daemon/logger/splunk" _ "github.com/docker/docker/daemon/logger/syslog" diff --git a/daemon/logger/local/config.go b/daemon/logger/local/config.go new file mode 100644 index 0000000000..8de160aea2 --- /dev/null +++ b/daemon/logger/local/config.go @@ -0,0 +1,36 @@ +package local + +import ( + "github.com/pkg/errors" +) + +// CreateConfig is used to configure new instances of driver +type CreateConfig struct { + DisableCompression bool + MaxFileSize int64 + MaxFileCount int +} + +func newDefaultConfig() *CreateConfig { + return &CreateConfig{ + MaxFileSize: defaultMaxFileSize, + MaxFileCount: defaultMaxFileCount, + DisableCompression: !defaultCompressLogs, + } +} + +func validateConfig(cfg *CreateConfig) error { + if cfg.MaxFileSize < 0 { + return errors.New("max size should be a positive number") + } + if cfg.MaxFileCount < 0 { + return errors.New("max file count cannot be less than 0") + } + + if !cfg.DisableCompression { + if cfg.MaxFileCount <= 1 { + return errors.New("compression cannot be enabled when max file count is 1") + } + } + return nil +} diff --git a/daemon/logger/local/doc.go b/daemon/logger/local/doc.go new file mode 100644 index 0000000000..7b082a3d53 --- /dev/null +++ b/daemon/logger/local/doc.go @@ -0,0 +1,9 @@ +// Package local provides a logger implementation that stores logs on disk. +// +// Log messages are encoded as protobufs with a header and footer for each message. +// The header and footer are big-endian binary encoded uint32 values which indicate the size of the log message. +// The header and footer of each message allows you to efficiently read through a file either forwards or in +// backwards (such as is the case when tailing a file) +// +// Example log message format: [22][This is a log message.][22][28][This is another log message.][28] +package local // import "github.com/docker/docker/daemon/logger/local" diff --git a/daemon/logger/local/local.go b/daemon/logger/local/local.go new file mode 100644 index 0000000000..86c55784d4 --- /dev/null +++ b/daemon/logger/local/local.go @@ -0,0 +1,218 @@ +package local // import "github.com/docker/docker/daemon/logger/local" + +import ( + "encoding/binary" + "io" + "strconv" + "sync" + "time" + + "github.com/docker/docker/api/types/backend" + "github.com/docker/docker/api/types/plugins/logdriver" + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/loggerutils" + "github.com/docker/docker/errdefs" + "github.com/docker/go-units" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + // Name is the name of the driver + Name = "local" + + encodeBinaryLen = 4 + initialBufSize = 2048 + maxDecodeRetry = 20000 + + defaultMaxFileSize int64 = 20 * 1024 * 1024 + defaultMaxFileCount = 5 + defaultCompressLogs = true +) + +// LogOptKeys are the keys names used for log opts passed in to initialize the driver. +var LogOptKeys = map[string]bool{ + "max-file": true, + "max-size": true, + "compress": true, +} + +// ValidateLogOpt looks for log driver specific options. +func ValidateLogOpt(cfg map[string]string) error { + for key := range cfg { + if !LogOptKeys[key] { + return errors.Errorf("unknown log opt '%s' for log driver %s", key, Name) + } + } + return nil +} + +func init() { + if err := logger.RegisterLogDriver(Name, New); err != nil { + logrus.Fatal(err) + } + if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil { + logrus.Fatal(err) + } +} + +type driver struct { + mu sync.Mutex + closed bool + logfile *loggerutils.LogFile + readers map[*logger.LogWatcher]struct{} // stores the active log followers +} + +// New creates a new local logger +// You must provide the `LogPath` in the passed in info argument, this is the file path that logs are written to. +func New(info logger.Info) (logger.Logger, error) { + if info.LogPath == "" { + return nil, errdefs.System(errors.New("log path is missing -- this is a bug and should not happen")) + } + + cfg := newDefaultConfig() + if capacity, ok := info.Config["max-size"]; ok { + var err error + cfg.MaxFileSize, err = units.FromHumanSize(capacity) + if err != nil { + return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-size: %s", capacity)) + } + } + + if userMaxFileCount, ok := info.Config["max-file"]; ok { + var err error + cfg.MaxFileCount, err = strconv.Atoi(userMaxFileCount) + if err != nil { + return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-file: %s", userMaxFileCount)) + } + } + + if userCompress, ok := info.Config["compress"]; ok { + compressLogs, err := strconv.ParseBool(userCompress) + if err != nil { + return nil, errdefs.InvalidParameter(errors.Wrap(err, "error reading compress log option")) + } + cfg.DisableCompression = !compressLogs + } + return newDriver(info.LogPath, cfg) +} + +func makeMarshaller() func(m *logger.Message) ([]byte, error) { + buf := make([]byte, initialBufSize) + + // allocate the partial log entry separately, which allows for easier re-use + proto := &logdriver.LogEntry{} + md := &logdriver.PartialLogEntryMetadata{} + + return func(m *logger.Message) ([]byte, error) { + resetProto(proto) + + messageToProto(m, proto, md) + protoSize := proto.Size() + writeLen := protoSize + (2 * encodeBinaryLen) //+ len(messageDelimiter) + + if writeLen > len(buf) { + buf = make([]byte, writeLen) + } else { + // shrink the buffer back down + if writeLen <= initialBufSize { + buf = buf[:initialBufSize] + } else { + buf = buf[:writeLen] + } + } + + binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize)) + n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen]) + if err != nil { + return nil, errors.Wrap(err, "error marshaling log entry") + } + if n+(encodeBinaryLen*2) != writeLen { + return nil, io.ErrShortWrite + } + binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize)) + return buf[:writeLen], nil + } +} + +func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) { + if err := validateConfig(cfg); err != nil { + return nil, errdefs.InvalidParameter(err) + } + + lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, makeMarshaller(), decodeFunc, 0640, getTailReader) + if err != nil { + return nil, err + } + return &driver{ + logfile: lf, + readers: make(map[*logger.LogWatcher]struct{}), + }, nil +} + +func (d *driver) Name() string { + return Name +} + +func (d *driver) Log(msg *logger.Message) error { + d.mu.Lock() + err := d.logfile.WriteLogEntry(msg) + d.mu.Unlock() + return err +} + +func (d *driver) Close() error { + d.mu.Lock() + d.closed = true + err := d.logfile.Close() + for r := range d.readers { + r.Close() + delete(d.readers, r) + } + d.mu.Unlock() + return err +} + +func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) { + proto.Source = msg.Source + proto.TimeNano = msg.Timestamp.UnixNano() + proto.Line = append(proto.Line[:0], msg.Line...) + proto.Partial = msg.PLogMetaData != nil + if proto.Partial { + partial.Ordinal = int32(msg.PLogMetaData.Ordinal) + partial.Last = msg.PLogMetaData.Last + partial.Id = msg.PLogMetaData.ID + proto.PartialLogMetadata = partial + } else { + proto.PartialLogMetadata = nil + } +} + +func protoToMessage(proto *logdriver.LogEntry) *logger.Message { + msg := &logger.Message{ + Source: proto.Source, + Timestamp: time.Unix(0, proto.TimeNano), + } + if proto.Partial { + var md backend.PartialLogMetaData + md.Last = proto.GetPartialLogMetadata().GetLast() + md.ID = proto.GetPartialLogMetadata().GetId() + md.Ordinal = int(proto.GetPartialLogMetadata().GetOrdinal()) + msg.PLogMetaData = &md + } + msg.Line = append(msg.Line[:0], proto.Line...) + return msg +} + +func resetProto(proto *logdriver.LogEntry) { + proto.Source = "" + proto.Line = proto.Line[:0] + proto.TimeNano = 0 + proto.Partial = false + if proto.PartialLogMetadata != nil { + proto.PartialLogMetadata.Id = "" + proto.PartialLogMetadata.Last = false + proto.PartialLogMetadata.Ordinal = 0 + } + proto.PartialLogMetadata = nil +} diff --git a/daemon/logger/local/local_test.go b/daemon/logger/local/local_test.go new file mode 100644 index 0000000000..2da67bc6fb --- /dev/null +++ b/daemon/logger/local/local_test.go @@ -0,0 +1,220 @@ +package local + +import ( + "context" + "encoding/binary" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "bytes" + "fmt" + + "strings" + + "io" + + "github.com/docker/docker/api/types/backend" + "github.com/docker/docker/api/types/plugins/logdriver" + "github.com/docker/docker/daemon/logger" + protoio "github.com/gogo/protobuf/io" + "gotest.tools/assert" + is "gotest.tools/assert/cmp" +) + +func TestWriteLog(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", t.Name()) + assert.Assert(t, err) + defer os.RemoveAll(dir) + + logPath := filepath.Join(dir, "test.log") + + l, err := New(logger.Info{LogPath: logPath}) + assert.Assert(t, err) + defer l.Close() + + m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("message 1")} + m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("message 2"), PLogMetaData: &backend.PartialLogMetaData{Last: true, ID: "0001", Ordinal: 1}} + m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("message 3")} + + // copy the log message because the underying log writer resets the log message and returns it to a buffer pool + err = l.Log(copyLogMessage(&m1)) + assert.Assert(t, err) + err = l.Log(copyLogMessage(&m2)) + assert.Assert(t, err) + err = l.Log(copyLogMessage(&m3)) + assert.Assert(t, err) + + f, err := os.Open(logPath) + assert.Assert(t, err) + defer f.Close() + dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6) + + var ( + proto logdriver.LogEntry + testProto logdriver.LogEntry + partial logdriver.PartialLogEntryMetadata + ) + + lenBuf := make([]byte, encodeBinaryLen) + seekMsgLen := func() { + io.ReadFull(f, lenBuf) + } + + err = dec.ReadMsg(&proto) + assert.Assert(t, err) + messageToProto(&m1, &testProto, &partial) + assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto) + seekMsgLen() + + err = dec.ReadMsg(&proto) + assert.Assert(t, err) + messageToProto(&m2, &testProto, &partial) + assert.Check(t, is.DeepEqual(testProto, proto)) + seekMsgLen() + + err = dec.ReadMsg(&proto) + assert.Assert(t, err) + messageToProto(&m3, &testProto, &partial) + assert.Check(t, is.DeepEqual(testProto, proto), "expected:\n%+v\ngot:\n%+v", testProto, proto) +} + +func TestReadLog(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", t.Name()) + assert.Assert(t, err) + defer os.RemoveAll(dir) + + logPath := filepath.Join(dir, "test.log") + l, err := New(logger.Info{LogPath: logPath}) + assert.Assert(t, err) + defer l.Close() + + m1 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("a message")} + m2 := logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 20 * time.Minute), Line: []byte("another message"), PLogMetaData: &backend.PartialLogMetaData{Ordinal: 1, Last: true}} + longMessage := []byte("a really long message " + strings.Repeat("a", initialBufSize*2)) + m3 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: longMessage} + m4 := logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 10 * time.Minute), Line: []byte("just one more message")} + + // copy the log message because the underlying log writer resets the log message and returns it to a buffer pool + err = l.Log(copyLogMessage(&m1)) + assert.Assert(t, err) + err = l.Log(copyLogMessage(&m2)) + assert.Assert(t, err) + err = l.Log(copyLogMessage(&m3)) + assert.Assert(t, err) + err = l.Log(copyLogMessage(&m4)) + assert.Assert(t, err) + + lr := l.(logger.LogReader) + + testMessage := func(t *testing.T, lw *logger.LogWatcher, m *logger.Message) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + select { + case <-ctx.Done(): + assert.Assert(t, ctx.Err()) + case err := <-lw.Err: + assert.Assert(t, err) + case msg, open := <-lw.Msg: + if !open { + select { + case err := <-lw.Err: + assert.Assert(t, err) + default: + assert.Assert(t, m == nil) + return + } + } + assert.Assert(t, m != nil) + if m.PLogMetaData == nil { + // a `\n` is appended on read to make this work with the existing API's when the message is not a partial. + // make sure it's the last entry in the line, and then truncate it for the deep equal below. + assert.Check(t, msg.Line[len(msg.Line)-1] == '\n') + msg.Line = msg.Line[:len(msg.Line)-1] + } + assert.Check(t, is.DeepEqual(m, msg), fmt.Sprintf("\n%+v\n%+v", m, msg)) + } + } + + t.Run("tail exact", func(t *testing.T) { + lw := lr.ReadLogs(logger.ReadConfig{Tail: 4}) + + testMessage(t, lw, &m1) + testMessage(t, lw, &m2) + testMessage(t, lw, &m3) + testMessage(t, lw, &m4) + testMessage(t, lw, nil) // no more messages + }) + + t.Run("tail less than available", func(t *testing.T) { + lw := lr.ReadLogs(logger.ReadConfig{Tail: 2}) + + testMessage(t, lw, &m3) + testMessage(t, lw, &m4) + testMessage(t, lw, nil) // no more messages + }) + + t.Run("tail more than available", func(t *testing.T) { + lw := lr.ReadLogs(logger.ReadConfig{Tail: 100}) + + testMessage(t, lw, &m1) + testMessage(t, lw, &m2) + testMessage(t, lw, &m3) + testMessage(t, lw, &m4) + testMessage(t, lw, nil) // no more messages + }) +} + +func BenchmarkLogWrite(b *testing.B) { + f, err := ioutil.TempFile("", b.Name()) + assert.Assert(b, err) + defer os.Remove(f.Name()) + f.Close() + + local, err := New(logger.Info{LogPath: f.Name()}) + assert.Assert(b, err) + defer local.Close() + + t := time.Now().UTC() + for _, data := range [][]byte{ + []byte(""), + []byte("a short string"), + bytes.Repeat([]byte("a long string"), 100), + bytes.Repeat([]byte("a really long string"), 10000), + } { + b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) { + entry := &logdriver.LogEntry{Line: data, Source: "stdout", TimeNano: t.UnixNano()} + b.SetBytes(int64(entry.Size() + encodeBinaryLen + encodeBinaryLen)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + msg := logger.NewMessage() + msg.Line = data + msg.Timestamp = t + msg.Source = "stdout" + if err := local.Log(msg); err != nil { + b.Fatal(err) + } + } + }) + } +} + +func copyLogMessage(src *logger.Message) *logger.Message { + dst := logger.NewMessage() + dst.Source = src.Source + dst.Timestamp = src.Timestamp + dst.Attrs = src.Attrs + dst.Err = src.Err + dst.Line = append(dst.Line, src.Line...) + if src.PLogMetaData != nil { + dst.PLogMetaData = &(*src.PLogMetaData) + } + return dst +} diff --git a/daemon/logger/local/read.go b/daemon/logger/local/read.go new file mode 100644 index 0000000000..a752de2a8d --- /dev/null +++ b/daemon/logger/local/read.go @@ -0,0 +1,174 @@ +package local + +import ( + "context" + "encoding/binary" + "io" + + "bytes" + + "github.com/docker/docker/api/types/plugins/logdriver" + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/loggerutils" + "github.com/docker/docker/errdefs" + "github.com/pkg/errors" +) + +func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { + logWatcher := logger.NewLogWatcher() + + go d.readLogs(logWatcher, config) + return logWatcher +} + +func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { + defer close(watcher.Msg) + + d.mu.Lock() + d.readers[watcher] = struct{}{} + d.mu.Unlock() + + d.logfile.ReadLogs(config, watcher) + + d.mu.Lock() + delete(d.readers, watcher) + d.mu.Unlock() +} + +func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { + size := r.Size() + if req < 0 { + return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req)) + } + + if size < (encodeBinaryLen*2)+1 { + return bytes.NewReader(nil), 0, nil + } + + const encodeBinaryLen64 = int64(encodeBinaryLen) + var found int + + buf := make([]byte, encodeBinaryLen) + + offset := size + for { + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + default: + } + + n, err := r.ReadAt(buf, offset-encodeBinaryLen64) + if err != nil && err != io.EOF { + return nil, 0, errors.Wrap(err, "error reading log message footer") + } + + if n != encodeBinaryLen { + return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer")) + } + + msgLen := binary.BigEndian.Uint32(buf) + + n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen)) + if err != nil && err != io.EOF { + return nil, 0, errors.Wrap(err, "error reading log message header") + } + + if n != encodeBinaryLen { + return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header")) + } + + if msgLen != binary.BigEndian.Uint32(buf) { + return nil, 0, errdefs.DataLoss(errors.Wrap(err, "log message header and footer indicate different message sizes")) + } + + found++ + offset -= int64(msgLen) + offset -= encodeBinaryLen64 * 2 + if found == req { + break + } + if offset <= 0 { + break + } + } + + return io.NewSectionReader(r, offset, size), found, nil +} + +func decodeFunc(rdr io.Reader) func() (*logger.Message, error) { + proto := &logdriver.LogEntry{} + buf := make([]byte, initialBufSize) + + return func() (*logger.Message, error) { + var ( + read int + err error + ) + + resetProto(proto) + + for i := 0; i < maxDecodeRetry; i++ { + var n int + n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen]) + if err != nil { + if err != io.ErrUnexpectedEOF { + return nil, errors.Wrap(err, "error reading log message length") + } + read += n + continue + } + read += n + break + } + if err != nil { + return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen) + } + + msgLen := int(binary.BigEndian.Uint32(buf[:read])) + + if len(buf) < msgLen+encodeBinaryLen { + buf = make([]byte, msgLen+encodeBinaryLen) + } else { + if msgLen <= initialBufSize { + buf = buf[:initialBufSize] + } else { + buf = buf[:msgLen+encodeBinaryLen] + } + } + + return decodeLogEntry(rdr, proto, buf, msgLen) + } +} + +func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) { + var ( + read int + err error + ) + for i := 0; i < maxDecodeRetry; i++ { + var n int + n, err = io.ReadFull(rdr, buf[read:msgLen+encodeBinaryLen]) + if err != nil { + if err != io.ErrUnexpectedEOF { + return nil, errors.Wrap(err, "could not decode log entry") + } + read += n + continue + } + break + } + if err != nil { + return nil, errors.Wrapf(err, "could not decode entry: read %d, expected: %d", read, msgLen) + } + + if err := proto.Unmarshal(buf[:msgLen]); err != nil { + return nil, errors.Wrap(err, "error unmarshalling log entry") + } + + msg := protoToMessage(proto) + if msg.PLogMetaData == nil { + msg.Line = append(msg.Line, '\n') + } + return msg, nil +}