diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 86baa316b9..e170a047fe 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/pubsub" "github.com/docker/go-units" ) @@ -22,12 +23,13 @@ const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { - buf *bytes.Buffer - writer *loggerutils.RotateFileWriter - mu sync.Mutex - ctx logger.Context - readers map[*logger.LogWatcher]struct{} // stores the active log followers - extra []byte // json-encoded extra attributes + buf *bytes.Buffer + writer *loggerutils.RotateFileWriter + mu sync.Mutex + ctx logger.Context + readers map[*logger.LogWatcher]struct{} // stores the active log followers + extra []byte // json-encoded extra attributes + writeNotifier *pubsub.Publisher } func init() { @@ -77,10 +79,11 @@ func New(ctx logger.Context) (logger.Logger, error) { } return &JSONFileLogger{ - buf: bytes.NewBuffer(nil), - writer: writer, - readers: make(map[*logger.LogWatcher]struct{}), - extra: extra, + buf: bytes.NewBuffer(nil), + writer: writer, + readers: make(map[*logger.LogWatcher]struct{}), + extra: extra, + writeNotifier: pubsub.NewPublisher(0, 10), }, nil } @@ -104,6 +107,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { l.buf.WriteByte('\n') _, err = l.writer.Write(l.buf.Bytes()) + l.writeNotifier.Publish(struct{}{}) l.buf.Reset() return err @@ -137,6 +141,7 @@ func (l *JSONFileLogger) Close() error { r.Close() delete(l.readers, r) } + l.writeNotifier.Close() l.mu.Unlock() return err } diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index fd695c83dc..6a4780f3a2 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -10,14 +10,11 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/tailfile" ) -const maxJSONDecodeRetry = 20000 - func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { l.Reset() if err := dec.Decode(l); err != nil { @@ -35,7 +32,6 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro // created by this driver. func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { logWatcher := logger.NewLogWatcher() - go l.readLogs(logWatcher, config) return logWatcher } @@ -85,7 +81,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R l.mu.Unlock() notifyRotate := l.writer.NotifyRotate() - followLogs(latestFile, logWatcher, notifyRotate, config.Since) + l.followLogs(latestFile, logWatcher, notifyRotate, config.Since) l.mu.Lock() delete(l.readers, logWatcher) @@ -121,95 +117,81 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti } } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { - dec := json.NewDecoder(f) - l := &jsonlog.JSONLog{} +func (l *JSONFileLogger) followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { + var ( + rotated bool - fileWatcher, err := filenotify.New() - if err != nil { - logWatcher.Err <- err - } - defer fileWatcher.Close() + dec = json.NewDecoder(f) + log = &jsonlog.JSONLog{} + writeNotify = l.writeNotifier.Subscribe() + watchClose = logWatcher.WatchClose() + ) - var retries int - for { - msg, err := decodeLogLine(dec, l) + reopenLogFile := func() error { + f.Close() + f, err := os.Open(f.Name()) if err != nil { - if err != io.EOF { - // try again because this shouldn't happen - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { - dec = json.NewDecoder(f) - retries++ - continue - } + return err + } + dec = json.NewDecoder(f) + rotated = true + return nil + } - // io.ErrUnexpectedEOF is returned from json.Decoder when there is - // remaining data in the parser's buffer while an io.EOF occurs. - // If the json logger writes a partial json log entry to the disk - // while at the same time the decoder tries to decode it, the race condition happens. - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { - reader := io.MultiReader(dec.Buffered(), f) - dec = json.NewDecoder(reader) - retries++ - continue - } - logWatcher.Err <- err - return + readToEnd := func() error { + for { + msg, err := decodeLogLine(dec, log) + if err != nil { + return err } - - logrus.WithField("logger", "json-file").Debugf("waiting for events") - if err := fileWatcher.Add(f.Name()); err != nil { - logrus.WithField("logger", "json-file").Warn("falling back to file poller") - fileWatcher.Close() - fileWatcher = filenotify.NewPollingWatcher() - if err := fileWatcher.Add(f.Name()); err != nil { - logrus.Errorf("error watching log file for modifications: %v", err) - logWatcher.Err <- err - } - } - select { - case <-fileWatcher.Events(): - dec = json.NewDecoder(f) - fileWatcher.Remove(f.Name()) - continue - case <-fileWatcher.Errors(): - fileWatcher.Remove(f.Name()) - logWatcher.Err <- err - return - case <-logWatcher.WatchClose(): - fileWatcher.Remove(f.Name()) - return - case <-notifyRotate: - f, err = os.Open(f.Name()) - if err != nil { - logWatcher.Err <- err - return - } - - dec = json.NewDecoder(f) - fileWatcher.Remove(f.Name()) - fileWatcher.Add(f.Name()) + if !since.IsZero() && msg.Timestamp.Before(since) { continue } - } - - retries = 0 // reset retries since we've succeeded - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - select { - case logWatcher.Msg <- msg: - case <-logWatcher.WatchClose(): logWatcher.Msg <- msg - for { - msg, err := decodeLogLine(dec, l) - if err != nil { + } + } + + defer func() { + l.writeNotifier.Evict(writeNotify) + if rotated { + f.Close() + } + }() + + for { + select { + case <-watchClose: + readToEnd() + return + case <-notifyRotate: + readToEnd() + if err := reopenLogFile(); err != nil { + logWatcher.Err <- err + return + } + case _, ok := <-writeNotify: + if err := readToEnd(); err == io.EOF { + if !ok { + // The writer is closed, no new logs will be generated. return } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue + + select { + case <-notifyRotate: + if err := reopenLogFile(); err != nil { + logWatcher.Err <- err + return + } + default: + dec = json.NewDecoder(f) } - logWatcher.Msg <- msg + + } else if err == io.ErrUnexpectedEOF { + dec = json.NewDecoder(io.MultiReader(dec.Buffered(), f)) + } else { + logrus.Errorf("Failed to decode json log %s: %v", f.Name(), err) + logWatcher.Err <- err + return } } } diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index 8529ffa322..d48d432348 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -54,8 +54,10 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { // Evict removes the specified subscriber from receiving any more messages. func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() - delete(p.subscribers, sub) - close(sub) + if _, ok := p.subscribers[sub]; ok { + delete(p.subscribers, sub) + close(sub) + } p.m.Unlock() }