diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 78a230fe8d..8f59b27855 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -165,7 +165,8 @@ func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() if !l.closed { - l.messages <- msg + // buffer up the data, making sure to copy the Line data + l.messages <- logger.CopyMessage(msg) } return nil } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 9abb59a176..ecb4ed03b6 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -1,7 +1,6 @@ package logger import ( - "bufio" "bytes" "io" "sync" @@ -10,8 +9,13 @@ import ( "github.com/Sirupsen/logrus" ) +const ( + bufSize = 16 * 1024 + readSize = 2 * 1024 +) + // Copier can copy logs from specified sources to Logger and attach Timestamp. -// Writes are concurrent, so you need implement some sync in your logger +// Writes are concurrent, so you need implement some sync in your logger. type Copier struct { // srcs is map of name -> reader pairs, for example "stdout", "stderr" srcs map[string]io.Reader @@ -39,29 +43,75 @@ func (c *Copier) Run() { func (c *Copier) copySrc(name string, src io.Reader) { defer c.copyJobs.Done() - reader := bufio.NewReader(src) + buf := make([]byte, bufSize) + n := 0 + eof := false + msg := &Message{Source: name} for { select { case <-c.closed: return default: - line, err := reader.ReadBytes('\n') - line = bytes.TrimSuffix(line, []byte{'\n'}) - - // ReadBytes can return full or partial output even when it failed. - // e.g. it can return a full entry and EOF. - if err == nil || len(line) > 0 { - if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { - logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) + // Work out how much more data we are okay with reading this time. + upto := n + readSize + if upto > cap(buf) { + upto = cap(buf) + } + // Try to read that data. + if upto > n { + read, err := src.Read(buf[n:upto]) + if err != nil { + if err != io.EOF { + logrus.Errorf("Error scanning log stream: %s", err) + return + } + eof = true + } + n += read + } + // If we have no data to log, and there's no more coming, we're done. + if n == 0 && eof { + return + } + // Break up the data that we've buffered up into lines, and log each in turn. + p := 0 + for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) { + msg.Line = buf[p : p+q] + msg.Timestamp = time.Now().UTC() + msg.Partial = false + select { + case <-c.closed: + return + default: + if logErr := c.dst.Log(msg); logErr != nil { + logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) + } + } + p += q + 1 + } + // If there's no more coming, or the buffer is full but + // has no newlines, log whatever we haven't logged yet, + // noting that it's a partial log line. + if eof || (p == 0 && n == len(buf)) { + if p < n { + msg.Line = buf[p:n] + msg.Timestamp = time.Now().UTC() + msg.Partial = true + if logErr := c.dst.Log(msg); logErr != nil { + logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) + } + p = 0 + n = 0 + } + if eof { + return } } - - if err != nil { - if err != io.EOF { - logrus.Errorf("Error scanning log stream: %s", err) - } - return + // Move any unlogged data to the front of the buffer in preparation for another read. + if p > 0 { + copy(buf[0:], buf[p:n]) + n -= p } } } diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 748dd8b24a..e944116f35 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -84,10 +84,17 @@ func validateLogOpt(cfg map[string]string) error { } func (s *journald) Log(msg *logger.Message) error { - if msg.Source == "stderr" { - return journal.Send(string(msg.Line), journal.PriErr, s.vars) + vars := map[string]string{} + for k, v := range s.vars { + vars[k] = v } - return journal.Send(string(msg.Line), journal.PriInfo, s.vars) + if msg.Partial { + vars["CONTAINER_PARTIAL_MESSAGE"] = "true" + } + if msg.Source == "stderr" { + return journal.Send(string(msg.Line), journal.PriErr, vars) + } + return journal.Send(string(msg.Line), journal.PriInfo, vars) } func (s *journald) Name() string { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index bc009f61cf..04370fdbc0 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -12,11 +12,15 @@ package journald // #include // #include // -//static int get_message(sd_journal *j, const char **msg, size_t *length) +//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial) //{ // int rc; +// size_t plength; // *msg = NULL; // *length = 0; +// plength = strlen("CONTAINER_PARTIAL_MESSAGE=true"); +// rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length); +// *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0)); // rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length); // if (rc == 0) { // if (*length > 8) { @@ -167,7 +171,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea var msg, data, cursor *C.char var length C.size_t var stamp C.uint64_t - var priority C.int + var priority, partial C.int // Walk the journal from here forward until we run out of new entries. drain: @@ -183,7 +187,7 @@ drain: } } // Read and send the logged message, if there is one to read. - i := C.get_message(j, &msg, &length) + i := C.get_message(j, &msg, &length, &partial) if i != -C.ENOENT && i != -C.EADDRNOTAVAIL { // Read the entry's timestamp. if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { @@ -191,7 +195,10 @@ drain: } // Set up the time and text of the entry. timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000) - line := append(C.GoBytes(unsafe.Pointer(msg), C.int(length)), "\n"...) + line := C.GoBytes(unsafe.Pointer(msg), C.int(length)) + if partial == 0 { + line = append(line, "\n"...) + } // Recover the stream name by mapping // from the journal priority back to // the stream that we would have diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 9faa4e02db..a429a08a4f 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -90,8 +90,12 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { return err } l.mu.Lock() + logline := msg.Line + if !msg.Partial { + logline = append(msg.Line, '\n') + } err = (&jsonlog.JSONLogs{ - Log: append(msg.Line, '\n'), + Log: logline, Stream: msg.Source, Created: timestamp, RawAttrs: l.extra, diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index fb8c9a7dee..2e7b2265b7 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -25,12 +25,33 @@ const ( logWatcherBufferSize = 4096 ) -// Message is datastructure that represents record from some container. +// Message is datastructure that represents piece of output produced by some +// container. The Line member is a slice of an array whose contents can be +// changed after a log driver's Log() method returns. type Message struct { Line []byte Source string Timestamp time.Time Attrs LogAttributes + Partial bool +} + +// CopyMessage creates a copy of the passed-in Message which will remain +// unchanged if the original is changed. Log drivers which buffer Messages +// rather than dispatching them during their Log() method should use this +// function to obtain a Message whose Line member's contents won't change. +func CopyMessage(msg *Message) *Message { + m := new(Message) + m.Line = make([]byte, len(msg.Line)) + copy(m.Line, msg.Line) + m.Source = msg.Source + m.Timestamp = msg.Timestamp + m.Partial = msg.Partial + m.Attrs = make(LogAttributes) + for k, v := range m.Attrs { + m.Attrs[k] = v + } + return m } // LogAttributes is used to hold the extra attributes available in the log message