diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 5a8045088f..2b1e91d063 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -49,6 +49,9 @@ func New(info logger.Info) (logger.Logger, error) { if err != nil { return nil, err } + if capval <= 0 { + return nil, fmt.Errorf("max-size should be a positive numbler") + } } var maxFiles = 1 if maxFileString, ok := info.Config["max-file"]; ok { @@ -62,6 +65,18 @@ func New(info logger.Info) (logger.Logger, error) { } } + var compress bool + if compressString, ok := info.Config["compress"]; ok { + var err error + compress, err = strconv.ParseBool(compressString) + if err != nil { + return nil, err + } + if compress && (maxFiles == 1 || capval == -1) { + return nil, fmt.Errorf("compress cannot be true when max-file is less than 2 or max-size is not set") + } + } + attrs, err := info.ExtraAttributes(nil) if err != nil { return nil, err @@ -95,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) { return b, nil } - writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc, 0640) + writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640) if err != nil { return nil, err } @@ -139,6 +154,7 @@ func ValidateLogOpt(cfg map[string]string) error { switch key { case "max-file": case "max-size": + case "compress": case "labels": case "env": case "env-regex": diff --git a/daemon/logger/jsonfilelog/jsonfilelog_test.go b/daemon/logger/jsonfilelog/jsonfilelog_test.go index 0174d88c0d..2becd694b0 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog_test.go +++ b/daemon/logger/jsonfilelog/jsonfilelog_test.go @@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo import ( "bytes" + "compress/gzip" "encoding/json" "io/ioutil" "os" @@ -142,7 +143,7 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { } defer os.RemoveAll(tmp) filename := filepath.Join(tmp, "container.log") - config := map[string]string{"max-file": "2", "max-size": "1k"} + config := map[string]string{"max-file": "3", "max-size": "1k", "compress": "true"} l, err := New(logger.Info{ ContainerID: cid, LogPath: filename, @@ -152,21 +153,55 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { t.Fatal(err) } defer l.Close() - for i := 0; i < 20; i++ { + for i := 0; i < 36; i++ { if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil { t.Fatal(err) } } + res, err := ioutil.ReadFile(filename) if err != nil { t.Fatal(err) } + penUlt, err := ioutil.ReadFile(filename + ".1") + if err != nil { + if !os.IsNotExist(err) { + t.Fatal(err) + } + + file, err := os.Open(filename + ".1.gz") + defer file.Close() + if err != nil { + t.Fatal(err) + } + zipReader, err := gzip.NewReader(file) + defer zipReader.Close() + if err != nil { + t.Fatal(err) + } + penUlt, err = ioutil.ReadAll(zipReader) + if err != nil { + t.Fatal(err) + } + } + + file, err := os.Open(filename + ".2.gz") + defer file.Close() + if err != nil { + t.Fatal(err) + } + zipReader, err := gzip.NewReader(file) + defer zipReader.Close() + if err != nil { + t.Fatal(err) + } + antepenult, err := ioutil.ReadAll(zipReader) if err != nil { t.Fatal(err) } - expectedPenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"} + expectedAntepenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line1\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line2\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line3\n","stream":"src1","time":"0001-01-01T00:00:00Z"} @@ -183,10 +218,27 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { {"log":"line14\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line15\n","stream":"src1","time":"0001-01-01T00:00:00Z"} ` - expected := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"} + expectedPenultimate := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line17\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line18\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line19\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line20\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line21\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line22\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line23\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line24\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line25\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line26\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line27\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line28\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line29\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line30\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line31\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +` + expected := `{"log":"line32\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line33\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line34\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line35\n","stream":"src1","time":"0001-01-01T00:00:00Z"} ` if string(res) != expected { @@ -195,7 +247,9 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { if string(penUlt) != expectedPenultimate { t.Fatalf("Wrong log content: %q, expected %q", penUlt, expectedPenultimate) } - + if string(antepenult) != expectedAntepenultimate { + t.Fatalf("Wrong log content: %q, expected %q", antepenult, expectedAntepenultimate) + } } func TestJSONFileLoggerWithLabelsEnv(t *testing.T) { diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index e646afc23d..b4148ce645 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -2,17 +2,21 @@ package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutil import ( "bytes" + "compress/gzip" "context" + "encoding/json" "fmt" "io" "os" "strconv" + "strings" "sync" "time" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils/multireader" "github.com/docker/docker/pkg/filenotify" + "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pubsub" "github.com/docker/docker/pkg/tailfile" "github.com/fsnotify/fsnotify" @@ -20,24 +24,81 @@ import ( "github.com/sirupsen/logrus" ) +const tmpLogfileSuffix = ".tmp" + +// rotateFileMetadata is a metadata of the gzip header of the compressed log file +type rotateFileMetadata struct { + LastTime time.Time `json:"lastTime,omitempty"` +} + +// refCounter is a counter of logfile being referenced +type refCounter struct { + mu sync.Mutex + counter map[string]int +} + +// Reference increase the reference counter for specified logfile +func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) { + rc.mu.Lock() + defer rc.mu.Unlock() + + var ( + file *os.File + err error + ) + _, ok := rc.counter[fileName] + file, err = openRefFile(fileName, ok) + if err != nil { + return nil, err + } + + if ok { + rc.counter[fileName]++ + } else if file != nil { + rc.counter[file.Name()] = 1 + } + + return file, nil +} + +// Dereference reduce the reference counter for specified logfile +func (rc *refCounter) Dereference(fileName string) error { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.counter[fileName]-- + if rc.counter[fileName] <= 0 { + delete(rc.counter, fileName) + err := os.Remove(fileName) + if err != nil { + return err + } + } + return nil +} + // LogFile is Logger implementation for default Docker logging. type LogFile struct { - f *os.File // store for closing - closed bool - mu sync.RWMutex - capacity int64 //maximum size of each file - currentSize int64 // current size of the latest file - maxFiles int //maximum number of files - notifyRotate *pubsub.Publisher - marshal logger.MarshalFunc - createDecoder makeDecoderFunc - perms os.FileMode + mu sync.RWMutex // protects the logfile access + f *os.File // store for closing + closed bool + rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed + capacity int64 // maximum size of each file + currentSize int64 // current size of the latest file + maxFiles int // maximum number of files + compress bool // whether old versions of log files are compressed + lastTimestamp time.Time // timestamp of the last log + filesRefCounter refCounter // keep reference-counted of decompressed files + notifyRotate *pubsub.Publisher + marshal logger.MarshalFunc + createDecoder makeDecoderFunc + perms os.FileMode } type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error) //NewLogFile creates new LogFile -func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) { +func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) { log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) if err != nil { return nil, err @@ -49,14 +110,16 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger. } return &LogFile{ - f: log, - capacity: capacity, - currentSize: size, - maxFiles: maxFiles, - notifyRotate: pubsub.NewPublisher(0, 1), - marshal: marshaller, - createDecoder: decodeFunc, - perms: perms, + f: log, + capacity: capacity, + currentSize: size, + maxFiles: maxFiles, + compress: compress, + filesRefCounter: refCounter{counter: make(map[string]int)}, + notifyRotate: pubsub.NewPublisher(0, 1), + marshal: marshaller, + createDecoder: decodeFunc, + perms: perms, }, nil } @@ -84,6 +147,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { n, err := w.f.Write(b) if err == nil { w.currentSize += int64(n) + w.lastTimestamp = msg.Timestamp } w.mu.Unlock() return err @@ -95,43 +159,108 @@ func (w *LogFile) checkCapacityAndRotate() error { } if w.currentSize >= w.capacity { - name := w.f.Name() + w.rotateMu.Lock() + fname := w.f.Name() if err := w.f.Close(); err != nil { + w.rotateMu.Unlock() return errors.Wrap(err, "error closing file") } - if err := rotate(name, w.maxFiles); err != nil { + if err := rotate(fname, w.maxFiles, w.compress); err != nil { + w.rotateMu.Unlock() return err } - file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) + file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) if err != nil { + w.rotateMu.Unlock() return err } w.f = file w.currentSize = 0 w.notifyRotate.Publish(struct{}{}) + + if w.maxFiles <= 1 || !w.compress { + w.rotateMu.Unlock() + return nil + } + + go func() { + compressFile(fname+".1", w.lastTimestamp) + w.rotateMu.Unlock() + }() } return nil } -func rotate(name string, maxFiles int) error { +func rotate(name string, maxFiles int, compress bool) error { if maxFiles < 2 { return nil } + + var extension string + if compress { + extension = ".gz" + } for i := maxFiles - 1; i > 1; i-- { - toPath := name + "." + strconv.Itoa(i) - fromPath := name + "." + strconv.Itoa(i-1) + toPath := name + "." + strconv.Itoa(i) + extension + fromPath := name + "." + strconv.Itoa(i-1) + extension if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { - return errors.Wrap(err, "error rotating old log entries") + return err } } if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { - return errors.Wrap(err, "error rotating current log") + return err } + return nil } +func compressFile(fileName string, lastTimestamp time.Time) { + file, err := os.Open(fileName) + if err != nil { + logrus.Errorf("Failed to open log file: %v", err) + return + } + defer func() { + file.Close() + err := os.Remove(fileName) + if err != nil { + logrus.Errorf("Failed to remove source log file: %v", err) + } + }() + + outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_RDWR, 0640) + if err != nil { + logrus.Errorf("Failed to open or create gzip log file: %v", err) + return + } + defer func() { + outFile.Close() + if err != nil { + os.Remove(fileName + ".gz") + } + }() + + compressWriter := gzip.NewWriter(outFile) + defer compressWriter.Close() + + // Add the last log entry timestramp to the gzip header + extra := rotateFileMetadata{} + extra.LastTime = lastTimestamp + compressWriter.Header.Extra, err = json.Marshal(&extra) + if err != nil { + // Here log the error only and don't return since this is just an optimization. + logrus.Warningf("Failed to marshal JSON: %v", err) + } + + _, err = pools.Copy(compressWriter, file) + if err != nil { + logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file") + return + } +} + // MaxFiles return maximum number of files func (w *LogFile) MaxFiles() int { return w.maxFiles @@ -154,18 +283,6 @@ func (w *LogFile) Close() error { // ReadLogs decodes entries from log files and sends them the passed in watcher func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { w.mu.RLock() - files, err := w.openRotatedFiles() - if err != nil { - w.mu.RUnlock() - watcher.Err <- err - return - } - defer func() { - for _, f := range files { - f.Close() - } - }() - currentFile, err := os.Open(w.f.Name()) if err != nil { w.mu.RUnlock() @@ -175,14 +292,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) defer currentFile.Close() currentChunk, err := newSectionReader(currentFile) - w.mu.RUnlock() - if err != nil { + w.mu.RUnlock() watcher.Err <- err return } if config.Tail != 0 { + files, err := w.openRotatedFiles(config) + if err != nil { + w.mu.RUnlock() + watcher.Err <- err + return + } + w.mu.RUnlock() seekers := make([]io.ReadSeeker, 0, len(files)+1) for _, f := range files { seekers = append(seekers, f) @@ -193,9 +316,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) if len(seekers) > 0 { tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config) } + for _, f := range files { + f.Close() + fileName := f.Name() + if strings.HasSuffix(fileName, tmpLogfileSuffix) { + err := w.filesRefCounter.Dereference(fileName) + if err != nil { + logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err) + } + } + } + + w.mu.RLock() } - w.mu.RLock() if !config.Follow || w.closed { w.mu.RUnlock() return @@ -207,13 +341,22 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until) } -func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { +func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) { + w.rotateMu.Lock() + defer w.rotateMu.Unlock() + defer func() { if err == nil { return } for _, f := range files { f.Close() + if strings.HasSuffix(f.Name(), tmpLogfileSuffix) { + err := os.Remove(f.Name()) + if err != nil && !os.IsNotExist(err) { + logrus.Warningf("Failed to remove the logfile %q: %v", f.Name, err) + } + } } }() @@ -223,6 +366,28 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { if !os.IsNotExist(err) { return nil, err } + + fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1) + decompressedFileName := fileName + tmpLogfileSuffix + tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) { + if exists { + return os.Open(refFileName) + } + return decompressfile(fileName, refFileName, config.Since) + }) + + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + continue + } + if tmpFile == nil { + // The log before `config.Since` does not need to read + break + } + + files = append(files, tmpFile) continue } files = append(files, f) @@ -231,6 +396,44 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { return files, nil } +func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) { + cf, err := os.Open(fileName) + if err != nil { + return nil, err + } + defer cf.Close() + + rc, err := gzip.NewReader(cf) + if err != nil { + return nil, err + } + defer rc.Close() + + // Extract the last log entry timestramp from the gzip header + extra := &rotateFileMetadata{} + err = json.Unmarshal(rc.Header.Extra, extra) + if err == nil && extra.LastTime.Before(since) { + return nil, nil + } + + rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640) + if err != nil { + return nil, err + } + + _, err = pools.Copy(rs, rc) + if err != nil { + rs.Close() + rErr := os.Remove(rs.Name()) + if rErr != nil && os.IsNotExist(rErr) { + logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr) + } + return nil, err + } + + return rs, nil +} + func newSectionReader(f *os.File) (*io.SectionReader, error) { // seek to the end to get the size // we'll leave this at the end of the file since section reader does not advance the reader