diff --git a/daemon/attach.go b/daemon/attach.go index fb14691d24..0e12441ad1 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -123,7 +123,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach return logger.ErrReadLogsNotSupported{} } logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) - defer logs.Close() + defer logs.ConsumerGone() LogLoop: for { diff --git a/daemon/logger/adapter.go b/daemon/logger/adapter.go index 95aff9bf3b..d9370352c5 100644 --- a/daemon/logger/adapter.go +++ b/daemon/logger/adapter.go @@ -93,21 +93,12 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { dec := logdriver.NewLogEntryDecoder(stream) for { - select { - case <-watcher.WatchClose(): - return - default: - } - var buf logdriver.LogEntry if err := dec.Decode(&buf); err != nil { if err == io.EOF { return } - select { - case watcher.Err <- errors.Wrap(err, "error decoding log message"): - case <-watcher.WatchClose(): - } + watcher.Err <- errors.Wrap(err, "error decoding log message") return } @@ -125,11 +116,10 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { return } + // send the message unless the consumer is gone select { case watcher.Msg <- msg: - case <-watcher.WatchClose(): - // make sure the message we consumed is sent - watcher.Msg <- msg + case <-watcher.WatchConsumerGone(): return } } diff --git a/daemon/logger/adapter_test.go b/daemon/logger/adapter_test.go index f47e711c89..d14a48e477 100644 --- a/daemon/logger/adapter_test.go +++ b/daemon/logger/adapter_test.go @@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) { t.Fatal("timeout waiting for message channel to close") } - lw.Close() + lw.ProducerGone() lw = lr.ReadLogs(ReadConfig{Follow: true}) for _, x := range testMsg { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index d4bcc62d9a..cadb97f4ca 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -165,7 +165,7 @@ func (s *journald) Close() error { s.mu.Lock() s.closed = true for reader := range s.readers.readers { - reader.Close() + reader.ProducerGone() } s.mu.Unlock() return nil @@ -299,7 +299,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, // Wait until we're told to stop. select { case cursor = <-newCursor: - case <-logWatcher.WatchClose(): + case <-logWatcher.WatchConsumerGone(): // Notify the other goroutine that its work is done. C.close(pfd[1]) cursor = <-newCursor diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 05243000d4..bbb8eeb7ec 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -166,13 +166,14 @@ func ValidateLogOpt(cfg map[string]string) error { return nil } -// Close closes underlying file and signals all readers to stop. +// Close closes underlying file and signals all the readers +// that the logs producer is gone. func (l *JSONFileLogger) Close() error { l.mu.Lock() l.closed = true err := l.writer.Close() for r := range l.readers { - r.Close() + r.ProducerGone() delete(l.readers, r) } l.mu.Unlock() diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index 6ce4936e0e..cfa8694b19 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -50,11 +50,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) { }() lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true}) - watchClose := lw.WatchClose() for { select { case <-lw.Msg: - case <-watchClose: + case <-lw.WatchProducerGone(): return case err := <-chError: if err != nil { diff --git a/daemon/logger/local/local.go b/daemon/logger/local/local.go index 86c55784d4..ba4aa096f7 100644 --- a/daemon/logger/local/local.go +++ b/daemon/logger/local/local.go @@ -166,7 +166,7 @@ func (d *driver) Close() error { d.closed = true err := d.logfile.Close() for r := range d.readers { - r.Close() + r.ProducerGone() delete(d.readers, r) } d.mu.Unlock() diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 912e855c7f..12e8d0054e 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -104,33 +104,50 @@ type LogWatcher struct { // For sending log messages to a reader. Msg chan *Message // For sending error messages that occur while while reading logs. - Err chan error - closeOnce sync.Once - closeNotifier chan struct{} + Err chan error + producerOnce sync.Once + producerGone chan struct{} + consumerOnce sync.Once + consumerGone chan struct{} } // NewLogWatcher returns a new LogWatcher. func NewLogWatcher() *LogWatcher { return &LogWatcher{ - Msg: make(chan *Message, logWatcherBufferSize), - Err: make(chan error, 1), - closeNotifier: make(chan struct{}), + Msg: make(chan *Message, logWatcherBufferSize), + Err: make(chan error, 1), + producerGone: make(chan struct{}), + consumerGone: make(chan struct{}), } } -// Close notifies the underlying log reader to stop. -func (w *LogWatcher) Close() { +// ProducerGone notifies the underlying log reader that +// the logs producer (a container) is gone. +func (w *LogWatcher) ProducerGone() { // only close if not already closed - w.closeOnce.Do(func() { - close(w.closeNotifier) + w.producerOnce.Do(func() { + close(w.producerGone) }) } -// WatchClose returns a channel receiver that receives notification -// when the watcher has been closed. This should only be called from -// one goroutine. -func (w *LogWatcher) WatchClose() <-chan struct{} { - return w.closeNotifier +// WatchProducerGone returns a channel receiver that receives notification +// once the logs producer (a container) is gone. +func (w *LogWatcher) WatchProducerGone() <-chan struct{} { + return w.producerGone +} + +// ConsumerGone notifies that the logs consumer is gone. +func (w *LogWatcher) ConsumerGone() { + // only close if not already closed + w.consumerOnce.Do(func() { + close(w.consumerGone) + }) +} + +// WatchConsumerGone returns a channel receiver that receives notification +// when the log watcher consumer is gone. +func (w *LogWatcher) WatchConsumerGone() <-chan struct{} { + return w.consumerGone } // Capability defines the list of capabilities that a driver can implement diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index a53190376e..fa45154ec9 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -488,7 +488,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m go func() { select { case <-ctx.Done(): - case <-watcher.WatchClose(): + case <-watcher.WatchConsumerGone(): cancel() } }() @@ -546,22 +546,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int } defer func() { f.Close() - fileWatcher.Remove(name) fileWatcher.Close() }() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - select { - case <-logWatcher.WatchClose(): - fileWatcher.Remove(name) - cancel() - case <-ctx.Done(): - return - } - }() - var retries int handleRotate := func() error { f.Close() @@ -596,7 +583,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int case fsnotify.Rename, fsnotify.Remove: select { case <-notifyRotate: - case <-ctx.Done(): + case <-logWatcher.WatchProducerGone(): + return errDone + case <-logWatcher.WatchConsumerGone(): return errDone } if err := handleRotate(); err != nil { @@ -618,7 +607,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int return errRetry } return err - case <-ctx.Done(): + case <-logWatcher.WatchProducerGone(): + return errDone + case <-logWatcher.WatchConsumerGone(): return errDone } } @@ -664,23 +655,11 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int if !until.IsZero() && msg.Timestamp.After(until) { return } + // send the message, unless the consumer is gone select { case logWatcher.Msg <- msg: - case <-ctx.Done(): - logWatcher.Msg <- msg - for { - msg, err := decodeLogLine() - if err != nil { - return - } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - if !until.IsZero() && msg.Timestamp.After(until) { - return - } - logWatcher.Msg <- msg - } + case <-logWatcher.WatchConsumerGone(): + return } } } diff --git a/daemon/logger/loggerutils/logfile_test.go b/daemon/logger/loggerutils/logfile_test.go index 0e359db3f8..e3e63210fc 100644 --- a/daemon/logger/loggerutils/logfile_test.go +++ b/daemon/logger/loggerutils/logfile_test.go @@ -4,6 +4,8 @@ import ( "bufio" "context" "io" + "io/ioutil" + "os" "strings" "testing" "time" @@ -74,3 +76,128 @@ func TestTailFiles(t *testing.T) { assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line)) } } + +func TestFollowLogsConsumerGone(t *testing.T) { + lw := logger.NewLogWatcher() + + f, err := ioutil.TempFile("", t.Name()) + assert.NilError(t, err) + defer func() { + f.Close() + os.Remove(f.Name()) + }() + + makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) { + return func() (*logger.Message, error) { + return &logger.Message{}, nil + } + } + + followLogsDone := make(chan struct{}) + var since, until time.Time + go func() { + followLogs(f, lw, make(chan interface{}), makeDecoder, since, until) + close(followLogsDone) + }() + + select { + case <-lw.Msg: + case err := <-lw.Err: + assert.NilError(t, err) + case <-followLogsDone: + t.Fatal("follow logs finished unexpectedly") + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for log message") + } + + lw.ConsumerGone() + select { + case <-followLogsDone: + case <-time.After(20 * time.Second): + t.Fatal("timeout waiting for followLogs() to finish") + } +} + +func TestFollowLogsProducerGone(t *testing.T) { + lw := logger.NewLogWatcher() + + f, err := ioutil.TempFile("", t.Name()) + assert.NilError(t, err) + defer os.Remove(f.Name()) + + var sent, received, closed int + makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) { + return func() (*logger.Message, error) { + if closed == 1 { + closed++ + t.Logf("logDecode() closed after sending %d messages\n", sent) + return nil, io.EOF + } else if closed > 1 { + t.Fatal("logDecode() called after closing!") + return nil, io.EOF + } + sent++ + return &logger.Message{}, nil + } + } + var since, until time.Time + + followLogsDone := make(chan struct{}) + go func() { + followLogs(f, lw, make(chan interface{}), makeDecoder, since, until) + close(followLogsDone) + }() + + // read 1 message + select { + case <-lw.Msg: + received++ + case err := <-lw.Err: + assert.NilError(t, err) + case <-followLogsDone: + t.Fatal("followLogs() finished unexpectedly") + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for log message") + } + + // "stop" the "container" + closed = 1 + lw.ProducerGone() + + // should receive all the messages sent + readDone := make(chan struct{}) + go func() { + defer close(readDone) + for { + select { + case <-lw.Msg: + received++ + if received == sent { + return + } + case err := <-lw.Err: + assert.NilError(t, err) + } + } + }() + select { + case <-readDone: + case <-time.After(30 * time.Second): + t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received) + } + + t.Logf("messages sent: %d, received: %d", sent, received) + + // followLogs() should be done by now + select { + case <-followLogsDone: + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for followLogs() to finish") + } + + select { + case <-lw.WatchConsumerGone(): + t.Fatal("consumer should not have exited") + default: + } +} diff --git a/daemon/logs.go b/daemon/logs.go index 37ca4caf63..668a75c778 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -110,14 +110,16 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c } }() } - // set up some defers - defer logs.Close() + // signal that the log reader is gone + defer logs.ConsumerGone() // close the messages channel. closing is the only way to signal above // that we're doing with logs (other than context cancel i guess). defer close(messageChan) lg.Debug("begin logs") + defer lg.Debugf("end logs (%v)", ctx.Err()) + for { select { // i do not believe as the system is currently designed any error @@ -132,14 +134,12 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c } return case <-ctx.Done(): - lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err()) return case msg, ok := <-logs.Msg: // there is some kind of pool or ring buffer in the logger that // produces these messages, and a possible future optimization // might be to use that pool and reuse message objects if !ok { - lg.Debug("end logs") return } m := msg.AsLogMessage() // just a pointer conversion, does not copy data