package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" import ( "compress/gzip" "context" "encoding/json" "fmt" "io" "os" "runtime" "strconv" "strings" "sync" "time" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pubsub" "github.com/fsnotify/fsnotify" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) const tmpLogfileSuffix = ".tmp" // rotateFileMetadata is a metadata of the gzip header of the compressed log file type rotateFileMetadata struct { LastTime time.Time `json:"lastTime,omitempty"` } // refCounter is a counter of logfile being referenced type refCounter struct { mu sync.Mutex counter map[string]int } // Reference increase the reference counter for specified logfile func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) { rc.mu.Lock() defer rc.mu.Unlock() var ( file *os.File err error ) _, ok := rc.counter[fileName] file, err = openRefFile(fileName, ok) if err != nil { return nil, err } if ok { rc.counter[fileName]++ } else if file != nil { rc.counter[file.Name()] = 1 } return file, nil } // Dereference reduce the reference counter for specified logfile func (rc *refCounter) Dereference(fileName string) error { rc.mu.Lock() defer rc.mu.Unlock() rc.counter[fileName]-- if rc.counter[fileName] <= 0 { delete(rc.counter, fileName) err := os.Remove(fileName) if err != nil && !os.IsNotExist(err) { return err } } return nil } // LogFile is Logger implementation for default Docker logging. type LogFile struct { mu sync.RWMutex // protects the logfile access f *os.File // store for closing closed bool rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed capacity int64 // maximum size of each file currentSize int64 // current size of the latest file maxFiles int // maximum number of files compress bool // whether old versions of log files are compressed lastTimestamp time.Time // timestamp of the last log filesRefCounter refCounter // keep reference-counted of decompressed files notifyReaders *pubsub.Publisher marshal logger.MarshalFunc createDecoder MakeDecoderFn getTailReader GetTailReaderFunc perms os.FileMode } // MakeDecoderFn creates a decoder type MakeDecoderFn func(rdr io.Reader) Decoder // Decoder is for reading logs // It is created by the log reader by calling the `MakeDecoderFunc` type Decoder interface { // Reset resets the decoder // Reset is called for certain events, such as log rotations Reset(io.Reader) // Decode decodes the next log messeage from the stream Decode() (*logger.Message, error) // Close signals to the decoder that it can release whatever resources it was using. Close() } // SizeReaderAt defines a ReaderAt that also reports its size. // This is used for tailing log files. type SizeReaderAt interface { io.ReaderAt Size() int64 } // GetTailReaderFunc is used to truncate a reader to only read as much as is required // in order to get the passed in number of log lines. // It returns the sectioned reader, the number of lines that the section reader // contains, and any error that occurs. type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error) // NewLogFile creates new LogFile func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc MakeDecoderFn, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) { log, err := openFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) if err != nil { return nil, err } size, err := log.Seek(0, io.SeekEnd) if err != nil { return nil, err } return &LogFile{ f: log, capacity: capacity, currentSize: size, maxFiles: maxFiles, compress: compress, filesRefCounter: refCounter{counter: make(map[string]int)}, notifyReaders: pubsub.NewPublisher(0, 1), marshal: marshaller, createDecoder: decodeFunc, perms: perms, getTailReader: getTailReader, }, nil } // WriteLogEntry writes the provided log message to the current log file. // This may trigger a rotation event if the max file/capacity limits are hit. func (w *LogFile) WriteLogEntry(msg *logger.Message) error { b, err := w.marshal(msg) if err != nil { return errors.Wrap(err, "error marshalling log message") } logger.PutMessage(msg) w.mu.Lock() if w.closed { w.mu.Unlock() return errors.New("cannot write because the output file was closed") } if err := w.checkCapacityAndRotate(); err != nil { w.mu.Unlock() return errors.Wrap(err, "error rotating log file") } n, err := w.f.Write(b) if err == nil { w.currentSize += int64(n) w.lastTimestamp = msg.Timestamp } w.mu.Unlock() return errors.Wrap(err, "error writing log entry") } func (w *LogFile) checkCapacityAndRotate() (retErr error) { if w.capacity == -1 { return nil } if w.currentSize < w.capacity { return nil } w.rotateMu.Lock() noCompress := w.maxFiles <= 1 || !w.compress defer func() { // If we aren't going to run the goroutine to compress the log file, then we need to unlock in this function. // Otherwise the lock will be released in the goroutine that handles compression. if retErr != nil || noCompress { w.rotateMu.Unlock() } }() fname := w.f.Name() if err := w.f.Close(); err != nil { // if there was an error during a prior rotate, the file could already be closed if !errors.Is(err, os.ErrClosed) { return errors.Wrap(err, "error closing file") } } if err := rotate(fname, w.maxFiles, w.compress); err != nil { logrus.WithError(err).Warn("Error rotating log file, log data may have been lost") } else { var renameErr error for i := 0; i < 10; i++ { if renameErr = os.Rename(fname, fname+".1"); renameErr != nil && !os.IsNotExist(renameErr) { logrus.WithError(renameErr).WithField("file", fname).Debug("Error rotating current container log file, evicting readers and retrying") w.notifyReaders.Publish(renameErr) time.Sleep(100 * time.Millisecond) continue } break } if renameErr != nil { logrus.WithError(renameErr).Error("Error renaming current log file") } } file, err := openFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) if err != nil { return err } w.f = file w.currentSize = 0 w.notifyReaders.Publish(struct{}{}) if noCompress { return nil } ts := w.lastTimestamp go func() { if err := compressFile(fname+".1", ts); err != nil { logrus.WithError(err).Error("Error compressing log file after rotation") } w.rotateMu.Unlock() }() return nil } func rotate(name string, maxFiles int, compress bool) error { if maxFiles < 2 { return nil } var extension string if compress { extension = ".gz" } lastFile := fmt.Sprintf("%s.%d%s", name, maxFiles-1, extension) err := os.Remove(lastFile) if err != nil && !os.IsNotExist(err) { return errors.Wrap(err, "error removing oldest log file") } for i := maxFiles - 1; i > 1; i-- { toPath := name + "." + strconv.Itoa(i) + extension fromPath := name + "." + strconv.Itoa(i-1) + extension logrus.WithField("source", fromPath).WithField("target", toPath).Trace("Rotating log file") if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { return err } } return nil } func compressFile(fileName string, lastTimestamp time.Time) (retErr error) { file, err := open(fileName) if err != nil { if os.IsNotExist(err) { logrus.WithField("file", fileName).WithError(err).Debug("Could not open log file to compress") return nil } return errors.Wrap(err, "failed to open log file") } defer func() { file.Close() if retErr == nil { err := os.Remove(fileName) if err != nil && !os.IsNotExist(err) { retErr = errors.Wrap(err, "failed to remove source log file") } } }() outFile, err := openFile(fileName+".gz", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0640) if err != nil { return errors.Wrap(err, "failed to open or create gzip log file") } defer func() { outFile.Close() if retErr != nil { if err := os.Remove(fileName + ".gz"); err != nil && !os.IsExist(err) { logrus.WithError(err).Error("Error cleaning up after failed log compression") } } }() compressWriter := gzip.NewWriter(outFile) defer compressWriter.Close() // Add the last log entry timestamp to the gzip header extra := rotateFileMetadata{} extra.LastTime = lastTimestamp compressWriter.Header.Extra, err = json.Marshal(&extra) if err != nil { // Here log the error only and don't return since this is just an optimization. logrus.Warningf("Failed to marshal gzip header as JSON: %v", err) } _, err = pools.Copy(compressWriter, file) if err != nil { return errors.Wrapf(err, "error compressing log file %s", fileName) } return nil } // MaxFiles return maximum number of files func (w *LogFile) MaxFiles() int { return w.maxFiles } // Close closes underlying file and signals all readers to stop. func (w *LogFile) Close() error { w.mu.Lock() defer w.mu.Unlock() if w.closed { return nil } if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { return err } w.closed = true return nil } // ReadLogs decodes entries from log files and sends them the passed in watcher // // Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1. // TODO: Consider a different implementation which can effectively follow logs under frequent rotations. func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { w.mu.RLock() currentFile, err := open(w.f.Name()) if err != nil { w.mu.RUnlock() watcher.Err <- err return } defer currentFile.Close() dec := w.createDecoder(nil) defer dec.Close() currentChunk, err := newSectionReader(currentFile) if err != nil { w.mu.RUnlock() watcher.Err <- err return } notifyEvict := w.notifyReaders.SubscribeTopicWithBuffer(func(i interface{}) bool { _, ok := i.(error) return ok }, 1) defer w.notifyReaders.Evict(notifyEvict) if config.Tail != 0 { // TODO(@cpuguy83): Instead of opening every file, only get the files which // are needed to tail. // This is especially costly when compression is enabled. files, err := w.openRotatedFiles(config) w.mu.RUnlock() if err != nil { watcher.Err <- err return } closeFiles := func() { for _, f := range files { f.Close() fileName := f.Name() if strings.HasSuffix(fileName, tmpLogfileSuffix) { err := w.filesRefCounter.Dereference(fileName) if err != nil { logrus.WithError(err).WithField("file", fileName).Error("Failed to dereference the log file") } } } } readers := make([]SizeReaderAt, 0, len(files)+1) for _, f := range files { stat, err := f.Stat() if err != nil { watcher.Err <- errors.Wrap(err, "error reading size of rotated file") closeFiles() return } readers = append(readers, io.NewSectionReader(f, 0, stat.Size())) } if currentChunk.Size() > 0 { readers = append(readers, currentChunk) } ok := tailFiles(readers, watcher, dec, w.getTailReader, config, notifyEvict) closeFiles() if !ok { return } w.mu.RLock() } if !config.Follow || w.closed { w.mu.RUnlock() return } w.mu.RUnlock() notifyRotate := w.notifyReaders.SubscribeTopic(func(i interface{}) bool { _, ok := i.(struct{}) return ok }) defer w.notifyReaders.Evict(notifyRotate) followLogs(currentFile, watcher, notifyRotate, notifyEvict, dec, config.Since, config.Until) } func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) { w.rotateMu.Lock() defer w.rotateMu.Unlock() defer func() { if err == nil { return } for _, f := range files { f.Close() if strings.HasSuffix(f.Name(), tmpLogfileSuffix) { err := os.Remove(f.Name()) if err != nil && !os.IsNotExist(err) { logrus.Warnf("Failed to remove logfile: %v", err) } } } }() for i := w.maxFiles; i > 1; i-- { f, err := open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) if err != nil { if !os.IsNotExist(err) { return nil, errors.Wrap(err, "error opening rotated log file") } fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1) decompressedFileName := fileName + tmpLogfileSuffix tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) { if exists { return open(refFileName) } return decompressfile(fileName, refFileName, config.Since) }) if err != nil { if !errors.Is(err, os.ErrNotExist) { return nil, errors.Wrap(err, "error getting reference to decompressed log file") } continue } if tmpFile == nil { // The log before `config.Since` does not need to read break } files = append(files, tmpFile) continue } files = append(files, f) } return files, nil } func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) { cf, err := open(fileName) if err != nil { return nil, errors.Wrap(err, "error opening file for decompression") } defer cf.Close() rc, err := gzip.NewReader(cf) if err != nil { return nil, errors.Wrap(err, "error making gzip reader for compressed log file") } defer rc.Close() // Extract the last log entry timestramp from the gzip header extra := &rotateFileMetadata{} err = json.Unmarshal(rc.Header.Extra, extra) if err == nil && extra.LastTime.Before(since) { return nil, nil } rs, err := openFile(destFileName, os.O_CREATE|os.O_RDWR, 0640) if err != nil { return nil, errors.Wrap(err, "error creating file for copying decompressed log stream") } _, err = pools.Copy(rs, rc) if err != nil { rs.Close() rErr := os.Remove(rs.Name()) if rErr != nil && !os.IsNotExist(rErr) { logrus.Errorf("Failed to remove logfile: %v", rErr) } return nil, errors.Wrap(err, "error while copying decompressed log stream to file") } return rs, nil } func newSectionReader(f *os.File) (*io.SectionReader, error) { // seek to the end to get the size // we'll leave this at the end of the file since section reader does not advance the reader size, err := f.Seek(0, io.SeekEnd) if err != nil { return nil, errors.Wrap(err, "error getting current file size") } return io.NewSectionReader(f, 0, size), nil } func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, dec Decoder, getTailReader GetTailReaderFunc, config logger.ReadConfig, notifyEvict <-chan interface{}) (cont bool) { nLines := config.Tail ctx, cancel := context.WithCancel(context.Background()) defer cancel() cont = true // TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here. go func() { select { case err := <-notifyEvict: if err != nil { watcher.Err <- err.(error) cont = false cancel() } case <-ctx.Done(): case <-watcher.WatchConsumerGone(): cont = false cancel() } }() readers := make([]io.Reader, 0, len(files)) if config.Tail > 0 { for i := len(files) - 1; i >= 0 && nLines > 0; i-- { tail, n, err := getTailReader(ctx, files[i], nLines) if err != nil { watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing") return } nLines -= n readers = append([]io.Reader{tail}, readers...) } } else { for _, r := range files { readers = append(readers, &wrappedReaderAt{ReaderAt: r}) } } rdr := io.MultiReader(readers...) dec.Reset(rdr) for { msg, err := dec.Decode() if err != nil { if !errors.Is(err, io.EOF) { watcher.Err <- err } return } if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { continue } if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { return } select { case <-ctx.Done(): return case watcher.Msg <- msg: } } } func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate, notifyEvict chan interface{}, dec Decoder, since, until time.Time) { dec.Reset(f) name := f.Name() fileWatcher, err := watchFile(name) if err != nil { logWatcher.Err <- err return } defer func() { f.Close() dec.Close() fileWatcher.Close() }() var retries int handleRotate := func() error { f.Close() fileWatcher.Remove(name) // retry when the file doesn't exist for retries := 0; retries <= 5; retries++ { f, err = open(name) if err == nil || !os.IsNotExist(err) { break } } if err != nil { return err } if err := fileWatcher.Add(name); err != nil { return err } dec.Reset(f) return nil } errRetry := errors.New("retry") errDone := errors.New("done") handleMustClose := func(evictErr error) { f.Close() dec.Close() logWatcher.Err <- errors.Wrap(err, "log reader evicted due to errors") logrus.WithField("file", f.Name()).Error("Log reader notified that it must re-open log file, some log data may not be streamed to the client.") } waitRead := func() error { select { case e := <-notifyEvict: if e != nil { err := e.(error) handleMustClose(err) } return errDone case e := <-fileWatcher.Events(): switch e.Op { case fsnotify.Write: dec.Reset(f) return nil case fsnotify.Rename, fsnotify.Remove: select { case <-notifyRotate: case <-logWatcher.WatchProducerGone(): return errDone case <-logWatcher.WatchConsumerGone(): return errDone } if err := handleRotate(); err != nil { return err } return nil } return errRetry case err := <-fileWatcher.Errors(): logrus.Debugf("logger got error watching file: %v", err) // Something happened, let's try and stay alive and create a new watcher if retries <= 5 { fileWatcher.Close() fileWatcher, err = watchFile(name) if err != nil { return err } retries++ return errRetry } return err case <-logWatcher.WatchProducerGone(): return errDone case <-logWatcher.WatchConsumerGone(): return errDone } } oldSize := int64(-1) handleDecodeErr := func(err error) error { if !errors.Is(err, io.EOF) { return err } // Handle special case (#39235): max-file=1 and file was truncated st, stErr := f.Stat() if stErr == nil { size := st.Size() defer func() { oldSize = size }() if size < oldSize { // truncated f.Seek(0, 0) return nil } } else { logrus.WithError(stErr).Warn("logger: stat error") } for { err := waitRead() if err == nil { break } if err == errRetry { continue } return err } return nil } // main loop for { select { case err := <-notifyEvict: if err != nil { handleMustClose(err.(error)) } return default: } msg, err := dec.Decode() if err != nil { if err := handleDecodeErr(err); err != nil { if err == errDone { return } // we got an unrecoverable error, so return logWatcher.Err <- err return } // ready to try again continue } retries = 0 // reset retries since we've succeeded if !since.IsZero() && msg.Timestamp.Before(since) { continue } if !until.IsZero() && msg.Timestamp.After(until) { return } // send the message, unless the consumer is gone select { case e := <-notifyEvict: if e != nil { err := e.(error) logrus.WithError(err).Debug("Reader evicted while sending log message") logWatcher.Err <- err } return case logWatcher.Msg <- msg: case <-logWatcher.WatchConsumerGone(): return } } } func watchFile(name string) (filenotify.FileWatcher, error) { var fileWatcher filenotify.FileWatcher if runtime.GOOS == "windows" { // FileWatcher on Windows files is based on the syscall notifications which has an issue because of file caching. // It is based on ReadDirectoryChangesW() which doesn't detect writes to the cache. It detects writes to disk only. // Because of the OS lazy writing, we don't get notifications for file writes and thereby the watcher // doesn't work. Hence for Windows we will use poll based notifier. fileWatcher = filenotify.NewPollingWatcher() } else { var err error fileWatcher, err = filenotify.New() if err != nil { return nil, err } } logger := logrus.WithFields(logrus.Fields{ "module": "logger", "file": name, }) if err := fileWatcher.Add(name); err != nil { // we will retry using file poller. logger.WithError(err).Warnf("falling back to file poller") fileWatcher.Close() fileWatcher = filenotify.NewPollingWatcher() if err := fileWatcher.Add(name); err != nil { fileWatcher.Close() logger.WithError(err).Debugf("error watching log file for modifications") return nil, err } } return fileWatcher, nil } type wrappedReaderAt struct { io.ReaderAt pos int64 } func (r *wrappedReaderAt) Read(p []byte) (int, error) { n, err := r.ReaderAt.ReadAt(p, r.pos) r.pos += int64(n) return n, err }