From c208f1c8a8d57f9a7f48f63345e77146774aa7a6 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 2 Apr 2018 13:33:36 -0700 Subject: [PATCH] Update logger adapter test to avoid race Add synchronization around adding logs to a plugin and reading those logs. Without the follow configuration, a race occurs between go routines to add the logs into the plugin and read the logs out of the plugin. This adds a function to synchronize the action to avoid the race. Removes use of file for buffering, instead buffering whole messages so log count can be checked discretely. Signed-off-by: Derek McGowan Signed-off-by: Sebastiaan van Stijn --- daemon/logger/adapter_test.go | 123 ++++++++++++++++++++++------------ 1 file changed, 79 insertions(+), 44 deletions(-) diff --git a/daemon/logger/adapter_test.go b/daemon/logger/adapter_test.go index 94d14eaef1..e9143928b4 100644 --- a/daemon/logger/adapter_test.go +++ b/daemon/logger/adapter_test.go @@ -3,8 +3,7 @@ package logger // import "github.com/docker/docker/daemon/logger" import ( "encoding/binary" "io" - "io/ioutil" - "os" + "sync" "testing" "time" @@ -17,24 +16,57 @@ import ( // mockLoggingPlugin implements the loggingPlugin interface for testing purposes // it only supports a single log stream type mockLoggingPlugin struct { - inStream io.ReadCloser - f *os.File - closed chan struct{} - t *testing.T + io.WriteCloser + inStream io.Reader + logs []*logdriver.LogEntry + c *sync.Cond + err error +} + +func newMockLoggingPlugin() *mockLoggingPlugin { + r, w := io.Pipe() + return &mockLoggingPlugin{ + WriteCloser: w, + inStream: r, + logs: []*logdriver.LogEntry{}, + c: sync.NewCond(new(sync.Mutex)), + } } func (l *mockLoggingPlugin) StartLogging(file string, info Info) error { go func() { - io.Copy(l.f, l.inStream) - close(l.closed) + dec := protoio.NewUint32DelimitedReader(l.inStream, binary.BigEndian, 1e6) + for { + var msg logdriver.LogEntry + if err := dec.ReadMsg(&msg); err != nil { + l.c.L.Lock() + if l.err == nil { + l.err = err + } + l.c.L.Unlock() + + l.c.Broadcast() + return + + } + + l.c.L.Lock() + l.logs = append(l.logs, &msg) + l.c.L.Unlock() + l.c.Broadcast() + } + }() return nil } func (l *mockLoggingPlugin) StopLogging(file string) error { - l.inStream.Close() - l.f.Close() - os.Remove(l.f.Name()) + l.c.L.Lock() + if l.err == nil { + l.err = io.EOF + } + l.c.L.Unlock() + l.c.Broadcast() return nil } @@ -44,63 +76,60 @@ func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) { func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) { r, w := io.Pipe() - f, err := os.Open(l.f.Name()) - if err != nil { - return nil, err - } + go func() { - defer f.Close() - dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6) + var idx int enc := logdriver.NewLogEntryEncoder(w) + l.c.L.Lock() + defer l.c.L.Unlock() for { - select { - case <-l.closed: + if l.err != nil { w.Close() return - default: } - var msg logdriver.LogEntry - if err := dec.ReadMsg(&msg); err != nil { - if err == io.EOF { - if !config.Follow { - w.Close() - return - } - dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6) - continue + if idx >= len(l.logs) { + if !config.Follow { + w.Close() + return } - l.t.Fatal(err) + l.c.Wait() continue } - if err := enc.Encode(&msg); err != nil { + if err := enc.Encode(l.logs[idx]); err != nil { w.CloseWithError(err) return } + idx++ } }() return r, nil } -func newMockPluginAdapter(t *testing.T) Logger { - r, w := io.Pipe() - f, err := ioutil.TempFile("", "mock-plugin-adapter") - assert.Check(t, err) +func (l *mockLoggingPlugin) waitLen(i int) { + l.c.L.Lock() + defer l.c.L.Unlock() + for len(l.logs) < i { + l.c.Wait() + } +} - enc := logdriver.NewLogEntryEncoder(w) +func (l *mockLoggingPlugin) check(t *testing.T) { + if l.err != nil && l.err != io.EOF { + t.Fatal(l.err) + } +} + +func newMockPluginAdapter(plugin *mockLoggingPlugin) Logger { + enc := logdriver.NewLogEntryEncoder(plugin) a := &pluginAdapterWithRead{ &pluginAdapter{ - plugin: &mockLoggingPlugin{ - inStream: r, - f: f, - closed: make(chan struct{}), - t: t, - }, - stream: w, + plugin: plugin, + stream: plugin, enc: enc, }, } @@ -109,7 +138,8 @@ func newMockPluginAdapter(t *testing.T) Logger { } func TestAdapterReadLogs(t *testing.T) { - l := newMockPluginAdapter(t) + plugin := newMockLoggingPlugin() + l := newMockPluginAdapter(plugin) testMsg := []Message{ {Line: []byte("Are you the keymaker?"), Timestamp: time.Now()}, @@ -120,6 +150,9 @@ func TestAdapterReadLogs(t *testing.T) { assert.Check(t, l.Log(m)) } + // Wait until messages are read into plugin + plugin.waitLen(len(testMsg)) + lr, ok := l.(LogReader) assert.Check(t, ok, "Logger does not implement LogReader") @@ -172,6 +205,8 @@ func TestAdapterReadLogs(t *testing.T) { case <-time.After(10 * time.Second): t.Fatal("timeout waiting for logger to close") } + + plugin.check(t) } func testMessageEqual(t *testing.T, a, b *Message) {