1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/logger/jsonfilelog/read.go
Brian Goff e2209185ed Fix log readers can block writes indefinitely
Before this patch, a log reader is able to block all log writes
indefinitely (and other operations) by simply opening the log stream and
not consuming all the messages.

The reason for this is we protect the read stream from corruption by
ensuring there are no new writes while the log stream is consumed (and
caught up with the live entries).

We can get around this issue because log files are append only, so we
can limit reads to only the section of the file that was written to when
the log stream was first requested.

Now logs are only blocked until all files are opened, rather than
streamed to the client.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2017-07-05 14:04:52 -04:00

341 lines
7.7 KiB
Go

package jsonfilelog
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"time"
"github.com/fsnotify/fsnotify"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog/multireader"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/tailfile"
"github.com/pkg/errors"
)
const maxJSONDecodeRetry = 20000
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
l.Reset()
if err := dec.Decode(l); err != nil {
return nil, err
}
msg := &logger.Message{
Source: l.Stream,
Timestamp: l.Created,
Line: []byte(l.Log),
Attrs: l.Attrs,
}
return msg, nil
}
// ReadLogs implements the logger's LogReader interface for the logs
// created by this driver.
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()
go l.readLogs(logWatcher, config)
return logWatcher
}
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(logWatcher.Msg)
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
// This will block writes!!!
l.mu.RLock()
// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
pth := l.writer.LogPath()
var files []io.ReadSeeker
for i := l.writer.MaxFiles(); i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
if err != nil {
if !os.IsNotExist(err) {
logWatcher.Err <- err
l.mu.RUnlock()
return
}
continue
}
defer f.Close()
files = append(files, f)
}
latestFile, err := os.Open(pth)
if err != nil {
logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
l.mu.RUnlock()
return
}
defer latestFile.Close()
latestChunk, err := newSectionReader(latestFile)
// Now we have the reader sectioned, all fd's opened, we can unlock.
// New writes/rotates will not affect seeking through these files
l.mu.RUnlock()
if err != nil {
logWatcher.Err <- err
return
}
if config.Tail != 0 {
tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
tailFile(tailer, logWatcher, config.Tail, config.Since)
}
// close all the rotated files
for _, f := range files {
if err := f.(io.Closer).Close(); err != nil {
logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
}
}
if !config.Follow || l.closed {
return
}
notifyRotate := l.writer.NotifyRotate()
defer l.writer.NotifyRotateEvict(notifyRotate)
l.mu.Lock()
l.readers[logWatcher] = struct{}{}
l.mu.Unlock()
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
l.mu.Lock()
delete(l.readers, logWatcher)
l.mu.Unlock()
}
func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader
size, err := f.Seek(0, os.SEEK_END)
if err != nil {
return nil, errors.Wrap(err, "error getting current file size")
}
return io.NewSectionReader(f, 0, size), nil
}
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
var rdr io.Reader
rdr = f
if tail > 0 {
ls, err := tailfile.TailFile(f, tail)
if err != nil {
logWatcher.Err <- err
return
}
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
}
dec := json.NewDecoder(rdr)
l := &jsonlog.JSONLog{}
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
if err != io.EOF {
logWatcher.Err <- err
}
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
select {
case <-logWatcher.WatchClose():
return
case logWatcher.Msg <- msg:
}
}
}
func watchFile(name string) (filenotify.FileWatcher, error) {
fileWatcher, err := filenotify.New()
if err != nil {
return nil, err
}
if err := fileWatcher.Add(name); err != nil {
logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
fileWatcher.Close()
fileWatcher = filenotify.NewPollingWatcher()
if err := fileWatcher.Add(name); err != nil {
fileWatcher.Close()
logrus.Debugf("error watching log file for modifications: %v", err)
return nil, err
}
}
return fileWatcher, nil
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
dec := json.NewDecoder(f)
l := &jsonlog.JSONLog{}
name := f.Name()
fileWatcher, err := watchFile(name)
if err != nil {
logWatcher.Err <- err
return
}
defer func() {
f.Close()
fileWatcher.Remove(name)
fileWatcher.Close()
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-logWatcher.WatchClose():
fileWatcher.Remove(name)
cancel()
case <-ctx.Done():
return
}
}()
var retries int
handleRotate := func() error {
f.Close()
fileWatcher.Remove(name)
// retry when the file doesn't exist
for retries := 0; retries <= 5; retries++ {
f, err = os.Open(name)
if err == nil || !os.IsNotExist(err) {
break
}
}
if err != nil {
return err
}
if err := fileWatcher.Add(name); err != nil {
return err
}
dec = json.NewDecoder(f)
return nil
}
errRetry := errors.New("retry")
errDone := errors.New("done")
waitRead := func() error {
select {
case e := <-fileWatcher.Events():
switch e.Op {
case fsnotify.Write:
dec = json.NewDecoder(f)
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-notifyRotate:
case <-ctx.Done():
return errDone
}
if err := handleRotate(); err != nil {
return err
}
return nil
}
return errRetry
case err := <-fileWatcher.Errors():
logrus.Debug("logger got error watching file: %v", err)
// Something happened, let's try and stay alive and create a new watcher
if retries <= 5 {
fileWatcher.Close()
fileWatcher, err = watchFile(name)
if err != nil {
return err
}
retries++
return errRetry
}
return err
case <-ctx.Done():
return errDone
}
}
handleDecodeErr := func(err error) error {
if err == io.EOF {
for {
err := waitRead()
if err == nil {
break
}
if err == errRetry {
continue
}
return err
}
return nil
}
// try again because this shouldn't happen
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
dec = json.NewDecoder(f)
retries++
return nil
}
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
// remaining data in the parser's buffer while an io.EOF occurs.
// If the json logger writes a partial json log entry to the disk
// while at the same time the decoder tries to decode it, the race condition happens.
if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
reader := io.MultiReader(dec.Buffered(), f)
dec = json.NewDecoder(reader)
retries++
return nil
}
return err
}
// main loop
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
if err := handleDecodeErr(err); err != nil {
if err == errDone {
return
}
// we got an unrecoverable error, so return
logWatcher.Err <- err
return
}
// ready to try again
continue
}
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
select {
case logWatcher.Msg <- msg:
case <-ctx.Done():
logWatcher.Msg <- msg
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
logWatcher.Msg <- msg
}
}
}
}