2018-02-05 16:05:59 -05:00
|
|
|
package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog"
|
2015-10-08 16:55:28 -04:00
|
|
|
|
|
|
|
import (
|
2018-04-05 12:39:28 -04:00
|
|
|
"context"
|
2015-10-08 16:55:28 -04:00
|
|
|
"encoding/json"
|
|
|
|
"io"
|
2016-07-11 16:31:42 -04:00
|
|
|
|
2017-07-18 22:01:20 -04:00
|
|
|
"github.com/docker/docker/api/types/backend"
|
2015-10-08 16:55:28 -04:00
|
|
|
"github.com/docker/docker/daemon/logger"
|
2017-09-25 15:57:45 -04:00
|
|
|
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
|
2018-04-05 12:39:28 -04:00
|
|
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
|
|
|
"github.com/docker/docker/pkg/tailfile"
|
|
|
|
"github.com/sirupsen/logrus"
|
2015-10-08 16:55:28 -04:00
|
|
|
)
|
|
|
|
|
2016-02-23 21:07:38 -05:00
|
|
|
const maxJSONDecodeRetry = 20000
|
|
|
|
|
2015-10-08 16:55:28 -04:00
|
|
|
// ReadLogs implements the logger's LogReader interface for the logs
|
|
|
|
// created by this driver.
|
|
|
|
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
|
|
logWatcher := logger.NewLogWatcher()
|
2016-02-23 21:07:38 -05:00
|
|
|
|
2015-10-08 16:55:28 -04:00
|
|
|
go l.readLogs(logWatcher, config)
|
|
|
|
return logWatcher
|
|
|
|
}
|
|
|
|
|
2017-07-18 11:54:03 -04:00
|
|
|
func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
|
|
defer close(watcher.Msg)
|
2017-03-15 11:41:12 -04:00
|
|
|
|
2017-06-30 13:13:32 -04:00
|
|
|
l.mu.Lock()
|
2017-07-18 11:54:03 -04:00
|
|
|
l.readers[watcher] = struct{}{}
|
2015-10-08 16:55:28 -04:00
|
|
|
l.mu.Unlock()
|
|
|
|
|
2017-07-18 11:54:03 -04:00
|
|
|
l.writer.ReadLogs(config, watcher)
|
2015-10-08 16:55:28 -04:00
|
|
|
|
|
|
|
l.mu.Lock()
|
2017-07-18 11:54:03 -04:00
|
|
|
delete(l.readers, watcher)
|
2015-10-08 16:55:28 -04:00
|
|
|
l.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
2017-07-18 11:54:03 -04:00
|
|
|
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
|
|
|
l.Reset()
|
|
|
|
if err := dec.Decode(l); err != nil {
|
2016-07-11 16:31:42 -04:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-07-18 11:54:03 -04:00
|
|
|
var attrs []backend.LogAttr
|
|
|
|
if len(l.Attrs) != 0 {
|
|
|
|
attrs = make([]backend.LogAttr, 0, len(l.Attrs))
|
|
|
|
for k, v := range l.Attrs {
|
|
|
|
attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
|
2016-07-11 16:31:42 -04:00
|
|
|
}
|
|
|
|
}
|
2017-07-18 11:54:03 -04:00
|
|
|
msg := &logger.Message{
|
|
|
|
Source: l.Stream,
|
|
|
|
Timestamp: l.Created,
|
|
|
|
Line: []byte(l.Log),
|
|
|
|
Attrs: attrs,
|
|
|
|
}
|
|
|
|
return msg, nil
|
2016-07-11 16:31:42 -04:00
|
|
|
}
|
|
|
|
|
2017-07-18 11:54:03 -04:00
|
|
|
// decodeFunc is used to create a decoder for the log file reader
|
|
|
|
func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
|
2016-02-23 21:07:38 -05:00
|
|
|
l := &jsonlog.JSONLog{}
|
2017-07-18 11:54:03 -04:00
|
|
|
dec := json.NewDecoder(rdr)
|
|
|
|
return func() (msg *logger.Message, err error) {
|
|
|
|
for retries := 0; retries < maxJSONDecodeRetry; retries++ {
|
|
|
|
msg, err = decodeLogLine(dec, l)
|
2018-04-05 12:39:28 -04:00
|
|
|
if err == nil || err == io.EOF {
|
2016-07-11 16:31:42 -04:00
|
|
|
break
|
|
|
|
}
|
2015-10-08 16:55:28 -04:00
|
|
|
|
2018-04-05 12:39:28 -04:00
|
|
|
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
|
2017-07-18 11:54:03 -04:00
|
|
|
// try again, could be due to a an incomplete json object as we read
|
|
|
|
if _, ok := err.(*json.SyntaxError); ok {
|
|
|
|
dec = json.NewDecoder(rdr)
|
|
|
|
continue
|
2015-10-08 16:55:28 -04:00
|
|
|
}
|
2017-04-28 07:53:00 -04:00
|
|
|
|
2017-07-18 11:54:03 -04:00
|
|
|
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
|
|
|
// remaining data in the parser's buffer while an io.EOF occurs.
|
|
|
|
// If the json logger writes a partial json log entry to the disk
|
|
|
|
// while at the same time the decoder tries to decode it, the race condition happens.
|
|
|
|
if err == io.ErrUnexpectedEOF {
|
|
|
|
reader := io.MultiReader(dec.Buffered(), rdr)
|
|
|
|
dec = json.NewDecoder(reader)
|
2018-04-05 12:39:28 -04:00
|
|
|
continue
|
2015-10-08 16:55:28 -04:00
|
|
|
}
|
|
|
|
}
|
2017-07-18 11:54:03 -04:00
|
|
|
return msg, err
|
2015-10-08 16:55:28 -04:00
|
|
|
}
|
|
|
|
}
|
2018-04-05 12:39:28 -04:00
|
|
|
|
|
|
|
func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
|
|
|
|
return tailfile.NewTailReader(ctx, r, req)
|
|
|
|
}
|