diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 797644e669..e8df0ecbd0 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "strconv" "sync" @@ -15,6 +16,7 @@ import ( "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/go-units" + "github.com/pkg/errors" ) // Name is the name of the file that the jsonlogger logs to. @@ -22,12 +24,13 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - buf *bytes.Buffer - writer *loggerutils.RotateFileWriter - mu sync.Mutex - readers map[*logger.LogWatcher]struct{} // stores the active log followers - extra []byte // json-encoded extra attributes + extra []byte // json-encoded extra attributes + + mu sync.RWMutex + buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()` closed bool + writer *loggerutils.RotateFileWriter + readers map[*logger.LogWatcher]struct{} // stores the active log followers } func init() { @@ -90,33 +93,45 @@ func New(info logger.Info) (logger.Logger, error) { // Log converts logger.Message to jsonlog.JSONLog and serializes it to file. func (l *JSONFileLogger) Log(msg *logger.Message) error { + l.mu.Lock() + err := writeMessageBuf(l.writer, msg, l.extra, l.buf) + l.buf.Reset() + l.mu.Unlock() + return err +} + +func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { + if err := marshalMessage(m, extra, buf); err != nil { + logger.PutMessage(m) + return err + } + logger.PutMessage(m) + if _, err := w.Write(buf.Bytes()); err != nil { + return errors.Wrap(err, "error writing log entry") + } + return nil +} + +func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error { timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp) if err != nil { return err } - l.mu.Lock() - logline := msg.Line + logLine := msg.Line if !msg.Partial { - logline = append(msg.Line, '\n') + logLine = append(msg.Line, '\n') } err = (&jsonlog.JSONLogs{ - Log: logline, + Log: logLine, Stream: msg.Source, Created: timestamp, - RawAttrs: l.extra, - }).MarshalJSONBuf(l.buf) - logger.PutMessage(msg) + RawAttrs: extra, + }).MarshalJSONBuf(buf) if err != nil { - l.mu.Unlock() - return err + return errors.Wrap(err, "error writing log message to buffer") } - - l.buf.WriteByte('\n') - _, err = l.writer.Write(l.buf.Bytes()) - l.buf.Reset() - l.mu.Unlock() - - return err + err = buf.WriteByte('\n') + return errors.Wrap(err, "error finalizing log buffer") } // ValidateLogOpt looks for json specific log options max-file & max-size. diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 312aa551d0..3fe5967241 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -3,7 +3,6 @@ package jsonfilelog import ( "bytes" "encoding/json" - "errors" "fmt" "io" "os" @@ -18,6 +17,7 @@ import ( "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/tailfile" + "github.com/pkg/errors" ) const maxJSONDecodeRetry = 20000 @@ -48,10 +48,11 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { defer close(logWatcher.Msg) - // lock so the read stream doesn't get corrupted due to rotations or other log data written while we read + // lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files // This will block writes!!! - l.mu.Lock() + l.mu.RLock() + // TODO it would be nice to move a lot of this reader implementation to the rotate logger object pth := l.writer.LogPath() var files []io.ReadSeeker for i := l.writer.MaxFiles(); i > 1; i-- { @@ -59,25 +60,36 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R if err != nil { if !os.IsNotExist(err) { logWatcher.Err <- err - break + l.mu.RUnlock() + return } continue } defer f.Close() - files = append(files, f) } latestFile, err := os.Open(pth) if err != nil { - logWatcher.Err <- err - l.mu.Unlock() + logWatcher.Err <- errors.Wrap(err, "error opening latest log file") + l.mu.RUnlock() return } defer latestFile.Close() + latestChunk, err := newSectionReader(latestFile) + + // Now we have the reader sectioned, all fd's opened, we can unlock. + // New writes/rotates will not affect seeking through these files + l.mu.RUnlock() + + if err != nil { + logWatcher.Err <- err + return + } + if config.Tail != 0 { - tailer := multireader.MultiReadSeeker(append(files, latestFile)...) + tailer := multireader.MultiReadSeeker(append(files, latestChunk)...) tailFile(tailer, logWatcher, config.Tail, config.Since) } @@ -89,19 +101,14 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R } if !config.Follow || l.closed { - l.mu.Unlock() return } - if config.Tail >= 0 { - latestFile.Seek(0, os.SEEK_END) - } - notifyRotate := l.writer.NotifyRotate() defer l.writer.NotifyRotateEvict(notifyRotate) + l.mu.Lock() l.readers[logWatcher] = struct{}{} - l.mu.Unlock() followLogs(latestFile, logWatcher, notifyRotate, config.Since) @@ -111,6 +118,16 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R l.mu.Unlock() } +func newSectionReader(f *os.File) (*io.SectionReader, error) { + // seek to the end to get the size + // we'll leave this at the end of the file since section reader does not advance the reader + size, err := f.Seek(0, os.SEEK_END) + if err != nil { + return nil, errors.Wrap(err, "error getting current file size") + } + return io.NewSectionReader(f, 0, size), nil +} + func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { var rdr io.Reader rdr = f