1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Support reads for all log drivers.

This supplements any log driver which does not support reads with a
custom read implementation that uses a local file cache.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit d675e2bf2b75865915c7a4552e00802feeb0847f)
Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Brian Goff 2018-04-05 12:42:31 -04:00 committed by Sam Whited
parent d8772509d1
commit e2ceb83a53
7 changed files with 271 additions and 2 deletions

View file

@ -23,6 +23,7 @@ import (
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/daemon/logger/local"
"github.com/docker/docker/daemon/logger/loggerutils/cache"
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
@ -104,8 +105,13 @@ type Container struct {
NoNewPrivileges bool
// Fields here are specific to Windows
NetworkSharedContainerID string `json:"-"`
SharedEndpointList []string `json:"-"`
NetworkSharedContainerID string `json:"-"`
SharedEndpointList []string `json:"-"`
LocalLogCacheMeta localLogCacheMeta `json:",omitempty"`
}
type localLogCacheMeta struct {
HaveNotifyEnabled bool
}
// NewBaseContainer creates a new container with its
@ -415,6 +421,22 @@ func (container *Container) StartLogger() (logger.Logger, error) {
}
l = logger.NewRingLogger(l, info, bufferSize)
}
if _, ok := l.(logger.LogReader); !ok {
logPath, err := container.GetRootResourcePath("container-cached.log")
if err != nil {
return nil, err
}
info.LogPath = logPath
if !container.LocalLogCacheMeta.HaveNotifyEnabled {
logrus.WithField("container", container.ID).Info("Configured log driver does not support reads, enabling local file cache for container logs")
}
l, err = cache.WithLocalCache(l, info)
if err != nil {
return nil, errors.Wrap(err, "error setting up local container log cache")
}
}
return l, nil
}

View file

@ -0,0 +1,63 @@
package cache // import "github.com/docker/docker/daemon/logger/loggerutils/cache"
import (
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/local"
"github.com/sirupsen/logrus"
)
// WithLocalCache wraps the passed in logger with a logger caches all writes locally
// in addition to writing to the passed in logger.
func WithLocalCache(l logger.Logger, logInfo logger.Info) (logger.Logger, error) {
localLogger, err := local.New(logInfo)
if err != nil {
return nil, err
}
return &loggerWithCache{
l: l,
// TODO(@cpuguy83): Should this be configurable?
cache: logger.NewRingLogger(localLogger, logInfo, -1),
}, nil
}
type loggerWithCache struct {
l logger.Logger
cache logger.Logger
}
func (l *loggerWithCache) Log(msg *logger.Message) error {
// copy the message since the underlying logger will return the passed in message to the message pool
dup := logger.NewMessage()
dumbCopyMessage(dup, msg)
if err := l.l.Log(msg); err != nil {
return err
}
return l.cache.Log(dup)
}
func (l *loggerWithCache) Name() string {
return l.l.Name()
}
func (l *loggerWithCache) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
return l.cache.(logger.LogReader).ReadLogs(config)
}
func (l *loggerWithCache) Close() error {
err := l.l.Close()
if err := l.cache.Close(); err != nil {
logrus.WithError(err).Warn("error while shutting cache logger")
}
return err
}
// dumbCopyMessage is a bit of a fake copy but avoids extra allocations which
// are not necessary for this use case.
func dumbCopyMessage(dst, src *logger.Message) {
dst.Source = src.Source
dst.Timestamp = src.Timestamp
dst.PLogMetaData = src.PLogMetaData
dst.Err = src.Err
dst.Attrs = src.Attrs
dst.Line = src.Line
}

View file

@ -0,0 +1,85 @@
package main
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"os"
"sync"
"syscall"
)
type startLoggingRequest struct {
File string
}
type capabilitiesResponse struct {
Cap struct {
ReadLogs bool
}
}
type driver struct {
mu sync.Mutex
logs map[string]io.Closer
}
type stopLoggingRequest struct {
File string
}
func handle(mux *http.ServeMux) {
d := &driver{logs: make(map[string]io.Closer)}
mux.HandleFunc("/LogDriver.StartLogging", func(w http.ResponseWriter, r *http.Request) {
var req startLoggingRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
f, err := os.OpenFile(req.File, syscall.O_RDONLY, 0700)
if err != nil {
respond(err, w)
}
d.mu.Lock()
d.logs[req.File] = f
d.mu.Unlock()
go io.Copy(ioutil.Discard, f)
respond(err, w)
})
mux.HandleFunc("/LogDriver.StopLogging", func(w http.ResponseWriter, r *http.Request) {
var req stopLoggingRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
d.mu.Lock()
if f := d.logs[req.File]; f != nil {
f.Close()
}
respond(nil, w)
})
mux.HandleFunc("/LogDriver.Capabilities", func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(&capabilitiesResponse{
Cap: struct{ ReadLogs bool }{ReadLogs: false},
})
})
}
type response struct {
Err string
}
func respond(err error, w io.Writer) {
var res response
if err != nil {
res.Err = err.Error()
}
json.NewEncoder(w).Encode(&res)
}

View file

@ -0,0 +1,22 @@
package main
import (
"net"
"net/http"
)
func main() {
l, err := net.Listen("unix", "/run/docker/plugins/plugin.sock")
if err != nil {
panic(err)
}
mux := http.NewServeMux()
handle(mux)
server := http.Server{
Addr: l.Addr().String(),
Handler: mux,
}
server.Serve(l)
}

View file

@ -0,0 +1 @@
package main

View file

@ -19,6 +19,9 @@ func TestMain(m *testing.M) {
fmt.Println(err)
os.Exit(1)
}
if testEnv.OSType != "linux" {
os.Exit(0)
}
err = environment.EnsureFrozenImagesLinux(testEnv)
if err != nil {
fmt.Println(err)

View file

@ -0,0 +1,73 @@
package logging
import (
"bytes"
"testing"
"context"
"time"
"strings"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/testutil/daemon"
"gotest.tools/v3/assert"
)
// TestReadPluginNoRead tests that reads are supported even if the plugin isn't capable.
func TestReadPluginNoRead(t *testing.T) {
t.Parallel()
d := daemon.New(t)
d.StartWithBusybox(t, "--iptables=false")
defer d.Stop(t)
client, err := d.NewClient()
assert.Assert(t, err)
createPlugin(t, client, "test", "discard", asLogDriver)
ctx := context.Background()
defer func() {
err = client.PluginRemove(ctx, "test", types.PluginRemoveOptions{Force: true})
assert.Check(t, err)
}()
err = client.PluginEnable(ctx, "test", types.PluginEnableOptions{Timeout: 30})
assert.Check(t, err)
c, err := client.ContainerCreate(ctx,
&container.Config{
Image: "busybox",
Cmd: []string{"/bin/echo", "hello world"},
},
&container.HostConfig{LogConfig: container.LogConfig{Type: "test"}},
nil,
"",
)
assert.Assert(t, err)
err = client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{})
assert.Assert(t, err)
logs, err := client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ShowStdout: true})
assert.Assert(t, err)
defer logs.Close()
buf := bytes.NewBuffer(nil)
errCh := make(chan error)
go func() {
_, err := stdcopy.StdCopy(buf, buf, logs)
errCh <- err
}()
select {
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for IO to complete")
case err := <-errCh:
assert.Assert(t, err)
}
assert.Assert(t, strings.TrimSpace(buf.String()) == "hello world", buf.Bytes())
}