1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

add compress option for 'jsonfiles' log driver

This PR adds support for compressibility of log file.
I added a new option conpression for the jsonfile log driver,
this option allows the user to specify compression algorithm to
compress the log files. By default, the log files will be
not compressed. At present, only support 'gzip'.

Signed-off-by: Yanqiang Miao <miao.yanqiang@zte.com.cn>

'docker logs' can read from compressed files

Signed-off-by: Yanqiang Miao <miao.yanqiang@zte.com.cn>

Add Metadata to the gzip header, optmize 'readlog'

Signed-off-by: Yanqiang Miao <miao.yanqiang@zte.com.cn>
This commit is contained in:
Yanqiang Miao 2018-03-13 10:47:24 +08:00
parent 241c904e6f
commit f69f09f44c
3 changed files with 322 additions and 49 deletions

View file

@ -49,6 +49,9 @@ func New(info logger.Info) (logger.Logger, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if capval <= 0 {
return nil, fmt.Errorf("max-size should be a positive numbler")
}
} }
var maxFiles = 1 var maxFiles = 1
if maxFileString, ok := info.Config["max-file"]; ok { if maxFileString, ok := info.Config["max-file"]; ok {
@ -62,6 +65,18 @@ func New(info logger.Info) (logger.Logger, error) {
} }
} }
var compress bool
if compressString, ok := info.Config["compress"]; ok {
var err error
compress, err = strconv.ParseBool(compressString)
if err != nil {
return nil, err
}
if compress && (maxFiles == 1 || capval == -1) {
return nil, fmt.Errorf("compress cannot be true when max-file is less than 2 or max-size is not set")
}
}
attrs, err := info.ExtraAttributes(nil) attrs, err := info.ExtraAttributes(nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -95,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) {
return b, nil return b, nil
} }
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc, 0640) writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -139,6 +154,7 @@ func ValidateLogOpt(cfg map[string]string) error {
switch key { switch key {
case "max-file": case "max-file":
case "max-size": case "max-size":
case "compress":
case "labels": case "labels":
case "env": case "env":
case "env-regex": case "env-regex":

View file

@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
import ( import (
"bytes" "bytes"
"compress/gzip"
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"os" "os"
@ -142,7 +143,7 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
} }
defer os.RemoveAll(tmp) defer os.RemoveAll(tmp)
filename := filepath.Join(tmp, "container.log") filename := filepath.Join(tmp, "container.log")
config := map[string]string{"max-file": "2", "max-size": "1k"} config := map[string]string{"max-file": "3", "max-size": "1k", "compress": "true"}
l, err := New(logger.Info{ l, err := New(logger.Info{
ContainerID: cid, ContainerID: cid,
LogPath: filename, LogPath: filename,
@ -152,21 +153,55 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
defer l.Close() defer l.Close()
for i := 0; i < 20; i++ { for i := 0; i < 36; i++ {
if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil { if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
res, err := ioutil.ReadFile(filename) res, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
penUlt, err := ioutil.ReadFile(filename + ".1") penUlt, err := ioutil.ReadFile(filename + ".1")
if err != nil {
if !os.IsNotExist(err) {
t.Fatal(err)
}
file, err := os.Open(filename + ".1.gz")
defer file.Close()
if err != nil {
t.Fatal(err)
}
zipReader, err := gzip.NewReader(file)
defer zipReader.Close()
if err != nil {
t.Fatal(err)
}
penUlt, err = ioutil.ReadAll(zipReader)
if err != nil {
t.Fatal(err)
}
}
file, err := os.Open(filename + ".2.gz")
defer file.Close()
if err != nil {
t.Fatal(err)
}
zipReader, err := gzip.NewReader(file)
defer zipReader.Close()
if err != nil {
t.Fatal(err)
}
antepenult, err := ioutil.ReadAll(zipReader)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
expectedPenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"} expectedAntepenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line1\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line1\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line2\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line2\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line3\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line3\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
@ -183,10 +218,27 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
{"log":"line14\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line14\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line15\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line15\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
` `
expected := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"} expectedPenultimate := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line17\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line17\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line18\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line18\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line19\n","stream":"src1","time":"0001-01-01T00:00:00Z"} {"log":"line19\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line20\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line21\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line22\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line23\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line24\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line25\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line26\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line27\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line28\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line29\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line30\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line31\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
`
expected := `{"log":"line32\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line33\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line34\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line35\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
` `
if string(res) != expected { if string(res) != expected {
@ -195,7 +247,9 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
if string(penUlt) != expectedPenultimate { if string(penUlt) != expectedPenultimate {
t.Fatalf("Wrong log content: %q, expected %q", penUlt, expectedPenultimate) t.Fatalf("Wrong log content: %q, expected %q", penUlt, expectedPenultimate)
} }
if string(antepenult) != expectedAntepenultimate {
t.Fatalf("Wrong log content: %q, expected %q", antepenult, expectedAntepenultimate)
}
} }
func TestJSONFileLoggerWithLabelsEnv(t *testing.T) { func TestJSONFileLoggerWithLabelsEnv(t *testing.T) {

View file

@ -2,17 +2,21 @@ package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutil
import ( import (
"bytes" "bytes"
"compress/gzip"
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"os" "os"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils/multireader" "github.com/docker/docker/daemon/logger/loggerutils/multireader"
"github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/pubsub" "github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/pkg/tailfile" "github.com/docker/docker/pkg/tailfile"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -20,24 +24,81 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
const tmpLogfileSuffix = ".tmp"
// rotateFileMetadata is a metadata of the gzip header of the compressed log file
type rotateFileMetadata struct {
LastTime time.Time `json:"lastTime,omitempty"`
}
// refCounter is a counter of logfile being referenced
type refCounter struct {
mu sync.Mutex
counter map[string]int
}
// Reference increase the reference counter for specified logfile
func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
rc.mu.Lock()
defer rc.mu.Unlock()
var (
file *os.File
err error
)
_, ok := rc.counter[fileName]
file, err = openRefFile(fileName, ok)
if err != nil {
return nil, err
}
if ok {
rc.counter[fileName]++
} else if file != nil {
rc.counter[file.Name()] = 1
}
return file, nil
}
// Dereference reduce the reference counter for specified logfile
func (rc *refCounter) Dereference(fileName string) error {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.counter[fileName]--
if rc.counter[fileName] <= 0 {
delete(rc.counter, fileName)
err := os.Remove(fileName)
if err != nil {
return err
}
}
return nil
}
// LogFile is Logger implementation for default Docker logging. // LogFile is Logger implementation for default Docker logging.
type LogFile struct { type LogFile struct {
f *os.File // store for closing mu sync.RWMutex // protects the logfile access
closed bool f *os.File // store for closing
mu sync.RWMutex closed bool
capacity int64 //maximum size of each file rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
currentSize int64 // current size of the latest file capacity int64 // maximum size of each file
maxFiles int //maximum number of files currentSize int64 // current size of the latest file
notifyRotate *pubsub.Publisher maxFiles int // maximum number of files
marshal logger.MarshalFunc compress bool // whether old versions of log files are compressed
createDecoder makeDecoderFunc lastTimestamp time.Time // timestamp of the last log
perms os.FileMode filesRefCounter refCounter // keep reference-counted of decompressed files
notifyRotate *pubsub.Publisher
marshal logger.MarshalFunc
createDecoder makeDecoderFunc
perms os.FileMode
} }
type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error) type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
//NewLogFile creates new LogFile //NewLogFile creates new LogFile
func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) { func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms) log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
if err != nil { if err != nil {
return nil, err return nil, err
@ -49,14 +110,16 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.
} }
return &LogFile{ return &LogFile{
f: log, f: log,
capacity: capacity, capacity: capacity,
currentSize: size, currentSize: size,
maxFiles: maxFiles, maxFiles: maxFiles,
notifyRotate: pubsub.NewPublisher(0, 1), compress: compress,
marshal: marshaller, filesRefCounter: refCounter{counter: make(map[string]int)},
createDecoder: decodeFunc, notifyRotate: pubsub.NewPublisher(0, 1),
perms: perms, marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
}, nil }, nil
} }
@ -84,6 +147,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
n, err := w.f.Write(b) n, err := w.f.Write(b)
if err == nil { if err == nil {
w.currentSize += int64(n) w.currentSize += int64(n)
w.lastTimestamp = msg.Timestamp
} }
w.mu.Unlock() w.mu.Unlock()
return err return err
@ -95,43 +159,108 @@ func (w *LogFile) checkCapacityAndRotate() error {
} }
if w.currentSize >= w.capacity { if w.currentSize >= w.capacity {
name := w.f.Name() w.rotateMu.Lock()
fname := w.f.Name()
if err := w.f.Close(); err != nil { if err := w.f.Close(); err != nil {
w.rotateMu.Unlock()
return errors.Wrap(err, "error closing file") return errors.Wrap(err, "error closing file")
} }
if err := rotate(name, w.maxFiles); err != nil { if err := rotate(fname, w.maxFiles, w.compress); err != nil {
w.rotateMu.Unlock()
return err return err
} }
file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms) file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
if err != nil { if err != nil {
w.rotateMu.Unlock()
return err return err
} }
w.f = file w.f = file
w.currentSize = 0 w.currentSize = 0
w.notifyRotate.Publish(struct{}{}) w.notifyRotate.Publish(struct{}{})
if w.maxFiles <= 1 || !w.compress {
w.rotateMu.Unlock()
return nil
}
go func() {
compressFile(fname+".1", w.lastTimestamp)
w.rotateMu.Unlock()
}()
} }
return nil return nil
} }
func rotate(name string, maxFiles int) error { func rotate(name string, maxFiles int, compress bool) error {
if maxFiles < 2 { if maxFiles < 2 {
return nil return nil
} }
var extension string
if compress {
extension = ".gz"
}
for i := maxFiles - 1; i > 1; i-- { for i := maxFiles - 1; i > 1; i-- {
toPath := name + "." + strconv.Itoa(i) toPath := name + "." + strconv.Itoa(i) + extension
fromPath := name + "." + strconv.Itoa(i-1) fromPath := name + "." + strconv.Itoa(i-1) + extension
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "error rotating old log entries") return err
} }
} }
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "error rotating current log") return err
} }
return nil return nil
} }
func compressFile(fileName string, lastTimestamp time.Time) {
file, err := os.Open(fileName)
if err != nil {
logrus.Errorf("Failed to open log file: %v", err)
return
}
defer func() {
file.Close()
err := os.Remove(fileName)
if err != nil {
logrus.Errorf("Failed to remove source log file: %v", err)
}
}()
outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_RDWR, 0640)
if err != nil {
logrus.Errorf("Failed to open or create gzip log file: %v", err)
return
}
defer func() {
outFile.Close()
if err != nil {
os.Remove(fileName + ".gz")
}
}()
compressWriter := gzip.NewWriter(outFile)
defer compressWriter.Close()
// Add the last log entry timestramp to the gzip header
extra := rotateFileMetadata{}
extra.LastTime = lastTimestamp
compressWriter.Header.Extra, err = json.Marshal(&extra)
if err != nil {
// Here log the error only and don't return since this is just an optimization.
logrus.Warningf("Failed to marshal JSON: %v", err)
}
_, err = pools.Copy(compressWriter, file)
if err != nil {
logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
return
}
}
// MaxFiles return maximum number of files // MaxFiles return maximum number of files
func (w *LogFile) MaxFiles() int { func (w *LogFile) MaxFiles() int {
return w.maxFiles return w.maxFiles
@ -154,18 +283,6 @@ func (w *LogFile) Close() error {
// ReadLogs decodes entries from log files and sends them the passed in watcher // ReadLogs decodes entries from log files and sends them the passed in watcher
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
w.mu.RLock() w.mu.RLock()
files, err := w.openRotatedFiles()
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
defer func() {
for _, f := range files {
f.Close()
}
}()
currentFile, err := os.Open(w.f.Name()) currentFile, err := os.Open(w.f.Name())
if err != nil { if err != nil {
w.mu.RUnlock() w.mu.RUnlock()
@ -175,14 +292,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
defer currentFile.Close() defer currentFile.Close()
currentChunk, err := newSectionReader(currentFile) currentChunk, err := newSectionReader(currentFile)
w.mu.RUnlock()
if err != nil { if err != nil {
w.mu.RUnlock()
watcher.Err <- err watcher.Err <- err
return return
} }
if config.Tail != 0 { if config.Tail != 0 {
files, err := w.openRotatedFiles(config)
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
w.mu.RUnlock()
seekers := make([]io.ReadSeeker, 0, len(files)+1) seekers := make([]io.ReadSeeker, 0, len(files)+1)
for _, f := range files { for _, f := range files {
seekers = append(seekers, f) seekers = append(seekers, f)
@ -193,9 +316,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
if len(seekers) > 0 { if len(seekers) > 0 {
tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config) tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
} }
for _, f := range files {
f.Close()
fileName := f.Name()
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
}
}
}
w.mu.RLock()
} }
w.mu.RLock()
if !config.Follow || w.closed { if !config.Follow || w.closed {
w.mu.RUnlock() w.mu.RUnlock()
return return
@ -207,13 +341,22 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until) followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
} }
func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
w.rotateMu.Lock()
defer w.rotateMu.Unlock()
defer func() { defer func() {
if err == nil { if err == nil {
return return
} }
for _, f := range files { for _, f := range files {
f.Close() f.Close()
if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
err := os.Remove(f.Name())
if err != nil && !os.IsNotExist(err) {
logrus.Warningf("Failed to remove the logfile %q: %v", f.Name, err)
}
}
} }
}() }()
@ -223,6 +366,28 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
return nil, err return nil, err
} }
fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
decompressedFileName := fileName + tmpLogfileSuffix
tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
if exists {
return os.Open(refFileName)
}
return decompressfile(fileName, refFileName, config.Since)
})
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
continue
}
if tmpFile == nil {
// The log before `config.Since` does not need to read
break
}
files = append(files, tmpFile)
continue continue
} }
files = append(files, f) files = append(files, f)
@ -231,6 +396,44 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
return files, nil return files, nil
} }
func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
cf, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer cf.Close()
rc, err := gzip.NewReader(cf)
if err != nil {
return nil, err
}
defer rc.Close()
// Extract the last log entry timestramp from the gzip header
extra := &rotateFileMetadata{}
err = json.Unmarshal(rc.Header.Extra, extra)
if err == nil && extra.LastTime.Before(since) {
return nil, nil
}
rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
if err != nil {
return nil, err
}
_, err = pools.Copy(rs, rc)
if err != nil {
rs.Close()
rErr := os.Remove(rs.Name())
if rErr != nil && os.IsNotExist(rErr) {
logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr)
}
return nil, err
}
return rs, nil
}
func newSectionReader(f *os.File) (*io.SectionReader, error) { func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size // seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader // we'll leave this at the end of the file since section reader does not advance the reader