diff --git a/pkg/tailfile/tailfile.go b/pkg/tailfile/tailfile.go index e835893746..c82fe603f6 100644 --- a/pkg/tailfile/tailfile.go +++ b/pkg/tailfile/tailfile.go @@ -3,7 +3,9 @@ package tailfile // import "github.com/docker/docker/pkg/tailfile" import ( + "bufio" "bytes" + "context" "errors" "io" "os" @@ -16,51 +18,205 @@ var eol = []byte("\n") // ErrNonPositiveLinesNumber is an error returned if the lines number was negative. var ErrNonPositiveLinesNumber = errors.New("The number of lines to extract from the file must be positive") -//TailFile returns last n lines of reader f (could be a nil). -func TailFile(f io.ReadSeeker, n int) ([][]byte, error) { - if n <= 0 { - return nil, ErrNonPositiveLinesNumber - } - size, err := f.Seek(0, os.SEEK_END) +//TailFile returns last n lines of the passed in file. +func TailFile(f *os.File, n int) ([][]byte, error) { + size, err := f.Seek(0, io.SeekEnd) if err != nil { return nil, err } - block := -1 - var data []byte - var cnt int - for { - var b []byte - step := int64(block * blockSize) - left := size + step // how many bytes to beginning - if left < 0 { - if _, err := f.Seek(0, os.SEEK_SET); err != nil { - return nil, err - } - b = make([]byte, blockSize+left) - if _, err := f.Read(b); err != nil { - return nil, err - } - data = append(b, data...) - break - } else { - b = make([]byte, blockSize) - if _, err := f.Seek(left, os.SEEK_SET); err != nil { - return nil, err - } - if _, err := f.Read(b); err != nil { - return nil, err - } - data = append(b, data...) - } - cnt += bytes.Count(b, eol) - if cnt > n { - break - } - block-- + + rAt := io.NewSectionReader(f, 0, size) + r, nLines, err := NewTailReader(context.Background(), rAt, n) + if err != nil { + return nil, err } - lines := bytes.Split(data, eol) - if n < len(lines) { - return lines[len(lines)-n-1 : len(lines)-1], nil + + buf := make([][]byte, 0, nLines) + scanner := bufio.NewScanner(r) + + for scanner.Scan() { + buf = append(buf, scanner.Bytes()) + } + return buf, nil +} + +// SizeReaderAt is an interface used to get a ReaderAt as well as the size of the underlying reader. +// Note that the size of the underlying reader should not change when using this interface. +type SizeReaderAt interface { + io.ReaderAt + Size() int64 +} + +// NewTailReader scopes the passed in reader to just the last N lines passed in +func NewTailReader(ctx context.Context, r SizeReaderAt, reqLines int) (io.Reader, int, error) { + return NewTailReaderWithDelimiter(ctx, r, reqLines, eol) +} + +// NewTailReaderWithDelimiter scopes the passed in reader to just the last N lines passed in +// In this case a "line" is defined by the passed in delimiter. +// +// Delimiter lengths should be generally small, no more than 12 bytes +func NewTailReaderWithDelimiter(ctx context.Context, r SizeReaderAt, reqLines int, delimiter []byte) (io.Reader, int, error) { + if reqLines < 1 { + return nil, 0, ErrNonPositiveLinesNumber + } + if len(delimiter) == 0 { + return nil, 0, errors.New("must provide a delimiter") + } + var ( + size = r.Size() + tailStart int64 + tailEnd = size + found int + ) + + if int64(len(delimiter)) >= size { + return bytes.NewReader(nil), 0, nil + } + + scanner := newScanner(r, delimiter) + for scanner.Scan(ctx) { + if err := scanner.Err(); err != nil { + return nil, 0, scanner.Err() + } + + found++ + if found == 1 { + tailEnd = scanner.End() + } + if found == reqLines { + break + } + } + + tailStart = scanner.Start(ctx) + + if found == 0 { + return bytes.NewReader(nil), 0, nil + } + + if found < reqLines && tailStart != 0 { + tailStart = 0 + } + return io.NewSectionReader(r, tailStart, tailEnd-tailStart), found, nil +} + +func newScanner(r SizeReaderAt, delim []byte) *scanner { + size := r.Size() + readSize := blockSize + if readSize > int(size) { + readSize = int(size) + } + // silly case... + if len(delim) >= readSize/2 { + readSize = len(delim)*2 + 2 + } + + return &scanner{ + r: r, + pos: size, + buf: make([]byte, readSize), + delim: delim, + } +} + +type scanner struct { + r SizeReaderAt + pos int64 + buf []byte + delim []byte + err error + idx int + done bool +} + +func (s *scanner) Start(ctx context.Context) int64 { + if s.idx > 0 { + idx := bytes.LastIndex(s.buf[:s.idx], s.delim) + if idx >= 0 { + return s.pos + int64(idx) + int64(len(s.delim)) + } + } + + // slow path + buf := make([]byte, len(s.buf)) + copy(buf, s.buf) + + readAhead := &scanner{ + r: s.r, + pos: s.pos, + delim: s.delim, + idx: s.idx, + buf: buf, + } + + if !readAhead.Scan(ctx) { + return 0 + } + return readAhead.End() +} + +func (s *scanner) End() int64 { + return s.pos + int64(s.idx) + int64(len(s.delim)) +} + +func (s *scanner) Err() error { + return s.err +} + +func (s *scanner) Scan(ctx context.Context) bool { + if s.err != nil { + return false + } + + for { + select { + case <-ctx.Done(): + s.err = ctx.Err() + return false + default: + } + + idx := s.idx - len(s.delim) + if idx < 0 { + readSize := int(s.pos) + if readSize > len(s.buf) { + readSize = len(s.buf) + } + + if readSize < len(s.delim) { + return false + } + + offset := s.pos - int64(readSize) + n, err := s.r.ReadAt(s.buf[:readSize], offset) + if err != nil && err != io.EOF { + s.err = err + return false + } + + s.pos -= int64(n) + idx = n + } + + s.idx = bytes.LastIndex(s.buf[:idx], s.delim) + if s.idx >= 0 { + return true + } + + if len(s.delim) > 1 && s.pos > 0 { + // in this case, there may be a partial delimiter at the front of the buffer, so set the position forward + // up to the maximum size partial that could be there so it can be read again in the next iteration with any + // potential remainder. + // An example where delimiter is `####`: + // [##asdfqwerty] + // ^ + // This resets the position to where the arrow is pointing. + // It could actually check if a partial exists and at the front, but that is pretty similar to the indexing + // code above though a bit more complex since each byte has to be checked (`len(delimiter)-1`) factorial). + // It's much simpler and cleaner to just re-read `len(delimiter)-1` bytes again. + s.pos += int64(len(s.delim)) - 1 + } + } - return lines[:len(lines)-1], nil } diff --git a/pkg/tailfile/tailfile_test.go b/pkg/tailfile/tailfile_test.go index c74bb02e16..4dde90f9e3 100644 --- a/pkg/tailfile/tailfile_test.go +++ b/pkg/tailfile/tailfile_test.go @@ -1,9 +1,17 @@ package tailfile // import "github.com/docker/docker/pkg/tailfile" import ( + "bufio" + "bytes" + "context" + "fmt" + "io" "io/ioutil" "os" + "strings" "testing" + + "gotest.tools/assert" ) func TestTailFile(t *testing.T) { @@ -42,7 +50,7 @@ truncated line`) if _, err := f.Write(testFile); err != nil { t.Fatal(err) } - if _, err := f.Seek(0, os.SEEK_SET); err != nil { + if _, err := f.Seek(0, io.SeekStart); err != nil { t.Fatal(err) } expected := []string{"last fourth line", "last fifth line"} @@ -50,10 +58,12 @@ truncated line`) if err != nil { t.Fatal(err) } + if len(res) != len(expected) { + t.Fatalf("\nexpected:\n%s\n\nactual:\n%s", expected, res) + } for i, l := range res { - t.Logf("%s", l) if expected[i] != string(l) { - t.Fatalf("Expected line %s, got %s", expected[i], l) + t.Fatalf("Expected line %q, got %q", expected[i], l) } } } @@ -71,7 +81,7 @@ truncated line`) if _, err := f.Write(testFile); err != nil { t.Fatal(err) } - if _, err := f.Seek(0, os.SEEK_SET); err != nil { + if _, err := f.Seek(0, io.SeekStart); err != nil { t.Fatal(err) } expected := []string{"first line", "second line"} @@ -79,8 +89,10 @@ truncated line`) if err != nil { t.Fatal(err) } + if len(expected) != len(res) { + t.Fatalf("\nexpected:\n%s\n\nactual:\n%s", expected, res) + } for i, l := range res { - t.Logf("%s", l) if expected[i] != string(l) { t.Fatalf("Expected line %s, got %s", expected[i], l) } @@ -116,11 +128,11 @@ truncated line`) if _, err := f.Write(testFile); err != nil { t.Fatal(err) } - if _, err := f.Seek(0, os.SEEK_SET); err != nil { + if _, err := f.Seek(0, io.SeekStart); err != nil { t.Fatal(err) } if _, err := TailFile(f, -1); err != ErrNonPositiveLinesNumber { - t.Fatalf("Expected ErrNonPositiveLinesNumber, got %s", err) + t.Fatalf("Expected ErrNonPositiveLinesNumber, got %v", err) } if _, err := TailFile(f, 0); err != ErrNonPositiveLinesNumber { t.Fatalf("Expected ErrNonPositiveLinesNumber, got %s", err) @@ -146,3 +158,170 @@ func BenchmarkTail(b *testing.B) { } } } + +func TestNewTailReader(t *testing.T) { + t.Parallel() + ctx := context.Background() + + for dName, delim := range map[string][]byte{ + "no delimiter": {}, + "single byte delimiter": {'\n'}, + "2 byte delimiter": []byte(";\n"), + "4 byte delimiter": []byte("####"), + "8 byte delimiter": []byte("########"), + "12 byte delimiter": []byte("############"), + } { + t.Run(dName, func(t *testing.T) { + delim := delim + t.Parallel() + + s1 := "Hello world." + s2 := "Today is a fine day." + s3 := "So long, and thanks for all the fish!" + s4 := strings.Repeat("a", blockSize/2) // same as block size + s5 := strings.Repeat("a", blockSize) // just to make sure + s6 := strings.Repeat("a", blockSize*2) // bigger than block size + s7 := strings.Repeat("a", blockSize-1) // single line same as block + + s8 := `{"log":"Don't panic!\n","stream":"stdout","time":"2018-04-04T20:28:44.7207062Z"}` + jsonTest := make([]string, 0, 20) + for i := 0; i < 20; i++ { + jsonTest = append(jsonTest, s8) + } + + for _, test := range []struct { + desc string + data []string + }{ + {desc: "one small entry", data: []string{s1}}, + {desc: "several small entries", data: []string{s1, s2, s3}}, + {desc: "various sizes", data: []string{s1, s2, s3, s4, s5, s1, s2, s3, s7, s6}}, + {desc: "multiple lines with one more than block", data: []string{s5, s5, s5, s5, s5}}, + {desc: "multiple lines much bigger than block", data: []string{s6, s6, s6, s6, s6}}, + {desc: "multiple lines same as block", data: []string{s4, s4, s4, s4, s4}}, + {desc: "single line same as block", data: []string{s7}}, + {desc: "single line half block", data: []string{s4}}, + {desc: "single line twice block", data: []string{s6}}, + {desc: "json encoded values", data: jsonTest}, + {desc: "no lines", data: []string{}}, + {desc: "same length as delimiter", data: []string{strings.Repeat("a", len(delim))}}, + } { + t.Run(test.desc, func(t *testing.T) { + test := test + t.Parallel() + + max := len(test.data) + if max > 10 { + max = 10 + } + + s := strings.Join(test.data, string(delim)) + if len(test.data) > 0 { + s += string(delim) + } + + for i := 1; i <= max; i++ { + t.Run(fmt.Sprintf("%d lines", i), func(t *testing.T) { + i := i + t.Parallel() + + r := strings.NewReader(s) + tr, lines, err := NewTailReaderWithDelimiter(ctx, r, i, delim) + if len(delim) == 0 { + assert.Assert(t, err != nil) + assert.Assert(t, lines == 0) + return + } + assert.Assert(t, err) + assert.Check(t, lines == i, "%d -- %d", lines, i) + + b, err := ioutil.ReadAll(tr) + assert.Assert(t, err) + + expectLines := test.data[len(test.data)-i:] + assert.Check(t, len(expectLines) == i) + expect := strings.Join(expectLines, string(delim)) + string(delim) + assert.Check(t, string(b) == expect, "\n%v\n%v", b, []byte(expect)) + }) + } + + t.Run("request more lines than available", func(t *testing.T) { + t.Parallel() + + r := strings.NewReader(s) + tr, lines, err := NewTailReaderWithDelimiter(ctx, r, len(test.data)*2, delim) + if len(delim) == 0 { + assert.Assert(t, err != nil) + assert.Assert(t, lines == 0) + return + } + if len(test.data) == 0 { + assert.Assert(t, err == ErrNonPositiveLinesNumber, err) + return + } + + assert.Assert(t, err) + assert.Check(t, lines == len(test.data), "%d -- %d", lines, len(test.data)) + b, err := ioutil.ReadAll(tr) + assert.Assert(t, err) + assert.Check(t, bytes.Equal(b, []byte(s)), "\n%v\n%v", b, []byte(s)) + }) + }) + } + }) + } + t.Run("truncated last line", func(t *testing.T) { + t.Run("more than available", func(t *testing.T) { + tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 3) + assert.Assert(t, err) + assert.Check(t, nLines == 2, nLines) + + rdr := bufio.NewReader(tail) + data, _, err := rdr.ReadLine() + assert.Assert(t, err) + assert.Check(t, string(data) == "a", string(data)) + + data, _, err = rdr.ReadLine() + assert.Assert(t, err) + assert.Check(t, string(data) == "b", string(data)) + + _, _, err = rdr.ReadLine() + assert.Assert(t, err == io.EOF, err) + }) + }) + t.Run("truncated last line", func(t *testing.T) { + t.Run("exact", func(t *testing.T) { + tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 2) + assert.Assert(t, err) + assert.Check(t, nLines == 2, nLines) + + rdr := bufio.NewReader(tail) + data, _, err := rdr.ReadLine() + assert.Assert(t, err) + assert.Check(t, string(data) == "a", string(data)) + + data, _, err = rdr.ReadLine() + assert.Assert(t, err) + assert.Check(t, string(data) == "b", string(data)) + + _, _, err = rdr.ReadLine() + assert.Assert(t, err == io.EOF, err) + }) + }) + + t.Run("truncated last line", func(t *testing.T) { + t.Run("one line", func(t *testing.T) { + tail, nLines, err := NewTailReader(ctx, strings.NewReader("a\nb\nextra"), 1) + assert.Assert(t, err) + assert.Check(t, nLines == 1, nLines) + + rdr := bufio.NewReader(tail) + data, _, err := rdr.ReadLine() + assert.Assert(t, err) + assert.Check(t, string(data) == "b", string(data)) + + _, _, err = rdr.ReadLine() + assert.Assert(t, err == io.EOF, err) + }) + }) +}