From f69f09f44ce9fedbc9d70f11980c1fc8d7f77cec Mon Sep 17 00:00:00 2001 From: Yanqiang Miao Date: Tue, 13 Mar 2018 10:47:24 +0800 Subject: [PATCH] add compress option for 'jsonfiles' log driver This PR adds support for compressibility of log file. I added a new option conpression for the jsonfile log driver, this option allows the user to specify compression algorithm to compress the log files. By default, the log files will be not compressed. At present, only support 'gzip'. Signed-off-by: Yanqiang Miao 'docker logs' can read from compressed files Signed-off-by: Yanqiang Miao Add Metadata to the gzip header, optmize 'readlog' Signed-off-by: Yanqiang Miao --- daemon/logger/jsonfilelog/jsonfilelog.go | 18 +- daemon/logger/jsonfilelog/jsonfilelog_test.go | 64 +++- daemon/logger/loggerutils/logfile.go | 289 +++++++++++++++--- 3 files changed, 322 insertions(+), 49 deletions(-) diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 5a8045088f..2b1e91d063 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -49,6 +49,9 @@ func New(info logger.Info) (logger.Logger, error) { if err != nil { return nil, err } + if capval <= 0 { + return nil, fmt.Errorf("max-size should be a positive numbler") + } } var maxFiles = 1 if maxFileString, ok := info.Config["max-file"]; ok { @@ -62,6 +65,18 @@ func New(info logger.Info) (logger.Logger, error) { } } + var compress bool + if compressString, ok := info.Config["compress"]; ok { + var err error + compress, err = strconv.ParseBool(compressString) + if err != nil { + return nil, err + } + if compress && (maxFiles == 1 || capval == -1) { + return nil, fmt.Errorf("compress cannot be true when max-file is less than 2 or max-size is not set") + } + } + attrs, err := info.ExtraAttributes(nil) if err != nil { return nil, err @@ -95,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) { return b, nil } - writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc, 0640) + writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640) if err != nil { return nil, err } @@ -139,6 +154,7 @@ func ValidateLogOpt(cfg map[string]string) error { switch key { case "max-file": case "max-size": + case "compress": case "labels": case "env": case "env-regex": diff --git a/daemon/logger/jsonfilelog/jsonfilelog_test.go b/daemon/logger/jsonfilelog/jsonfilelog_test.go index 2f74e26091..a5f3d06472 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog_test.go +++ b/daemon/logger/jsonfilelog/jsonfilelog_test.go @@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo import ( "bytes" + "compress/gzip" "encoding/json" "io/ioutil" "os" @@ -142,7 +143,7 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { } defer os.RemoveAll(tmp) filename := filepath.Join(tmp, "container.log") - config := map[string]string{"max-file": "2", "max-size": "1k"} + config := map[string]string{"max-file": "3", "max-size": "1k", "compress": "true"} l, err := New(logger.Info{ ContainerID: cid, LogPath: filename, @@ -152,21 +153,55 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { t.Fatal(err) } defer l.Close() - for i := 0; i < 20; i++ { + for i := 0; i < 36; i++ { if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil { t.Fatal(err) } } + res, err := ioutil.ReadFile(filename) if err != nil { t.Fatal(err) } + penUlt, err := ioutil.ReadFile(filename + ".1") + if err != nil { + if !os.IsNotExist(err) { + t.Fatal(err) + } + + file, err := os.Open(filename + ".1.gz") + defer file.Close() + if err != nil { + t.Fatal(err) + } + zipReader, err := gzip.NewReader(file) + defer zipReader.Close() + if err != nil { + t.Fatal(err) + } + penUlt, err = ioutil.ReadAll(zipReader) + if err != nil { + t.Fatal(err) + } + } + + file, err := os.Open(filename + ".2.gz") + defer file.Close() + if err != nil { + t.Fatal(err) + } + zipReader, err := gzip.NewReader(file) + defer zipReader.Close() + if err != nil { + t.Fatal(err) + } + antepenult, err := ioutil.ReadAll(zipReader) if err != nil { t.Fatal(err) } - expectedPenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"} + expectedAntepenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line1\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line2\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line3\n","stream":"src1","time":"0001-01-01T00:00:00Z"} @@ -183,10 +218,27 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { {"log":"line14\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line15\n","stream":"src1","time":"0001-01-01T00:00:00Z"} ` - expected := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"} + expectedPenultimate := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line17\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line18\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line19\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line20\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line21\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line22\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line23\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line24\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line25\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line26\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line27\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line28\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line29\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line30\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line31\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +` + expected := `{"log":"line32\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line33\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line34\n","stream":"src1","time":"0001-01-01T00:00:00Z"} +{"log":"line35\n","stream":"src1","time":"0001-01-01T00:00:00Z"} ` if string(res) != expected { @@ -195,7 +247,9 @@ func TestJSONFileLoggerWithOpts(t *testing.T) { if string(penUlt) != expectedPenultimate { t.Fatalf("Wrong log content: %q, expected %q", penUlt, expectedPenultimate) } - + if string(antepenult) != expectedAntepenultimate { + t.Fatalf("Wrong log content: %q, expected %q", antepenult, expectedAntepenultimate) + } } func TestJSONFileLoggerWithLabelsEnv(t *testing.T) { diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index e646afc23d..b4148ce645 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -2,17 +2,21 @@ package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutil import ( "bytes" + "compress/gzip" "context" + "encoding/json" "fmt" "io" "os" "strconv" + "strings" "sync" "time" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils/multireader" "github.com/docker/docker/pkg/filenotify" + "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pubsub" "github.com/docker/docker/pkg/tailfile" "github.com/fsnotify/fsnotify" @@ -20,24 +24,81 @@ import ( "github.com/sirupsen/logrus" ) +const tmpLogfileSuffix = ".tmp" + +// rotateFileMetadata is a metadata of the gzip header of the compressed log file +type rotateFileMetadata struct { + LastTime time.Time `json:"lastTime,omitempty"` +} + +// refCounter is a counter of logfile being referenced +type refCounter struct { + mu sync.Mutex + counter map[string]int +} + +// Reference increase the reference counter for specified logfile +func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) { + rc.mu.Lock() + defer rc.mu.Unlock() + + var ( + file *os.File + err error + ) + _, ok := rc.counter[fileName] + file, err = openRefFile(fileName, ok) + if err != nil { + return nil, err + } + + if ok { + rc.counter[fileName]++ + } else if file != nil { + rc.counter[file.Name()] = 1 + } + + return file, nil +} + +// Dereference reduce the reference counter for specified logfile +func (rc *refCounter) Dereference(fileName string) error { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.counter[fileName]-- + if rc.counter[fileName] <= 0 { + delete(rc.counter, fileName) + err := os.Remove(fileName) + if err != nil { + return err + } + } + return nil +} + // LogFile is Logger implementation for default Docker logging. type LogFile struct { - f *os.File // store for closing - closed bool - mu sync.RWMutex - capacity int64 //maximum size of each file - currentSize int64 // current size of the latest file - maxFiles int //maximum number of files - notifyRotate *pubsub.Publisher - marshal logger.MarshalFunc - createDecoder makeDecoderFunc - perms os.FileMode + mu sync.RWMutex // protects the logfile access + f *os.File // store for closing + closed bool + rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed + capacity int64 // maximum size of each file + currentSize int64 // current size of the latest file + maxFiles int // maximum number of files + compress bool // whether old versions of log files are compressed + lastTimestamp time.Time // timestamp of the last log + filesRefCounter refCounter // keep reference-counted of decompressed files + notifyRotate *pubsub.Publisher + marshal logger.MarshalFunc + createDecoder makeDecoderFunc + perms os.FileMode } type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error) //NewLogFile creates new LogFile -func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) { +func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) { log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) if err != nil { return nil, err @@ -49,14 +110,16 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger. } return &LogFile{ - f: log, - capacity: capacity, - currentSize: size, - maxFiles: maxFiles, - notifyRotate: pubsub.NewPublisher(0, 1), - marshal: marshaller, - createDecoder: decodeFunc, - perms: perms, + f: log, + capacity: capacity, + currentSize: size, + maxFiles: maxFiles, + compress: compress, + filesRefCounter: refCounter{counter: make(map[string]int)}, + notifyRotate: pubsub.NewPublisher(0, 1), + marshal: marshaller, + createDecoder: decodeFunc, + perms: perms, }, nil } @@ -84,6 +147,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error { n, err := w.f.Write(b) if err == nil { w.currentSize += int64(n) + w.lastTimestamp = msg.Timestamp } w.mu.Unlock() return err @@ -95,43 +159,108 @@ func (w *LogFile) checkCapacityAndRotate() error { } if w.currentSize >= w.capacity { - name := w.f.Name() + w.rotateMu.Lock() + fname := w.f.Name() if err := w.f.Close(); err != nil { + w.rotateMu.Unlock() return errors.Wrap(err, "error closing file") } - if err := rotate(name, w.maxFiles); err != nil { + if err := rotate(fname, w.maxFiles, w.compress); err != nil { + w.rotateMu.Unlock() return err } - file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) + file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) if err != nil { + w.rotateMu.Unlock() return err } w.f = file w.currentSize = 0 w.notifyRotate.Publish(struct{}{}) + + if w.maxFiles <= 1 || !w.compress { + w.rotateMu.Unlock() + return nil + } + + go func() { + compressFile(fname+".1", w.lastTimestamp) + w.rotateMu.Unlock() + }() } return nil } -func rotate(name string, maxFiles int) error { +func rotate(name string, maxFiles int, compress bool) error { if maxFiles < 2 { return nil } + + var extension string + if compress { + extension = ".gz" + } for i := maxFiles - 1; i > 1; i-- { - toPath := name + "." + strconv.Itoa(i) - fromPath := name + "." + strconv.Itoa(i-1) + toPath := name + "." + strconv.Itoa(i) + extension + fromPath := name + "." + strconv.Itoa(i-1) + extension if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { - return errors.Wrap(err, "error rotating old log entries") + return err } } if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { - return errors.Wrap(err, "error rotating current log") + return err } + return nil } +func compressFile(fileName string, lastTimestamp time.Time) { + file, err := os.Open(fileName) + if err != nil { + logrus.Errorf("Failed to open log file: %v", err) + return + } + defer func() { + file.Close() + err := os.Remove(fileName) + if err != nil { + logrus.Errorf("Failed to remove source log file: %v", err) + } + }() + + outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_RDWR, 0640) + if err != nil { + logrus.Errorf("Failed to open or create gzip log file: %v", err) + return + } + defer func() { + outFile.Close() + if err != nil { + os.Remove(fileName + ".gz") + } + }() + + compressWriter := gzip.NewWriter(outFile) + defer compressWriter.Close() + + // Add the last log entry timestramp to the gzip header + extra := rotateFileMetadata{} + extra.LastTime = lastTimestamp + compressWriter.Header.Extra, err = json.Marshal(&extra) + if err != nil { + // Here log the error only and don't return since this is just an optimization. + logrus.Warningf("Failed to marshal JSON: %v", err) + } + + _, err = pools.Copy(compressWriter, file) + if err != nil { + logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file") + return + } +} + // MaxFiles return maximum number of files func (w *LogFile) MaxFiles() int { return w.maxFiles @@ -154,18 +283,6 @@ func (w *LogFile) Close() error { // ReadLogs decodes entries from log files and sends them the passed in watcher func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { w.mu.RLock() - files, err := w.openRotatedFiles() - if err != nil { - w.mu.RUnlock() - watcher.Err <- err - return - } - defer func() { - for _, f := range files { - f.Close() - } - }() - currentFile, err := os.Open(w.f.Name()) if err != nil { w.mu.RUnlock() @@ -175,14 +292,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) defer currentFile.Close() currentChunk, err := newSectionReader(currentFile) - w.mu.RUnlock() - if err != nil { + w.mu.RUnlock() watcher.Err <- err return } if config.Tail != 0 { + files, err := w.openRotatedFiles(config) + if err != nil { + w.mu.RUnlock() + watcher.Err <- err + return + } + w.mu.RUnlock() seekers := make([]io.ReadSeeker, 0, len(files)+1) for _, f := range files { seekers = append(seekers, f) @@ -193,9 +316,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) if len(seekers) > 0 { tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config) } + for _, f := range files { + f.Close() + fileName := f.Name() + if strings.HasSuffix(fileName, tmpLogfileSuffix) { + err := w.filesRefCounter.Dereference(fileName) + if err != nil { + logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err) + } + } + } + + w.mu.RLock() } - w.mu.RLock() if !config.Follow || w.closed { w.mu.RUnlock() return @@ -207,13 +341,22 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until) } -func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { +func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) { + w.rotateMu.Lock() + defer w.rotateMu.Unlock() + defer func() { if err == nil { return } for _, f := range files { f.Close() + if strings.HasSuffix(f.Name(), tmpLogfileSuffix) { + err := os.Remove(f.Name()) + if err != nil && !os.IsNotExist(err) { + logrus.Warningf("Failed to remove the logfile %q: %v", f.Name, err) + } + } } }() @@ -223,6 +366,28 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { if !os.IsNotExist(err) { return nil, err } + + fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1) + decompressedFileName := fileName + tmpLogfileSuffix + tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) { + if exists { + return os.Open(refFileName) + } + return decompressfile(fileName, refFileName, config.Since) + }) + + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + continue + } + if tmpFile == nil { + // The log before `config.Since` does not need to read + break + } + + files = append(files, tmpFile) continue } files = append(files, f) @@ -231,6 +396,44 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { return files, nil } +func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) { + cf, err := os.Open(fileName) + if err != nil { + return nil, err + } + defer cf.Close() + + rc, err := gzip.NewReader(cf) + if err != nil { + return nil, err + } + defer rc.Close() + + // Extract the last log entry timestramp from the gzip header + extra := &rotateFileMetadata{} + err = json.Unmarshal(rc.Header.Extra, extra) + if err == nil && extra.LastTime.Before(since) { + return nil, nil + } + + rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640) + if err != nil { + return nil, err + } + + _, err = pools.Copy(rs, rc) + if err != nil { + rs.Close() + rErr := os.Remove(rs.Name()) + if rErr != nil && os.IsNotExist(rErr) { + logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr) + } + return nil, err + } + + return rs, nil +} + func newSectionReader(f *os.File) (*io.SectionReader, error) { // seek to the end to get the size // we'll leave this at the end of the file since section reader does not advance the reader