diff --git a/Dockerfile b/Dockerfile index edb50a73a2..4085767e96 100644 --- a/Dockerfile +++ b/Dockerfile @@ -333,6 +333,7 @@ RUN --mount=type=cache,sharing=locked,id=moby-dev-aptlib,target=/var/lib/apt \ python3-setuptools \ python3-wheel \ sudo \ + systemd-journal-remote \ thin-provisioning-tools \ uidmap \ vim \ diff --git a/daemon/logger/journald/internal/export/export.go b/daemon/logger/journald/internal/export/export.go new file mode 100644 index 0000000000..3de1b5602d --- /dev/null +++ b/daemon/logger/journald/internal/export/export.go @@ -0,0 +1,50 @@ +// Package export implements a serializer for the systemd Journal Export Format +// as documented at https://systemd.io/JOURNAL_EXPORT_FORMATS/ +package export // import "github.com/docker/docker/daemon/logger/journald/internal/export" + +import ( + "encoding/binary" + "fmt" + "io" + "unicode/utf8" +) + +// Returns whether s can be serialized as a field value "as they are" without +// the special binary safe serialization. +func isSerializableAsIs(s string) bool { + if !utf8.ValidString(s) { + return false + } + for _, c := range s { + if c < ' ' && c != '\t' { + return false + } + } + return true +} + +// WriteField writes the field serialized to Journal Export format to w. +// +// The variable name must consist only of uppercase characters, numbers and +// underscores. No validation or sanitization is performed. +func WriteField(w io.Writer, variable, value string) error { + if isSerializableAsIs(value) { + _, err := fmt.Fprintf(w, "%s=%s\n", variable, value) + return err + } + + if _, err := fmt.Fprintln(w, variable); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, uint64(len(value))); err != nil { + return err + } + _, err := fmt.Fprintln(w, value) + return err +} + +// WriteEndOfEntry terminates the journal entry. +func WriteEndOfEntry(w io.Writer) error { + _, err := fmt.Fprintln(w) + return err +} diff --git a/daemon/logger/journald/internal/export/export_test.go b/daemon/logger/journald/internal/export/export_test.go new file mode 100644 index 0000000000..0b7a4b3409 --- /dev/null +++ b/daemon/logger/journald/internal/export/export_test.go @@ -0,0 +1,27 @@ +package export_test + +import ( + "bytes" + "testing" + + "github.com/docker/docker/daemon/logger/journald/internal/export" + "gotest.tools/v3/assert" + "gotest.tools/v3/golden" +) + +func TestExportSerialization(t *testing.T) { + must := func(err error) { t.Helper(); assert.NilError(t, err) } + var buf bytes.Buffer + must(export.WriteField(&buf, "_TRANSPORT", "journal")) + must(export.WriteField(&buf, "MESSAGE", "this is a single-line message.\t🚀")) + must(export.WriteField(&buf, "EMPTY_VALUE", "")) + must(export.WriteField(&buf, "NEWLINE", "\n")) + must(export.WriteEndOfEntry(&buf)) + + must(export.WriteField(&buf, "MESSAGE", "this is a\nmulti line\nmessage")) + must(export.WriteField(&buf, "INVALID_UTF8", "a\x80b")) + must(export.WriteField(&buf, "BINDATA", "\x00\x01\x02\x03")) + must(export.WriteEndOfEntry(&buf)) + + golden.Assert(t, buf.String(), "export-serialization.golden") +} diff --git a/daemon/logger/journald/internal/export/testdata/export-serialization.golden b/daemon/logger/journald/internal/export/testdata/export-serialization.golden new file mode 100644 index 0000000000..4d1fa0eba5 Binary files /dev/null and b/daemon/logger/journald/internal/export/testdata/export-serialization.golden differ diff --git a/daemon/logger/journald/internal/fake/sender.go b/daemon/logger/journald/internal/fake/sender.go new file mode 100644 index 0000000000..0d4bf8e90e --- /dev/null +++ b/daemon/logger/journald/internal/fake/sender.go @@ -0,0 +1,149 @@ +// Package fake implements a journal writer for testing which is decoupled from +// the system's journald. +// +// The systemd project does not have any facilities to support testing of +// journal reader clients (although it has been requested: +// https://github.com/systemd/systemd/issues/14120) so we have to get creative. +// The systemd-journal-remote command reads serialized journal entries in the +// Journal Export Format and writes them to journal files. This format is +// well-documented and straightforward to generate. +package fake // import "github.com/docker/docker/daemon/logger/journald/internal/fake" + +import ( + "bytes" + "errors" + "fmt" + "os" + "os/exec" + "regexp" + "strconv" + "testing" + "time" + + "code.cloudfoundry.org/clock" + "github.com/coreos/go-systemd/v22/journal" + "gotest.tools/v3/assert" + + "github.com/docker/docker/daemon/logger/journald/internal/export" +) + +// The systemd-journal-remote command is not conventionally installed on $PATH. +// The manpage from upstream systemd lists the command as +// /usr/lib/systemd/systemd-journal-remote, but Debian installs it to +// /lib/systemd instead. +var cmdPaths = []string{ + "/usr/lib/systemd/systemd-journal-remote", + "/lib/systemd/systemd-journal-remote", + "systemd-journal-remote", // Check $PATH anyway, just in case. +} + +// ErrCommandNotFound is returned when the systemd-journal-remote command could +// not be located at the well-known paths or $PATH. +var ErrCommandNotFound = errors.New("systemd-journal-remote command not found") + +// JournalRemoteCmdPath searches for the systemd-journal-remote command in +// well-known paths and the directories named in the $PATH environment variable. +func JournalRemoteCmdPath() (string, error) { + for _, p := range cmdPaths { + if path, err := exec.LookPath(p); err == nil { + return path, nil + } + } + return "", ErrCommandNotFound +} + +// Sender fakes github.com/coreos/go-systemd/v22/journal.Send, writing journal +// entries to an arbitrary journal file without depending on a running journald +// process. +type Sender struct { + CmdName string + OutputPath string + + // Clock for timestamping sent messages. + Clock clock.Clock + // Whether to assign the event's realtime timestamp to the time + // specified by the SYSLOG_TIMESTAMP variable value. This is roughly + // analogous to journald receiving the event and assigning it a + // timestamp in zero time after the SYSLOG_TIMESTAMP value was set, + // which is higly unrealistic in practice. + AssignEventTimestampFromSyslogTimestamp bool +} + +// New constructs a new Sender which will write journal entries to outpath. The +// file name must end in '.journal' and the directory must already exist. The +// journal file will be created if it does not exist. An existing journal file +// will be appended to. +func New(outpath string) (*Sender, error) { + p, err := JournalRemoteCmdPath() + if err != nil { + return nil, err + } + sender := &Sender{ + CmdName: p, + OutputPath: outpath, + Clock: clock.NewClock(), + } + return sender, nil +} + +// NewT is like New but will skip the test if the systemd-journal-remote command +// is not available. +func NewT(t *testing.T, outpath string) *Sender { + t.Helper() + s, err := New(outpath) + if errors.Is(err, ErrCommandNotFound) { + t.Skip(err) + } + assert.NilError(t, err) + return s +} + +var validVarName = regexp.MustCompile("^[A-Z0-9][A-Z0-9_]*$") + +// Send is a drop-in replacement for +// github.com/coreos/go-systemd/v22/journal.Send. +func (s *Sender) Send(message string, priority journal.Priority, vars map[string]string) error { + var buf bytes.Buffer + // https://systemd.io/JOURNAL_EXPORT_FORMATS/ says "if you are + // generating this format you shouldn’t care about these special + // double-underscore fields," yet systemd-journal-remote treats entries + // without a __REALTIME_TIMESTAMP as invalid and discards them. + // Reported upstream: https://github.com/systemd/systemd/issues/22411 + var ts time.Time + if sts := vars["SYSLOG_TIMESTAMP"]; s.AssignEventTimestampFromSyslogTimestamp && sts != "" { + var err error + if ts, err = time.Parse(time.RFC3339Nano, sts); err != nil { + return fmt.Errorf("fake: error parsing SYSLOG_TIMESTAMP value %q: %w", ts, err) + } + } else { + ts = s.Clock.Now() + } + if err := export.WriteField(&buf, "__REALTIME_TIMESTAMP", strconv.FormatInt(ts.UnixMicro(), 10)); err != nil { + return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err) + } + if err := export.WriteField(&buf, "MESSAGE", message); err != nil { + return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err) + } + if err := export.WriteField(&buf, "PRIORITY", strconv.Itoa(int(priority))); err != nil { + return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err) + } + for k, v := range vars { + if !validVarName.MatchString(k) { + return fmt.Errorf("fake: invalid journal-entry variable name %q", k) + } + if err := export.WriteField(&buf, k, v); err != nil { + return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err) + } + } + if err := export.WriteEndOfEntry(&buf); err != nil { + return fmt.Errorf("fake: error writing entry to systemd-journal-remote: %w", err) + } + + // Invoke the command separately for each entry to ensure that the entry + // has been flushed to disk when Send returns. + cmd := exec.Command(s.CmdName, "--output", s.OutputPath, "-") + cmd.Stdin = &buf + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} diff --git a/daemon/logger/journald/internal/sdjournal/cursor.go b/daemon/logger/journald/internal/sdjournal/cursor.go deleted file mode 100644 index 52cc21c7c5..0000000000 --- a/daemon/logger/journald/internal/sdjournal/cursor.go +++ /dev/null @@ -1,45 +0,0 @@ -//go:build linux && cgo && !static_build && journald -// +build linux,cgo,!static_build,journald - -package sdjournal // import "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" - -// #include -import "C" -import ( - "runtime" - "unsafe" -) - -// Cursor is a reference to a journal cursor. A Cursor must not be copied. -type Cursor struct { - c *C.char - noCopy noCopy //nolint:structcheck,unused // Exists only to mark values uncopyable for `go vet`. -} - -func wrapCursor(cur *C.char) *Cursor { - c := &Cursor{c: cur} - runtime.SetFinalizer(c, (*Cursor).Free) - return c -} - -func (c *Cursor) String() string { - if c.c == nil { - return "" - } - return C.GoString(c.c) -} - -// Free invalidates the cursor and frees any associated resources on the C heap. -func (c *Cursor) Free() { - if c == nil { - return - } - C.free(unsafe.Pointer(c.c)) - runtime.SetFinalizer(c, nil) - c.c = nil -} - -type noCopy struct{} - -func (*noCopy) Lock() {} -func (*noCopy) Unlock() {} diff --git a/daemon/logger/journald/internal/sdjournal/sdjournal.go b/daemon/logger/journald/internal/sdjournal/sdjournal.go index 651d51ac86..af2b532670 100644 --- a/daemon/logger/journald/internal/sdjournal/sdjournal.go +++ b/daemon/logger/journald/internal/sdjournal/sdjournal.go @@ -56,6 +56,23 @@ func Open(flags int) (*Journal, error) { return j, nil } +// OpenDir opens the journal files at the specified absolute directory path for +// reading. +// +// The returned Journal value may only be used from the same operating system +// thread which Open was called from. Using it from only a single goroutine is +// not sufficient; runtime.LockOSThread must also be used. +func OpenDir(path string, flags int) (*Journal, error) { + j := &Journal{} + cpath := C.CString(path) + defer C.free(unsafe.Pointer(cpath)) + if rc := C.sd_journal_open_directory(&j.j, cpath, C.int(flags)); rc != 0 { + return nil, fmt.Errorf("journald: error opening journal: %w", syscall.Errno(-rc)) + } + runtime.SetFinalizer(j, (*Journal).Close) + return j, nil +} + // Close closes the journal. The return value is always nil. func (j *Journal) Close() error { if j.j != nil { @@ -105,10 +122,27 @@ func (j *Journal) Next() (bool, error) { return rc > 0, nil } -// PreviousSkip sets back the read pointer by n entries. The number of entries -// must be less than or equal to 2147483647 (2**31 - 1). -func (j *Journal) PreviousSkip(n uint) (int, error) { - rc := C.sd_journal_previous_skip(j.j, C.uint64_t(n)) +// Previous sets back the read pointer to the previous entry. +func (j *Journal) Previous() (bool, error) { + rc := C.sd_journal_previous(j.j) + if rc < 0 { + return false, fmt.Errorf("journald: error setting back read pointer: %w", syscall.Errno(-rc)) + } + return rc > 0, nil +} + +// PreviousSkip sets back the read pointer by skip entries, returning the number +// of entries set back. skip must be less than or equal to 2147483647 +// (2**31 - 1). +// +// skip == 0 is a special case: PreviousSkip(0) resolves the read pointer to a +// discrete position without setting it back to a different entry. The trouble +// is, it always returns zero on recent libsystemd versions. There is no way to +// tell from the return values whether or not it successfully resolved the read +// pointer to a discrete entry. +// https://github.com/systemd/systemd/pull/5930#issuecomment-300878104 +func (j *Journal) PreviousSkip(skip uint) (int, error) { + rc := C.sd_journal_previous_skip(j.j, C.uint64_t(skip)) if rc < 0 { return 0, fmt.Errorf("journald: error setting back read pointer: %w", syscall.Errno(-rc)) } @@ -116,6 +150,9 @@ func (j *Journal) PreviousSkip(n uint) (int, error) { } // SeekHead sets the read pointer to the position before the oldest available entry. +// +// BUG: SeekHead() followed by Previous() has unexpected behavior. +// https://github.com/systemd/systemd/issues/17662 func (j *Journal) SeekHead() error { if rc := C.sd_journal_seek_head(j.j); rc != 0 { return fmt.Errorf("journald: error seeking to head of journal: %w", syscall.Errno(-rc)) @@ -124,6 +161,9 @@ func (j *Journal) SeekHead() error { } // SeekTail sets the read pointer to the position after the most recent available entry. +// +// BUG: SeekTail() followed by Next() has unexpected behavior. +// https://github.com/systemd/systemd/issues/9934 func (j *Journal) SeekTail() error { if rc := C.sd_journal_seek_tail(j.j); rc != 0 { return fmt.Errorf("journald: error seeking to tail of journal: %w", syscall.Errno(-rc)) @@ -142,24 +182,6 @@ func (j *Journal) SeekRealtime(t time.Time) error { return nil } -// Cursor returns a serialization of the journal read pointer's current position. -func (j *Journal) Cursor() (*Cursor, error) { - var c *C.char - if rc := C.sd_journal_get_cursor(j.j, &c); rc != 0 { - return nil, fmt.Errorf("journald: error getting cursor: %w", syscall.Errno(-rc)) - } - return wrapCursor(c), nil -} - -// TestCursor checks whether the current position of the journal read pointer matches c. -func (j *Journal) TestCursor(c *Cursor) (bool, error) { - rc := C.sd_journal_test_cursor(j.j, c.c) - if rc < 0 { - return false, fmt.Errorf("journald: error testing cursor: %w", syscall.Errno(-rc)) - } - return rc > 0, nil -} - // Wait blocks until the journal gets changed or timeout has elapsed. // Pass a negative timeout to wait indefinitely. func (j *Journal) Wait(timeout time.Duration) (Status, error) { @@ -235,3 +257,8 @@ func (j *Journal) SetDataThreshold(v uint) error { } return nil } + +type noCopy struct{} + +func (noCopy) Lock() {} +func (noCopy) Unlock() {} diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 61f294ba97..311a87d488 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -6,6 +6,7 @@ package journald // import "github.com/docker/docker/daemon/logger/journald" import ( "fmt" "strconv" + "time" "unicode" "github.com/coreos/go-systemd/v22/journal" @@ -15,10 +16,38 @@ import ( const name = "journald" +// Well-known user journal fields. +// https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html +const ( + fieldSyslogIdentifier = "SYSLOG_IDENTIFIER" + fieldSyslogTimestamp = "SYSLOG_TIMESTAMP" +) + +// User journal fields used by the log driver. +const ( + fieldContainerID = "CONTAINER_ID" + fieldContainerIDFull = "CONTAINER_ID_FULL" + fieldContainerName = "CONTAINER_NAME" + fieldContainerTag = "CONTAINER_TAG" + fieldImageName = "IMAGE_NAME" + + // Fields used to serialize PLogMetaData. + + fieldPLogID = "CONTAINER_PARTIAL_ID" + fieldPLogOrdinal = "CONTAINER_PARTIAL_ORDINAL" + fieldPLogLast = "CONTAINER_PARTIAL_LAST" + fieldPartialMessage = "CONTAINER_PARTIAL_MESSAGE" +) + type journald struct { vars map[string]string // additional variables and values to send to the journal along with the log message closed chan struct{} + + // Overrides for unit tests. + + sendToJournal func(message string, priority journal.Priority, vars map[string]string) error + journalReadDir string //nolint:structcheck,unused // Referenced in read.go, which has more restrictive build constraints. } func init() { @@ -57,6 +86,10 @@ func New(info logger.Info) (logger.Logger, error) { return nil, fmt.Errorf("journald is not enabled on this host") } + return new(info) +} + +func new(info logger.Info) (*journald, error) { // parse log tag tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate) if err != nil { @@ -64,12 +97,12 @@ func New(info logger.Info) (logger.Logger, error) { } vars := map[string]string{ - "CONTAINER_ID": info.ContainerID[:12], - "CONTAINER_ID_FULL": info.ContainerID, - "CONTAINER_NAME": info.Name(), - "CONTAINER_TAG": tag, - "IMAGE_NAME": info.ImageName(), - "SYSLOG_IDENTIFIER": tag, + fieldContainerID: info.ContainerID[:12], + fieldContainerIDFull: info.ContainerID, + fieldContainerName: info.Name(), + fieldContainerTag: tag, + fieldImageName: info.ImageName(), + fieldSyslogIdentifier: tag, } extraAttrs, err := info.ExtraAttributes(sanitizeKeyMod) if err != nil { @@ -78,7 +111,7 @@ func New(info logger.Info) (logger.Logger, error) { for k, v := range extraAttrs { vars[k] = v } - return &journald{vars: vars, closed: make(chan struct{})}, nil + return &journald{vars: vars, closed: make(chan struct{}), sendToJournal: journal.Send}, nil } // We don't actually accept any options, but we have to supply a callback for @@ -103,12 +136,15 @@ func (s *journald) Log(msg *logger.Message) error { for k, v := range s.vars { vars[k] = v } + if !msg.Timestamp.IsZero() { + vars[fieldSyslogTimestamp] = msg.Timestamp.Format(time.RFC3339Nano) + } if msg.PLogMetaData != nil { - vars["CONTAINER_PARTIAL_ID"] = msg.PLogMetaData.ID - vars["CONTAINER_PARTIAL_ORDINAL"] = strconv.Itoa(msg.PLogMetaData.Ordinal) - vars["CONTAINER_PARTIAL_LAST"] = strconv.FormatBool(msg.PLogMetaData.Last) + vars[fieldPLogID] = msg.PLogMetaData.ID + vars[fieldPLogOrdinal] = strconv.Itoa(msg.PLogMetaData.Ordinal) + vars[fieldPLogLast] = strconv.FormatBool(msg.PLogMetaData.Last) if !msg.PLogMetaData.Last { - vars["CONTAINER_PARTIAL_MESSAGE"] = "true" + vars[fieldPartialMessage] = "true" } } @@ -117,9 +153,9 @@ func (s *journald) Log(msg *logger.Message) error { logger.PutMessage(msg) if source == "stderr" { - return journal.Send(line, journal.PriErr, vars) + return s.sendToJournal(line, journal.PriErr, vars) } - return journal.Send(line, journal.PriInfo, vars) + return s.sendToJournal(line, journal.PriInfo, vars) } func (s *journald) Name() string { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 01f3ec18d8..f15c4ebc7a 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -17,26 +17,39 @@ import ( "github.com/docker/docker/daemon/logger/journald/internal/sdjournal" ) +// Fields which we know are not user-provided attribute fields. var wellKnownFields = map[string]bool{ - "MESSAGE": true, - "MESSAGE_ID": true, - "PRIORITY": true, - "CODE_FILE": true, - "CODE_LINE": true, - "CODE_FUNC": true, - "ERRNO": true, - "SYSLOG_FACILITY": true, - "SYSLOG_IDENTIFIER": true, - "SYSLOG_PID": true, - "CONTAINER_NAME": true, - "CONTAINER_ID": true, - "CONTAINER_ID_FULL": true, - "CONTAINER_TAG": true, + "MESSAGE": true, + "MESSAGE_ID": true, + "PRIORITY": true, + "CODE_FILE": true, + "CODE_LINE": true, + "CODE_FUNC": true, + "ERRNO": true, + "SYSLOG_FACILITY": true, + fieldSyslogIdentifier: true, + "SYSLOG_PID": true, + fieldSyslogTimestamp: true, + fieldContainerName: true, + fieldContainerID: true, + fieldContainerIDFull: true, + fieldContainerTag: true, + fieldImageName: true, + fieldPLogID: true, + fieldPLogOrdinal: true, + fieldPLogLast: true, + fieldPartialMessage: true, } -func getMessage(d map[string]string) (line []byte, partial, ok bool) { +func getMessage(d map[string]string) (line []byte, ok bool) { m, ok := d["MESSAGE"] - return []byte(m), d["CONTAINER_PARTIAL_MESSAGE"] == "true", ok + if ok { + line = []byte(m) + if d[fieldPartialMessage] != "true" { + line = append(line, "\n"...) + } + } + return line, ok } func getPriority(d map[string]string) (journal.Priority, bool) { @@ -47,106 +60,193 @@ func getPriority(d map[string]string) (journal.Priority, bool) { return -1, false } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, oldCursor *sdjournal.Cursor, until time.Time) (*sdjournal.Cursor, bool, int) { - var ( - done bool - shown int - ) +// getSource recovers the stream name from the entry data by mapping from the +// journal priority field back to the stream that we would have assigned that +// value. +func getSource(d map[string]string) string { + source := "" + if priority, ok := getPriority(d); ok { + if priority == journal.PriErr { + source = "stderr" + } else if priority == journal.PriInfo { + source = "stdout" + } + } + return source +} - // Walk the journal from here forward until we run out of new entries - // or we reach the until value (if provided). -drain: - for ok := true; ok; ok, _ = j.Next() { - // Try not to send a given entry twice. - if oldCursor != nil { - if ok, _ := j.TestCursor(oldCursor); ok { - if ok, _ := j.Next(); !ok { - break drain - } +func getAttrs(d map[string]string) []backend.LogAttr { + var attrs []backend.LogAttr + for k, v := range d { + if k[0] != '_' && !wellKnownFields[k] { + attrs = append(attrs, backend.LogAttr{Key: k, Value: v}) + } + } + return attrs +} + +// errDrainDone is the error returned by drainJournal to signal that there are +// no more log entries to send to the log watcher. +var errDrainDone = errors.New("journald drain done") + +// drainJournal reads and sends log messages from the journal. It returns the +// number of log messages sent and any error encountered. When initial != nil +// it initializes the journal read position to the position specified by config +// before reading. Otherwise it continues to read from the current position. +// +// drainJournal returns err == errDrainDone when a terminal stopping condition +// has been reached: either the watch consumer is gone or a log entry is read +// which has a timestamp after until (if until is nonzero). If the end of the +// journal is reached without encountering a terminal stopping condition, +// err == nil is returned. +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, initial chan struct{}) (int, error) { + if initial != nil { + defer func() { + if initial != nil { + close(initial) + } + }() + + var ( + err error + seekedToTail bool + ) + if config.Tail >= 0 { + if config.Until.IsZero() { + err = j.SeekTail() + seekedToTail = true + } else { + err = j.SeekRealtime(config.Until) + } + } else { + if config.Since.IsZero() { + err = j.SeekHead() + } else { + err = j.SeekRealtime(config.Since) } } - // Read and send the logged message, if there is one to read. - data, err := j.Data() - if errors.Is(err, sdjournal.ErrInvalidReadPointer) { - continue + if err != nil { + return 0, err } - if line, partial, ok := getMessage(data); ok { - // Read the entry's timestamp. - timestamp, err := j.Realtime() - if err != nil { - break - } - // Break if the timestamp exceeds any provided until flag. - if !until.IsZero() && until.Before(timestamp) { - done = true - break - } - // Set up the text of the entry. - if !partial { - line = append(line, "\n"...) - } - // Recover the stream name by mapping - // from the journal priority back to - // the stream that we would have - // assigned that value. - source := "" - if priority, ok := getPriority(data); ok { - if priority == journal.PriErr { - source = "stderr" - } else if priority == journal.PriInfo { - source = "stdout" - } - } - // Retrieve the values of any variables we're adding to the journal. - var attrs []backend.LogAttr - for k, v := range data { - if k[0] != '_' && !wellKnownFields[k] { - attrs = append(attrs, backend.LogAttr{Key: k, Value: v}) - } - } - - // Send the log message, unless the consumer is gone - select { - case <-logWatcher.WatchConsumerGone(): - done = true // we won't be able to write anything anymore - break drain - case logWatcher.Msg <- &logger.Message{ - Line: line, - Source: source, - Timestamp: timestamp.In(time.UTC), - Attrs: attrs, - }: - shown++ - } - // Call sd_journal_process() periodically during the processing loop - // to close any opened file descriptors for rotated (deleted) journal files. - if shown%1024 == 0 { - if _, err := j.Process(); err != nil { - // log a warning but ignore it for now - logrus.WithField("container", s.vars["CONTAINER_ID_FULL"]). - WithField("error", err). - Warn("journald: error processing journal") - } + // SeekTail() followed by Next() behaves incorrectly, so we need + // to work around the bug by ensuring the first discrete + // movement of the read pointer is Previous() or PreviousSkip(). + // PreviousSkip() is called inside the loop when config.Tail > 0 + // so the only special case requiring special handling is + // config.Tail == 0. + // https://github.com/systemd/systemd/issues/9934 + if seekedToTail && config.Tail == 0 { + // Resolve the read pointer to the last entry in the + // journal so that the call to Next() inside the loop + // advances past it. + if ok, err := j.Previous(); err != nil || !ok { + return 0, err } } } - cursor, _ := j.Cursor() - return cursor, done, shown + var sent int + for i := 0; ; i++ { + if initial != nil && i == 0 && config.Tail > 0 { + if n, err := j.PreviousSkip(uint(config.Tail)); err != nil || n == 0 { + return sent, err + } + } else if ok, err := j.Next(); err != nil || !ok { + return sent, err + } + + if initial != nil && i == 0 { + // The cursor is in position. Signal that the watcher is + // initialized. + close(initial) + initial = nil // Prevent double-closing. + } + + // Read the entry's timestamp. + timestamp, err := j.Realtime() + if err != nil { + return sent, err + } + if timestamp.Before(config.Since) { + if initial != nil && i == 0 && config.Tail > 0 { + // PreviousSkip went too far back. Seek forwards. + j.SeekRealtime(config.Since) + } + continue + } + if !config.Until.IsZero() && config.Until.Before(timestamp) { + return sent, errDrainDone + } + + // Read and send the logged message, if there is one to read. + data, err := j.Data() + if err != nil { + return sent, err + } + if line, ok := getMessage(data); ok { + // Send the log message, unless the consumer is gone + msg := &logger.Message{ + Line: line, + Source: getSource(data), + Timestamp: timestamp.In(time.UTC), + Attrs: getAttrs(data), + } + // The daemon timestamp will differ from the "trusted" + // timestamp of when the event was received by journald. + // We can efficiently seek around the journal by the + // event timestamp, and the event timestamp is what + // journalctl displays. The daemon timestamp is just an + // application-supplied field with no special + // significance; libsystemd won't help us seek to the + // entry with the closest timestamp. + /* + if sts := data["SYSLOG_TIMESTAMP"]; sts != "" { + if tv, err := time.Parse(time.RFC3339Nano, sts); err == nil { + msg.Timestamp = tv + } + } + */ + select { + case <-logWatcher.WatchConsumerGone(): + return sent, errDrainDone + case logWatcher.Msg <- msg: + sent++ + } + } + + // Call sd_journal_process() periodically during the processing loop + // to close any opened file descriptors for rotated (deleted) journal files. + if i != 0 && i%1024 == 0 { + if _, err := j.Process(); err != nil { + // log a warning but ignore it for now + logrus.WithField("container", s.vars[fieldContainerIDFull]). + WithField("error", err). + Warn("journald: error processing journal") + } + } + } } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, cursor *sdjournal.Cursor, until time.Time) *sdjournal.Cursor { -LOOP: +func (s *journald) readJournal(logWatcher *logger.LogWatcher, j *sdjournal.Journal, config logger.ReadConfig, ready chan struct{}) error { + if _, err := s.drainJournal(logWatcher, j, config, ready /* initial */); err != nil { + if err != errDrainDone { + return err + } + return nil + } + if !config.Follow { + return nil + } + for { status, err := j.Wait(250 * time.Millisecond) if err != nil { - logWatcher.Err <- err - break + return err } select { case <-logWatcher.WatchConsumerGone(): - break LOOP // won't be able to write anything anymore + return nil // won't be able to write anything anymore case <-s.closed: // container is gone, drain journal default: @@ -156,20 +256,31 @@ LOOP: continue } } - newCursor, done, recv := s.drainJournal(logWatcher, j, cursor, until) - cursor.Free() - cursor = newCursor - if done || (status == sdjournal.StatusNOP && recv == 0) { - break + n, err := s.drainJournal(logWatcher, j, config, nil /* initial */) + if err != nil { + if err != errDrainDone { + return err + } + return nil + } else if status == sdjournal.StatusNOP && n == 0 { + return nil } } - - return cursor } -func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { +func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig, ready chan struct{}) { defer close(logWatcher.Msg) + // Make sure the ready channel is closed in the event of an early + // return. + defer func() { + select { + case <-ready: + default: + close(ready) + } + }() + // Quoting https://www.freedesktop.org/software/systemd/man/sd-journal.html: // Functions that operate on sd_journal objects are thread // agnostic — given sd_journal pointer may only be used from one @@ -183,7 +294,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon defer runtime.UnlockOSThread() // Get a handle to the journal. - j, err := sdjournal.Open(0) + var ( + j *sdjournal.Journal + err error + ) + if s.journalReadDir != "" { + j, err = sdjournal.OpenDir(s.journalReadDir, 0) + } else { + j, err = sdjournal.Open(0) + } if err != nil { logWatcher.Err <- err return @@ -204,61 +323,23 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon return } // Add a match to have the library do the searching for us. - if err := j.AddMatch("CONTAINER_ID_FULL", s.vars["CONTAINER_ID_FULL"]); err != nil { + if err := j.AddMatch(fieldContainerIDFull, s.vars[fieldContainerIDFull]); err != nil { logWatcher.Err <- err return } - if config.Tail >= 0 { - // If until time provided, start from there. - // Otherwise start at the end of the journal. - if !config.Until.IsZero() { - if err := j.SeekRealtime(config.Until); err != nil { - logWatcher.Err <- err - return - } - } else if err := j.SeekTail(); err != nil { - logWatcher.Err <- err - return - } - // (Try to) skip backwards by the requested number of lines... - if _, err := j.PreviousSkip(uint(config.Tail)); err == nil { - // ...but not before "since" - if !config.Since.IsZero() { - if stamp, err := j.Realtime(); err == nil && stamp.Before(config.Since) { - _ = j.SeekRealtime(config.Since) - } - } - } - } else { - // Start at the beginning of the journal. - if err := j.SeekHead(); err != nil { - logWatcher.Err <- err - return - } - // If we have a cutoff date, fast-forward to it. - if !config.Since.IsZero() { - if err := j.SeekRealtime(config.Since); err != nil { - logWatcher.Err <- err - return - } - } - if _, err := j.Next(); err != nil { - logWatcher.Err <- err - return - } + + if err := s.readJournal(logWatcher, j, config, ready); err != nil { + logWatcher.Err <- err + return } - var cursor *sdjournal.Cursor - if config.Tail != 0 { // special case for --tail 0 - cursor, _, _ = s.drainJournal(logWatcher, j, nil, config.Until) - } - if config.Follow { - cursor = s.followJournal(logWatcher, j, cursor, config.Until) - } - cursor.Free() } func (s *journald) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { logWatcher := logger.NewLogWatcher() - go s.readLogs(logWatcher, config) + ready := make(chan struct{}) + go s.readLogs(logWatcher, config, ready) + // Block until the reader is in position to read from the current config + // location to prevent race conditions in tests. + <-ready return logWatcher } diff --git a/daemon/logger/journald/read_test.go b/daemon/logger/journald/read_test.go new file mode 100644 index 0000000000..fd61322ece --- /dev/null +++ b/daemon/logger/journald/read_test.go @@ -0,0 +1,60 @@ +//go:build linux && cgo && !static_build && journald +// +build linux,cgo,!static_build,journald + +package journald // import "github.com/docker/docker/daemon/logger/journald" + +import ( + "testing" + "time" + + "github.com/coreos/go-systemd/v22/journal" + "gotest.tools/v3/assert" + + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/daemon/logger/journald/internal/fake" + "github.com/docker/docker/daemon/logger/loggertest" +) + +func TestLogRead(t *testing.T) { + r := loggertest.Reader{ + Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger { + journalDir := t.TempDir() + + // Fill the journal with irrelevant events which the + // LogReader needs to filter out. + rotatedJournal := fake.NewT(t, journalDir+"/rotated.journal") + rotatedJournal.AssignEventTimestampFromSyslogTimestamp = true + l, err := new(logger.Info{ + ContainerID: "wrongone0001", + ContainerName: "fake", + }) + assert.NilError(t, err) + l.sendToJournal = rotatedJournal.Send + assert.NilError(t, l.Log(&logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stdout of a different container in a rotated journal file")})) + assert.NilError(t, l.Log(&logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stderr of a different container in a rotated journal file")})) + assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in a rotated journal", journal.PriInfo, nil)) + + activeJournal := fake.NewT(t, journalDir+"/fake.journal") + activeJournal.AssignEventTimestampFromSyslogTimestamp = true + l, err = new(logger.Info{ + ContainerID: "wrongone0002", + ContainerName: "fake", + }) + assert.NilError(t, err) + l.sendToJournal = activeJournal.Send + assert.NilError(t, l.Log(&logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stdout of a different container in the active journal file")})) + assert.NilError(t, l.Log(&logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stderr of a different container in the active journal file")})) + assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in the active journal", journal.PriInfo, nil)) + + return func(t *testing.T) logger.Logger { + l, err := new(info) + assert.NilError(t, err) + l.journalReadDir = journalDir + l.sendToJournal = activeJournal.Send + return l + } + }, + } + t.Run("Tail", r.TestTail) + t.Run("Follow", r.TestFollow) +} diff --git a/daemon/logger/loggertest/logreader.go b/daemon/logger/loggertest/logreader.go index c77eeeb95b..df5cf5ed62 100644 --- a/daemon/logger/loggertest/logreader.go +++ b/daemon/logger/loggertest/logreader.go @@ -10,6 +10,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "gotest.tools/v3/assert" + "gotest.tools/v3/assert/opt" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/daemon/logger" @@ -25,6 +26,9 @@ type Reader struct { } var compareLog cmp.Options = []cmp.Option{ + // Not all log drivers can round-trip timestamps at full nanosecond + // precision. + opt.TimeWithThreshold(time.Millisecond), // The json-log driver does not round-trip PLogMetaData and API users do // not expect it. cmpopts.IgnoreFields(logger.Message{}, "PLogMetaData"), diff --git a/hack/test/unit b/hack/test/unit index 454e99291f..89666c9218 100755 --- a/hack/test/unit +++ b/hack/test/unit @@ -12,7 +12,7 @@ # set -eux -o pipefail -BUILDFLAGS=(-tags 'netgo libdm_no_deferred_remove') +BUILDFLAGS=(-tags 'netgo libdm_no_deferred_remove journald') TESTFLAGS+=" -test.timeout=${TIMEOUT:-5m}" TESTDIRS="${TESTDIRS:-./...}" exclude_paths='/vendor/|/integration' diff --git a/vendor.mod b/vendor.mod index 945c8447d0..a7ac76ea34 100644 --- a/vendor.mod +++ b/vendor.mod @@ -9,6 +9,7 @@ go 1.17 require ( cloud.google.com/go v0.93.3 cloud.google.com/go/logging v1.4.2 + code.cloudfoundry.org/clock v1.0.0 github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 github.com/Graylog2/go-gelf v0.0.0-20191017102106-1550ee647df0 github.com/Microsoft/go-winio v0.5.2 @@ -87,7 +88,6 @@ require ( ) require ( - code.cloudfoundry.org/clock v1.0.0 // indirect github.com/agext/levenshtein v1.2.3 // indirect github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect diff --git a/vendor/gotest.tools/v3/assert/opt/opt.go b/vendor/gotest.tools/v3/assert/opt/opt.go new file mode 100644 index 0000000000..357cdf2eba --- /dev/null +++ b/vendor/gotest.tools/v3/assert/opt/opt.go @@ -0,0 +1,118 @@ +/*Package opt provides common go-cmp.Options for use with assert.DeepEqual. + */ +package opt // import "gotest.tools/v3/assert/opt" + +import ( + "fmt" + "reflect" + "strings" + "time" + + gocmp "github.com/google/go-cmp/cmp" +) + +// DurationWithThreshold returns a gocmp.Comparer for comparing time.Duration. The +// Comparer returns true if the difference between the two Duration values is +// within the threshold and neither value is zero. +func DurationWithThreshold(threshold time.Duration) gocmp.Option { + return gocmp.Comparer(cmpDuration(threshold)) +} + +func cmpDuration(threshold time.Duration) func(x, y time.Duration) bool { + return func(x, y time.Duration) bool { + if x == 0 || y == 0 { + return false + } + delta := x - y + return delta <= threshold && delta >= -threshold + } +} + +// TimeWithThreshold returns a gocmp.Comparer for comparing time.Time. The +// Comparer returns true if the difference between the two Time values is +// within the threshold and neither value is zero. +func TimeWithThreshold(threshold time.Duration) gocmp.Option { + return gocmp.Comparer(cmpTime(threshold)) +} + +func cmpTime(threshold time.Duration) func(x, y time.Time) bool { + return func(x, y time.Time) bool { + if x.IsZero() || y.IsZero() { + return false + } + delta := x.Sub(y) + return delta <= threshold && delta >= -threshold + } +} + +// PathString is a gocmp.FilterPath filter that returns true when path.String() +// matches any of the specs. +// +// The path spec is a dot separated string where each segment is a field name. +// Slices, Arrays, and Maps are always matched against every element in the +// sequence. gocmp.Indirect, gocmp.Transform, and gocmp.TypeAssertion are always +// ignored. +// +// Note: this path filter is not type safe. Incorrect paths will be silently +// ignored. Consider using a type safe path filter for more complex paths. +func PathString(specs ...string) func(path gocmp.Path) bool { + return func(path gocmp.Path) bool { + for _, spec := range specs { + if path.String() == spec { + return true + } + } + return false + } +} + +// PathDebug is a gocmp.FilerPath filter that always returns false. It prints +// each path it receives. It can be used to debug path matching problems. +func PathDebug(path gocmp.Path) bool { + fmt.Printf("PATH string=%s gostring=%s\n", path, path.GoString()) + for _, step := range path { + fmt.Printf(" STEP %s\ttype=%s\t%s\n", + formatStepType(step), step.Type(), stepTypeFields(step)) + } + return false +} + +func formatStepType(step gocmp.PathStep) string { + return strings.Title(strings.TrimPrefix(reflect.TypeOf(step).String(), "*cmp.")) +} + +func stepTypeFields(step gocmp.PathStep) string { + switch typed := step.(type) { + case gocmp.StructField: + return fmt.Sprintf("name=%s", typed.Name()) + case gocmp.MapIndex: + return fmt.Sprintf("key=%s", typed.Key().Interface()) + case gocmp.Transform: + return fmt.Sprintf("name=%s", typed.Name()) + case gocmp.SliceIndex: + return fmt.Sprintf("name=%d", typed.Key()) + } + return "" +} + +// PathField is a gocmp.FilerPath filter that matches a struct field by name. +// PathField will match every instance of the field in a recursive or nested +// structure. +func PathField(structType interface{}, field string) func(gocmp.Path) bool { + typ := reflect.TypeOf(structType) + if typ.Kind() != reflect.Struct { + panic(fmt.Sprintf("type %s is not a struct", typ)) + } + if _, ok := typ.FieldByName(field); !ok { + panic(fmt.Sprintf("type %s does not have field %s", typ, field)) + } + + return func(path gocmp.Path) bool { + return path.Index(-2).Type() == typ && isStructField(path.Index(-1), field) + } +} + +func isStructField(step gocmp.PathStep, name string) bool { + field, ok := step.(gocmp.StructField) + return ok && field.Name() == name +} diff --git a/vendor/gotest.tools/v3/golden/golden.go b/vendor/gotest.tools/v3/golden/golden.go new file mode 100644 index 0000000000..750e6c5dd5 --- /dev/null +++ b/vendor/gotest.tools/v3/golden/golden.go @@ -0,0 +1,187 @@ +/*Package golden provides tools for comparing large mutli-line strings. + +Golden files are files in the ./testdata/ subdirectory of the package under test. +Golden files can be automatically updated to match new values by running +`go test pkgname -test.update-golden`. To ensure the update is correct +compare the diff of the old expected value to the new expected value. +*/ +package golden // import "gotest.tools/v3/golden" + +import ( + "bytes" + "flag" + "fmt" + "io/ioutil" + "os" + "path/filepath" + + "gotest.tools/v3/assert" + "gotest.tools/v3/assert/cmp" + "gotest.tools/v3/internal/format" +) + +var flagUpdate = flag.Bool("test.update-golden", false, "update golden file") + +type helperT interface { + Helper() +} + +// NormalizeCRLFToLF enables end-of-line normalization for actual values passed +// to Assert and String, as well as the values saved to golden files with +// -test.update-golden. +// +// Defaults to true. If you use the core.autocrlf=true git setting on windows +// you will need to set this to false. +// +// The value may be set to false by setting GOTESTTOOLS_GOLDEN_NormalizeCRLFToLF=false +// in the environment before running tests. +// +// The default value may change in a future major release. +var NormalizeCRLFToLF = os.Getenv("GOTESTTOOLS_GOLDEN_NormalizeCRLFToLF") != "false" + +// FlagUpdate returns true when the -test.update-golden flag has been set. +func FlagUpdate() bool { + return *flagUpdate +} + +// Open opens the file in ./testdata +func Open(t assert.TestingT, filename string) *os.File { + if ht, ok := t.(helperT); ok { + ht.Helper() + } + f, err := os.Open(Path(filename)) + assert.NilError(t, err) + return f +} + +// Get returns the contents of the file in ./testdata +func Get(t assert.TestingT, filename string) []byte { + if ht, ok := t.(helperT); ok { + ht.Helper() + } + expected, err := ioutil.ReadFile(Path(filename)) + assert.NilError(t, err) + return expected +} + +// Path returns the full path to a file in ./testdata +func Path(filename string) string { + if filepath.IsAbs(filename) { + return filename + } + return filepath.Join("testdata", filename) +} + +func removeCarriageReturn(in []byte) []byte { + if !NormalizeCRLFToLF { + return in + } + return bytes.Replace(in, []byte("\r\n"), []byte("\n"), -1) +} + +// Assert compares actual to the expected value in the golden file. +// +// Running `go test pkgname -test.update-golden` will write the value of actual +// to the golden file. +// +// This is equivalent to assert.Assert(t, String(actual, filename)) +func Assert(t assert.TestingT, actual string, filename string, msgAndArgs ...interface{}) { + if ht, ok := t.(helperT); ok { + ht.Helper() + } + assert.Assert(t, String(actual, filename), msgAndArgs...) +} + +// String compares actual to the contents of filename and returns success +// if the strings are equal. +// +// Running `go test pkgname -test.update-golden` will write the value of actual +// to the golden file. +// +// Any \r\n substrings in actual are converted to a single \n character +// before comparing it to the expected string. When updating the golden file the +// normalized version will be written to the file. This allows Windows to use +// the same golden files as other operating systems. +func String(actual string, filename string) cmp.Comparison { + return func() cmp.Result { + actualBytes := removeCarriageReturn([]byte(actual)) + result, expected := compare(actualBytes, filename) + if result != nil { + return result + } + diff := format.UnifiedDiff(format.DiffConfig{ + A: string(expected), + B: string(actualBytes), + From: "expected", + To: "actual", + }) + return cmp.ResultFailure("\n" + diff + failurePostamble(filename)) + } +} + +func failurePostamble(filename string) string { + return fmt.Sprintf(` + +You can run 'go test . -test.update-golden' to automatically update %s to the new expected value.' +`, Path(filename)) +} + +// AssertBytes compares actual to the expected value in the golden. +// +// Running `go test pkgname -test.update-golden` will write the value of actual +// to the golden file. +// +// This is equivalent to assert.Assert(t, Bytes(actual, filename)) +func AssertBytes( + t assert.TestingT, + actual []byte, + filename string, + msgAndArgs ...interface{}, +) { + if ht, ok := t.(helperT); ok { + ht.Helper() + } + assert.Assert(t, Bytes(actual, filename), msgAndArgs...) +} + +// Bytes compares actual to the contents of filename and returns success +// if the bytes are equal. +// +// Running `go test pkgname -test.update-golden` will write the value of actual +// to the golden file. +func Bytes(actual []byte, filename string) cmp.Comparison { + return func() cmp.Result { + result, expected := compare(actual, filename) + if result != nil { + return result + } + msg := fmt.Sprintf("%v (actual) != %v (expected)", actual, expected) + return cmp.ResultFailure(msg + failurePostamble(filename)) + } +} + +func compare(actual []byte, filename string) (cmp.Result, []byte) { + if err := update(filename, actual); err != nil { + return cmp.ResultFromError(err), nil + } + expected, err := ioutil.ReadFile(Path(filename)) + if err != nil { + return cmp.ResultFromError(err), nil + } + if bytes.Equal(expected, actual) { + return cmp.ResultSuccess, nil + } + return nil, expected +} + +func update(filename string, actual []byte) error { + if !*flagUpdate { + return nil + } + if dir := filepath.Dir(Path(filename)); dir != "." { + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + } + return ioutil.WriteFile(Path(filename), actual, 0644) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 516b3c7862..8811151615 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1099,8 +1099,10 @@ google.golang.org/protobuf/types/known/wrapperspb ## explicit; go 1.13 gotest.tools/v3/assert gotest.tools/v3/assert/cmp +gotest.tools/v3/assert/opt gotest.tools/v3/env gotest.tools/v3/fs +gotest.tools/v3/golden gotest.tools/v3/icmd gotest.tools/v3/internal/assert gotest.tools/v3/internal/cleanup