diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index e27b380941..ba9455e6ac 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -203,7 +203,6 @@ func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() if !l.closed { - // buffer up the data, making sure to copy the Line data l.messages <- msg } return nil @@ -347,6 +346,7 @@ func (l *logStream) collectBatch() { }) bytes += (lineBytes + perEventBytes) } + logger.PutMessage(msg) } } } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index e8b06e58fe..65d8fb148e 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -76,15 +76,14 @@ func (c *Copier) copySrc(name string, src io.Reader) { } // Break up the data that we've buffered up into lines, and log each in turn. p := 0 - for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) { + for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') { select { case <-c.closed: return default: - msg := &Message{ - Source: name, - Timestamp: time.Now().UTC(), - } + msg := NewMessage() + msg.Source = name + msg.Timestamp = time.Now().UTC() msg.Line = append(msg.Line, buf[p:p+q]...) if logErr := c.dst.Log(msg); logErr != nil { @@ -98,11 +97,9 @@ func (c *Copier) copySrc(name string, src io.Reader) { // noting that it's a partial log line. if eof || (p == 0 && n == len(buf)) { if p < n { - msg := &Message{ - Source: name, - Timestamp: time.Now().UTC(), - Partial: true, - } + msg := NewMessage() + msg.Source = name + msg.Timestamp = time.Now().UTC() msg.Line = append(msg.Line, buf[p:n]...) msg.Partial = true diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index cfd816a6eb..e6975e2d84 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -208,7 +208,7 @@ func TestCopierSlow(t *testing.T) { type BenchmarkLoggerDummy struct { } -func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil } +func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil } func (l *BenchmarkLoggerDummy) Close() error { return nil } diff --git a/daemon/logger/etwlogs/etwlogs_windows.go b/daemon/logger/etwlogs/etwlogs_windows.go index 0933874525..9b082fe737 100644 --- a/daemon/logger/etwlogs/etwlogs_windows.go +++ b/daemon/logger/etwlogs/etwlogs_windows.go @@ -76,7 +76,9 @@ func (etwLogger *etwLogs) Log(msg *logger.Message) error { logrus.Error(errorMessage) return errors.New(errorMessage) } - return callEventWriteString(createLogMessage(etwLogger, msg)) + m := createLogMessage(etwLogger, msg) + logger.PutMessage(msg) + return callEventWriteString(m) } // Close closes the logger by unregistering the ETW provider. diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index 8c0da26e76..4af3e0e6fa 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -151,9 +151,12 @@ func (f *fluentd) Log(msg *logger.Message) error { for k, v := range f.extra { data[k] = v } + + ts := msg.Timestamp + logger.PutMessage(msg) // fluent-logger-golang buffers logs from failures and disconnections, // and these are transferred again automatically. - return f.writer.PostWithTime(f.tag, msg.Timestamp, data) + return f.writer.PostWithTime(f.tag, ts, data) } func (f *fluentd) Close() error { diff --git a/daemon/logger/gcplogs/gcplogging.go b/daemon/logger/gcplogs/gcplogging.go index ff1cb39c30..3576699933 100644 --- a/daemon/logger/gcplogs/gcplogging.go +++ b/daemon/logger/gcplogs/gcplogging.go @@ -194,12 +194,16 @@ func ValidateLogOpts(cfg map[string]string) error { } func (l *gcplogs) Log(m *logger.Message) error { + data := string(m.Line) + ts := m.Timestamp + logger.PutMessage(m) + l.logger.Log(logging.Entry{ - Timestamp: m.Timestamp, + Timestamp: ts, Payload: &dockerLogEntry{ Instance: l.instance, Container: l.container, - Data: string(m.Line), + Data: data, }, }) return nil diff --git a/daemon/logger/gelf/gelf.go b/daemon/logger/gelf/gelf.go index 42b9570495..b771cab501 100644 --- a/daemon/logger/gelf/gelf.go +++ b/daemon/logger/gelf/gelf.go @@ -133,6 +133,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error { Level: level, RawExtra: s.rawExtra, } + logger.PutMessage(msg) if err := s.writer.WriteMessage(&m); err != nil { return fmt.Errorf("gelf: cannot send GELF message: %v", err) diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 0a16aafd94..712d301c68 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -105,10 +105,14 @@ func (s *journald) Log(msg *logger.Message) error { if msg.Partial { vars["CONTAINER_PARTIAL_MESSAGE"] = "true" } + + line := string(msg.Line) + logger.PutMessage(msg) + if msg.Source == "stderr" { - return journal.Send(string(msg.Line), journal.PriErr, vars) + return journal.Send(line, journal.PriErr, vars) } - return journal.Send(string(msg.Line), journal.PriInfo, vars) + return journal.Send(line, journal.PriInfo, vars) } func (s *journald) Name() string { diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index eb25e419af..d804ed28f4 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -100,6 +100,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { Created: timestamp, RawAttrs: l.extra, }).MarshalJSONBuf(l.buf) + logger.PutMessage(msg) if err != nil { l.mu.Unlock() return err diff --git a/daemon/logger/logentries/logentries.go b/daemon/logger/logentries/logentries.go index 114ddd59d4..64d6893716 100644 --- a/daemon/logger/logentries/logentries.go +++ b/daemon/logger/logentries/logentries.go @@ -61,7 +61,9 @@ func (f *logentries) Log(msg *logger.Message) error { for k, v := range f.extra { data[k] = v } - f.writer.Println(f.tag, msg.Timestamp, data) + ts := msg.Timestamp + logger.PutMessage(msg) + f.writer.Println(f.tag, ts, data) return nil } diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index dadd6f7cfc..7172663aa0 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -26,9 +26,24 @@ const ( logWatcherBufferSize = 4096 ) +var messagePool = &sync.Pool{New: func() interface{} { return &Message{Line: make([]byte, 0, 256)} }} + +// NewMessage returns a new message from the message sync.Pool +func NewMessage() *Message { + return messagePool.Get().(*Message) +} + +// PutMessage puts the specified message back n the message pool. +// The message fields are reset before putting into the pool. +func PutMessage(msg *Message) { + msg.reset() + messagePool.Put(msg) +} + // Message is datastructure that represents piece of output produced by some // container. The Line member is a slice of an array whose contents can be // changed after a log driver's Log() method returns. +// Any changes made to this struct must also be updated in the `reset` function type Message struct { Line []byte Source string @@ -37,6 +52,16 @@ type Message struct { Partial bool } +// reset sets the message back to default values +// This is used when putting a message back into the message pool. +// Any changes to the `Message` struct should be reflected here. +func (m *Message) reset() { + m.Line = m.Line[:0] + m.Source = "" + m.Attrs = nil + m.Partial = false +} + // LogAttributes is used to hold the extra attributes available in the log message // Primarily used for converting the map type to string and sorting. type LogAttributes map[string]string diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go index 9748a558c5..90769d71e1 100644 --- a/daemon/logger/ring.go +++ b/daemon/logger/ring.go @@ -83,10 +83,18 @@ func (r *RingLogger) Close() error { r.setClosed() r.buffer.Close() // empty out the queue + var logErr bool for _, msg := range r.buffer.Drain() { + if logErr { + // some error logging a previous message, so re-insert to message pool + // and assume log driver is hosed + PutMessage(msg) + continue + } + if err := r.l.Log(msg); err != nil { logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) - break + logErr = true } } return r.l.Close() diff --git a/daemon/logger/splunk/splunk.go b/daemon/logger/splunk/splunk.go index 3ae6da71b3..5dcd508786 100644 --- a/daemon/logger/splunk/splunk.go +++ b/daemon/logger/splunk/splunk.go @@ -336,7 +336,7 @@ func (l *splunkLoggerInline) Log(msg *logger.Message) error { event.Source = msg.Source message.Event = &event - + logger.PutMessage(msg) return l.queueMessageAsync(message) } @@ -354,7 +354,7 @@ func (l *splunkLoggerJSON) Log(msg *logger.Message) error { event.Source = msg.Source message.Event = &event - + logger.PutMessage(msg) return l.queueMessageAsync(message) } @@ -362,7 +362,7 @@ func (l *splunkLoggerRaw) Log(msg *logger.Message) error { message := l.createSplunkMessage(msg) message.Event = string(append(l.prefix, msg.Line...)) - + logger.PutMessage(msg) return l.queueMessageAsync(message) } diff --git a/daemon/logger/syslog/syslog.go b/daemon/logger/syslog/syslog.go index f765b7d976..1272b8a647 100644 --- a/daemon/logger/syslog/syslog.go +++ b/daemon/logger/syslog/syslog.go @@ -132,10 +132,12 @@ func New(info logger.Info) (logger.Logger, error) { } func (s *syslogger) Log(msg *logger.Message) error { + line := string(msg.Line) + logger.PutMessage(msg) if msg.Source == "stderr" { - return s.writer.Err(string(msg.Line)) + return s.writer.Err(line) } - return s.writer.Info(string(msg.Line)) + return s.writer.Info(line) } func (s *syslogger) Close() error {