From b1594c59f5e0d1ac898eacde8d91b1ba33c2b626 Mon Sep 17 00:00:00 2001 From: Shijiang Wei Date: Thu, 21 Jan 2016 00:28:10 +0800 Subject: [PATCH] use pubsub instead of filenotify to follow json logs inotify event is trigged immediately there's data written to disk. But at the time that the inotify event is received, the json line might not fully saved to disk. If the json decoder tries to decode in such case, an io.UnexpectedEOF will be trigged. We used to retry for several times to mitigate the io.UnexpectedEOF error. But there are still flaky tests caused by the partial log entries. The daemon knows exactly when there are new log entries emitted. We can use the pubsub package to notify all the log readers instead of inotify. Signed-off-by: Shijiang Wei try to fix broken test. will squash once tests pass Signed-off-by: Shijiang Wei --- daemon/logger/jsonfilelog/jsonfilelog.go | 25 ++-- daemon/logger/jsonfilelog/read.go | 148 ++++++++++------------- pkg/pubsub/publisher.go | 6 +- 3 files changed, 84 insertions(+), 95 deletions(-) diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 86baa316b9..e170a047fe 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/pubsub" "github.com/docker/go-units" ) @@ -22,12 +23,13 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - buf *bytes.Buffer - writer *loggerutils.RotateFileWriter - mu sync.Mutex - ctx logger.Context - readers map[*logger.LogWatcher]struct{} // stores the active log followers - extra []byte // json-encoded extra attributes + buf *bytes.Buffer + writer *loggerutils.RotateFileWriter + mu sync.Mutex + ctx logger.Context + readers map[*logger.LogWatcher]struct{} // stores the active log followers + extra []byte // json-encoded extra attributes + writeNotifier *pubsub.Publisher } func init() { @@ -77,10 +79,11 @@ func New(ctx logger.Context) (logger.Logger, error) { } return &JSONFileLogger{ - buf: bytes.NewBuffer(nil), - writer: writer, - readers: make(map[*logger.LogWatcher]struct{}), - extra: extra, + buf: bytes.NewBuffer(nil), + writer: writer, + readers: make(map[*logger.LogWatcher]struct{}), + extra: extra, + writeNotifier: pubsub.NewPublisher(0, 10), }, nil } @@ -104,6 +107,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { l.buf.WriteByte('\n') _, err = l.writer.Write(l.buf.Bytes()) + l.writeNotifier.Publish(struct{}{}) l.buf.Reset() return err @@ -137,6 +141,7 @@ func (l *JSONFileLogger) Close() error { r.Close() delete(l.readers, r) } + l.writeNotifier.Close() l.mu.Unlock() return err } diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index fd695c83dc..6a4780f3a2 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -10,14 +10,11 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/tailfile" ) -const maxJSONDecodeRetry = 20000 - func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { l.Reset() if err := dec.Decode(l); err != nil { @@ -35,7 +32,6 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro // created by this driver. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { logWatcher := logger.NewLogWatcher() - go l.readLogs(logWatcher, config) return logWatcher } @@ -85,7 +81,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R l.mu.Unlock() notifyRotate := l.writer.NotifyRotate() - followLogs(latestFile, logWatcher, notifyRotate, config.Since) + l.followLogs(latestFile, logWatcher, notifyRotate, config.Since) l.mu.Lock() delete(l.readers, logWatcher) @@ -121,95 +117,81 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti } } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { - dec := json.NewDecoder(f) - l := &jsonlog.JSONLog{} +func (l *JSONFileLogger) followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { + var ( + rotated bool - fileWatcher, err := filenotify.New() - if err != nil { - logWatcher.Err <- err - } - defer fileWatcher.Close() + dec = json.NewDecoder(f) + log = &jsonlog.JSONLog{} + writeNotify = l.writeNotifier.Subscribe() + watchClose = logWatcher.WatchClose() + ) - var retries int - for { - msg, err := decodeLogLine(dec, l) + reopenLogFile := func() error { + f.Close() + f, err := os.Open(f.Name()) if err != nil { - if err != io.EOF { - // try again because this shouldn't happen - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { - dec = json.NewDecoder(f) - retries++ - continue - } + return err + } + dec = json.NewDecoder(f) + rotated = true + return nil + } - // io.ErrUnexpectedEOF is returned from json.Decoder when there is - // remaining data in the parser's buffer while an io.EOF occurs. - // If the json logger writes a partial json log entry to the disk - // while at the same time the decoder tries to decode it, the race condition happens. - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { - reader := io.MultiReader(dec.Buffered(), f) - dec = json.NewDecoder(reader) - retries++ - continue - } - logWatcher.Err <- err - return + readToEnd := func() error { + for { + msg, err := decodeLogLine(dec, log) + if err != nil { + return err } - - logrus.WithField("logger", "json-file").Debugf("waiting for events") - if err := fileWatcher.Add(f.Name()); err != nil { - logrus.WithField("logger", "json-file").Warn("falling back to file poller") - fileWatcher.Close() - fileWatcher = filenotify.NewPollingWatcher() - if err := fileWatcher.Add(f.Name()); err != nil { - logrus.Errorf("error watching log file for modifications: %v", err) - logWatcher.Err <- err - } - } - select { - case <-fileWatcher.Events(): - dec = json.NewDecoder(f) - fileWatcher.Remove(f.Name()) - continue - case <-fileWatcher.Errors(): - fileWatcher.Remove(f.Name()) - logWatcher.Err <- err - return - case <-logWatcher.WatchClose(): - fileWatcher.Remove(f.Name()) - return - case <-notifyRotate: - f, err = os.Open(f.Name()) - if err != nil { - logWatcher.Err <- err - return - } - - dec = json.NewDecoder(f) - fileWatcher.Remove(f.Name()) - fileWatcher.Add(f.Name()) + if !since.IsZero() && msg.Timestamp.Before(since) { continue } - } - - retries = 0 // reset retries since we've succeeded - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - select { - case logWatcher.Msg <- msg: - case <-logWatcher.WatchClose(): logWatcher.Msg <- msg - for { - msg, err := decodeLogLine(dec, l) - if err != nil { + } + } + + defer func() { + l.writeNotifier.Evict(writeNotify) + if rotated { + f.Close() + } + }() + + for { + select { + case <-watchClose: + readToEnd() + return + case <-notifyRotate: + readToEnd() + if err := reopenLogFile(); err != nil { + logWatcher.Err <- err + return + } + case _, ok := <-writeNotify: + if err := readToEnd(); err == io.EOF { + if !ok { + // The writer is closed, no new logs will be generated. return } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue + + select { + case <-notifyRotate: + if err := reopenLogFile(); err != nil { + logWatcher.Err <- err + return + } + default: + dec = json.NewDecoder(f) } - logWatcher.Msg <- msg + + } else if err == io.ErrUnexpectedEOF { + dec = json.NewDecoder(io.MultiReader(dec.Buffered(), f)) + } else { + logrus.Errorf("Failed to decode json log %s: %v", f.Name(), err) + logWatcher.Err <- err + return } } } diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index 8529ffa322..d48d432348 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -54,8 +54,10 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { // Evict removes the specified subscriber from receiving any more messages. func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() - delete(p.subscribers, sub) - close(sub) + if _, ok := p.subscribers[sub]; ok { + delete(p.subscribers, sub) + close(sub) + } p.m.Unlock() }