add compress option for 'jsonfiles' log driver

This PR adds support for compressibility of log file.
I added a new option conpression for the jsonfile log driver,
this option allows the user to specify compression algorithm to
compress the log files. By default, the log files will be
not compressed. At present, only support 'gzip'.

Signed-off-by: Yanqiang Miao <miao.yanqiang@zte.com.cn>

'docker logs' can read from compressed files

Signed-off-by: Yanqiang Miao <miao.yanqiang@zte.com.cn>

Add Metadata to the gzip header, optmize 'readlog'

Signed-off-by: Yanqiang Miao <miao.yanqiang@zte.com.cn>
This commit is contained in:
Yanqiang Miao 2018-03-13 10:47:24 +08:00
parent 241c904e6f
commit f69f09f44c
3 changed files with 322 additions and 49 deletions

View File

@ -49,6 +49,9 @@ func New(info logger.Info) (logger.Logger, error) {
if err != nil {
return nil, err
}
if capval <= 0 {
return nil, fmt.Errorf("max-size should be a positive numbler")
}
}
var maxFiles = 1
if maxFileString, ok := info.Config["max-file"]; ok {
@ -62,6 +65,18 @@ func New(info logger.Info) (logger.Logger, error) {
}
}
var compress bool
if compressString, ok := info.Config["compress"]; ok {
var err error
compress, err = strconv.ParseBool(compressString)
if err != nil {
return nil, err
}
if compress && (maxFiles == 1 || capval == -1) {
return nil, fmt.Errorf("compress cannot be true when max-file is less than 2 or max-size is not set")
}
}
attrs, err := info.ExtraAttributes(nil)
if err != nil {
return nil, err
@ -95,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) {
return b, nil
}
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc, 0640)
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640)
if err != nil {
return nil, err
}
@ -139,6 +154,7 @@ func ValidateLogOpt(cfg map[string]string) error {
switch key {
case "max-file":
case "max-size":
case "compress":
case "labels":
case "env":
case "env-regex":

View File

@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
import (
"bytes"
"compress/gzip"
"encoding/json"
"io/ioutil"
"os"
@ -142,7 +143,7 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
}
defer os.RemoveAll(tmp)
filename := filepath.Join(tmp, "container.log")
config := map[string]string{"max-file": "2", "max-size": "1k"}
config := map[string]string{"max-file": "3", "max-size": "1k", "compress": "true"}
l, err := New(logger.Info{
ContainerID: cid,
LogPath: filename,
@ -152,21 +153,55 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
t.Fatal(err)
}
defer l.Close()
for i := 0; i < 20; i++ {
for i := 0; i < 36; i++ {
if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil {
t.Fatal(err)
}
}
res, err := ioutil.ReadFile(filename)
if err != nil {
t.Fatal(err)
}
penUlt, err := ioutil.ReadFile(filename + ".1")
if err != nil {
if !os.IsNotExist(err) {
t.Fatal(err)
}
file, err := os.Open(filename + ".1.gz")
defer file.Close()
if err != nil {
t.Fatal(err)
}
zipReader, err := gzip.NewReader(file)
defer zipReader.Close()
if err != nil {
t.Fatal(err)
}
penUlt, err = ioutil.ReadAll(zipReader)
if err != nil {
t.Fatal(err)
}
}
file, err := os.Open(filename + ".2.gz")
defer file.Close()
if err != nil {
t.Fatal(err)
}
zipReader, err := gzip.NewReader(file)
defer zipReader.Close()
if err != nil {
t.Fatal(err)
}
antepenult, err := ioutil.ReadAll(zipReader)
if err != nil {
t.Fatal(err)
}
expectedPenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
expectedAntepenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line1\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line2\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line3\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
@ -183,10 +218,27 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
{"log":"line14\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line15\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
`
expected := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
expectedPenultimate := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line17\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line18\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line19\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line20\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line21\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line22\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line23\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line24\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line25\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line26\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line27\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line28\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line29\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line30\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line31\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
`
expected := `{"log":"line32\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line33\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line34\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
{"log":"line35\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
`
if string(res) != expected {
@ -195,7 +247,9 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
if string(penUlt) != expectedPenultimate {
t.Fatalf("Wrong log content: %q, expected %q", penUlt, expectedPenultimate)
}
if string(antepenult) != expectedAntepenultimate {
t.Fatalf("Wrong log content: %q, expected %q", antepenult, expectedAntepenultimate)
}
}
func TestJSONFileLoggerWithLabelsEnv(t *testing.T) {

View File

@ -2,17 +2,21 @@ package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutil
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils/multireader"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/pkg/tailfile"
"github.com/fsnotify/fsnotify"
@ -20,24 +24,81 @@ import (
"github.com/sirupsen/logrus"
)
const tmpLogfileSuffix = ".tmp"
// rotateFileMetadata is a metadata of the gzip header of the compressed log file
type rotateFileMetadata struct {
LastTime time.Time `json:"lastTime,omitempty"`
}
// refCounter is a counter of logfile being referenced
type refCounter struct {
mu sync.Mutex
counter map[string]int
}
// Reference increase the reference counter for specified logfile
func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
rc.mu.Lock()
defer rc.mu.Unlock()
var (
file *os.File
err error
)
_, ok := rc.counter[fileName]
file, err = openRefFile(fileName, ok)
if err != nil {
return nil, err
}
if ok {
rc.counter[fileName]++
} else if file != nil {
rc.counter[file.Name()] = 1
}
return file, nil
}
// Dereference reduce the reference counter for specified logfile
func (rc *refCounter) Dereference(fileName string) error {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.counter[fileName]--
if rc.counter[fileName] <= 0 {
delete(rc.counter, fileName)
err := os.Remove(fileName)
if err != nil {
return err
}
}
return nil
}
// LogFile is Logger implementation for default Docker logging.
type LogFile struct {
f *os.File // store for closing
closed bool
mu sync.RWMutex
capacity int64 //maximum size of each file
currentSize int64 // current size of the latest file
maxFiles int //maximum number of files
notifyRotate *pubsub.Publisher
marshal logger.MarshalFunc
createDecoder makeDecoderFunc
perms os.FileMode
mu sync.RWMutex // protects the logfile access
f *os.File // store for closing
closed bool
rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
capacity int64 // maximum size of each file
currentSize int64 // current size of the latest file
maxFiles int // maximum number of files
compress bool // whether old versions of log files are compressed
lastTimestamp time.Time // timestamp of the last log
filesRefCounter refCounter // keep reference-counted of decompressed files
notifyRotate *pubsub.Publisher
marshal logger.MarshalFunc
createDecoder makeDecoderFunc
perms os.FileMode
}
type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
//NewLogFile creates new LogFile
func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
if err != nil {
return nil, err
@ -49,14 +110,16 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.
}
return &LogFile{
f: log,
capacity: capacity,
currentSize: size,
maxFiles: maxFiles,
notifyRotate: pubsub.NewPublisher(0, 1),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
f: log,
capacity: capacity,
currentSize: size,
maxFiles: maxFiles,
compress: compress,
filesRefCounter: refCounter{counter: make(map[string]int)},
notifyRotate: pubsub.NewPublisher(0, 1),
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
}, nil
}
@ -84,6 +147,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
n, err := w.f.Write(b)
if err == nil {
w.currentSize += int64(n)
w.lastTimestamp = msg.Timestamp
}
w.mu.Unlock()
return err
@ -95,43 +159,108 @@ func (w *LogFile) checkCapacityAndRotate() error {
}
if w.currentSize >= w.capacity {
name := w.f.Name()
w.rotateMu.Lock()
fname := w.f.Name()
if err := w.f.Close(); err != nil {
w.rotateMu.Unlock()
return errors.Wrap(err, "error closing file")
}
if err := rotate(name, w.maxFiles); err != nil {
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
w.rotateMu.Unlock()
return err
}
file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
if err != nil {
w.rotateMu.Unlock()
return err
}
w.f = file
w.currentSize = 0
w.notifyRotate.Publish(struct{}{})
if w.maxFiles <= 1 || !w.compress {
w.rotateMu.Unlock()
return nil
}
go func() {
compressFile(fname+".1", w.lastTimestamp)
w.rotateMu.Unlock()
}()
}
return nil
}
func rotate(name string, maxFiles int) error {
func rotate(name string, maxFiles int, compress bool) error {
if maxFiles < 2 {
return nil
}
var extension string
if compress {
extension = ".gz"
}
for i := maxFiles - 1; i > 1; i-- {
toPath := name + "." + strconv.Itoa(i)
fromPath := name + "." + strconv.Itoa(i-1)
toPath := name + "." + strconv.Itoa(i) + extension
fromPath := name + "." + strconv.Itoa(i-1) + extension
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "error rotating old log entries")
return err
}
}
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "error rotating current log")
return err
}
return nil
}
func compressFile(fileName string, lastTimestamp time.Time) {
file, err := os.Open(fileName)
if err != nil {
logrus.Errorf("Failed to open log file: %v", err)
return
}
defer func() {
file.Close()
err := os.Remove(fileName)
if err != nil {
logrus.Errorf("Failed to remove source log file: %v", err)
}
}()
outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_RDWR, 0640)
if err != nil {
logrus.Errorf("Failed to open or create gzip log file: %v", err)
return
}
defer func() {
outFile.Close()
if err != nil {
os.Remove(fileName + ".gz")
}
}()
compressWriter := gzip.NewWriter(outFile)
defer compressWriter.Close()
// Add the last log entry timestramp to the gzip header
extra := rotateFileMetadata{}
extra.LastTime = lastTimestamp
compressWriter.Header.Extra, err = json.Marshal(&extra)
if err != nil {
// Here log the error only and don't return since this is just an optimization.
logrus.Warningf("Failed to marshal JSON: %v", err)
}
_, err = pools.Copy(compressWriter, file)
if err != nil {
logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
return
}
}
// MaxFiles return maximum number of files
func (w *LogFile) MaxFiles() int {
return w.maxFiles
@ -154,18 +283,6 @@ func (w *LogFile) Close() error {
// ReadLogs decodes entries from log files and sends them the passed in watcher
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
w.mu.RLock()
files, err := w.openRotatedFiles()
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
defer func() {
for _, f := range files {
f.Close()
}
}()
currentFile, err := os.Open(w.f.Name())
if err != nil {
w.mu.RUnlock()
@ -175,14 +292,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
defer currentFile.Close()
currentChunk, err := newSectionReader(currentFile)
w.mu.RUnlock()
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
if config.Tail != 0 {
files, err := w.openRotatedFiles(config)
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
w.mu.RUnlock()
seekers := make([]io.ReadSeeker, 0, len(files)+1)
for _, f := range files {
seekers = append(seekers, f)
@ -193,9 +316,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
if len(seekers) > 0 {
tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
}
for _, f := range files {
f.Close()
fileName := f.Name()
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
}
}
}
w.mu.RLock()
}
w.mu.RLock()
if !config.Follow || w.closed {
w.mu.RUnlock()
return
@ -207,13 +341,22 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
}
func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
w.rotateMu.Lock()
defer w.rotateMu.Unlock()
defer func() {
if err == nil {
return
}
for _, f := range files {
f.Close()
if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
err := os.Remove(f.Name())
if err != nil && !os.IsNotExist(err) {
logrus.Warningf("Failed to remove the logfile %q: %v", f.Name, err)
}
}
}
}()
@ -223,6 +366,28 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
if !os.IsNotExist(err) {
return nil, err
}
fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
decompressedFileName := fileName + tmpLogfileSuffix
tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
if exists {
return os.Open(refFileName)
}
return decompressfile(fileName, refFileName, config.Since)
})
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
continue
}
if tmpFile == nil {
// The log before `config.Since` does not need to read
break
}
files = append(files, tmpFile)
continue
}
files = append(files, f)
@ -231,6 +396,44 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
return files, nil
}
func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
cf, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer cf.Close()
rc, err := gzip.NewReader(cf)
if err != nil {
return nil, err
}
defer rc.Close()
// Extract the last log entry timestramp from the gzip header
extra := &rotateFileMetadata{}
err = json.Unmarshal(rc.Header.Extra, extra)
if err == nil && extra.LastTime.Before(since) {
return nil, nil
}
rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
if err != nil {
return nil, err
}
_, err = pools.Copy(rs, rc)
if err != nil {
rs.Close()
rErr := os.Remove(rs.Name())
if rErr != nil && os.IsNotExist(rErr) {
logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr)
}
return nil, err
}
return rs, nil
}
func newSectionReader(f *os.File) (*io.SectionReader, error) {
// seek to the end to get the size
// we'll leave this at the end of the file since section reader does not advance the reader