Merge pull request #29932 from miaoyq/container-log-add-archive
add support for compressibility of log file
This commit is contained in:
commit
c4e93da8a6
|
@ -49,6 +49,9 @@ func New(info logger.Info) (logger.Logger, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if capval <= 0 {
|
||||||
|
return nil, fmt.Errorf("max-size should be a positive numbler")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
var maxFiles = 1
|
var maxFiles = 1
|
||||||
if maxFileString, ok := info.Config["max-file"]; ok {
|
if maxFileString, ok := info.Config["max-file"]; ok {
|
||||||
|
@ -62,6 +65,18 @@ func New(info logger.Info) (logger.Logger, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var compress bool
|
||||||
|
if compressString, ok := info.Config["compress"]; ok {
|
||||||
|
var err error
|
||||||
|
compress, err = strconv.ParseBool(compressString)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if compress && (maxFiles == 1 || capval == -1) {
|
||||||
|
return nil, fmt.Errorf("compress cannot be true when max-file is less than 2 or max-size is not set")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
attrs, err := info.ExtraAttributes(nil)
|
attrs, err := info.ExtraAttributes(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -95,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) {
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc, 0640)
|
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -139,6 +154,7 @@ func ValidateLogOpt(cfg map[string]string) error {
|
||||||
switch key {
|
switch key {
|
||||||
case "max-file":
|
case "max-file":
|
||||||
case "max-size":
|
case "max-size":
|
||||||
|
case "compress":
|
||||||
case "labels":
|
case "labels":
|
||||||
case "env":
|
case "env":
|
||||||
case "env-regex":
|
case "env-regex":
|
||||||
|
|
|
@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
@ -142,7 +143,7 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(tmp)
|
defer os.RemoveAll(tmp)
|
||||||
filename := filepath.Join(tmp, "container.log")
|
filename := filepath.Join(tmp, "container.log")
|
||||||
config := map[string]string{"max-file": "2", "max-size": "1k"}
|
config := map[string]string{"max-file": "3", "max-size": "1k", "compress": "true"}
|
||||||
l, err := New(logger.Info{
|
l, err := New(logger.Info{
|
||||||
ContainerID: cid,
|
ContainerID: cid,
|
||||||
LogPath: filename,
|
LogPath: filename,
|
||||||
|
@ -152,21 +153,55 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer l.Close()
|
defer l.Close()
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 36; i++ {
|
||||||
if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil {
|
if err := l.Log(&logger.Message{Line: []byte("line" + strconv.Itoa(i)), Source: "src1"}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := ioutil.ReadFile(filename)
|
res, err := ioutil.ReadFile(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
penUlt, err := ioutil.ReadFile(filename + ".1")
|
penUlt, err := ioutil.ReadFile(filename + ".1")
|
||||||
|
if err != nil {
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.Open(filename + ".1.gz")
|
||||||
|
defer file.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
zipReader, err := gzip.NewReader(file)
|
||||||
|
defer zipReader.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
penUlt, err = ioutil.ReadAll(zipReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.Open(filename + ".2.gz")
|
||||||
|
defer file.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
zipReader, err := gzip.NewReader(file)
|
||||||
|
defer zipReader.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
antepenult, err := ioutil.ReadAll(zipReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedPenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
expectedAntepenultimate := `{"log":"line0\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
{"log":"line1\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line1\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
{"log":"line2\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line2\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
{"log":"line3\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line3\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
@ -183,10 +218,27 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
|
||||||
{"log":"line14\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line14\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
{"log":"line15\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line15\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
`
|
`
|
||||||
expected := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
expectedPenultimate := `{"log":"line16\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
{"log":"line17\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line17\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
{"log":"line18\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line18\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
{"log":"line19\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
{"log":"line19\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line20\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line21\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line22\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line23\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line24\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line25\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line26\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line27\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line28\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line29\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line30\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line31\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
`
|
||||||
|
expected := `{"log":"line32\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line33\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line34\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
|
{"log":"line35\n","stream":"src1","time":"0001-01-01T00:00:00Z"}
|
||||||
`
|
`
|
||||||
|
|
||||||
if string(res) != expected {
|
if string(res) != expected {
|
||||||
|
@ -195,7 +247,9 @@ func TestJSONFileLoggerWithOpts(t *testing.T) {
|
||||||
if string(penUlt) != expectedPenultimate {
|
if string(penUlt) != expectedPenultimate {
|
||||||
t.Fatalf("Wrong log content: %q, expected %q", penUlt, expectedPenultimate)
|
t.Fatalf("Wrong log content: %q, expected %q", penUlt, expectedPenultimate)
|
||||||
}
|
}
|
||||||
|
if string(antepenult) != expectedAntepenultimate {
|
||||||
|
t.Fatalf("Wrong log content: %q, expected %q", antepenult, expectedAntepenultimate)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestJSONFileLoggerWithLabelsEnv(t *testing.T) {
|
func TestJSONFileLoggerWithLabelsEnv(t *testing.T) {
|
||||||
|
|
|
@ -2,17 +2,21 @@ package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/daemon/logger"
|
"github.com/docker/docker/daemon/logger"
|
||||||
"github.com/docker/docker/daemon/logger/loggerutils/multireader"
|
"github.com/docker/docker/daemon/logger/loggerutils/multireader"
|
||||||
"github.com/docker/docker/pkg/filenotify"
|
"github.com/docker/docker/pkg/filenotify"
|
||||||
|
"github.com/docker/docker/pkg/pools"
|
||||||
"github.com/docker/docker/pkg/pubsub"
|
"github.com/docker/docker/pkg/pubsub"
|
||||||
"github.com/docker/docker/pkg/tailfile"
|
"github.com/docker/docker/pkg/tailfile"
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
|
@ -20,24 +24,81 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const tmpLogfileSuffix = ".tmp"
|
||||||
|
|
||||||
|
// rotateFileMetadata is a metadata of the gzip header of the compressed log file
|
||||||
|
type rotateFileMetadata struct {
|
||||||
|
LastTime time.Time `json:"lastTime,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// refCounter is a counter of logfile being referenced
|
||||||
|
type refCounter struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
counter map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reference increase the reference counter for specified logfile
|
||||||
|
func (rc *refCounter) GetReference(fileName string, openRefFile func(fileName string, exists bool) (*os.File, error)) (*os.File, error) {
|
||||||
|
rc.mu.Lock()
|
||||||
|
defer rc.mu.Unlock()
|
||||||
|
|
||||||
|
var (
|
||||||
|
file *os.File
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
_, ok := rc.counter[fileName]
|
||||||
|
file, err = openRefFile(fileName, ok)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
rc.counter[fileName]++
|
||||||
|
} else if file != nil {
|
||||||
|
rc.counter[file.Name()] = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
return file, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dereference reduce the reference counter for specified logfile
|
||||||
|
func (rc *refCounter) Dereference(fileName string) error {
|
||||||
|
rc.mu.Lock()
|
||||||
|
defer rc.mu.Unlock()
|
||||||
|
|
||||||
|
rc.counter[fileName]--
|
||||||
|
if rc.counter[fileName] <= 0 {
|
||||||
|
delete(rc.counter, fileName)
|
||||||
|
err := os.Remove(fileName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// LogFile is Logger implementation for default Docker logging.
|
// LogFile is Logger implementation for default Docker logging.
|
||||||
type LogFile struct {
|
type LogFile struct {
|
||||||
f *os.File // store for closing
|
mu sync.RWMutex // protects the logfile access
|
||||||
closed bool
|
f *os.File // store for closing
|
||||||
mu sync.RWMutex
|
closed bool
|
||||||
capacity int64 //maximum size of each file
|
rotateMu sync.Mutex // blocks the next rotation until the current rotation is completed
|
||||||
currentSize int64 // current size of the latest file
|
capacity int64 // maximum size of each file
|
||||||
maxFiles int //maximum number of files
|
currentSize int64 // current size of the latest file
|
||||||
notifyRotate *pubsub.Publisher
|
maxFiles int // maximum number of files
|
||||||
marshal logger.MarshalFunc
|
compress bool // whether old versions of log files are compressed
|
||||||
createDecoder makeDecoderFunc
|
lastTimestamp time.Time // timestamp of the last log
|
||||||
perms os.FileMode
|
filesRefCounter refCounter // keep reference-counted of decompressed files
|
||||||
|
notifyRotate *pubsub.Publisher
|
||||||
|
marshal logger.MarshalFunc
|
||||||
|
createDecoder makeDecoderFunc
|
||||||
|
perms os.FileMode
|
||||||
}
|
}
|
||||||
|
|
||||||
type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
|
type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
|
||||||
|
|
||||||
//NewLogFile creates new LogFile
|
//NewLogFile creates new LogFile
|
||||||
func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
|
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
|
||||||
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
|
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -49,14 +110,16 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.
|
||||||
}
|
}
|
||||||
|
|
||||||
return &LogFile{
|
return &LogFile{
|
||||||
f: log,
|
f: log,
|
||||||
capacity: capacity,
|
capacity: capacity,
|
||||||
currentSize: size,
|
currentSize: size,
|
||||||
maxFiles: maxFiles,
|
maxFiles: maxFiles,
|
||||||
notifyRotate: pubsub.NewPublisher(0, 1),
|
compress: compress,
|
||||||
marshal: marshaller,
|
filesRefCounter: refCounter{counter: make(map[string]int)},
|
||||||
createDecoder: decodeFunc,
|
notifyRotate: pubsub.NewPublisher(0, 1),
|
||||||
perms: perms,
|
marshal: marshaller,
|
||||||
|
createDecoder: decodeFunc,
|
||||||
|
perms: perms,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,6 +147,7 @@ func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
|
||||||
n, err := w.f.Write(b)
|
n, err := w.f.Write(b)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.currentSize += int64(n)
|
w.currentSize += int64(n)
|
||||||
|
w.lastTimestamp = msg.Timestamp
|
||||||
}
|
}
|
||||||
w.mu.Unlock()
|
w.mu.Unlock()
|
||||||
return err
|
return err
|
||||||
|
@ -95,43 +159,108 @@ func (w *LogFile) checkCapacityAndRotate() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.currentSize >= w.capacity {
|
if w.currentSize >= w.capacity {
|
||||||
name := w.f.Name()
|
w.rotateMu.Lock()
|
||||||
|
fname := w.f.Name()
|
||||||
if err := w.f.Close(); err != nil {
|
if err := w.f.Close(); err != nil {
|
||||||
|
w.rotateMu.Unlock()
|
||||||
return errors.Wrap(err, "error closing file")
|
return errors.Wrap(err, "error closing file")
|
||||||
}
|
}
|
||||||
if err := rotate(name, w.maxFiles); err != nil {
|
if err := rotate(fname, w.maxFiles, w.compress); err != nil {
|
||||||
|
w.rotateMu.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
|
file, err := os.OpenFile(fname, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, w.perms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
w.rotateMu.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.f = file
|
w.f = file
|
||||||
w.currentSize = 0
|
w.currentSize = 0
|
||||||
w.notifyRotate.Publish(struct{}{})
|
w.notifyRotate.Publish(struct{}{})
|
||||||
|
|
||||||
|
if w.maxFiles <= 1 || !w.compress {
|
||||||
|
w.rotateMu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
compressFile(fname+".1", w.lastTimestamp)
|
||||||
|
w.rotateMu.Unlock()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func rotate(name string, maxFiles int) error {
|
func rotate(name string, maxFiles int, compress bool) error {
|
||||||
if maxFiles < 2 {
|
if maxFiles < 2 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var extension string
|
||||||
|
if compress {
|
||||||
|
extension = ".gz"
|
||||||
|
}
|
||||||
for i := maxFiles - 1; i > 1; i-- {
|
for i := maxFiles - 1; i > 1; i-- {
|
||||||
toPath := name + "." + strconv.Itoa(i)
|
toPath := name + "." + strconv.Itoa(i) + extension
|
||||||
fromPath := name + "." + strconv.Itoa(i-1)
|
fromPath := name + "." + strconv.Itoa(i-1) + extension
|
||||||
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
|
if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
|
||||||
return errors.Wrap(err, "error rotating old log entries")
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
|
if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
|
||||||
return errors.Wrap(err, "error rotating current log")
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func compressFile(fileName string, lastTimestamp time.Time) {
|
||||||
|
file, err := os.Open(fileName)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Failed to open log file: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
file.Close()
|
||||||
|
err := os.Remove(fileName)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Failed to remove source log file: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
outFile, err := os.OpenFile(fileName+".gz", os.O_CREATE|os.O_RDWR, 0640)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Failed to open or create gzip log file: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
outFile.Close()
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(fileName + ".gz")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
compressWriter := gzip.NewWriter(outFile)
|
||||||
|
defer compressWriter.Close()
|
||||||
|
|
||||||
|
// Add the last log entry timestramp to the gzip header
|
||||||
|
extra := rotateFileMetadata{}
|
||||||
|
extra.LastTime = lastTimestamp
|
||||||
|
compressWriter.Header.Extra, err = json.Marshal(&extra)
|
||||||
|
if err != nil {
|
||||||
|
// Here log the error only and don't return since this is just an optimization.
|
||||||
|
logrus.Warningf("Failed to marshal JSON: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = pools.Copy(compressWriter, file)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).WithField("module", "container.logs").WithField("file", fileName).Error("Error compressing log file")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MaxFiles return maximum number of files
|
// MaxFiles return maximum number of files
|
||||||
func (w *LogFile) MaxFiles() int {
|
func (w *LogFile) MaxFiles() int {
|
||||||
return w.maxFiles
|
return w.maxFiles
|
||||||
|
@ -154,18 +283,6 @@ func (w *LogFile) Close() error {
|
||||||
// ReadLogs decodes entries from log files and sends them the passed in watcher
|
// ReadLogs decodes entries from log files and sends them the passed in watcher
|
||||||
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
|
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
|
||||||
w.mu.RLock()
|
w.mu.RLock()
|
||||||
files, err := w.openRotatedFiles()
|
|
||||||
if err != nil {
|
|
||||||
w.mu.RUnlock()
|
|
||||||
watcher.Err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
for _, f := range files {
|
|
||||||
f.Close()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
currentFile, err := os.Open(w.f.Name())
|
currentFile, err := os.Open(w.f.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.mu.RUnlock()
|
w.mu.RUnlock()
|
||||||
|
@ -175,14 +292,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
||||||
defer currentFile.Close()
|
defer currentFile.Close()
|
||||||
|
|
||||||
currentChunk, err := newSectionReader(currentFile)
|
currentChunk, err := newSectionReader(currentFile)
|
||||||
w.mu.RUnlock()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
w.mu.RUnlock()
|
||||||
watcher.Err <- err
|
watcher.Err <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Tail != 0 {
|
if config.Tail != 0 {
|
||||||
|
files, err := w.openRotatedFiles(config)
|
||||||
|
if err != nil {
|
||||||
|
w.mu.RUnlock()
|
||||||
|
watcher.Err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.mu.RUnlock()
|
||||||
seekers := make([]io.ReadSeeker, 0, len(files)+1)
|
seekers := make([]io.ReadSeeker, 0, len(files)+1)
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
seekers = append(seekers, f)
|
seekers = append(seekers, f)
|
||||||
|
@ -193,9 +316,20 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
||||||
if len(seekers) > 0 {
|
if len(seekers) > 0 {
|
||||||
tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
|
tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
|
||||||
}
|
}
|
||||||
|
for _, f := range files {
|
||||||
|
f.Close()
|
||||||
|
fileName := f.Name()
|
||||||
|
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
|
||||||
|
err := w.filesRefCounter.Dereference(fileName)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
w.mu.RLock()
|
||||||
}
|
}
|
||||||
|
|
||||||
w.mu.RLock()
|
|
||||||
if !config.Follow || w.closed {
|
if !config.Follow || w.closed {
|
||||||
w.mu.RUnlock()
|
w.mu.RUnlock()
|
||||||
return
|
return
|
||||||
|
@ -207,13 +341,22 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
|
||||||
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
|
followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
|
func (w *LogFile) openRotatedFiles(config logger.ReadConfig) (files []*os.File, err error) {
|
||||||
|
w.rotateMu.Lock()
|
||||||
|
defer w.rotateMu.Unlock()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
f.Close()
|
f.Close()
|
||||||
|
if strings.HasSuffix(f.Name(), tmpLogfileSuffix) {
|
||||||
|
err := os.Remove(f.Name())
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
logrus.Warningf("Failed to remove the logfile %q: %v", f.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -223,6 +366,28 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fileName := fmt.Sprintf("%s.%d.gz", w.f.Name(), i-1)
|
||||||
|
decompressedFileName := fileName + tmpLogfileSuffix
|
||||||
|
tmpFile, err := w.filesRefCounter.GetReference(decompressedFileName, func(refFileName string, exists bool) (*os.File, error) {
|
||||||
|
if exists {
|
||||||
|
return os.Open(refFileName)
|
||||||
|
}
|
||||||
|
return decompressfile(fileName, refFileName, config.Since)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if tmpFile == nil {
|
||||||
|
// The log before `config.Since` does not need to read
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
files = append(files, tmpFile)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
files = append(files, f)
|
files = append(files, f)
|
||||||
|
@ -231,6 +396,44 @@ func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
|
||||||
return files, nil
|
return files, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decompressfile(fileName, destFileName string, since time.Time) (*os.File, error) {
|
||||||
|
cf, err := os.Open(fileName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer cf.Close()
|
||||||
|
|
||||||
|
rc, err := gzip.NewReader(cf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rc.Close()
|
||||||
|
|
||||||
|
// Extract the last log entry timestramp from the gzip header
|
||||||
|
extra := &rotateFileMetadata{}
|
||||||
|
err = json.Unmarshal(rc.Header.Extra, extra)
|
||||||
|
if err == nil && extra.LastTime.Before(since) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rs, err := os.OpenFile(destFileName, os.O_CREATE|os.O_RDWR, 0640)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = pools.Copy(rs, rc)
|
||||||
|
if err != nil {
|
||||||
|
rs.Close()
|
||||||
|
rErr := os.Remove(rs.Name())
|
||||||
|
if rErr != nil && os.IsNotExist(rErr) {
|
||||||
|
logrus.Errorf("Failed to remove the logfile %q: %v", rs.Name(), rErr)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
func newSectionReader(f *os.File) (*io.SectionReader, error) {
|
||||||
// seek to the end to get the size
|
// seek to the end to get the size
|
||||||
// we'll leave this at the end of the file since section reader does not advance the reader
|
// we'll leave this at the end of the file since section reader does not advance the reader
|
||||||
|
|
Loading…
Reference in New Issue