mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
94a10150f6
This makes it so consumers of `LogFile` should pass in how to get an io.Reader to the requested number of lines to tail. This is also much more efficient when tailing a large number of lines. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
97 lines
2.7 KiB
Go
97 lines
2.7 KiB
Go
package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog"
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
|
|
"github.com/docker/docker/api/types/backend"
|
|
"github.com/docker/docker/daemon/logger"
|
|
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
|
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
|
"github.com/docker/docker/pkg/tailfile"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const maxJSONDecodeRetry = 20000
|
|
|
|
// ReadLogs implements the logger's LogReader interface for the logs
|
|
// created by this driver.
|
|
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
logWatcher := logger.NewLogWatcher()
|
|
|
|
go l.readLogs(logWatcher, config)
|
|
return logWatcher
|
|
}
|
|
|
|
func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
defer close(watcher.Msg)
|
|
|
|
l.mu.Lock()
|
|
l.readers[watcher] = struct{}{}
|
|
l.mu.Unlock()
|
|
|
|
l.writer.ReadLogs(config, watcher)
|
|
|
|
l.mu.Lock()
|
|
delete(l.readers, watcher)
|
|
l.mu.Unlock()
|
|
}
|
|
|
|
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
|
l.Reset()
|
|
if err := dec.Decode(l); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var attrs []backend.LogAttr
|
|
if len(l.Attrs) != 0 {
|
|
attrs = make([]backend.LogAttr, 0, len(l.Attrs))
|
|
for k, v := range l.Attrs {
|
|
attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
|
|
}
|
|
}
|
|
msg := &logger.Message{
|
|
Source: l.Stream,
|
|
Timestamp: l.Created,
|
|
Line: []byte(l.Log),
|
|
Attrs: attrs,
|
|
}
|
|
return msg, nil
|
|
}
|
|
|
|
// decodeFunc is used to create a decoder for the log file reader
|
|
func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
|
|
l := &jsonlog.JSONLog{}
|
|
dec := json.NewDecoder(rdr)
|
|
return func() (msg *logger.Message, err error) {
|
|
for retries := 0; retries < maxJSONDecodeRetry; retries++ {
|
|
msg, err = decodeLogLine(dec, l)
|
|
if err == nil || err == io.EOF {
|
|
break
|
|
}
|
|
|
|
logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
|
|
// try again, could be due to a an incomplete json object as we read
|
|
if _, ok := err.(*json.SyntaxError); ok {
|
|
dec = json.NewDecoder(rdr)
|
|
continue
|
|
}
|
|
|
|
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
|
// remaining data in the parser's buffer while an io.EOF occurs.
|
|
// If the json logger writes a partial json log entry to the disk
|
|
// while at the same time the decoder tries to decode it, the race condition happens.
|
|
if err == io.ErrUnexpectedEOF {
|
|
reader := io.MultiReader(dec.Buffered(), rdr)
|
|
dec = json.NewDecoder(reader)
|
|
continue
|
|
}
|
|
}
|
|
return msg, err
|
|
}
|
|
}
|
|
|
|
func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
|
|
return tailfile.NewTailReader(ctx, r, req)
|
|
}
|