mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #17417 from cpuguy83/15815_add_log_file_poller
Fallback to file polling for jsonlog reader on err
This commit is contained in:
commit
42fa8e3128
6 changed files with 614 additions and 201 deletions
|
@ -7,29 +7,20 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"gopkg.in/fsnotify.v1"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/daemon/logger"
|
"github.com/docker/docker/daemon/logger"
|
||||||
"github.com/docker/docker/pkg/ioutils"
|
|
||||||
"github.com/docker/docker/pkg/jsonlog"
|
"github.com/docker/docker/pkg/jsonlog"
|
||||||
"github.com/docker/docker/pkg/pubsub"
|
"github.com/docker/docker/pkg/pubsub"
|
||||||
"github.com/docker/docker/pkg/tailfile"
|
|
||||||
"github.com/docker/docker/pkg/timeutils"
|
"github.com/docker/docker/pkg/timeutils"
|
||||||
"github.com/docker/docker/pkg/units"
|
"github.com/docker/docker/pkg/units"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// Name is the name of the file that the jsonlogger logs to.
|
||||||
// Name is the name of the file that the jsonlogger logs to.
|
const Name = "json-file"
|
||||||
Name = "json-file"
|
|
||||||
maxJSONDecodeRetry = 20000
|
|
||||||
)
|
|
||||||
|
|
||||||
// JSONFileLogger is Logger implementation for default Docker logging.
|
// JSONFileLogger is Logger implementation for default Docker logging.
|
||||||
type JSONFileLogger struct {
|
type JSONFileLogger struct {
|
||||||
|
@ -228,193 +219,3 @@ func (l *JSONFileLogger) Close() error {
|
||||||
func (l *JSONFileLogger) Name() string {
|
func (l *JSONFileLogger) Name() string {
|
||||||
return Name
|
return Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
|
||||||
l.Reset()
|
|
||||||
if err := dec.Decode(l); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
msg := &logger.Message{
|
|
||||||
Source: l.Stream,
|
|
||||||
Timestamp: l.Created,
|
|
||||||
Line: []byte(l.Log),
|
|
||||||
}
|
|
||||||
return msg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadLogs implements the logger's LogReader interface for the logs
|
|
||||||
// created by this driver.
|
|
||||||
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
|
||||||
logWatcher := logger.NewLogWatcher()
|
|
||||||
|
|
||||||
go l.readLogs(logWatcher, config)
|
|
||||||
return logWatcher
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
|
||||||
defer close(logWatcher.Msg)
|
|
||||||
|
|
||||||
pth := l.ctx.LogPath
|
|
||||||
var files []io.ReadSeeker
|
|
||||||
for i := l.n; i > 1; i-- {
|
|
||||||
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
|
|
||||||
if err != nil {
|
|
||||||
if !os.IsNotExist(err) {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
files = append(files, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
latestFile, err := os.Open(pth)
|
|
||||||
if err != nil {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer latestFile.Close()
|
|
||||||
|
|
||||||
files = append(files, latestFile)
|
|
||||||
tailer := ioutils.MultiReadSeeker(files...)
|
|
||||||
|
|
||||||
if config.Tail != 0 {
|
|
||||||
tailFile(tailer, logWatcher, config.Tail, config.Since)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !config.Follow {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Tail >= 0 {
|
|
||||||
latestFile.Seek(0, os.SEEK_END)
|
|
||||||
}
|
|
||||||
|
|
||||||
l.mu.Lock()
|
|
||||||
l.readers[logWatcher] = struct{}{}
|
|
||||||
l.mu.Unlock()
|
|
||||||
|
|
||||||
notifyRotate := l.notifyRotate.Subscribe()
|
|
||||||
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
|
|
||||||
|
|
||||||
l.mu.Lock()
|
|
||||||
delete(l.readers, logWatcher)
|
|
||||||
l.mu.Unlock()
|
|
||||||
|
|
||||||
l.notifyRotate.Evict(notifyRotate)
|
|
||||||
}
|
|
||||||
|
|
||||||
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
|
|
||||||
var rdr io.Reader = f
|
|
||||||
if tail > 0 {
|
|
||||||
ls, err := tailfile.TailFile(f, tail)
|
|
||||||
if err != nil {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
|
|
||||||
}
|
|
||||||
dec := json.NewDecoder(rdr)
|
|
||||||
l := &jsonlog.JSONLog{}
|
|
||||||
for {
|
|
||||||
msg, err := decodeLogLine(dec, l)
|
|
||||||
if err != nil {
|
|
||||||
if err != io.EOF {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
logWatcher.Msg <- msg
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
|
|
||||||
dec := json.NewDecoder(f)
|
|
||||||
l := &jsonlog.JSONLog{}
|
|
||||||
fileWatcher, err := fsnotify.NewWatcher()
|
|
||||||
if err != nil {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer fileWatcher.Close()
|
|
||||||
if err := fileWatcher.Add(f.Name()); err != nil {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var retries int
|
|
||||||
for {
|
|
||||||
msg, err := decodeLogLine(dec, l)
|
|
||||||
if err != nil {
|
|
||||||
if err != io.EOF {
|
|
||||||
// try again because this shouldn't happen
|
|
||||||
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
|
||||||
dec = json.NewDecoder(f)
|
|
||||||
retries++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
|
||||||
// remaining data in the parser's buffer while an io.EOF occurs.
|
|
||||||
// If the json logger writes a partial json log entry to the disk
|
|
||||||
// while at the same time the decoder tries to decode it, the race codition happens.
|
|
||||||
if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
|
|
||||||
reader := io.MultiReader(dec.Buffered(), f)
|
|
||||||
dec = json.NewDecoder(reader)
|
|
||||||
retries++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
logWatcher.Err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-fileWatcher.Events:
|
|
||||||
dec = json.NewDecoder(f)
|
|
||||||
continue
|
|
||||||
case <-fileWatcher.Errors:
|
|
||||||
logWatcher.Err <- err
|
|
||||||
return
|
|
||||||
case <-logWatcher.WatchClose():
|
|
||||||
return
|
|
||||||
case <-notifyRotate:
|
|
||||||
fileWatcher.Remove(f.Name())
|
|
||||||
|
|
||||||
f, err = os.Open(f.Name())
|
|
||||||
if err != nil {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := fileWatcher.Add(f.Name()); err != nil {
|
|
||||||
logWatcher.Err <- err
|
|
||||||
}
|
|
||||||
dec = json.NewDecoder(f)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
retries = 0 // reset retries since we've succeeded
|
|
||||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case logWatcher.Msg <- msg:
|
|
||||||
case <-logWatcher.WatchClose():
|
|
||||||
logWatcher.Msg <- msg
|
|
||||||
for {
|
|
||||||
msg, err := decodeLogLine(dec, l)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
logWatcher.Msg <- msg
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
216
daemon/logger/jsonfilelog/read.go
Normal file
216
daemon/logger/jsonfilelog/read.go
Normal file
|
@ -0,0 +1,216 @@
|
||||||
|
package jsonfilelog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/docker/docker/daemon/logger"
|
||||||
|
"github.com/docker/docker/pkg/filenotify"
|
||||||
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
|
"github.com/docker/docker/pkg/jsonlog"
|
||||||
|
"github.com/docker/docker/pkg/tailfile"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxJSONDecodeRetry = 20000
|
||||||
|
|
||||||
|
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
|
||||||
|
l.Reset()
|
||||||
|
if err := dec.Decode(l); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
msg := &logger.Message{
|
||||||
|
Source: l.Stream,
|
||||||
|
Timestamp: l.Created,
|
||||||
|
Line: []byte(l.Log),
|
||||||
|
}
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadLogs implements the logger's LogReader interface for the logs
|
||||||
|
// created by this driver.
|
||||||
|
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
||||||
|
logWatcher := logger.NewLogWatcher()
|
||||||
|
|
||||||
|
go l.readLogs(logWatcher, config)
|
||||||
|
return logWatcher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||||
|
defer close(logWatcher.Msg)
|
||||||
|
|
||||||
|
pth := l.ctx.LogPath
|
||||||
|
var files []io.ReadSeeker
|
||||||
|
for i := l.n; i > 1; i-- {
|
||||||
|
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
|
||||||
|
if err != nil {
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
files = append(files, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
latestFile, err := os.Open(pth)
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer latestFile.Close()
|
||||||
|
|
||||||
|
files = append(files, latestFile)
|
||||||
|
tailer := ioutils.MultiReadSeeker(files...)
|
||||||
|
|
||||||
|
if config.Tail != 0 {
|
||||||
|
tailFile(tailer, logWatcher, config.Tail, config.Since)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !config.Follow {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Tail >= 0 {
|
||||||
|
latestFile.Seek(0, os.SEEK_END)
|
||||||
|
}
|
||||||
|
|
||||||
|
l.mu.Lock()
|
||||||
|
l.readers[logWatcher] = struct{}{}
|
||||||
|
l.mu.Unlock()
|
||||||
|
|
||||||
|
notifyRotate := l.notifyRotate.Subscribe()
|
||||||
|
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
|
||||||
|
|
||||||
|
l.mu.Lock()
|
||||||
|
delete(l.readers, logWatcher)
|
||||||
|
l.mu.Unlock()
|
||||||
|
|
||||||
|
l.notifyRotate.Evict(notifyRotate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
|
||||||
|
var rdr io.Reader = f
|
||||||
|
if tail > 0 {
|
||||||
|
ls, err := tailfile.TailFile(f, tail)
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
|
||||||
|
}
|
||||||
|
dec := json.NewDecoder(rdr)
|
||||||
|
l := &jsonlog.JSONLog{}
|
||||||
|
for {
|
||||||
|
msg, err := decodeLogLine(dec, l)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logWatcher.Msg <- msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
|
||||||
|
dec := json.NewDecoder(f)
|
||||||
|
l := &jsonlog.JSONLog{}
|
||||||
|
|
||||||
|
fileWatcher, err := filenotify.New()
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
}
|
||||||
|
defer fileWatcher.Close()
|
||||||
|
|
||||||
|
var retries int
|
||||||
|
for {
|
||||||
|
msg, err := decodeLogLine(dec, l)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
// try again because this shouldn't happen
|
||||||
|
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
|
||||||
|
dec = json.NewDecoder(f)
|
||||||
|
retries++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// io.ErrUnexpectedEOF is returned from json.Decoder when there is
|
||||||
|
// remaining data in the parser's buffer while an io.EOF occurs.
|
||||||
|
// If the json logger writes a partial json log entry to the disk
|
||||||
|
// while at the same time the decoder tries to decode it, the race codition happens.
|
||||||
|
if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
|
||||||
|
reader := io.MultiReader(dec.Buffered(), f)
|
||||||
|
dec = json.NewDecoder(reader)
|
||||||
|
retries++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.WithField("logger", "json-file").Debugf("waiting for events")
|
||||||
|
if err := fileWatcher.Add(f.Name()); err != nil {
|
||||||
|
logrus.WithField("logger", "json-file").Warn("falling back to file poller")
|
||||||
|
fileWatcher.Close()
|
||||||
|
fileWatcher = filenotify.NewPollingWatcher()
|
||||||
|
if err := fileWatcher.Add(f.Name()); err != nil {
|
||||||
|
logrus.Errorf("error watching log file for modifications: %v", err)
|
||||||
|
logWatcher.Err <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-fileWatcher.Events():
|
||||||
|
dec = json.NewDecoder(f)
|
||||||
|
fileWatcher.Remove(f.Name())
|
||||||
|
continue
|
||||||
|
case <-fileWatcher.Errors():
|
||||||
|
fileWatcher.Remove(f.Name())
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
case <-logWatcher.WatchClose():
|
||||||
|
fileWatcher.Remove(f.Name())
|
||||||
|
return
|
||||||
|
case <-notifyRotate:
|
||||||
|
f, err = os.Open(f.Name())
|
||||||
|
if err != nil {
|
||||||
|
logWatcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dec = json.NewDecoder(f)
|
||||||
|
fileWatcher.Remove(f.Name())
|
||||||
|
fileWatcher.Add(f.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
retries = 0 // reset retries since we've succeeded
|
||||||
|
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case logWatcher.Msg <- msg:
|
||||||
|
case <-logWatcher.WatchClose():
|
||||||
|
logWatcher.Msg <- msg
|
||||||
|
for {
|
||||||
|
msg, err := decodeLogLine(dec, l)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logWatcher.Msg <- msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
40
pkg/filenotify/filenotify.go
Normal file
40
pkg/filenotify/filenotify.go
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
// Package filenotify provides a mechanism for watching file(s) for changes.
|
||||||
|
// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support.
|
||||||
|
// These are wrapped up in a common interface so that either can be used interchangably in your code.
|
||||||
|
package filenotify
|
||||||
|
|
||||||
|
import "gopkg.in/fsnotify.v1"
|
||||||
|
|
||||||
|
// FileWatcher is an interface for implementing file notification watchers
|
||||||
|
type FileWatcher interface {
|
||||||
|
Events() <-chan fsnotify.Event
|
||||||
|
Errors() <-chan error
|
||||||
|
Add(name string) error
|
||||||
|
Remove(name string) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// New tries to use an fs-event watcher, and falls back to the poller if there is an error
|
||||||
|
func New() (FileWatcher, error) {
|
||||||
|
if watcher, err := NewEventWatcher(); err == nil {
|
||||||
|
return watcher, nil
|
||||||
|
}
|
||||||
|
return NewPollingWatcher(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPollingWatcher returns a poll-based file watcher
|
||||||
|
func NewPollingWatcher() FileWatcher {
|
||||||
|
return &filePoller{
|
||||||
|
events: make(chan fsnotify.Event),
|
||||||
|
errors: make(chan error),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEventWatcher returns an fs-event based file watcher
|
||||||
|
func NewEventWatcher() (FileWatcher, error) {
|
||||||
|
watcher, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &fsNotifyWatcher{watcher}, nil
|
||||||
|
}
|
18
pkg/filenotify/fsnotify.go
Normal file
18
pkg/filenotify/fsnotify.go
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
package filenotify
|
||||||
|
|
||||||
|
import "gopkg.in/fsnotify.v1"
|
||||||
|
|
||||||
|
// fsNotify wraps the fsnotify package to satisfy the FileNotifer interface
|
||||||
|
type fsNotifyWatcher struct {
|
||||||
|
*fsnotify.Watcher
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEvents returns the fsnotify event channel receiver
|
||||||
|
func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event {
|
||||||
|
return w.Watcher.Events
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetErrors returns the fsnotify error channel receiver
|
||||||
|
func (w *fsNotifyWatcher) Errors() <-chan error {
|
||||||
|
return w.Watcher.Errors
|
||||||
|
}
|
205
pkg/filenotify/poller.go
Normal file
205
pkg/filenotify/poller.go
Normal file
|
@ -0,0 +1,205 @@
|
||||||
|
package filenotify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
|
||||||
|
"gopkg.in/fsnotify.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// errPollerClosed is returned when the poller is closed
|
||||||
|
errPollerClosed = errors.New("poller is closed")
|
||||||
|
// errNoSuchPoller is returned when trying to remove a watch that doesn't exist
|
||||||
|
errNoSuchWatch = errors.New("poller does not exist")
|
||||||
|
)
|
||||||
|
|
||||||
|
// watchWaitTime is the time to wait between file poll loops
|
||||||
|
const watchWaitTime = 200 * time.Millisecond
|
||||||
|
|
||||||
|
// filePoller is used to poll files for changes, especially in cases where fsnotify
|
||||||
|
// can't be run (e.g. when inotify handles are exhausted)
|
||||||
|
// filePoller satifies the FileWatcher interface
|
||||||
|
type filePoller struct {
|
||||||
|
// watches is the list of files currently being polled, close the associated channel to stop the watch
|
||||||
|
watches map[string]chan struct{}
|
||||||
|
// events is the channel to listen to for watch events
|
||||||
|
events chan fsnotify.Event
|
||||||
|
// errors is the channel to listen to for watch errors
|
||||||
|
errors chan error
|
||||||
|
// mu locks the poller for modification
|
||||||
|
mu sync.Mutex
|
||||||
|
// closed is used to specify when the poller has already closed
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds a filename to the list of watches
|
||||||
|
// once added the file is polled for changes in a separate goroutine
|
||||||
|
func (w *filePoller) Add(name string) error {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
if w.closed == true {
|
||||||
|
return errPollerClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Open(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fi, err := os.Stat(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.watches == nil {
|
||||||
|
w.watches = make(map[string]chan struct{})
|
||||||
|
}
|
||||||
|
if _, exists := w.watches[name]; exists {
|
||||||
|
return fmt.Errorf("watch exists")
|
||||||
|
}
|
||||||
|
chClose := make(chan struct{})
|
||||||
|
w.watches[name] = chClose
|
||||||
|
|
||||||
|
go w.watch(f, fi, chClose)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove stops and removes watch with the specified name
|
||||||
|
func (w *filePoller) Remove(name string) error {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
return w.remove(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *filePoller) remove(name string) error {
|
||||||
|
if w.closed == true {
|
||||||
|
return errPollerClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
chClose, exists := w.watches[name]
|
||||||
|
if !exists {
|
||||||
|
return errNoSuchWatch
|
||||||
|
}
|
||||||
|
close(chClose)
|
||||||
|
delete(w.watches, name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Events returns the event channel
|
||||||
|
// This is used for notifications on events about watched files
|
||||||
|
func (w *filePoller) Events() <-chan fsnotify.Event {
|
||||||
|
return w.events
|
||||||
|
}
|
||||||
|
|
||||||
|
// Errors returns the errors channel
|
||||||
|
// This is used for notifications about errors on watched files
|
||||||
|
func (w *filePoller) Errors() <-chan error {
|
||||||
|
return w.errors
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the poller
|
||||||
|
// All watches are stopped, removed, and the poller cannot be added to
|
||||||
|
func (w *filePoller) Close() error {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
if w.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
w.closed = true
|
||||||
|
for name := range w.watches {
|
||||||
|
w.remove(name)
|
||||||
|
delete(w.watches, name)
|
||||||
|
}
|
||||||
|
close(w.events)
|
||||||
|
close(w.errors)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendEvent publishes the specified event to the events channel
|
||||||
|
func (w *filePoller) sendEvent(e fsnotify.Event, chClose <-chan struct{}) error {
|
||||||
|
select {
|
||||||
|
case w.events <- e:
|
||||||
|
case <-chClose:
|
||||||
|
return fmt.Errorf("closed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendErr publishes the specified error to the errors channel
|
||||||
|
func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error {
|
||||||
|
select {
|
||||||
|
case w.errors <- e:
|
||||||
|
case <-chClose:
|
||||||
|
return fmt.Errorf("closed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// watch is responsible for polling the specified file for changes
|
||||||
|
// upon finding changes to a file or errors, sendEvent/sendErr is called
|
||||||
|
func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) {
|
||||||
|
for {
|
||||||
|
time.Sleep(watchWaitTime)
|
||||||
|
select {
|
||||||
|
case <-chClose:
|
||||||
|
logrus.Debugf("watch for %s closed", f.Name())
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
fi, err := os.Stat(f.Name())
|
||||||
|
if err != nil {
|
||||||
|
// if we got an error here and lastFi is not set, we can presume that nothing has changed
|
||||||
|
// This should be safe since before `watch()` is called, a stat is performed, there is any error `watch` is not called
|
||||||
|
if lastFi == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// If it doesn't exist at this point, it must have been removed
|
||||||
|
// no need to send the error here since this is a valid operation
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Remove, Name: f.Name()}, chClose); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastFi = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// at this point, send the error
|
||||||
|
if err := w.sendErr(err, chClose); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastFi == nil {
|
||||||
|
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Create, Name: fi.Name()}, chClose); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastFi = fi
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if fi.Mode() != lastFi.Mode() {
|
||||||
|
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Chmod, Name: fi.Name()}, chClose); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastFi = fi
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if fi.ModTime() != lastFi.ModTime() || fi.Size() != lastFi.Size() {
|
||||||
|
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Write, Name: fi.Name()}, chClose); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastFi = fi
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
133
pkg/filenotify/poller_test.go
Normal file
133
pkg/filenotify/poller_test.go
Normal file
|
@ -0,0 +1,133 @@
|
||||||
|
package filenotify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/fsnotify.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPollerAddRemove(t *testing.T) {
|
||||||
|
w := NewPollingWatcher()
|
||||||
|
|
||||||
|
if err := w.Add("no-such-file"); err == nil {
|
||||||
|
t.Fatal("should have gotten error when adding a non-existant file")
|
||||||
|
}
|
||||||
|
if err := w.Remove("no-such-file"); err == nil {
|
||||||
|
t.Fatal("should have gotten error when removing non-existant watch")
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := ioutil.TempFile("", "asdf")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(f.Name())
|
||||||
|
|
||||||
|
if err := w.Add(f.Name()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.Remove(f.Name()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPollerEvent(t *testing.T) {
|
||||||
|
w := NewPollingWatcher()
|
||||||
|
|
||||||
|
f, err := ioutil.TempFile("", "test-poller")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating temp file")
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(f.Name())
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
if err := w.Add(f.Name()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-w.Events():
|
||||||
|
t.Fatal("got event before anything happened")
|
||||||
|
case <-w.Errors():
|
||||||
|
t.Fatal("got error before anything happened")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ioutil.WriteFile(f.Name(), []byte("hello"), 644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := assertEvent(w, fsnotify.Write); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Chmod(f.Name(), 600); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := assertEvent(w, fsnotify.Chmod); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Remove(f.Name()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := assertEvent(w, fsnotify.Remove); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPollerClose(t *testing.T) {
|
||||||
|
w := NewPollingWatcher()
|
||||||
|
if err := w.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// test double-close
|
||||||
|
if err := w.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, open := <-w.Events():
|
||||||
|
if open {
|
||||||
|
t.Fatal("event chan should be closed")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Fatal("event chan should be closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case _, open := <-w.Errors():
|
||||||
|
if open {
|
||||||
|
t.Fatal("errors chan should be closed")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Fatal("errors chan should be closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := ioutil.TempFile("", "asdf")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(f.Name())
|
||||||
|
if err := w.Add(f.Name()); err == nil {
|
||||||
|
t.Fatal("should have gotten error adding watch for closed watcher")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertEvent(w FileWatcher, eType fsnotify.Op) error {
|
||||||
|
var err error
|
||||||
|
select {
|
||||||
|
case e := <-w.Events():
|
||||||
|
if e.Op != eType {
|
||||||
|
err = fmt.Errorf("got wrong event type, expected %q: %v", eType, e)
|
||||||
|
}
|
||||||
|
case e := <-w.Errors():
|
||||||
|
err = fmt.Errorf("got unexpected error waiting for events %v: %v", eType, e)
|
||||||
|
case <-time.After(watchWaitTime * 3):
|
||||||
|
err = fmt.Errorf("timeout waiting for event %v", eType)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in a new issue