mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #40543 from SamWhited/upstream_logging
Upstream logging changes from Enterprise Edition
This commit is contained in:
commit
39679991f4
13 changed files with 497 additions and 2 deletions
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/jsonfilelog"
|
||||
"github.com/docker/docker/daemon/logger/local"
|
||||
"github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
"github.com/docker/docker/daemon/network"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/docker/docker/image"
|
||||
|
@ -106,6 +107,11 @@ type Container struct {
|
|||
// Fields here are specific to Windows
|
||||
NetworkSharedContainerID string `json:"-"`
|
||||
SharedEndpointList []string `json:"-"`
|
||||
LocalLogCacheMeta localLogCacheMeta `json:",omitempty"`
|
||||
}
|
||||
|
||||
type localLogCacheMeta struct {
|
||||
HaveNotifyEnabled bool
|
||||
}
|
||||
|
||||
// NewBaseContainer creates a new container with its
|
||||
|
@ -415,6 +421,25 @@ func (container *Container) StartLogger() (logger.Logger, error) {
|
|||
}
|
||||
l = logger.NewRingLogger(l, info, bufferSize)
|
||||
}
|
||||
|
||||
if _, ok := l.(logger.LogReader); !ok {
|
||||
if cache.ShouldUseCache(cfg.Config) {
|
||||
logPath, err := container.GetRootResourcePath("container-cached.log")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !container.LocalLogCacheMeta.HaveNotifyEnabled {
|
||||
logrus.WithField("container", container.ID).WithField("driver", container.HostConfig.LogConfig.Type).Info("Configured log driver does not support reads, enabling local file cache for container logs")
|
||||
container.LocalLogCacheMeta.HaveNotifyEnabled = true
|
||||
}
|
||||
info.LogPath = logPath
|
||||
l, err = cache.WithLocalCache(l, info)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error setting up local container log cache")
|
||||
}
|
||||
}
|
||||
}
|
||||
return l, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
_ "github.com/docker/docker/daemon/logger/jsonfilelog"
|
||||
_ "github.com/docker/docker/daemon/logger/local"
|
||||
_ "github.com/docker/docker/daemon/logger/logentries"
|
||||
_ "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
_ "github.com/docker/docker/daemon/logger/splunk"
|
||||
_ "github.com/docker/docker/daemon/logger/syslog"
|
||||
)
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
_ "github.com/docker/docker/daemon/logger/gelf"
|
||||
_ "github.com/docker/docker/daemon/logger/jsonfilelog"
|
||||
_ "github.com/docker/docker/daemon/logger/logentries"
|
||||
_ "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
_ "github.com/docker/docker/daemon/logger/splunk"
|
||||
_ "github.com/docker/docker/daemon/logger/syslog"
|
||||
)
|
||||
|
|
|
@ -143,6 +143,10 @@ func ValidateLogOpts(name string, cfg map[string]string) error {
|
|||
}
|
||||
}
|
||||
|
||||
if err := validateExternal(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !factory.driverRegistered(name) {
|
||||
return fmt.Errorf("logger: no log driver named '%s' is registered", name)
|
||||
}
|
||||
|
|
29
daemon/logger/log_cache_opts.go
Normal file
29
daemon/logger/log_cache_opts.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package logger
|
||||
|
||||
var externalValidators []LogOptValidator
|
||||
|
||||
// RegisterExternalValidator adds the validator to the list of external validators.
|
||||
// External validators are used by packages outside this package that need to add their own validation logic.
|
||||
// This should only be called on package initialization.
|
||||
func RegisterExternalValidator(v LogOptValidator) {
|
||||
externalValidators = append(externalValidators, v)
|
||||
}
|
||||
|
||||
// AddBuiltinLogOpts updates the list of built-in log opts. This allows other packages to supplement additional log options
|
||||
// without having to register an actual log driver. This is used by things that are more proxy log drivers and should
|
||||
// not be exposed as a usable log driver to the API.
|
||||
// This should only be called on package initialization.
|
||||
func AddBuiltinLogOpts(opts map[string]bool) {
|
||||
for k, v := range opts {
|
||||
builtInLogOpts[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
func validateExternal(cfg map[string]string) error {
|
||||
for _, v := range externalValidators {
|
||||
if err := v(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
110
daemon/logger/loggerutils/cache/local_cache.go
vendored
Normal file
110
daemon/logger/loggerutils/cache/local_cache.go
vendored
Normal file
|
@ -0,0 +1,110 @@
|
|||
package cache // import "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/local"
|
||||
units "github.com/docker/go-units"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
// DriverName is the name of the driver used for local log caching
|
||||
DriverName = local.Name
|
||||
|
||||
cachePrefix = "cache-"
|
||||
cacheDisabledKey = cachePrefix + "disabled"
|
||||
)
|
||||
|
||||
var builtInCacheLogOpts = map[string]bool{
|
||||
cacheDisabledKey: true,
|
||||
}
|
||||
|
||||
// WithLocalCache wraps the passed in logger with a logger caches all writes locally
|
||||
// in addition to writing to the passed in logger.
|
||||
func WithLocalCache(l logger.Logger, info logger.Info) (logger.Logger, error) {
|
||||
initLogger, err := logger.GetLogDriver(DriverName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cacher, err := initLogger(info)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error initializing local log cache driver")
|
||||
}
|
||||
|
||||
if info.Config["mode"] == container.LogModeUnset || container.LogMode(info.Config["mode"]) == container.LogModeNonBlock {
|
||||
var size int64 = -1
|
||||
if s, exists := info.Config["max-buffer-size"]; exists {
|
||||
size, err = units.RAMInBytes(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cacher = logger.NewRingLogger(cacher, info, size)
|
||||
}
|
||||
|
||||
return &loggerWithCache{
|
||||
l: l,
|
||||
cache: cacher,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type loggerWithCache struct {
|
||||
l logger.Logger
|
||||
cache logger.Logger
|
||||
}
|
||||
|
||||
func (l *loggerWithCache) Log(msg *logger.Message) error {
|
||||
// copy the message as the original will be reset once the call to `Log` is complete
|
||||
dup := logger.NewMessage()
|
||||
dumbCopyMessage(dup, msg)
|
||||
|
||||
if err := l.l.Log(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.cache.Log(dup)
|
||||
}
|
||||
|
||||
func (l *loggerWithCache) Name() string {
|
||||
return l.l.Name()
|
||||
}
|
||||
|
||||
func (l *loggerWithCache) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
|
||||
return l.cache.(logger.LogReader).ReadLogs(config)
|
||||
}
|
||||
|
||||
func (l *loggerWithCache) Close() error {
|
||||
err := l.l.Close()
|
||||
if err := l.cache.Close(); err != nil {
|
||||
logrus.WithError(err).Warn("error while shutting cache logger")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ShouldUseCache reads the log opts to determine if caching should be enabled
|
||||
func ShouldUseCache(cfg map[string]string) bool {
|
||||
if cfg[cacheDisabledKey] == "" {
|
||||
return true
|
||||
}
|
||||
b, err := strconv.ParseBool(cfg[cacheDisabledKey])
|
||||
if err != nil {
|
||||
// This shouldn't happen since the values are validated before hand.
|
||||
return false
|
||||
}
|
||||
return !b
|
||||
}
|
||||
|
||||
// dumbCopyMessage is a bit of a fake copy but avoids extra allocations which
|
||||
// are not necessary for this use case.
|
||||
func dumbCopyMessage(dst, src *logger.Message) {
|
||||
dst.Source = src.Source
|
||||
dst.Timestamp = src.Timestamp
|
||||
dst.PLogMetaData = src.PLogMetaData
|
||||
dst.Err = src.Err
|
||||
dst.Attrs = src.Attrs
|
||||
dst.Line = append(dst.Line[:0], src.Line...)
|
||||
}
|
81
daemon/logger/loggerutils/cache/log_cache_test.go
vendored
Normal file
81
daemon/logger/loggerutils/cache/log_cache_test.go
vendored
Normal file
|
@ -0,0 +1,81 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"time"
|
||||
|
||||
"bytes"
|
||||
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"gotest.tools/v3/assert"
|
||||
"gotest.tools/v3/assert/cmp"
|
||||
)
|
||||
|
||||
type fakeLogger struct {
|
||||
messages chan logger.Message
|
||||
close chan struct{}
|
||||
}
|
||||
|
||||
func (l *fakeLogger) Log(msg *logger.Message) error {
|
||||
select {
|
||||
case l.messages <- *msg:
|
||||
case <-l.close:
|
||||
}
|
||||
logger.PutMessage(msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *fakeLogger) Name() string {
|
||||
return "fake"
|
||||
}
|
||||
|
||||
func (l *fakeLogger) Close() error {
|
||||
close(l.close)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLog(t *testing.T) {
|
||||
cacher := &fakeLogger{make(chan logger.Message), make(chan struct{})}
|
||||
l := &loggerWithCache{
|
||||
l: &fakeLogger{make(chan logger.Message, 100), make(chan struct{})},
|
||||
cache: cacher,
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
var messages []logger.Message
|
||||
for i := 0; i < 100; i++ {
|
||||
messages = append(messages, logger.Message{
|
||||
Timestamp: time.Now(),
|
||||
Line: append(bytes.Repeat([]byte("a"), 100), '\n'),
|
||||
})
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
for _, msg := range messages {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
m := logger.NewMessage()
|
||||
dumbCopyMessage(m, &msg)
|
||||
l.Log(m)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, m := range messages {
|
||||
var msg logger.Message
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timed out waiting for messages... this is probably a test implementation error")
|
||||
case msg = <-cacher.messages:
|
||||
assert.Assert(t, cmp.DeepEqual(msg, m))
|
||||
}
|
||||
}
|
||||
}
|
40
daemon/logger/loggerutils/cache/validate.go
vendored
Normal file
40
daemon/logger/loggerutils/cache/validate.go
vendored
Normal file
|
@ -0,0 +1,40 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
"github.com/docker/docker/daemon/logger/local"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
for k, v := range local.LogOptKeys {
|
||||
builtInCacheLogOpts[cachePrefix+k] = v
|
||||
}
|
||||
logger.AddBuiltinLogOpts(builtInCacheLogOpts)
|
||||
logger.RegisterExternalValidator(validateLogCacheOpts)
|
||||
}
|
||||
|
||||
func validateLogCacheOpts(cfg map[string]string) error {
|
||||
if v := cfg[cacheDisabledKey]; v != "" {
|
||||
_, err := strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
return errors.Errorf("invalid value for option %s: %s", cacheDisabledKey, cfg[cacheDisabledKey])
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MergeDefaultLogConfig reads the default log opts and makes sure that any caching related keys that exist there are
|
||||
// added to dst.
|
||||
func MergeDefaultLogConfig(dst, defaults map[string]string) {
|
||||
for k, v := range defaults {
|
||||
if !builtInCacheLogOpts[k] {
|
||||
continue
|
||||
}
|
||||
if _, exists := dst[k]; !exists {
|
||||
dst[k] = v
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
timetypes "github.com/docker/docker/api/types/time"
|
||||
"github.com/docker/docker/container"
|
||||
"github.com/docker/docker/daemon/logger"
|
||||
logcache "github.com/docker/docker/daemon/logger/loggerutils/cache"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -190,6 +191,8 @@ func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) err
|
|||
}
|
||||
}
|
||||
|
||||
logcache.MergeDefaultLogConfig(cfg.Config, daemon.defaultLogConfig.Config)
|
||||
|
||||
return logger.ValidateLogOpts(cfg.Type, cfg.Config)
|
||||
}
|
||||
|
||||
|
@ -204,6 +207,7 @@ func (daemon *Daemon) setupDefaultLogConfig() error {
|
|||
Type: config.LogConfig.Type,
|
||||
Config: config.LogConfig.Config,
|
||||
}
|
||||
|
||||
logrus.Debugf("Using default logging driver %s", daemon.defaultLogConfig.Type)
|
||||
return nil
|
||||
}
|
||||
|
|
86
integration/plugin/logging/cmd/discard/driver.go
Normal file
86
integration/plugin/logging/cmd/discard/driver.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type startLoggingRequest struct {
|
||||
File string
|
||||
}
|
||||
|
||||
type capabilitiesResponse struct {
|
||||
Cap struct {
|
||||
ReadLogs bool
|
||||
}
|
||||
}
|
||||
|
||||
type driver struct {
|
||||
mu sync.Mutex
|
||||
logs map[string]io.Closer
|
||||
}
|
||||
|
||||
type stopLoggingRequest struct {
|
||||
File string
|
||||
}
|
||||
|
||||
func handle(mux *http.ServeMux) {
|
||||
d := &driver{logs: make(map[string]io.Closer)}
|
||||
mux.HandleFunc("/LogDriver.StartLogging", func(w http.ResponseWriter, r *http.Request) {
|
||||
var req startLoggingRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(req.File, syscall.O_RDONLY, 0700)
|
||||
if err != nil {
|
||||
respond(err, w)
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
d.logs[req.File] = f
|
||||
d.mu.Unlock()
|
||||
|
||||
go io.Copy(ioutil.Discard, f)
|
||||
respond(err, w)
|
||||
})
|
||||
|
||||
mux.HandleFunc("/LogDriver.StopLogging", func(w http.ResponseWriter, r *http.Request) {
|
||||
var req stopLoggingRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
if f := d.logs[req.File]; f != nil {
|
||||
f.Close()
|
||||
}
|
||||
d.mu.Unlock()
|
||||
respond(nil, w)
|
||||
})
|
||||
|
||||
mux.HandleFunc("/LogDriver.Capabilities", func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(&capabilitiesResponse{
|
||||
Cap: struct{ ReadLogs bool }{ReadLogs: false},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type response struct {
|
||||
Err string
|
||||
}
|
||||
|
||||
func respond(err error, w io.Writer) {
|
||||
var res response
|
||||
if err != nil {
|
||||
res.Err = err.Error()
|
||||
}
|
||||
json.NewEncoder(w).Encode(&res)
|
||||
}
|
22
integration/plugin/logging/cmd/discard/main.go
Normal file
22
integration/plugin/logging/cmd/discard/main.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func main() {
|
||||
l, err := net.Listen("unix", "/run/docker/plugins/plugin.sock")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
handle(mux)
|
||||
|
||||
server := http.Server{
|
||||
Addr: l.Addr().String(),
|
||||
Handler: mux,
|
||||
}
|
||||
server.Serve(l)
|
||||
}
|
1
integration/plugin/logging/cmd/discard/main_test.go
Normal file
1
integration/plugin/logging/cmd/discard/main_test.go
Normal file
|
@ -0,0 +1 @@
|
|||
package main
|
91
integration/plugin/logging/read_test.go
Normal file
91
integration/plugin/logging/read_test.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package logging
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/docker/docker/testutil/daemon"
|
||||
"gotest.tools/v3/assert"
|
||||
)
|
||||
|
||||
// TestReadPluginNoRead tests that reads are supported even if the plugin isn't capable.
|
||||
func TestReadPluginNoRead(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("no unix domain sockets on Windows")
|
||||
}
|
||||
t.Parallel()
|
||||
d := daemon.New(t)
|
||||
d.StartWithBusybox(t, "--iptables=false")
|
||||
defer d.Stop(t)
|
||||
|
||||
client, err := d.NewClient()
|
||||
assert.Assert(t, err)
|
||||
createPlugin(t, client, "test", "discard", asLogDriver)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err = client.PluginEnable(ctx, "test", types.PluginEnableOptions{Timeout: 30})
|
||||
assert.Check(t, err)
|
||||
d.Stop(t)
|
||||
|
||||
cfg := &container.Config{
|
||||
Image: "busybox",
|
||||
Cmd: []string{"/bin/echo", "hello world"},
|
||||
}
|
||||
for desc, test := range map[string]struct {
|
||||
dOpts []string
|
||||
logsSupported bool
|
||||
}{
|
||||
"default": {logsSupported: true},
|
||||
"disabled caching": {[]string{"--log-opt=cache-disabled=true"}, false},
|
||||
"explicitly enabled caching": {[]string{"--log-opt=cache-disabled=false"}, true},
|
||||
} {
|
||||
t.Run(desc, func(t *testing.T) {
|
||||
d.Start(t, append([]string{"--iptables=false"}, test.dOpts...)...)
|
||||
defer d.Stop(t)
|
||||
c, err := client.ContainerCreate(ctx,
|
||||
cfg,
|
||||
&container.HostConfig{LogConfig: container.LogConfig{Type: "test"}},
|
||||
nil,
|
||||
"",
|
||||
)
|
||||
assert.Assert(t, err)
|
||||
defer client.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{Force: true})
|
||||
|
||||
err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{})
|
||||
assert.Assert(t, err)
|
||||
|
||||
logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true})
|
||||
if !test.logsSupported {
|
||||
assert.Assert(t, err != nil)
|
||||
return
|
||||
}
|
||||
assert.Assert(t, err)
|
||||
defer logs.Close()
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
_, err := stdcopy.StdCopy(buf, buf, logs)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(60 * time.Second):
|
||||
t.Fatal("timeout waiting for IO to complete")
|
||||
case err := <-errCh:
|
||||
assert.Assert(t, err)
|
||||
}
|
||||
assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes())
|
||||
})
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in a new issue