1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/logger/loggerutils/logfile.go
Kir Kolyshkin 916eabd459 daemon.ContainerLogs(): fix resource leak on follow
When daemon.ContainerLogs() is called with options.follow=true
(as in "docker logs --follow"), the "loggerutils.followLogs()"
function never returns (even then the logs consumer is gone).
As a result, all the resources associated with it (including
an opened file descriptor for the log file being read, two FDs
for a pipe, and two FDs for inotify watch) are never released.

If this is repeated (such as by running "docker logs --follow"
and pressing Ctrl-C a few times), this results in DoS caused by
either hitting the limit of inotify watches, or the limit of
opened files. The only cure is daemon restart.

Apparently, what happens is:

1. logs producer (a container) is gone, calling (*LogWatcher).Close()
for all its readers (daemon/logger/jsonfilelog/jsonfilelog.go:175).

2. WatchClose() is properly handled by a dedicated goroutine in
followLogs(), cancelling the context.

3. Upon receiving the ctx.Done(), the code in followLogs()
(daemon/logger/loggerutils/logfile.go#L626-L638) keeps to
send messages _synchronously_ (which is OK for now).

4. Logs consumer is gone (Ctrl-C is pressed on a terminal running
"docker logs --follow"). Method (*LogWatcher).Close() is properly
called (see daemon/logs.go:114). Since it was called before and
due to to once.Do(), nothing happens (which is kinda good, as
otherwise it will panic on closing a closed channel).

5. A goroutine (see item 3 above) keeps sending log messages
synchronously to the logWatcher.Msg channel. Since the
channel reader is gone, the channel send operation blocks forever,
and resource cleanup set up in defer statements at the beginning
of followLogs() never happens.

Alas, the fix is somewhat complicated:

1. Distinguish between close from logs producer and logs consumer.
To that effect,
 - yet another channel is added to LogWatcher();
 - {Watch,}Close() are renamed to {Watch,}ProducerGone();
 - {Watch,}ConsumerGone() are added;

*NOTE* that ProducerGone()/WatchProducerGone() pair is ONLY needed
in order to stop ConsumerLogs(follow=true) when a container is stopped;
otherwise we're not interested in it. In other words, we're only
using it in followLogs().

2. Code that was doing (logWatcher*).Close() is modified to either call
ProducerGone() or ConsumerGone(), depending on the context.

3. Code that was waiting for WatchClose() is modified to wait for
either ConsumerGone() or ProducerGone(), or both, depending on the
context.

4. followLogs() are modified accordingly:
 - context cancellation is happening on WatchProducerGone(),
and once it's received the FileWatcher is closed and waitRead()
returns errDone on EOF (i.e. log rotation handling logic is disabled);
 - due to this, code that was writing synchronously to logWatcher.Msg
can be and is removed as the code above it handles this case;
 - function returns once ConsumerGone is received, freeing all the
resources -- this is the bugfix itself.

While at it,

1. Let's also remove the ctx usage to simplify the code a bit.
It was introduced by commit a69a59ffc7 ("Decouple removing the
fileWatcher from reading") in order to fix a bug. The bug was actually
a deadlock in fsnotify, and the fix was just a workaround. Since then
the fsnofify bug has been fixed, and a new fsnotify was vendored in.
For more details, please see
https://github.com/moby/moby/pull/27782#issuecomment-416794490

2. Since `(*filePoller).Close()` is fixed to remove all the files
being watched, there is no need to explicitly call
fileWatcher.Remove(name) anymore, so get rid of the extra code.

Should fix https://github.com/moby/moby/issues/37391

Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2018-09-06 11:47:42 -07:00

714 lines
18 KiB
Go

package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/pubsub"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const tmpLogfileSuffix = ".tmp"
// rotateFileMetadata is a metadata of the gzip header of the compressed log file
type rotateFileMetadata struct {
LastTime time.Time `json:"lastTime,omitempty"`
}
// refCounter is a counter of logfile being referenced
type refCounter struct {
mu sync.Mutex
counter map[string]int
}
// Reference increase the reference counter for specified logfile
func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
rc.mu.Lock()
defer rc.mu.Unlock()
var (
file *os.File
err error
)
_, ok := rc.counter[fileName]
file, err = openRefFile(fileName, ok)
if err != nil {
return nil, err
}
if ok {
rc.counter[fileName]++
} else if file != nil {
rc.counter[file.Name()] = 1
}
return file, nil
}
// Dereference reduce the reference counter for specified logfile
func (rc *refCounter) Dereference(fileName string) error {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.counter[fileName]--
if rc.counter[fileName] <= 0 {
delete(rc.counter, fileName)
err := os.Remove(fileName)
if err != nil {
return err
}
}
return nil
}
// LogFile is Logger implementation for default Docker logging.
type LogFile struct {
mu sync.RWMutex // protects the logfile access
f *os.File // store for closing
closed bool
rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
capacity int64 // maximum size of each file
currentSize int64 // current size of the latest file
maxFiles int // maximum number of files
compress bool // whether old versions of log files are compressed
lastTimestamp time.Time // timestamp of the last log
filesRefCounter refCounter // keep reference-counted of decompressed files
notifyRotate *pubsub.Publisher
marshal logger.MarshalFunc
createDecoder makeDecoderFunc
getTailReader GetTailReaderFunc
perms os.FileMode
}
type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
// SizeReaderAt defines a ReaderAt that also reports its size.
// This is used for tailing log files.
type SizeReaderAt interface {
io.ReaderAt
Size() int64
}
// GetTailReaderFunc is used to truncate a reader to only read as much as is required
// in order to get the passed in number of log lines.
// It returns the sectioned reader, the number of lines that the section reader
// contains, and any error that occurs.
type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)
// NewLogFile creates new LogFile
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
if err != nil {
return nil, err
}
size, err := log.Seek(0, os.SEEK_END)
if err != nil {
return nil, err
}
return &LogFile{
f: log,
capacity: capacity,
currentSize: size,
maxFiles: maxFiles,
compress: compress,
filesRefCounter: refCounter{counter: make(map[string]int)},
notifyRotate: pubsub.NewPublisher(0, 1),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
getTailReader: getTailReader,
}, nil
}
// WriteLogEntry writes the provided log message to the current log file.
// This may trigger a rotation event if the max file/capacity limits are hit.
func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
b, err := w.marshal(msg)
if err != nil {
return errors.Wrap(err, "error marshalling log message")
}
logger.PutMessage(msg)
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return errors.New("cannot write because the output file was closed")
}
if err := w.checkCapacityAndRotate(); err != nil {
w.mu.Unlock()
return err
}
n, err := w.f.Write(b)
if err == nil {
w.currentSize += int64(n)
w.lastTimestamp = msg.Timestamp
}
w.mu.Unlock()
return err
}
func (w *LogFile) checkCapacityAndRotate() error {
if w.capacity == -1 {
return nil
}
if w.currentSize >= w.capacity {
w.rotateMu.Lock()
fname := w.f.Name()
if err := w.f.Close(); err != nil {
w.rotateMu.Unlock()
return errors.Wrap(err, "error closing file")
}
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
w.rotateMu.Unlock()
return err
}
file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
if err != nil {
w.rotateMu.Unlock()
return err
}
w.f = file
w.currentSize = 0
w.notifyRotate.Publish(struct{}{})
if w.maxFiles <= 1 || !w.compress {
w.rotateMu.Unlock()
return nil
}
go func() {
compressFile(fname+".1", w.lastTimestamp)
w.rotateMu.Unlock()
}()
}
return nil
}
func rotate(name string, maxFiles int, compress bool) error {
if maxFiles < 2 {
return nil
}
var extension string
if compress {
extension = ".gz"
}
lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension)
err := os.Remove(lastFile)
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "error removing oldest log file")
}
for i := maxFiles - 1; i > 1; i-- {
toPath := name + "." + strconv.Itoa(i) + extension
fromPath := name + "." + strconv.Itoa(i-1) + extension
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
return err
}
}
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func compressFile(fileName string, lastTimestamp time.Time) {
file, err := os.Open(fileName)
if err != nil {
logrus.Errorf("Failed to open log file: %v", err)
return
}
defer func() {
file.Close()
err := os.Remove(fileName)
if err != nil {
logrus.Errorf("Failed to remove source log file: %v", err)
}
}()
outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640)
if err != nil {
logrus.Errorf("Failed to open or create gzip log file: %v", err)
return
}
defer func() {
outFile.Close()
if err != nil {
os.Remove(fileName + ".gz")
}
}()
compressWriter := gzip.NewWriter(outFile)
defer compressWriter.Close()
// Add the last log entry timestramp to the gzip header
extra := rotateFileMetadata{}
extra.LastTime = lastTimestamp
compressWriter.Header.Extra, err = json.Marshal(&extra)
if err != nil {
// Here log the error only and don't return since this is just an optimization.
logrus.Warningf("Failed to marshal gzip header as JSON: %v", err)
}
_, err = pools.Copy(compressWriter, file)
if err != nil {
logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
return
}
}
// MaxFiles return maximum number of files
func (w *LogFile) MaxFiles() int {
return w.maxFiles
}
// Close closes underlying file and signals all readers to stop.
func (w *LogFile) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return nil
}
if err := w.f.Close(); err != nil {
return err
}
w.closed = true
return nil
}
// ReadLogs decodes entries from log files and sends them the passed in watcher
//
// Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
// TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
w.mu.RLock()
currentFile, err := os.Open(w.f.Name())
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
defer currentFile.Close()
currentChunk, err := newSectionReader(currentFile)
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
if config.Tail != 0 {
// TODO(@cpuguy83): Instead of opening every file, only get the files which
// are needed to tail.
// This is especially costly when compression is enabled.
files, err := w.openRotatedFiles(config)
w.mu.RUnlock()
if err != nil {
watcher.Err <- err
return
}
closeFiles := func() {
for _, f := range files {
f.Close()
fileName := f.Name()
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
}
}
}
}
readers := make([]SizeReaderAt, 0, len(files)+1)
for _, f := range files {
stat, err := f.Stat()
if err != nil {
watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
closeFiles()
return
}
readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
}
if currentChunk.Size() > 0 {
readers = append(readers, currentChunk)
}
tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config)
closeFiles()
w.mu.RLock()
}
if !config.Follow || w.closed {
w.mu.RUnlock()
return
}
w.mu.RUnlock()
notifyRotate := w.notifyRotate.Subscribe()
defer w.notifyRotate.Evict(notifyRotate)
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
}
func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
w.rotateMu.Lock()
defer w.rotateMu.Unlock()
defer func() {
if err == nil {
return
}
for _, f := range files {
f.Close()
if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
err := os.Remove(f.Name())
if err != nil && !os.IsNotExist(err) {
logrus.Warnf("Failed to remove logfile: %v", err)
}
}
}
}()
for i := w.maxFiles; i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
if err != nil {
if !os.IsNotExist(err) {
return nil, errors.Wrap(err, "error opening rotated log file")
}
fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
decompressedFileName := fileName + tmpLogfileSuffix
tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
if exists {
return os.Open(refFileName)
}
return decompressfile(fileName, refFileName, config.Since)
})
if err != nil {
if !os.IsNotExist(errors.Cause(err)) {
return nil, errors.Wrap(err, "error getting reference to decompressed log file")
}
continue
}
if tmpFile == nil {
// The log before `config.Since` does not need to read
break
}
files = append(files, tmpFile)
continue
}
files = append(files, f)
}
return files, nil
}
func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
cf, err := os.Open(fileName)
if err != nil {
return nil, errors.Wrap(err, "error opening file for decompression")
}
defer cf.Close()
rc, err := gzip.NewReader(cf)
if err != nil {
return nil, errors.Wrap(err, "error making gzip reader for compressed log file")
}
defer rc.Close()
// Extract the last log entry timestramp from the gzip header
extra := &rotateFileMetadata{}
err = json.Unmarshal(rc.Header.Extra, extra)
if err == nil && extra.LastTime.Before(since) {
return nil, nil
}
rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
if err != nil {
return nil, errors.Wrap(err, "error creating file for copying decompressed log stream")
}
_, err = pools.Copy(rs, rc)
if err != nil {
rs.Close()
rErr := os.Remove(rs.Name())
if rErr != nil && !os.IsNotExist(rErr) {
logrus.Errorf("Failed to remove logfile: %v", rErr)
}
return nil, errors.Wrap(err, "error while copying decompressed log stream to file")
}
return rs, nil
}
func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader
size, err := f.Seek(0, os.SEEK_END)
if err != nil {
return nil, errors.Wrap(err, "error getting current file size")
}
return io.NewSectionReader(f, 0, size), nil
}
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
nLines := config.Tail
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
go func() {
select {
case <-ctx.Done():
case <-watcher.WatchConsumerGone():
cancel()
}
}()
readers := make([]io.Reader, 0, len(files))
if config.Tail > 0 {
for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
tail, n, err := getTailReader(ctx, files[i], nLines)
if err != nil {
watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
return
}
nLines -= n
readers = append([]io.Reader{tail}, readers...)
}
} else {
for _, r := range files {
readers = append(readers, &wrappedReaderAt{ReaderAt: r})
}
}
rdr := io.MultiReader(readers...)
decodeLogLine := createDecoder(rdr)
for {
msg, err := decodeLogLine()
if err != nil {
if errors.Cause(err) != io.EOF {
watcher.Err <- err
}
return
}
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
continue
}
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
return
}
select {
case <-ctx.Done():
return
case watcher.Msg <- msg:
}
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
decodeLogLine := createDecoder(f)
name := f.Name()
fileWatcher, err := watchFile(name)
if err != nil {
logWatcher.Err <- err
return
}
defer func() {
f.Close()
fileWatcher.Close()
}()
var retries int
handleRotate := func() error {
f.Close()
fileWatcher.Remove(name)
// retry when the file doesn't exist
for retries := 0; retries <= 5; retries++ {
f, err = os.Open(name)
if err == nil || !os.IsNotExist(err) {
break
}
}
if err != nil {
return err
}
if err := fileWatcher.Add(name); err != nil {
return err
}
decodeLogLine = createDecoder(f)
return nil
}
errRetry := errors.New("retry")
errDone := errors.New("done")
waitRead := func() error {
select {
case e := <-fileWatcher.Events():
switch e.Op {
case fsnotify.Write:
decodeLogLine = createDecoder(f)
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-notifyRotate:
case <-logWatcher.WatchProducerGone():
return errDone
case <-logWatcher.WatchConsumerGone():
return errDone
}
if err := handleRotate(); err != nil {
return err
}
return nil
}
return errRetry
case err := <-fileWatcher.Errors():
logrus.Debugf("logger got error watching file: %v", err)
// Something happened, let's try and stay alive and create a new watcher
if retries <= 5 {
fileWatcher.Close()
fileWatcher, err = watchFile(name)
if err != nil {
return err
}
retries++
return errRetry
}
return err
case <-logWatcher.WatchProducerGone():
return errDone
case <-logWatcher.WatchConsumerGone():
return errDone
}
}
handleDecodeErr := func(err error) error {
if errors.Cause(err) != io.EOF {
return err
}
for {
err := waitRead()
if err == nil {
break
}
if err == errRetry {
continue
}
return err
}
return nil
}
// main loop
for {
msg, err := decodeLogLine()
if err != nil {
if err := handleDecodeErr(err); err != nil {
if err == errDone {
return
}
// we got an unrecoverable error, so return
logWatcher.Err <- err
return
}
// ready to try again
continue
}
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
if !until.IsZero() && msg.Timestamp.After(until) {
return
}
// send the message, unless the consumer is gone
select {
case logWatcher.Msg <- msg:
case <-logWatcher.WatchConsumerGone():
return
}
}
}
func watchFile(name string) (filenotify.FileWatcher, error) {
var fileWatcher filenotify.FileWatcher
if runtime.GOOS == "windows" {
// FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching.
// It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only.
// Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher
// doesn't work. Hence for Windows we will use poll based notifier.
fileWatcher = filenotify.NewPollingWatcher()
} else {
var err error
fileWatcher, err = filenotify.New()
if err != nil {
return nil, err
}
}
logger := logrus.WithFields(logrus.Fields{
"module": "logger",
"file": name,
})
if err := fileWatcher.Add(name); err != nil {
// we will retry using file poller.
logger.WithError(err).Warnf("falling back to file poller")
fileWatcher.Close()
fileWatcher = filenotify.NewPollingWatcher()
if err := fileWatcher.Add(name); err != nil {
fileWatcher.Close()
logger.WithError(err).Debugf("error watching log file for modifications")
return nil, err
}
}
return fileWatcher, nil
}
type wrappedReaderAt struct {
io.ReaderAt
pos int64
}
func (r *wrappedReaderAt) Read(p []byte) (int, error) {
n, err := r.ReaderAt.ReadAt(p, r.pos)
r.pos += int64(n)
return n, err
}