mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
f845d76d04
This should test that - all the messages produced are delivered (i.e. not lost) - followLogs() exits Loosely based on the test having the same name by Brian Goff, see https://gist.github.com/cpuguy83/e538793de18c762608358ee0eaddc197 Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
203 lines
4.7 KiB
Go
203 lines
4.7 KiB
Go
package loggerutils
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/docker/docker/daemon/logger"
|
|
"github.com/docker/docker/pkg/tailfile"
|
|
"gotest.tools/assert"
|
|
)
|
|
|
|
func TestTailFiles(t *testing.T) {
|
|
s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
|
|
s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
|
|
s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
|
|
|
|
files := []SizeReaderAt{s1, s2, s3}
|
|
watcher := logger.NewLogWatcher()
|
|
createDecoder := func(r io.Reader) func() (*logger.Message, error) {
|
|
scanner := bufio.NewScanner(r)
|
|
return func() (*logger.Message, error) {
|
|
if !scanner.Scan() {
|
|
return nil, scanner.Err()
|
|
}
|
|
// some comment
|
|
return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
|
|
}
|
|
}
|
|
|
|
tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
|
|
return tailfile.NewTailReader(ctx, r, lines)
|
|
}
|
|
|
|
for desc, config := range map[string]logger.ReadConfig{} {
|
|
t.Run(desc, func(t *testing.T) {
|
|
started := make(chan struct{})
|
|
go func() {
|
|
close(started)
|
|
tailFiles(files, watcher, createDecoder, tailReader, config)
|
|
}()
|
|
<-started
|
|
})
|
|
}
|
|
|
|
config := logger.ReadConfig{Tail: 2}
|
|
started := make(chan struct{})
|
|
go func() {
|
|
close(started)
|
|
tailFiles(files, watcher, createDecoder, tailReader, config)
|
|
}()
|
|
<-started
|
|
|
|
select {
|
|
case <-time.After(60 * time.Second):
|
|
t.Fatal("timeout waiting for tail line")
|
|
case err := <-watcher.Err:
|
|
assert.Assert(t, err)
|
|
case msg := <-watcher.Msg:
|
|
assert.Assert(t, msg != nil)
|
|
assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
|
|
}
|
|
|
|
select {
|
|
case <-time.After(60 * time.Second):
|
|
t.Fatal("timeout waiting for tail line")
|
|
case err := <-watcher.Err:
|
|
assert.Assert(t, err)
|
|
case msg := <-watcher.Msg:
|
|
assert.Assert(t, msg != nil)
|
|
assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
|
|
}
|
|
}
|
|
|
|
func TestFollowLogsConsumerGone(t *testing.T) {
|
|
lw := logger.NewLogWatcher()
|
|
|
|
f, err := ioutil.TempFile("", t.Name())
|
|
assert.NilError(t, err)
|
|
defer func() {
|
|
f.Close()
|
|
os.Remove(f.Name())
|
|
}()
|
|
|
|
makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
|
|
return func() (*logger.Message, error) {
|
|
return &logger.Message{}, nil
|
|
}
|
|
}
|
|
|
|
followLogsDone := make(chan struct{})
|
|
var since, until time.Time
|
|
go func() {
|
|
followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
|
|
close(followLogsDone)
|
|
}()
|
|
|
|
select {
|
|
case <-lw.Msg:
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
case <-followLogsDone:
|
|
t.Fatal("follow logs finished unexpectedly")
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("timeout waiting for log message")
|
|
}
|
|
|
|
lw.ConsumerGone()
|
|
select {
|
|
case <-followLogsDone:
|
|
case <-time.After(20 * time.Second):
|
|
t.Fatal("timeout waiting for followLogs() to finish")
|
|
}
|
|
}
|
|
|
|
func TestFollowLogsProducerGone(t *testing.T) {
|
|
lw := logger.NewLogWatcher()
|
|
|
|
f, err := ioutil.TempFile("", t.Name())
|
|
assert.NilError(t, err)
|
|
defer os.Remove(f.Name())
|
|
|
|
var sent, received, closed int
|
|
makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
|
|
return func() (*logger.Message, error) {
|
|
if closed == 1 {
|
|
closed++
|
|
t.Logf("logDecode() closed after sending %d messages\n", sent)
|
|
return nil, io.EOF
|
|
} else if closed > 1 {
|
|
t.Fatal("logDecode() called after closing!")
|
|
return nil, io.EOF
|
|
}
|
|
sent++
|
|
return &logger.Message{}, nil
|
|
}
|
|
}
|
|
var since, until time.Time
|
|
|
|
followLogsDone := make(chan struct{})
|
|
go func() {
|
|
followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
|
|
close(followLogsDone)
|
|
}()
|
|
|
|
// read 1 message
|
|
select {
|
|
case <-lw.Msg:
|
|
received++
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
case <-followLogsDone:
|
|
t.Fatal("followLogs() finished unexpectedly")
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("timeout waiting for log message")
|
|
}
|
|
|
|
// "stop" the "container"
|
|
closed = 1
|
|
lw.ProducerGone()
|
|
|
|
// should receive all the messages sent
|
|
readDone := make(chan struct{})
|
|
go func() {
|
|
defer close(readDone)
|
|
for {
|
|
select {
|
|
case <-lw.Msg:
|
|
received++
|
|
if received == sent {
|
|
return
|
|
}
|
|
case err := <-lw.Err:
|
|
assert.NilError(t, err)
|
|
}
|
|
}
|
|
}()
|
|
select {
|
|
case <-readDone:
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
|
|
}
|
|
|
|
t.Logf("messages sent: %d, received: %d", sent, received)
|
|
|
|
// followLogs() should be done by now
|
|
select {
|
|
case <-followLogsDone:
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatal("timeout waiting for followLogs() to finish")
|
|
}
|
|
|
|
select {
|
|
case <-lw.WatchConsumerGone():
|
|
t.Fatal("consumer should not have exited")
|
|
default:
|
|
}
|
|
}
|