diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 2b482d1f38..5c4e2458fd 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -7,29 +7,20 @@ import ( "bytes" "encoding/json" "fmt" - "io" "os" "strconv" "sync" - "time" - - "gopkg.in/fsnotify.v1" "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/pubsub" - "github.com/docker/docker/pkg/tailfile" "github.com/docker/docker/pkg/timeutils" "github.com/docker/docker/pkg/units" ) -const ( - // Name is the name of the file that the jsonlogger logs to. - Name = "json-file" - maxJSONDecodeRetry = 20000 -) +// Name is the name of the file that the jsonlogger logs to. +const Name = "json-file" // JSONFileLogger is Logger implementation for default Docker logging. type JSONFileLogger struct { @@ -228,193 +219,3 @@ func (l *JSONFileLogger) Close() error { func (l *JSONFileLogger) Name() string { return Name } - -func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { - l.Reset() - if err := dec.Decode(l); err != nil { - return nil, err - } - msg := &logger.Message{ - Source: l.Stream, - Timestamp: l.Created, - Line: []byte(l.Log), - } - return msg, nil -} - -// ReadLogs implements the logger's LogReader interface for the logs -// created by this driver. -func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { - logWatcher := logger.NewLogWatcher() - - go l.readLogs(logWatcher, config) - return logWatcher -} - -func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { - defer close(logWatcher.Msg) - - pth := l.ctx.LogPath - var files []io.ReadSeeker - for i := l.n; i > 1; i-- { - f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1)) - if err != nil { - if !os.IsNotExist(err) { - logWatcher.Err <- err - break - } - continue - } - defer f.Close() - files = append(files, f) - } - - latestFile, err := os.Open(pth) - if err != nil { - logWatcher.Err <- err - return - } - defer latestFile.Close() - - files = append(files, latestFile) - tailer := ioutils.MultiReadSeeker(files...) - - if config.Tail != 0 { - tailFile(tailer, logWatcher, config.Tail, config.Since) - } - - if !config.Follow { - return - } - - if config.Tail >= 0 { - latestFile.Seek(0, os.SEEK_END) - } - - l.mu.Lock() - l.readers[logWatcher] = struct{}{} - l.mu.Unlock() - - notifyRotate := l.notifyRotate.Subscribe() - followLogs(latestFile, logWatcher, notifyRotate, config.Since) - - l.mu.Lock() - delete(l.readers, logWatcher) - l.mu.Unlock() - - l.notifyRotate.Evict(notifyRotate) -} - -func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { - var rdr io.Reader = f - if tail > 0 { - ls, err := tailfile.TailFile(f, tail) - if err != nil { - logWatcher.Err <- err - return - } - rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) - } - dec := json.NewDecoder(rdr) - l := &jsonlog.JSONLog{} - for { - msg, err := decodeLogLine(dec, l) - if err != nil { - if err != io.EOF { - logWatcher.Err <- err - } - return - } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - logWatcher.Msg <- msg - } -} - -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { - dec := json.NewDecoder(f) - l := &jsonlog.JSONLog{} - fileWatcher, err := fsnotify.NewWatcher() - if err != nil { - logWatcher.Err <- err - return - } - defer fileWatcher.Close() - if err := fileWatcher.Add(f.Name()); err != nil { - logWatcher.Err <- err - return - } - - var retries int - for { - msg, err := decodeLogLine(dec, l) - if err != nil { - if err != io.EOF { - // try again because this shouldn't happen - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { - dec = json.NewDecoder(f) - retries++ - continue - } - - // io.ErrUnexpectedEOF is returned from json.Decoder when there is - // remaining data in the parser's buffer while an io.EOF occurs. - // If the json logger writes a partial json log entry to the disk - // while at the same time the decoder tries to decode it, the race codition happens. - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { - reader := io.MultiReader(dec.Buffered(), f) - dec = json.NewDecoder(reader) - retries++ - continue - } - logWatcher.Err <- err - return - } - - select { - case <-fileWatcher.Events: - dec = json.NewDecoder(f) - continue - case <-fileWatcher.Errors: - logWatcher.Err <- err - return - case <-logWatcher.WatchClose(): - return - case <-notifyRotate: - fileWatcher.Remove(f.Name()) - - f, err = os.Open(f.Name()) - if err != nil { - logWatcher.Err <- err - return - } - if err := fileWatcher.Add(f.Name()); err != nil { - logWatcher.Err <- err - } - dec = json.NewDecoder(f) - continue - } - } - - retries = 0 // reset retries since we've succeeded - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - select { - case logWatcher.Msg <- msg: - case <-logWatcher.WatchClose(): - logWatcher.Msg <- msg - for { - msg, err := decodeLogLine(dec, l) - if err != nil { - return - } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - logWatcher.Msg <- msg - } - } - } -} diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go new file mode 100644 index 0000000000..56772aef92 --- /dev/null +++ b/daemon/logger/jsonfilelog/read.go @@ -0,0 +1,216 @@ +package jsonfilelog + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "os" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/pkg/filenotify" + "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/tailfile" +) + +const maxJSONDecodeRetry = 20000 + +func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { + l.Reset() + if err := dec.Decode(l); err != nil { + return nil, err + } + msg := &logger.Message{ + Source: l.Stream, + Timestamp: l.Created, + Line: []byte(l.Log), + } + return msg, nil +} + +// ReadLogs implements the logger's LogReader interface for the logs +// created by this driver. +func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { + logWatcher := logger.NewLogWatcher() + + go l.readLogs(logWatcher, config) + return logWatcher +} + +func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { + defer close(logWatcher.Msg) + + pth := l.ctx.LogPath + var files []io.ReadSeeker + for i := l.n; i > 1; i-- { + f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1)) + if err != nil { + if !os.IsNotExist(err) { + logWatcher.Err <- err + break + } + continue + } + defer f.Close() + files = append(files, f) + } + + latestFile, err := os.Open(pth) + if err != nil { + logWatcher.Err <- err + return + } + defer latestFile.Close() + + files = append(files, latestFile) + tailer := ioutils.MultiReadSeeker(files...) + + if config.Tail != 0 { + tailFile(tailer, logWatcher, config.Tail, config.Since) + } + + if !config.Follow { + return + } + + if config.Tail >= 0 { + latestFile.Seek(0, os.SEEK_END) + } + + l.mu.Lock() + l.readers[logWatcher] = struct{}{} + l.mu.Unlock() + + notifyRotate := l.notifyRotate.Subscribe() + followLogs(latestFile, logWatcher, notifyRotate, config.Since) + + l.mu.Lock() + delete(l.readers, logWatcher) + l.mu.Unlock() + + l.notifyRotate.Evict(notifyRotate) +} + +func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { + var rdr io.Reader = f + if tail > 0 { + ls, err := tailfile.TailFile(f, tail) + if err != nil { + logWatcher.Err <- err + return + } + rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) + } + dec := json.NewDecoder(rdr) + l := &jsonlog.JSONLog{} + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err != io.EOF { + logWatcher.Err <- err + } + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + logWatcher.Msg <- msg + } +} + +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { + dec := json.NewDecoder(f) + l := &jsonlog.JSONLog{} + + fileWatcher, err := filenotify.New() + if err != nil { + logWatcher.Err <- err + } + defer fileWatcher.Close() + + var retries int + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + if err != io.EOF { + // try again because this shouldn't happen + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { + dec = json.NewDecoder(f) + retries++ + continue + } + + // io.ErrUnexpectedEOF is returned from json.Decoder when there is + // remaining data in the parser's buffer while an io.EOF occurs. + // If the json logger writes a partial json log entry to the disk + // while at the same time the decoder tries to decode it, the race codition happens. + if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { + reader := io.MultiReader(dec.Buffered(), f) + dec = json.NewDecoder(reader) + retries++ + continue + } + logWatcher.Err <- err + return + } + + logrus.WithField("logger", "json-file").Debugf("waiting for events") + if err := fileWatcher.Add(f.Name()); err != nil { + logrus.WithField("logger", "json-file").Warn("falling back to file poller") + fileWatcher.Close() + fileWatcher = filenotify.NewPollingWatcher() + if err := fileWatcher.Add(f.Name()); err != nil { + logrus.Errorf("error watching log file for modifications: %v", err) + logWatcher.Err <- err + } + } + select { + case <-fileWatcher.Events(): + dec = json.NewDecoder(f) + fileWatcher.Remove(f.Name()) + continue + case <-fileWatcher.Errors(): + fileWatcher.Remove(f.Name()) + logWatcher.Err <- err + return + case <-logWatcher.WatchClose(): + fileWatcher.Remove(f.Name()) + return + case <-notifyRotate: + f, err = os.Open(f.Name()) + if err != nil { + logWatcher.Err <- err + return + } + + dec = json.NewDecoder(f) + fileWatcher.Remove(f.Name()) + fileWatcher.Add(f.Name()) + continue + } + } + + retries = 0 // reset retries since we've succeeded + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + select { + case logWatcher.Msg <- msg: + case <-logWatcher.WatchClose(): + logWatcher.Msg <- msg + for { + msg, err := decodeLogLine(dec, l) + if err != nil { + return + } + if !since.IsZero() && msg.Timestamp.Before(since) { + continue + } + logWatcher.Msg <- msg + } + } + } +} diff --git a/pkg/filenotify/filenotify.go b/pkg/filenotify/filenotify.go new file mode 100644 index 0000000000..e042c6bba4 --- /dev/null +++ b/pkg/filenotify/filenotify.go @@ -0,0 +1,40 @@ +// Package filenotify provides a mechanism for watching file(s) for changes. +// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support. +// These are wrapped up in a common interface so that either can be used interchangably in your code. +package filenotify + +import "gopkg.in/fsnotify.v1" + +// FileWatcher is an interface for implementing file notification watchers +type FileWatcher interface { + Events() <-chan fsnotify.Event + Errors() <-chan error + Add(name string) error + Remove(name string) error + Close() error +} + +// New tries to use an fs-event watcher, and falls back to the poller if there is an error +func New() (FileWatcher, error) { + if watcher, err := NewEventWatcher(); err == nil { + return watcher, nil + } + return NewPollingWatcher(), nil +} + +// NewPollingWatcher returns a poll-based file watcher +func NewPollingWatcher() FileWatcher { + return &filePoller{ + events: make(chan fsnotify.Event), + errors: make(chan error), + } +} + +// NewEventWatcher returns an fs-event based file watcher +func NewEventWatcher() (FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &fsNotifyWatcher{watcher}, nil +} diff --git a/pkg/filenotify/fsnotify.go b/pkg/filenotify/fsnotify.go new file mode 100644 index 0000000000..4203883585 --- /dev/null +++ b/pkg/filenotify/fsnotify.go @@ -0,0 +1,18 @@ +package filenotify + +import "gopkg.in/fsnotify.v1" + +// fsNotify wraps the fsnotify package to satisfy the FileNotifer interface +type fsNotifyWatcher struct { + *fsnotify.Watcher +} + +// GetEvents returns the fsnotify event channel receiver +func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event { + return w.Watcher.Events +} + +// GetErrors returns the fsnotify error channel receiver +func (w *fsNotifyWatcher) Errors() <-chan error { + return w.Watcher.Errors +} diff --git a/pkg/filenotify/poller.go b/pkg/filenotify/poller.go new file mode 100644 index 0000000000..a55266d0e5 --- /dev/null +++ b/pkg/filenotify/poller.go @@ -0,0 +1,205 @@ +package filenotify + +import ( + "errors" + "fmt" + "os" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "gopkg.in/fsnotify.v1" +) + +var ( + // errPollerClosed is returned when the poller is closed + errPollerClosed = errors.New("poller is closed") + // errNoSuchPoller is returned when trying to remove a watch that doesn't exist + errNoSuchWatch = errors.New("poller does not exist") +) + +// watchWaitTime is the time to wait between file poll loops +const watchWaitTime = 200 * time.Millisecond + +// filePoller is used to poll files for changes, especially in cases where fsnotify +// can't be run (e.g. when inotify handles are exhausted) +// filePoller satifies the FileWatcher interface +type filePoller struct { + // watches is the list of files currently being polled, close the associated channel to stop the watch + watches map[string]chan struct{} + // events is the channel to listen to for watch events + events chan fsnotify.Event + // errors is the channel to listen to for watch errors + errors chan error + // mu locks the poller for modification + mu sync.Mutex + // closed is used to specify when the poller has already closed + closed bool +} + +// Add adds a filename to the list of watches +// once added the file is polled for changes in a separate goroutine +func (w *filePoller) Add(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed == true { + return errPollerClosed + } + + f, err := os.Open(name) + if err != nil { + return err + } + fi, err := os.Stat(name) + if err != nil { + return err + } + + if w.watches == nil { + w.watches = make(map[string]chan struct{}) + } + if _, exists := w.watches[name]; exists { + return fmt.Errorf("watch exists") + } + chClose := make(chan struct{}) + w.watches[name] = chClose + + go w.watch(f, fi, chClose) + return nil +} + +// Remove stops and removes watch with the specified name +func (w *filePoller) Remove(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.remove(name) +} + +func (w *filePoller) remove(name string) error { + if w.closed == true { + return errPollerClosed + } + + chClose, exists := w.watches[name] + if !exists { + return errNoSuchWatch + } + close(chClose) + delete(w.watches, name) + return nil +} + +// Events returns the event channel +// This is used for notifications on events about watched files +func (w *filePoller) Events() <-chan fsnotify.Event { + return w.events +} + +// Errors returns the errors channel +// This is used for notifications about errors on watched files +func (w *filePoller) Errors() <-chan error { + return w.errors +} + +// Close closes the poller +// All watches are stopped, removed, and the poller cannot be added to +func (w *filePoller) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return nil + } + + w.closed = true + for name := range w.watches { + w.remove(name) + delete(w.watches, name) + } + close(w.events) + close(w.errors) + return nil +} + +// sendEvent publishes the specified event to the events channel +func (w *filePoller) sendEvent(e fsnotify.Event, chClose <-chan struct{}) error { + select { + case w.events <- e: + case <-chClose: + return fmt.Errorf("closed") + } + return nil +} + +// sendErr publishes the specified error to the errors channel +func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error { + select { + case w.errors <- e: + case <-chClose: + return fmt.Errorf("closed") + } + return nil +} + +// watch is responsible for polling the specified file for changes +// upon finding changes to a file or errors, sendEvent/sendErr is called +func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) { + for { + time.Sleep(watchWaitTime) + select { + case <-chClose: + logrus.Debugf("watch for %s closed", f.Name()) + return + default: + } + + fi, err := os.Stat(f.Name()) + if err != nil { + // if we got an error here and lastFi is not set, we can presume that nothing has changed + // This should be safe since before `watch()` is called, a stat is performed, there is any error `watch` is not called + if lastFi == nil { + continue + } + // If it doesn't exist at this point, it must have been removed + // no need to send the error here since this is a valid operation + if os.IsNotExist(err) { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Remove, Name: f.Name()}, chClose); err != nil { + return + } + lastFi = nil + continue + } + // at this point, send the error + if err := w.sendErr(err, chClose); err != nil { + return + } + continue + } + + if lastFi == nil { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Create, Name: fi.Name()}, chClose); err != nil { + return + } + lastFi = fi + continue + } + + if fi.Mode() != lastFi.Mode() { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Chmod, Name: fi.Name()}, chClose); err != nil { + return + } + lastFi = fi + continue + } + + if fi.ModTime() != lastFi.ModTime() || fi.Size() != lastFi.Size() { + if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Write, Name: fi.Name()}, chClose); err != nil { + return + } + lastFi = fi + continue + } + } +} diff --git a/pkg/filenotify/poller_test.go b/pkg/filenotify/poller_test.go new file mode 100644 index 0000000000..da946d249b --- /dev/null +++ b/pkg/filenotify/poller_test.go @@ -0,0 +1,133 @@ +package filenotify + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "gopkg.in/fsnotify.v1" +) + +func TestPollerAddRemove(t *testing.T) { + w := NewPollingWatcher() + + if err := w.Add("no-such-file"); err == nil { + t.Fatal("should have gotten error when adding a non-existant file") + } + if err := w.Remove("no-such-file"); err == nil { + t.Fatal("should have gotten error when removing non-existant watch") + } + + f, err := ioutil.TempFile("", "asdf") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(f.Name()) + + if err := w.Add(f.Name()); err != nil { + t.Fatal(err) + } + + if err := w.Remove(f.Name()); err != nil { + t.Fatal(err) + } +} + +func TestPollerEvent(t *testing.T) { + w := NewPollingWatcher() + + f, err := ioutil.TempFile("", "test-poller") + if err != nil { + t.Fatal("error creating temp file") + } + defer os.RemoveAll(f.Name()) + f.Close() + + if err := w.Add(f.Name()); err != nil { + t.Fatal(err) + } + + select { + case <-w.Events(): + t.Fatal("got event before anything happened") + case <-w.Errors(): + t.Fatal("got error before anything happened") + default: + } + + if err := ioutil.WriteFile(f.Name(), []byte("hello"), 644); err != nil { + t.Fatal(err) + } + if err := assertEvent(w, fsnotify.Write); err != nil { + t.Fatal(err) + } + + if err := os.Chmod(f.Name(), 600); err != nil { + t.Fatal(err) + } + if err := assertEvent(w, fsnotify.Chmod); err != nil { + t.Fatal(err) + } + + if err := os.Remove(f.Name()); err != nil { + t.Fatal(err) + } + if err := assertEvent(w, fsnotify.Remove); err != nil { + t.Fatal(err) + } +} + +func TestPollerClose(t *testing.T) { + w := NewPollingWatcher() + if err := w.Close(); err != nil { + t.Fatal(err) + } + // test double-close + if err := w.Close(); err != nil { + t.Fatal(err) + } + + select { + case _, open := <-w.Events(): + if open { + t.Fatal("event chan should be closed") + } + default: + t.Fatal("event chan should be closed") + } + + select { + case _, open := <-w.Errors(): + if open { + t.Fatal("errors chan should be closed") + } + default: + t.Fatal("errors chan should be closed") + } + + f, err := ioutil.TempFile("", "asdf") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(f.Name()) + if err := w.Add(f.Name()); err == nil { + t.Fatal("should have gotten error adding watch for closed watcher") + } +} + +func assertEvent(w FileWatcher, eType fsnotify.Op) error { + var err error + select { + case e := <-w.Events(): + if e.Op != eType { + err = fmt.Errorf("got wrong event type, expected %q: %v", eType, e) + } + case e := <-w.Errors(): + err = fmt.Errorf("got unexpected error waiting for events %v: %v", eType, e) + case <-time.After(watchWaitTime * 3): + err = fmt.Errorf("timeout waiting for event %v", eType) + } + return err +}