1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/logger/adapter.go
Anusha Ragunathan 0b4b0a7b5d Improve partial message support in logger
Docker daemon has a 16K buffer for log messages. If a message length
exceeds 16K, it should be split by the logger and merged at the
endpoint.

This change adds `PartialLogMetaData` struct for enhanced partial support
- LastPartial (bool) : indicates if this is the last of all partials.
- ID (string)        : unique 32 bit ID. ID is same across all partials.
- Ordinal (int starts at 1) : indicates the position of msg in the series of partials.
Also, the timestamps across partials in the same.

Signed-off-by: Anusha Ragunathan <anusha.ragunathan@docker.com>
2018-04-11 13:26:28 -07:00

139 lines
3 KiB
Go

package logger // import "github.com/docker/docker/daemon/logger"
import (
"io"
"os"
"path/filepath"
"sync"
"time"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/pkg/plugingetter"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// pluginAdapter takes a plugin and implements the Logger interface for logger
// instances
type pluginAdapter struct {
driverName string
id string
plugin logPlugin
fifoPath string
capabilities Capability
logInfo Info
// synchronize access to the log stream and shared buffer
mu sync.Mutex
enc logdriver.LogEntryEncoder
stream io.WriteCloser
// buf is shared for each `Log()` call to reduce allocations.
// buf must be protected by mutex
buf logdriver.LogEntry
}
func (a *pluginAdapter) Log(msg *Message) error {
a.mu.Lock()
a.buf.Line = msg.Line
a.buf.TimeNano = msg.Timestamp.UnixNano()
a.buf.Partial = (msg.PLogMetaData != nil)
a.buf.Source = msg.Source
err := a.enc.Encode(&a.buf)
a.buf.Reset()
a.mu.Unlock()
PutMessage(msg)
return err
}
func (a *pluginAdapter) Name() string {
return a.driverName
}
func (a *pluginAdapter) Close() error {
a.mu.Lock()
defer a.mu.Unlock()
if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
return err
}
if err := a.stream.Close(); err != nil {
logrus.WithError(err).Error("error closing plugin fifo")
}
if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
logrus.WithError(err).Error("error cleaning up plugin fifo")
}
// may be nil, especially for unit tests
if pluginGetter != nil {
pluginGetter.Get(a.Name(), extName, plugingetter.Release)
}
return nil
}
type pluginAdapterWithRead struct {
*pluginAdapter
}
func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
watcher := NewLogWatcher()
go func() {
defer close(watcher.Msg)
stream, err := a.plugin.ReadLogs(a.logInfo, config)
if err != nil {
watcher.Err <- errors.Wrap(err, "error getting log reader")
return
}
defer stream.Close()
dec := logdriver.NewLogEntryDecoder(stream)
for {
select {
case <-watcher.WatchClose():
return
default:
}
var buf logdriver.LogEntry
if err := dec.Decode(&buf); err != nil {
if err == io.EOF {
return
}
select {
case watcher.Err <- errors.Wrap(err, "error decoding log message"):
case <-watcher.WatchClose():
}
return
}
msg := &Message{
Timestamp: time.Unix(0, buf.TimeNano),
Line: buf.Line,
Source: buf.Source,
}
// plugin should handle this, but check just in case
if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
continue
}
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
return
}
select {
case watcher.Msg <- msg:
case <-watcher.WatchClose():
// make sure the message we consumed is sent
watcher.Msg <- msg
return
}
}
}()
return watcher
}