package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog" import ( "context" "encoding/json" "io" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/pkg/tailfile" "github.com/sirupsen/logrus" ) const maxJSONDecodeRetry = 20000 // ReadLogs implements the logger's LogReader interface for the logs // created by this driver. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { logWatcher := logger.NewLogWatcher() go l.readLogs(logWatcher, config) return logWatcher } func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { defer close(watcher.Msg) l.mu.Lock() l.readers[watcher] = struct{}{} l.mu.Unlock() l.writer.ReadLogs(config, watcher) l.mu.Lock() delete(l.readers, watcher) l.mu.Unlock() } func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { l.Reset() if err := dec.Decode(l); err != nil { return nil, err } var attrs []backend.LogAttr if len(l.Attrs) != 0 { attrs = make([]backend.LogAttr, 0, len(l.Attrs)) for k, v := range l.Attrs { attrs = append(attrs, backend.LogAttr{Key: k, Value: v}) } } msg := &logger.Message{ Source: l.Stream, Timestamp: l.Created, Line: []byte(l.Log), Attrs: attrs, } return msg, nil } type decoder struct { rdr io.Reader dec *json.Decoder jl *jsonlog.JSONLog } func (d *decoder) Reset(rdr io.Reader) { d.rdr = rdr d.dec = nil if d.jl != nil { d.jl.Reset() } } func (d *decoder) Close() { d.dec = nil d.rdr = nil d.jl = nil } func (d *decoder) Decode() (msg *logger.Message, err error) { if d.dec == nil { d.dec = json.NewDecoder(d.rdr) } if d.jl == nil { d.jl = &jsonlog.JSONLog{} } for retries := 0; retries < maxJSONDecodeRetry; retries++ { msg, err = decodeLogLine(d.dec, d.jl) if err == nil || err == io.EOF { break } logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json") // try again, could be due to a an incomplete json object as we read if _, ok := err.(*json.SyntaxError); ok { d.dec = json.NewDecoder(d.rdr) continue } // io.ErrUnexpectedEOF is returned from json.Decoder when there is // remaining data in the parser's buffer while an io.EOF occurs. // If the json logger writes a partial json log entry to the disk // while at the same time the decoder tries to decode it, the race condition happens. if err == io.ErrUnexpectedEOF { d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr) d.dec = json.NewDecoder(d.rdr) continue } } return msg, err } // decodeFunc is used to create a decoder for the log file reader func decodeFunc(rdr io.Reader) loggerutils.Decoder { return &decoder{ rdr: rdr, dec: nil, jl: nil, } } func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { return tailfile.NewTailReader(ctx, r, req) }