mirror of
synced 2022-11-09 12:21:53 -05:00
Managed containerd processes are executed with SysProcAttr.Pdeathsig set to syscall.SIGKILL so that the managed containerd is automatically killed along with the daemon. At least, that is the intention. In practice, the signal is sent to the process when the creating _OS thread_ dies! If a goroutine exits while locked to an OS thread, the Go runtime will terminate the thread. If that thread happens to be the same thread which the subprocess was started from, the subprocess will be signaled. Prevent the journald driver from sometimes unintentionally killing child processes by ensuring that all runtime.LockOSThread() calls are paired with runtime.UnlockOSThread(). Signed-off-by: Cory Snider <csnider@mirantis.com>
459 lines
12 KiB
459 lines
12 KiB
//go:build linux && cgo && !static_build && journald
// +build linux,cgo,!static_build,journald
package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
const closedDrainTimeout = 5 * time.Second
// Fields which we know are not user-provided attribute fields.
var wellKnownFields = map[string]bool{
"MESSAGE": true,
"MESSAGE_ID": true,
"PRIORITY": true,
"CODE_FILE": true,
"CODE_LINE": true,
"CODE_FUNC": true,
"ERRNO": true,
fieldSyslogIdentifier: true,
"SYSLOG_PID": true,
fieldSyslogTimestamp: true,
fieldContainerName: true,
fieldContainerID: true,
fieldContainerIDFull: true,
fieldContainerTag: true,
fieldImageName: true,
fieldPLogID: true,
fieldPLogOrdinal: true,
fieldPLogLast: true,
fieldPartialMessage: true,
fieldLogEpoch: true,
fieldLogOrdinal: true,
type reader struct {
s *journald
j *sdjournal.Journal
logWatcher *logger.LogWatcher
config logger.ReadConfig
maxOrdinal uint64
initialized bool
ready chan struct{}
func getMessage(d map[string]string) (line []byte, ok bool) {
m, ok := d["MESSAGE"]
if ok {
line = []byte(m)
if d[fieldPartialMessage] != "true" {
line = append(line, "\n"...)
return line, ok
func getPriority(d map[string]string) (journal.Priority, bool) {
if pri, ok := d["PRIORITY"]; ok {
i, err := strconv.Atoi(pri)
return journal.Priority(i), err == nil
return -1, false
// getSource recovers the stream name from the entry data by mapping from the
// journal priority field back to the stream that we would have assigned that
// value.
func getSource(d map[string]string) string {
source := ""
if priority, ok := getPriority(d); ok {
if priority == journal.PriErr {
source = "stderr"
} else if priority == journal.PriInfo {
source = "stdout"
return source
func getAttrs(d map[string]string) []backend.LogAttr {
var attrs []backend.LogAttr
for k, v := range d {
if k[0] != '_' && !wellKnownFields[k] {
attrs = append(attrs, backend.LogAttr{Key: k, Value: v})
return attrs
// errDrainDone is the error returned by drainJournal to signal that there are
// no more log entries to send to the log watcher.
var errDrainDone = errors.New("journald drain done")
// drainJournal reads and sends log messages from the journal.
// drainJournal returns errDrainDone when a terminal stopping condition has been
// reached: the watch consumer is gone, a log entry is read which has a
// timestamp after until (if until is nonzero), or the log driver is closed and
// the last message logged has been sent from the journal. If the end of the
// journal is reached without encountering a terminal stopping condition, a nil
// error is returned.
func (r *reader) drainJournal() error {
if !r.initialized {
defer func() {
r.initialized = true
var (
err error
seekedToTail bool
if r.config.Tail >= 0 {
if r.config.Until.IsZero() {
err = r.j.SeekTail()
seekedToTail = true
} else {
err = r.j.SeekRealtime(r.config.Until)
} else {
if r.config.Since.IsZero() {
err = r.j.SeekHead()
} else {
err = r.j.SeekRealtime(r.config.Since)
if err != nil {
return err
// SeekTail() followed by Next() behaves incorrectly, so we need
// to work around the bug by ensuring the first discrete
// movement of the read pointer is Previous() or PreviousSkip().
// PreviousSkip() is called inside the loop when config.Tail > 0
// so the only special case requiring special handling is
// config.Tail == 0.
// https://github.com/systemd/systemd/issues/9934
if seekedToTail && r.config.Tail == 0 {
// Resolve the read pointer to the last entry in the
// journal so that the call to Next() inside the loop
// advances past it.
if ok, err := r.j.Previous(); err != nil || !ok {
return err
for i := 0; ; i++ {
if !r.initialized && i == 0 && r.config.Tail > 0 {
if n, err := r.j.PreviousSkip(uint(r.config.Tail)); err != nil || n == 0 {
return err
} else if ok, err := r.j.Next(); err != nil || !ok {
return err
if !r.initialized && i == 0 {
// The cursor is in a position which will be unaffected
// by subsequent logging.
// Read the entry's timestamp.
timestamp, err := r.j.Realtime()
if err != nil {
return err
// Check if the PreviousSkip went too far back. Check only the
// initial position as we are comparing wall-clock timestamps,
// which may not be monotonic. We don't want to skip over
// messages sent later in time just because the clock moved
// backwards.
if !r.initialized && i == 0 && r.config.Tail > 0 && timestamp.Before(r.config.Since) {
if !r.config.Until.IsZero() && r.config.Until.Before(timestamp) {
return errDrainDone
// Read and send the logged message, if there is one to read.
data, err := r.j.Data()
if err != nil {
return err
if data[fieldLogEpoch] == r.s.epoch {
seq, err := strconv.ParseUint(data[fieldLogOrdinal], 10, 64)
if err == nil && seq > r.maxOrdinal {
r.maxOrdinal = seq
if line, ok := getMessage(data); ok {
// Send the log message, unless the consumer is gone
msg := &logger.Message{
Line: line,
Source: getSource(data),
Timestamp: timestamp.In(time.UTC),
Attrs: getAttrs(data),
// The daemon timestamp will differ from the "trusted"
// timestamp of when the event was received by journald.
// We can efficiently seek around the journal by the
// event timestamp, and the event timestamp is what
// journalctl displays. The daemon timestamp is just an
// application-supplied field with no special
// significance; libsystemd won't help us seek to the
// entry with the closest timestamp.
if sts := data["SYSLOG_TIMESTAMP"]; sts != "" {
if tv, err := time.Parse(time.RFC3339Nano, sts); err == nil {
msg.Timestamp = tv
select {
case <-r.logWatcher.WatchConsumerGone():
return errDrainDone
case r.logWatcher.Msg <- msg:
// Call sd_journal_process() periodically during the processing loop
// to close any opened file descriptors for rotated (deleted) journal files.
if i != 0 && i%1024 == 0 {
if _, err := r.j.Process(); err != nil {
// log a warning but ignore it for now
logrus.WithField("container", r.s.vars[fieldContainerIDFull]).
WithField("error", err).
Warn("journald: error processing journal")
func (r *reader) readJournal() error {
caughtUp := atomic.LoadUint64(&r.s.ordinal)
if err := r.drainJournal(); err != nil {
if err != errDrainDone {
return err
return nil
var drainTimeout <-chan time.Time
if !r.config.Follow {
if r.s.readSyncTimeout == 0 {
return nil
tmr := time.NewTimer(r.s.readSyncTimeout)
defer tmr.Stop()
drainTimeout = tmr.C
for {
status, err := r.j.Wait(250 * time.Millisecond)
if err != nil {
return err
select {
case <-r.logWatcher.WatchConsumerGone():
return nil // won't be able to write anything anymore
case <-drainTimeout:
// Container is gone but we haven't found the end of the
// logs within the timeout. Maybe it was dropped by
// journald, e.g. due to rate-limiting.
return nil
case <-r.s.closed:
// container is gone, drain journal
lastSeq := atomic.LoadUint64(&r.s.ordinal)
if r.maxOrdinal >= lastSeq {
// All caught up with the logger!
return nil
if drainTimeout == nil {
tmr := time.NewTimer(closedDrainTimeout)
defer tmr.Stop()
drainTimeout = tmr.C
// container is still alive
if status == sdjournal.StatusNOP {
// no new data -- keep waiting
err = r.drainJournal()
if err != nil {
if err != errDrainDone {
return err
return nil
if !r.config.Follow && r.s.readSyncTimeout > 0 && r.maxOrdinal >= caughtUp {
return nil
func (r *reader) readLogs() {
defer close(r.logWatcher.Msg)
// Make sure the ready channel is closed in the event of an early
// return.
defer r.signalReady()
// Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html:
// Functions that operate on sd_journal objects are thread
// agnostic — given sd_journal pointer may only be used from one
// specific thread at all times (and it has to be the very same one
// during the entire lifetime of the object), but multiple,
// independent threads may use multiple, independent objects safely.
// sdjournal.Open returns a value which wraps an sd_journal pointer so
// we need to abide by those rules.
defer runtime.UnlockOSThread()
// Get a handle to the journal.
var err error
if r.s.journalReadDir != "" {
r.j, err = sdjournal.OpenDir(r.s.journalReadDir, 0)
} else {
r.j, err = sdjournal.Open(0)
if err != nil {
r.logWatcher.Err <- err
defer r.j.Close()
if r.config.Follow {
// Initialize library inotify watches early
if err := r.j.InitializeInotify(); err != nil {
r.logWatcher.Err <- err
// Remove limits on the size of data items that we'll retrieve.
if err := r.j.SetDataThreshold(0); err != nil {
r.logWatcher.Err <- err
// Add a match to have the library do the searching for us.
if err := r.j.AddMatch(fieldContainerIDFull, r.s.vars[fieldContainerIDFull]); err != nil {
r.logWatcher.Err <- err
if err := r.readJournal(); err != nil {
r.logWatcher.Err <- err
func (r *reader) signalReady() {
select {
case <-r.ready:
func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
r := &reader{
s: s,
logWatcher: logger.NewLogWatcher(),
config: config,
ready: make(chan struct{}),
go r.readLogs()
// Block until the reader is in position to read from the current config
// location to prevent race conditions in tests.
return r.logWatcher
func waitUntilFlushedImpl(s *journald) error {
if s.readSyncTimeout == 0 {
return nil
ordinal := atomic.LoadUint64(&s.ordinal)
if ordinal == 0 {
return nil // No logs were sent; nothing to wait for.
flushed := make(chan error)
go func() {
defer close(flushed)
defer runtime.UnlockOSThread()
var (
j *sdjournal.Journal
err error
if s.journalReadDir != "" {
j, err = sdjournal.OpenDir(s.journalReadDir, 0)
} else {
j, err = sdjournal.Open(0)
if err != nil {
flushed <- err
defer j.Close()
if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil {
flushed <- err
if err := j.AddMatch(fieldLogEpoch, s.epoch); err != nil {
flushed <- err
if err := j.AddMatch(fieldLogOrdinal, strconv.FormatUint(ordinal, 10)); err != nil {
flushed <- err
deadline := time.Now().Add(s.readSyncTimeout)
for time.Now().Before(deadline) {
if ok, err := j.Next(); ok {
// Found it!
} else if err != nil {
flushed <- err
if _, err := j.Wait(100 * time.Millisecond); err != nil {
flushed <- err
logrus.WithField("container", s.vars[fieldContainerIDFull]).
Warn("journald: deadline exceeded waiting for logs to be committed to journal")
return <-flushed
func init() {
waitUntilFlushed = waitUntilFlushedImpl