From 0b4b0a7b5d5de8cb575b666312fceaa2cd58e658 Mon Sep 17 00:00:00 2001 From: Anusha Ragunathan Date: Mon, 18 Dec 2017 18:26:55 -0800 Subject: [PATCH] Improve partial message support in logger Docker daemon has a 16K buffer for log messages. If a message length exceeds 16K, it should be split by the logger and merged at the endpoint. This change adds `PartialLogMetaData` struct for enhanced partial support - LastPartial (bool) : indicates if this is the last of all partials. - ID (string) : unique 32 bit ID. ID is same across all partials. - Ordinal (int starts at 1) : indicates the position of msg in the series of partials. Also, the timestamps across partials in the same. Signed-off-by: Anusha Ragunathan --- api/types/backend/backend.go | 20 +++- daemon/logger/adapter.go | 2 +- daemon/logger/copier.go | 42 ++++++- daemon/logger/copier_test.go | 135 +++++++++++++++++++++++ daemon/logger/journald/journald.go | 2 +- daemon/logger/jsonfilelog/jsonfilelog.go | 2 +- daemon/logger/logger.go | 2 +- daemon/logger/logger_test.go | 6 +- 8 files changed, 196 insertions(+), 15 deletions(-) diff --git a/api/types/backend/backend.go b/api/types/backend/backend.go index ffd556e944..ef1e669c39 100644 --- a/api/types/backend/backend.go +++ b/api/types/backend/backend.go @@ -25,17 +25,27 @@ type ContainerAttachConfig struct { MuxStreams bool } +// PartialLogMetaData provides meta data for a partial log message. Messages +// exceeding a predefined size are split into chunks with this metadata. The +// expectation is for the logger endpoints to assemble the chunks using this +// metadata. +type PartialLogMetaData struct { + Last bool //true if this message is last of a partial + ID string // identifies group of messages comprising a single record + Ordinal int // ordering of message in partial group +} + // LogMessage is datastructure that represents piece of output produced by some // container. The Line member is a slice of an array whose contents can be // changed after a log driver's Log() method returns. // changes to this struct need to be reflect in the reset method in // daemon/logger/logger.go type LogMessage struct { - Line []byte - Source string - Timestamp time.Time - Attrs []LogAttr - Partial bool + Line []byte + Source string + Timestamp time.Time + Attrs []LogAttr + PLogMetaData *PartialLogMetaData // Err is an error associated with a message. Completeness of a message // with Err is not expected, tho it may be partially complete (fields may diff --git a/daemon/logger/adapter.go b/daemon/logger/adapter.go index 71d033f0a9..5b9252d324 100644 --- a/daemon/logger/adapter.go +++ b/daemon/logger/adapter.go @@ -37,7 +37,7 @@ func (a *pluginAdapter) Log(msg *Message) error { a.buf.Line = msg.Line a.buf.TimeNano = msg.Timestamp.UnixNano() - a.buf.Partial = msg.Partial + a.buf.Partial = (msg.PLogMetaData != nil) a.buf.Source = msg.Source err := a.enc.Encode(&a.buf) diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index e38176dc74..ae86777f33 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -6,6 +6,8 @@ import ( "sync" "time" + types "github.com/docker/docker/api/types/backend" + "github.com/docker/docker/pkg/stringid" "github.com/sirupsen/logrus" ) @@ -58,6 +60,11 @@ func (c *Copier) copySrc(name string, src io.Reader) { n := 0 eof := false + var partialid string + var partialTS time.Time + var ordinal int + firstPartial := true + hasMorePartial := false for { select { @@ -87,6 +94,7 @@ func (c *Copier) copySrc(name string, src io.Reader) { } // Break up the data that we've buffered up into lines, and log each in turn. p := 0 + for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') { select { case <-c.closed: @@ -94,9 +102,23 @@ func (c *Copier) copySrc(name string, src io.Reader) { default: msg := NewMessage() msg.Source = name - msg.Timestamp = time.Now().UTC() msg.Line = append(msg.Line, buf[p:p+q]...) + if hasMorePartial { + msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true} + + // reset + partialid = "" + ordinal = 0 + firstPartial = true + hasMorePartial = false + } + if msg.PLogMetaData == nil { + msg.Timestamp = time.Now().UTC() + } else { + msg.Timestamp = partialTS + } + if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) } @@ -110,9 +132,23 @@ func (c *Copier) copySrc(name string, src io.Reader) { if p < n { msg := NewMessage() msg.Source = name - msg.Timestamp = time.Now().UTC() msg.Line = append(msg.Line, buf[p:n]...) - msg.Partial = true + + // Generate unique partialID for first partial. Use it across partials. + // Record timestamp for first partial. Use it across partials. + // Initialize Ordinal for first partial. Increment it across partials. + if firstPartial { + msg.Timestamp = time.Now().UTC() + partialTS = msg.Timestamp + partialid = stringid.GenerateRandomID() + ordinal = 1 + firstPartial = false + } else { + msg.Timestamp = partialTS + } + msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false} + ordinal++ + hasMorePartial = true if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index 62e1a56ecc..d09450bd19 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -258,6 +258,141 @@ func TestCopierWithSized(t *testing.T) { } } +func checkIdentical(t *testing.T, msg Message, expectedID string, expectedTS time.Time) { + if msg.PLogMetaData.ID != expectedID { + t.Fatalf("IDs are not he same across partials. Expected: %s Received: %s", + expectedID, msg.PLogMetaData.ID) + } + if msg.Timestamp != expectedTS { + t.Fatalf("Timestamps are not the same across partials. Expected: %v Received: %v", + expectedTS.Format(time.UnixDate), msg.Timestamp.Format(time.UnixDate)) + } +} + +// Have long lines and make sure that it comes out with PartialMetaData +func TestCopierWithPartial(t *testing.T) { + stdoutLongLine := strings.Repeat("a", defaultBufSize) + stderrLongLine := strings.Repeat("b", defaultBufSize) + stdoutTrailingLine := "stdout trailing line" + stderrTrailingLine := "stderr trailing line" + normalStr := "This is an impartial message :)" + + var stdout bytes.Buffer + var stderr bytes.Buffer + var normalMsg bytes.Buffer + + for i := 0; i < 3; i++ { + if _, err := stdout.WriteString(stdoutLongLine); err != nil { + t.Fatal(err) + } + if _, err := stderr.WriteString(stderrLongLine); err != nil { + t.Fatal(err) + } + } + + if _, err := stdout.WriteString(stdoutTrailingLine + "\n"); err != nil { + t.Fatal(err) + } + if _, err := stderr.WriteString(stderrTrailingLine + "\n"); err != nil { + t.Fatal(err) + } + if _, err := normalMsg.WriteString(normalStr + "\n"); err != nil { + t.Fatal(err) + } + + var jsonBuf bytes.Buffer + + jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} + + c := NewCopier( + map[string]io.Reader{ + "stdout": &stdout, + "normal": &normalMsg, + "stderr": &stderr, + }, + jsonLog) + c.Run() + wait := make(chan struct{}) + go func() { + c.Wait() + close(wait) + }() + select { + case <-time.After(1 * time.Second): + t.Fatal("Copier failed to do its work in 1 second") + case <-wait: + } + + dec := json.NewDecoder(&jsonBuf) + expectedMsgs := 9 + recvMsgs := 0 + var expectedPartID1, expectedPartID2 string + var expectedTS1, expectedTS2 time.Time + + for { + var msg Message + + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + if msg.Source != "stdout" && msg.Source != "stderr" && msg.Source != "normal" { + t.Fatalf("Wrong Source: %q, should be %q or %q or %q", msg.Source, "stdout", "stderr", "normal") + } + + if msg.Source == "stdout" { + if string(msg.Line) != stdoutLongLine && string(msg.Line) != stdoutTrailingLine { + t.Fatalf("Wrong Line: %q, expected 'stdoutLongLine' or 'stdoutTrailingLine'", msg.Line) + } + + if msg.PLogMetaData.ID == "" { + t.Fatalf("Expected partial metadata. Got nothing") + } + + if msg.PLogMetaData.Ordinal == 1 { + expectedPartID1 = msg.PLogMetaData.ID + expectedTS1 = msg.Timestamp + } else { + checkIdentical(t, msg, expectedPartID1, expectedTS1) + } + if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last { + t.Fatalf("Last is not set for last chunk") + } + } + + if msg.Source == "stderr" { + if string(msg.Line) != stderrLongLine && string(msg.Line) != stderrTrailingLine { + t.Fatalf("Wrong Line: %q, expected 'stderrLongLine' or 'stderrTrailingLine'", msg.Line) + } + + if msg.PLogMetaData.ID == "" { + t.Fatalf("Expected partial metadata. Got nothing") + } + + if msg.PLogMetaData.Ordinal == 1 { + expectedPartID2 = msg.PLogMetaData.ID + expectedTS2 = msg.Timestamp + } else { + checkIdentical(t, msg, expectedPartID2, expectedTS2) + } + if msg.PLogMetaData.Ordinal == 4 && !msg.PLogMetaData.Last { + t.Fatalf("Last is not set for last chunk") + } + } + + if msg.Source == "normal" && msg.PLogMetaData != nil { + t.Fatalf("Normal messages should not have PartialLogMetaData") + } + recvMsgs++ + } + + if expectedMsgs != recvMsgs { + t.Fatalf("Expected msgs: %d Recv msgs: %d", expectedMsgs, recvMsgs) + } +} + type BenchmarkLoggerDummy struct { } diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 47dbfb920c..342e18f57f 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -108,7 +108,7 @@ func (s *journald) Log(msg *logger.Message) error { for k, v := range s.vars { vars[k] = v } - if msg.Partial { + if msg.PLogMetaData != nil { vars["CONTAINER_PARTIAL_MESSAGE"] = "true" } diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 2b1e91d063..7d0533ec84 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -132,7 +132,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { logLine := msg.Line - if !msg.Partial { + if msg.PLogMetaData == nil || (msg.PLogMetaData != nil && msg.PLogMetaData.Last) { logLine = append(msg.Line, '\n') } err := (&jsonlog.JSONLogs{ diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index b64d77840e..912e855c7f 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -60,7 +60,7 @@ func (m *Message) reset() { m.Line = m.Line[:0] m.Source = "" m.Attrs = nil - m.Partial = false + m.PLogMetaData = nil m.Err = nil } diff --git a/daemon/logger/logger_test.go b/daemon/logger/logger_test.go index 3e29e22801..eaeec24085 100644 --- a/daemon/logger/logger_test.go +++ b/daemon/logger/logger_test.go @@ -6,9 +6,9 @@ import ( func (m *Message) copy() *Message { msg := &Message{ - Source: m.Source, - Partial: m.Partial, - Timestamp: m.Timestamp, + Source: m.Source, + PLogMetaData: m.PLogMetaData, + Timestamp: m.Timestamp, } if m.Attrs != nil {