From e8d9a61f4c9e1f3cfdf1c889c541c9dc72a4bb40 Mon Sep 17 00:00:00 2001 From: Jamie Hannaford Date: Fri, 28 Apr 2017 13:53:00 +0200 Subject: [PATCH] Add --until flag for docker logs; closes #32807 Signed-off-by: Jamie Hannaford --- .../router/container/container_routes.go | 1 + api/swagger.yaml | 5 + api/types/client.go | 1 + client/container_logs.go | 8 ++ client/container_logs_test.go | 20 ++- daemon/logger/adapter.go | 3 + daemon/logger/journald/read.go | 42 ++++-- daemon/logger/jsonfilelog/read.go | 22 ++- daemon/logger/logger.go | 1 + daemon/logs.go | 10 ++ docs/api/version-history.md | 3 +- integration-cli/docker_api_logs_test.go | 127 ++++++++++++++++++ 12 files changed, 223 insertions(+), 20 deletions(-) diff --git a/api/server/router/container/container_routes.go b/api/server/router/container/container_routes.go index 106a7087cd..d845fdd00f 100644 --- a/api/server/router/container/container_routes.go +++ b/api/server/router/container/container_routes.go @@ -96,6 +96,7 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response Follow: httputils.BoolValue(r, "follow"), Timestamps: httputils.BoolValue(r, "timestamps"), Since: r.Form.Get("since"), + Until: r.Form.Get("until"), Tail: r.Form.Get("tail"), ShowStdout: stdout, ShowStderr: stderr, diff --git a/api/swagger.yaml b/api/swagger.yaml index b0c0575fc0..0e10b7a42a 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -4955,6 +4955,11 @@ paths: description: "Only return logs since this time, as a UNIX timestamp" type: "integer" default: 0 + - name: "until" + in: "query" + description: "Only return logs before this time, as a UNIX timestamp" + type: "integer" + default: 0 - name: "timestamps" in: "query" description: "Add timestamps to every log line" diff --git a/api/types/client.go b/api/types/client.go index db37f1fe4e..93ca428540 100644 --- a/api/types/client.go +++ b/api/types/client.go @@ -74,6 +74,7 @@ type ContainerLogsOptions struct { ShowStdout bool ShowStderr bool Since string + Until string Timestamps bool Follow bool Tail string diff --git a/client/container_logs.go b/client/container_logs.go index 0f32e9f12b..35c297c5fb 100644 --- a/client/container_logs.go +++ b/client/container_logs.go @@ -51,6 +51,14 @@ func (cli *Client) ContainerLogs(ctx context.Context, container string, options query.Set("since", ts) } + if options.Until != "" { + ts, err := timetypes.GetTimestamp(options.Until, time.Now()) + if err != nil { + return nil, err + } + query.Set("until", ts) + } + if options.Timestamps { query.Set("timestamps", "1") } diff --git a/client/container_logs_test.go b/client/container_logs_test.go index 99e31842c9..8cb7635120 100644 --- a/client/container_logs_test.go +++ b/client/container_logs_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/docker/docker/api/types" + "github.com/docker/docker/internal/testutil" "golang.org/x/net/context" ) @@ -28,9 +29,11 @@ func TestContainerLogsError(t *testing.T) { _, err = client.ContainerLogs(context.Background(), "container_id", types.ContainerLogsOptions{ Since: "2006-01-02TZ", }) - if err == nil || !strings.Contains(err.Error(), `parsing time "2006-01-02TZ"`) { - t.Fatalf("expected a 'parsing time' error, got %v", err) - } + testutil.ErrorContains(t, err, `parsing time "2006-01-02TZ"`) + _, err = client.ContainerLogs(context.Background(), "container_id", types.ContainerLogsOptions{ + Until: "2006-01-02TZ", + }) + testutil.ErrorContains(t, err, `parsing time "2006-01-02TZ"`) } func TestContainerLogs(t *testing.T) { @@ -80,6 +83,17 @@ func TestContainerLogs(t *testing.T) { "since": "invalid but valid", }, }, + { + options: types.ContainerLogsOptions{ + // An complete invalid date, timestamp or go duration will be + // passed as is + Until: "invalid but valid", + }, + expectedQueryParams: map[string]string{ + "tail": "", + "until": "invalid but valid", + }, + }, } for _, logCase := range cases { client := &Client{ diff --git a/daemon/logger/adapter.go b/daemon/logger/adapter.go index 98852e89c1..5817913cbc 100644 --- a/daemon/logger/adapter.go +++ b/daemon/logger/adapter.go @@ -122,6 +122,9 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { continue } + if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { + return + } select { case watcher.Msg <- msg: diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 4d9b999a50..6aff21f441 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -171,13 +171,15 @@ func (s *journald) Close() error { return nil } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char { +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, oldCursor *C.char, untilUnixMicro uint64) (*C.char, bool) { var msg, data, cursor *C.char var length C.size_t var stamp C.uint64_t var priority, partial C.int + var done bool - // Walk the journal from here forward until we run out of new entries. + // Walk the journal from here forward until we run out of new entries + // or we reach the until value (if provided). drain: for { // Try not to send a given entry twice. @@ -195,6 +197,12 @@ drain: if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { break } + // Break if the timestamp exceeds any provided until flag. + if untilUnixMicro != 0 && untilUnixMicro < uint64(stamp) { + done = true + break + } + // Set up the time and text of the entry. timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000) line := C.GoBytes(unsafe.Pointer(msg), C.int(length)) @@ -240,10 +248,10 @@ drain: // ensure that we won't be freeing an address that's invalid cursor = nil } - return cursor + return cursor, done } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char { +func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char { s.mu.Lock() s.readers.readers[logWatcher] = logWatcher if s.closed { @@ -270,9 +278,10 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re break } - cursor = s.drainJournal(logWatcher, config, j, cursor) + var done bool + cursor, done = s.drainJournal(logWatcher, j, cursor, untilUnixMicro) - if status != 1 { + if status != 1 || done { // We were notified to stop break } @@ -304,6 +313,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon var cmatch, cursor *C.char var stamp C.uint64_t var sinceUnixMicro uint64 + var untilUnixMicro uint64 var pipes [2]C.int // Get a handle to the journal. @@ -343,10 +353,19 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon nano := config.Since.UnixNano() sinceUnixMicro = uint64(nano / 1000) } + // If we have an until value, convert it too + if !config.Until.IsZero() { + nano := config.Until.UnixNano() + untilUnixMicro = uint64(nano / 1000) + } if config.Tail > 0 { lines := config.Tail - // Start at the end of the journal. - if C.sd_journal_seek_tail(j) < 0 { + // If until time provided, start from there. + // Otherwise start at the end of the journal. + if untilUnixMicro != 0 && C.sd_journal_seek_realtime_usec(j, C.uint64_t(untilUnixMicro)) < 0 { + logWatcher.Err <- fmt.Errorf("error seeking provided until value") + return + } else if C.sd_journal_seek_tail(j) < 0 { logWatcher.Err <- fmt.Errorf("error seeking to end of journal") return } @@ -362,8 +381,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { break } else { - // Compare the timestamp on the entry - // to our threshold value. + // Compare the timestamp on the entry to our threshold value. if sinceUnixMicro != 0 && sinceUnixMicro > uint64(stamp) { break } @@ -392,7 +410,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon return } } - cursor = s.drainJournal(logWatcher, config, j, nil) + cursor, _ = s.drainJournal(logWatcher, j, nil, untilUnixMicro) if config.Follow { // Allocate a descriptor for following the journal, if we'll // need one. Do it here so that we can report if it fails. @@ -404,7 +422,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon if C.pipe(&pipes[0]) == C.int(-1) { logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") } else { - cursor = s.followJournal(logWatcher, config, j, pipes, cursor) + cursor = s.followJournal(logWatcher, j, pipes, cursor, untilUnixMicro) // Let followJournal handle freeing the journal context // object and closing the channel. following = true diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index 2586c7d7f7..09eaaf00de 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -98,7 +98,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R if config.Tail != 0 { tailer := multireader.MultiReadSeeker(append(files, latestChunk)...) - tailFile(tailer, logWatcher, config.Tail, config.Since) + tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until) } // close all the rotated files @@ -119,7 +119,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R l.readers[logWatcher] = struct{}{} l.mu.Unlock() - followLogs(latestFile, logWatcher, notifyRotate, config.Since) + followLogs(latestFile, logWatcher, notifyRotate, config) l.mu.Lock() delete(l.readers, logWatcher) @@ -136,7 +136,7 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) { return io.NewSectionReader(f, 0, size), nil } -func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { +func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) { rdr := io.Reader(f) if tail > 0 { ls, err := tailfile.TailFile(f, tail) @@ -158,6 +158,9 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti if !since.IsZero() && msg.Timestamp.Before(since) { continue } + if !until.IsZero() && msg.Timestamp.After(until) { + return + } select { case <-logWatcher.WatchClose(): return @@ -186,7 +189,7 @@ func watchFile(name string) (filenotify.FileWatcher, error) { return fileWatcher, nil } -func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { +func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) { dec := json.NewDecoder(f) l := &jsonlog.JSONLog{} @@ -324,14 +327,22 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int continue } + since := config.Since + until := config.Until + retries = 0 // reset retries since we've succeeded if !since.IsZero() && msg.Timestamp.Before(since) { continue } + if !until.IsZero() && msg.Timestamp.After(until) { + return + } select { case logWatcher.Msg <- msg: case <-ctx.Done(): logWatcher.Msg <- msg + // This for loop is used when the logger is closed (ie, container + // stopped) but the consumer is still waiting for logs. for { msg, err := decodeLogLine(dec, l) if err != nil { @@ -340,6 +351,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int if !since.IsZero() && msg.Timestamp.Before(since) { continue } + if !until.IsZero() && msg.Timestamp.After(until) { + return + } logWatcher.Msg <- msg } } diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index ee91b79c98..3ea7794221 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -81,6 +81,7 @@ type Logger interface { // ReadConfig is the configuration passed into ReadLogs. type ReadConfig struct { Since time.Time + Until time.Time Tail int Follow bool } diff --git a/daemon/logs.go b/daemon/logs.go index 68c5e5aa47..babf07e36d 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -77,8 +77,18 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c since = time.Unix(s, n) } + var until time.Time + if config.Until != "" && config.Until != "0" { + s, n, err := timetypes.ParseTimestamps(config.Until, 0) + if err != nil { + return nil, false, err + } + until = time.Unix(s, n) + } + readConfig := logger.ReadConfig{ Since: since, + Until: until, Tail: tailLines, Follow: follow, } diff --git a/docs/api/version-history.md b/docs/api/version-history.md index 77b8545bcc..a306e71609 100644 --- a/docs/api/version-history.md +++ b/docs/api/version-history.md @@ -23,6 +23,7 @@ keywords: "API, Docker, rcli, REST, documentation" If `Error` is `null`, container removal has succeeded, otherwise the test of an error message indicating why container removal has failed is available from `Error.Message` field. +* `GET /containers/(name)/logs` now supports an additional query parameter: `until`, which returns log lines that occurred before the specified timestamp. ## v1.33 API changes @@ -93,7 +94,7 @@ keywords: "API, Docker, rcli, REST, documentation" * `POST /containers/(name)/wait` now accepts a `condition` query parameter to indicate which state change condition to wait for. Also, response headers are now returned immediately to acknowledge that the server has registered a wait callback for the client. * `POST /swarm/init` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic * `POST /swarm/join` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic -* `GET /events` now supports service, node and secret events which are emitted when users create, update and remove service, node and secret +* `GET /events` now supports service, node and secret events which are emitted when users create, update and remove service, node and secret * `GET /events` now supports network remove event which is emitted when users remove a swarm scoped network * `GET /events` now supports a filter type `scope` in which supported value could be swarm and local diff --git a/integration-cli/docker_api_logs_test.go b/integration-cli/docker_api_logs_test.go index 1f2a30a929..0672e328db 100644 --- a/integration-cli/docker_api_logs_test.go +++ b/integration-cli/docker_api_logs_test.go @@ -2,8 +2,12 @@ package main import ( "bufio" + "bytes" "fmt" + "io" + "io/ioutil" "net/http" + "strconv" "strings" "time" @@ -11,6 +15,7 @@ import ( "github.com/docker/docker/client" "github.com/docker/docker/integration-cli/checker" "github.com/docker/docker/integration-cli/request" + "github.com/docker/docker/pkg/stdcopy" "github.com/go-check/check" "golang.org/x/net/context" ) @@ -85,3 +90,125 @@ func (s *DockerSuite) TestLogsAPIContainerNotFound(c *check.C) { c.Assert(err, checker.IsNil) c.Assert(resp.StatusCode, checker.Equals, http.StatusNotFound) } + +func (s *DockerSuite) TestLogsAPIUntilFutureFollow(c *check.C) { + testRequires(c, DaemonIsLinux) + + name := "logsuntilfuturefollow" + dockerCmd(c, "run", "-d", "--name", name, "busybox", "/bin/sh", "-c", "while true; do date +%s; sleep 1; done") + c.Assert(waitRun(name), checker.IsNil) + + untilSecs := 5 + untilDur, err := time.ParseDuration(fmt.Sprintf("%ds", untilSecs)) + c.Assert(err, checker.IsNil) + until := daemonTime(c).Add(untilDur) + + client, err := request.NewClient() + if err != nil { + c.Fatal(err) + } + + cfg := types.ContainerLogsOptions{Until: until.Format(time.RFC3339Nano), Follow: true, ShowStdout: true, Timestamps: true} + reader, err := client.ContainerLogs(context.Background(), name, cfg) + c.Assert(err, checker.IsNil) + + type logOut struct { + out string + err error + } + + chLog := make(chan logOut) + + go func() { + bufReader := bufio.NewReader(reader) + defer reader.Close() + for i := 0; i < untilSecs; i++ { + out, _, err := bufReader.ReadLine() + if err != nil { + if err == io.EOF { + return + } + chLog <- logOut{"", err} + return + } + + chLog <- logOut{strings.TrimSpace(string(out)), err} + } + }() + + for i := 0; i < untilSecs; i++ { + select { + case l := <-chLog: + c.Assert(l.err, checker.IsNil) + i, err := strconv.ParseInt(strings.Split(l.out, " ")[1], 10, 64) + c.Assert(err, checker.IsNil) + c.Assert(time.Unix(i, 0).UnixNano(), checker.LessOrEqualThan, until.UnixNano()) + case <-time.After(20 * time.Second): + c.Fatal("timeout waiting for logs to exit") + } + } +} + +func (s *DockerSuite) TestLogsAPIUntil(c *check.C) { + name := "logsuntil" + dockerCmd(c, "run", "--name", name, "busybox", "/bin/sh", "-c", "for i in $(seq 1 3); do echo log$i; sleep 0.5; done") + + client, err := request.NewClient() + if err != nil { + c.Fatal(err) + } + + extractBody := func(c *check.C, cfg types.ContainerLogsOptions) []string { + reader, err := client.ContainerLogs(context.Background(), name, cfg) + c.Assert(err, checker.IsNil) + + actualStdout := new(bytes.Buffer) + actualStderr := ioutil.Discard + _, err = stdcopy.StdCopy(actualStdout, actualStderr, reader) + c.Assert(err, checker.IsNil) + + return strings.Split(actualStdout.String(), "\n") + } + + // Get timestamp of second log line + allLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true}) + t, err := time.Parse(time.RFC3339Nano, strings.Split(allLogs[1], " ")[0]) + c.Assert(err, checker.IsNil) + until := t.Format(time.RFC3339Nano) + + // Get logs until the timestamp of second line, i.e. first two lines + logs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true, Until: until}) + + // Ensure log lines after cut-off are excluded + logsString := strings.Join(logs, "\n") + c.Assert(logsString, checker.Not(checker.Contains), "log3", check.Commentf("unexpected log message returned, until=%v", until)) +} + +func (s *DockerSuite) TestLogsAPIUntilDefaultValue(c *check.C) { + name := "logsuntildefaultval" + dockerCmd(c, "run", "--name", name, "busybox", "/bin/sh", "-c", "for i in $(seq 1 3); do echo log$i; done") + + client, err := request.NewClient() + if err != nil { + c.Fatal(err) + } + + extractBody := func(c *check.C, cfg types.ContainerLogsOptions) []string { + reader, err := client.ContainerLogs(context.Background(), name, cfg) + c.Assert(err, checker.IsNil) + + actualStdout := new(bytes.Buffer) + actualStderr := ioutil.Discard + _, err = stdcopy.StdCopy(actualStdout, actualStderr, reader) + c.Assert(err, checker.IsNil) + + return strings.Split(actualStdout.String(), "\n") + } + + // Get timestamp of second log line + allLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true}) + + // Test with default value specified and parameter omitted + defaultLogs := extractBody(c, types.ContainerLogsOptions{Timestamps: true, ShowStdout: true, Until: "0"}) + c.Assert(defaultLogs, checker.DeepEquals, allLogs) +}