diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index e170a047fe..86baa316b9 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -14,7 +14,6 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/pkg/jsonlog" - "github.com/docker/docker/pkg/pubsub" "github.com/docker/go-units" ) @@ -23,13 +22,12 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - buf *bytes.Buffer - writer *loggerutils.RotateFileWriter - mu sync.Mutex - ctx logger.Context - readers map[*logger.LogWatcher]struct{} // stores the active log followers - extra []byte // json-encoded extra attributes - writeNotifier *pubsub.Publisher + buf *bytes.Buffer + writer *loggerutils.RotateFileWriter + mu sync.Mutex + ctx logger.Context + readers map[*logger.LogWatcher]struct{} // stores the active log followers + extra []byte // json-encoded extra attributes } func init() { @@ -79,11 +77,10 @@ func New(ctx logger.Context) (logger.Logger, error) { } return &JSONFileLogger{ - buf: bytes.NewBuffer(nil), - writer: writer, - readers: make(map[*logger.LogWatcher]struct{}), - extra: extra, - writeNotifier: pubsub.NewPublisher(0, 10), + buf: bytes.NewBuffer(nil), + writer: writer, + readers: make(map[*logger.LogWatcher]struct{}), + extra: extra, }, nil } @@ -107,7 +104,6 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { l.buf.WriteByte('\n') _, err = l.writer.Write(l.buf.Bytes()) - l.writeNotifier.Publish(struct{}{}) l.buf.Reset() return err @@ -141,7 +137,6 @@ func (l *JSONFileLogger) Close() error { r.Close() delete(l.readers, r) } - l.writeNotifier.Close() l.mu.Unlock() return err } diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 6a4780f3a2..fd695c83dc 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -10,11 +10,14 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/tailfile" ) +const maxJSONDecodeRetry = 20000 + func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { l.Reset() if err := dec.Decode(l); err != nil { @@ -32,6 +35,7 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro // created by this driver. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { logWatcher := logger.NewLogWatcher() + go l.readLogs(logWatcher, config) return logWatcher } @@ -81,7 +85,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R l.mu.Unlock() notifyRotate := l.writer.NotifyRotate() - l.followLogs(latestFile, logWatcher, notifyRotate, config.Since) + followLogs(latestFile, logWatcher, notifyRotate, config.Since) l.mu.Lock() delete(l.readers, logWatcher) @@ -117,81 +121,95 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti } } -func (l *JSONFileLogger) followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { - var ( - rotated bool +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { + dec := json.NewDecoder(f) + l := &jsonlog.JSONLog{} - dec = json.NewDecoder(f) - log = &jsonlog.JSONLog{} - writeNotify = l.writeNotifier.Subscribe() - watchClose = logWatcher.WatchClose() - ) - - reopenLogFile := func() error { - f.Close() - f, err := os.Open(f.Name()) - if err != nil { - return err - } - dec = json.NewDecoder(f) - rotated = true - return nil + fileWatcher, err := filenotify.New() + if err != nil { + logWatcher.Err <- err } + defer fileWatcher.Close() - readToEnd := func() error { - for { - msg, err := decodeLogLine(dec, log) - if err != nil { - return err - } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - logWatcher.Msg <- msg - } - } - - defer func() { - l.writeNotifier.Evict(writeNotify) - if rotated { - f.Close() - } - }() - + var retries int for { - select { - case <-watchClose: - readToEnd() - return - case <-notifyRotate: - readToEnd() - if err := reopenLogFile(); err != nil { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err != io.EOF { + // try again because this shouldn't happen + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { + dec = json.NewDecoder(f) + retries++ + continue + } + + // io.ErrUnexpectedEOF is returned from json.Decoder when there is + // remaining data in the parser's buffer while an io.EOF occurs. + // If the json logger writes a partial json log entry to the disk + // while at the same time the decoder tries to decode it, the race condition happens. + if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { + reader := io.MultiReader(dec.Buffered(), f) + dec = json.NewDecoder(reader) + retries++ + continue + } logWatcher.Err <- err return } - case _, ok := <-writeNotify: - if err := readToEnd(); err == io.EOF { - if !ok { - // The writer is closed, no new logs will be generated. + + logrus.WithField("logger", "json-file").Debugf("waiting for events") + if err := fileWatcher.Add(f.Name()); err != nil { + logrus.WithField("logger", "json-file").Warn("falling back to file poller") + fileWatcher.Close() + fileWatcher = filenotify.NewPollingWatcher() + if err := fileWatcher.Add(f.Name()); err != nil { + logrus.Errorf("error watching log file for modifications: %v", err) + logWatcher.Err <- err + } + } + select { + case <-fileWatcher.Events(): + dec = json.NewDecoder(f) + fileWatcher.Remove(f.Name()) + continue + case <-fileWatcher.Errors(): + fileWatcher.Remove(f.Name()) + logWatcher.Err <- err + return + case <-logWatcher.WatchClose(): + fileWatcher.Remove(f.Name()) + return + case <-notifyRotate: + f, err = os.Open(f.Name()) + if err != nil { + logWatcher.Err <- err return } - select { - case <-notifyRotate: - if err := reopenLogFile(); err != nil { - logWatcher.Err <- err - return - } - default: - dec = json.NewDecoder(f) - } + dec = json.NewDecoder(f) + fileWatcher.Remove(f.Name()) + fileWatcher.Add(f.Name()) + continue + } + } - } else if err == io.ErrUnexpectedEOF { - dec = json.NewDecoder(io.MultiReader(dec.Buffered(), f)) - } else { - logrus.Errorf("Failed to decode json log %s: %v", f.Name(), err) - logWatcher.Err <- err - return + retries = 0 // reset retries since we've succeeded + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + select { + case logWatcher.Msg <- msg: + case <-logWatcher.WatchClose(): + logWatcher.Msg <- msg + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + logWatcher.Msg <- msg } } } diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index 9d2ae42fa7..09364617e4 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -56,10 +56,8 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { // Evict removes the specified subscriber from receiving any more messages. func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() - if _, ok := p.subscribers[sub]; ok { - delete(p.subscribers, sub) - close(sub) - } + delete(p.subscribers, sub) + close(sub) p.m.Unlock() }