diff --git a/vendor.conf b/vendor.conf index 33e2dc8445..d651f64408 100644 --- a/vendor.conf +++ b/vendor.conf @@ -95,7 +95,7 @@ github.com/philhofer/fwd 98c11a7a6ec829d672b03833c3d69a7fae1ca972 github.com/tinylib/msgp 3b556c64540842d4f82967be066a7f7fffc3adad # fsnotify -github.com/fsnotify/fsnotify 4da3e2cfbabc9f751898f250b49f2439785783a1 +github.com/fsnotify/fsnotify v1.4.7 # awslogs deps github.com/aws/aws-sdk-go v1.12.66 diff --git a/vendor/github.com/fsnotify/fsnotify/kqueue.go b/vendor/github.com/fsnotify/fsnotify/kqueue.go index c2b4acb18d..86e76a3d67 100644 --- a/vendor/github.com/fsnotify/fsnotify/kqueue.go +++ b/vendor/github.com/fsnotify/fsnotify/kqueue.go @@ -22,7 +22,7 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan bool // Channel for sending a "quit message" to the reader goroutine + done chan struct{} // Channel for sending a "quit message" to the reader goroutine kq int // File descriptor (as returned by the kqueue() syscall). @@ -56,7 +56,7 @@ func NewWatcher() (*Watcher, error) { externalWatches: make(map[string]bool), Events: make(chan Event), Errors: make(chan error), - done: make(chan bool), + done: make(chan struct{}), } go w.readEvents() @@ -71,10 +71,8 @@ func (w *Watcher) Close() error { return nil } w.isClosed = true - w.mu.Unlock() // copy paths to remove while locked - w.mu.Lock() var pathsToRemove = make([]string, 0, len(w.watches)) for name := range w.watches { pathsToRemove = append(pathsToRemove, name) @@ -82,15 +80,12 @@ func (w *Watcher) Close() error { w.mu.Unlock() // unlock before calling Remove, which also locks - var err error for _, name := range pathsToRemove { - if e := w.Remove(name); e != nil && err == nil { - err = e - } + w.Remove(name) } - // Send "quit" message to the reader goroutine: - w.done <- true + // send a "quit" message to the reader goroutine + close(w.done) return nil } @@ -266,17 +261,12 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { func (w *Watcher) readEvents() { eventBuffer := make([]unix.Kevent_t, 10) +loop: for { // See if there is a message on the "done" channel select { case <-w.done: - err := unix.Close(w.kq) - if err != nil { - w.Errors <- err - } - close(w.Events) - close(w.Errors) - return + break loop default: } @@ -284,7 +274,11 @@ func (w *Watcher) readEvents() { kevents, err := read(w.kq, eventBuffer, &keventWaitTime) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != unix.EINTR { - w.Errors <- err + select { + case w.Errors <- err: + case <-w.done: + break loop + } continue } @@ -319,8 +313,12 @@ func (w *Watcher) readEvents() { if path.isDir && event.Op&Write == Write && !(event.Op&Remove == Remove) { w.sendDirectoryChangeEvents(event.Name) } else { - // Send the event on the Events channel - w.Events <- event + // Send the event on the Events channel. + select { + case w.Events <- event: + case <-w.done: + break loop + } } if event.Op&Remove == Remove { @@ -352,6 +350,18 @@ func (w *Watcher) readEvents() { kevents = kevents[1:] } } + + // cleanup + err := unix.Close(w.kq) + if err != nil { + // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors. + select { + case w.Errors <- err: + default: + } + } + close(w.Events) + close(w.Errors) } // newEvent returns an platform-independent Event based on kqueue Fflags. @@ -407,7 +417,11 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { // Get all files files, err := ioutil.ReadDir(dirPath) if err != nil { - w.Errors <- err + select { + case w.Errors <- err: + case <-w.done: + return + } } // Search for new files @@ -428,7 +442,11 @@ func (w *Watcher) sendFileCreatedEventIfNew(filePath string, fileInfo os.FileInf w.mu.Unlock() if !doesExist { // Send create event - w.Events <- newCreateEvent(filePath) + select { + case w.Events <- newCreateEvent(filePath): + case <-w.done: + return + } } // like watchDirectoryFiles (but without doing another ReadDir)