1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/logger/journald/journald.go
Kir Kolyshkin 1cc7b3881d journald/read: simplify/fix followJournal()
TL;DR: simplify the code, fix --follow hanging indefinitely

Do the following to simplify the followJournal() code:

1. Use Go-native select instead of C-native polling.

2. Use Watch{Producer,Consumer}Gone(), eliminating the need
to have journald.closed variable, and an extra goroutine.

3. Use sd_journal_wait(). In the words of its own man page:

> A synchronous alternative for using sd_journal_get_fd(),
> sd_journal_get_events(), sd_journal_get_timeout() and
> sd_journal_process() is sd_journal_wait().

Unfortunately, the logic is still not as simple as it
could be; the reason being, once the container has exited,
journald might still be writing some logs from its internal
buffers onto journal file(s), and there is no way to
figure out whether it's done so we are guaranteed to
read all of it back. This bug can be reproduced with
something like

> $ ID=$(docker run -d busybox seq 1 150000); docker logs --follow $ID
> ...
> 128123
> $

(The last expected output line should be `150000`).

To avoid exiting from followJournal() early, add the
following logic: once the container is gone, keep trying
to drain the journal until there's no new data for at
least `waitTimeout` time period.

Should fix https://github.com/docker/for-linux/issues/575

Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
(cherry picked from commit f091febc94)
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-09 16:47:34 -07:00

123 lines
3.1 KiB
Go

// +build linux
// Package journald provides the log driver for forwarding server logs
// to endpoints that receive the systemd format.
package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"fmt"
"sync"
"unicode"
"github.com/coreos/go-systemd/journal"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/sirupsen/logrus"
)
const name = "journald"
type journald struct {
mu sync.Mutex
vars map[string]string // additional variables and values to send to the journal along with the log message
readers map[*logger.LogWatcher]struct{}
}
func init() {
if err := logger.RegisterLogDriver(name, New); err != nil {
logrus.Fatal(err)
}
if err := logger.RegisterLogOptValidator(name, validateLogOpt); err != nil {
logrus.Fatal(err)
}
}
// sanitizeKeyMode returns the sanitized string so that it could be used in journald.
// In journald log, there are special requirements for fields.
// Fields must be composed of uppercase letters, numbers, and underscores, but must
// not start with an underscore.
func sanitizeKeyMod(s string) string {
n := ""
for _, v := range s {
if 'a' <= v && v <= 'z' {
v = unicode.ToUpper(v)
} else if ('Z' < v || v < 'A') && ('9' < v || v < '0') {
v = '_'
}
// If (n == "" && v == '_'), then we will skip as this is the beginning with '_'
if !(n == "" && v == '_') {
n += string(v)
}
}
return n
}
// New creates a journald logger using the configuration passed in on
// the context.
func New(info logger.Info) (logger.Logger, error) {
if !journal.Enabled() {
return nil, fmt.Errorf("journald is not enabled on this host")
}
// parse log tag
tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
if err != nil {
return nil, err
}
vars := map[string]string{
"CONTAINER_ID": info.ContainerID[:12],
"CONTAINER_ID_FULL": info.ContainerID,
"CONTAINER_NAME": info.Name(),
"CONTAINER_TAG": tag,
"IMAGE_NAME": info.ImageName(),
"SYSLOG_IDENTIFIER": tag,
}
extraAttrs, err := info.ExtraAttributes(sanitizeKeyMod)
if err != nil {
return nil, err
}
for k, v := range extraAttrs {
vars[k] = v
}
return &journald{vars: vars, readers: make(map[*logger.LogWatcher]struct{})}, nil
}
// We don't actually accept any options, but we have to supply a callback for
// the factory to pass the (probably empty) configuration map to.
func validateLogOpt(cfg map[string]string) error {
for key := range cfg {
switch key {
case "labels":
case "env":
case "env-regex":
case "tag":
default:
return fmt.Errorf("unknown log opt '%s' for journald log driver", key)
}
}
return nil
}
func (s *journald) Log(msg *logger.Message) error {
vars := map[string]string{}
for k, v := range s.vars {
vars[k] = v
}
if msg.PLogMetaData != nil && !msg.PLogMetaData.Last {
vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
}
line := string(msg.Line)
source := msg.Source
logger.PutMessage(msg)
if source == "stderr" {
return journal.Send(line, journal.PriErr, vars)
}
return journal.Send(line, journal.PriInfo, vars)
}
func (s *journald) Name() string {
return name
}