Prevent ContainerLogs from hanging if container doesn't run for long

Signed-off-by: Jim Minter <jminter@redhat.com>
This commit is contained in:
Jim Minter 2017-03-15 15:41:12 +00:00
parent 68e71aa3e6
commit 4fdb17c777
5 changed files with 26 additions and 17 deletions

View File

@ -18,12 +18,13 @@ import (
const name = "journald" const name = "journald"
type journald struct { type journald struct {
mu sync.Mutex
vars map[string]string // additional variables and values to send to the journal along with the log message vars map[string]string // additional variables and values to send to the journal along with the log message
readers readerList readers readerList
closed bool
} }
type readerList struct { type readerList struct {
mu sync.Mutex
readers map[*logger.LogWatcher]*logger.LogWatcher readers map[*logger.LogWatcher]*logger.LogWatcher
} }

View File

@ -161,11 +161,12 @@ import (
) )
func (s *journald) Close() error { func (s *journald) Close() error {
s.readers.mu.Lock() s.mu.Lock()
s.closed = true
for reader := range s.readers.readers { for reader := range s.readers.readers {
reader.Close() reader.Close()
} }
s.readers.mu.Unlock() s.mu.Unlock()
return nil return nil
} }
@ -245,9 +246,16 @@ drain:
} }
func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char { func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
s.readers.mu.Lock() s.mu.Lock()
s.readers.readers[logWatcher] = logWatcher s.readers.readers[logWatcher] = logWatcher
s.readers.mu.Unlock() if s.closed {
// the journald Logger is closed, presumably because the container has been
// reset. So we shouldn't follow, because we'll never be woken up. But we
// should make one more drainJournal call to be sure we've got all the logs.
// Close pfd[1] so that one drainJournal happens, then cleanup, then return.
C.close(pfd[1])
}
s.mu.Unlock()
newCursor := make(chan *C.char) newCursor := make(chan *C.char)
@ -274,21 +282,21 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
// Clean up. // Clean up.
C.close(pfd[0]) C.close(pfd[0])
s.readers.mu.Lock() s.mu.Lock()
delete(s.readers.readers, logWatcher) delete(s.readers.readers, logWatcher)
s.readers.mu.Unlock() s.mu.Unlock()
close(logWatcher.Msg) close(logWatcher.Msg)
newCursor <- cursor newCursor <- cursor
}() }()
// Wait until we're told to stop. // Wait until we're told to stop.
select { select {
case cursor = <-newCursor:
case <-logWatcher.WatchClose(): case <-logWatcher.WatchClose():
// Notify the other goroutine that its work is done. // Notify the other goroutine that its work is done.
C.close(pfd[1]) C.close(pfd[1])
}
cursor = <-newCursor cursor = <-newCursor
}
return cursor return cursor
} }

View File

@ -27,6 +27,7 @@ type JSONFileLogger struct {
mu sync.Mutex mu sync.Mutex
readers map[*logger.LogWatcher]struct{} // stores the active log followers readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes extra []byte // json-encoded extra attributes
closed bool
} }
func init() { func init() {
@ -142,6 +143,7 @@ func (l *JSONFileLogger) LogPath() string {
// Close closes underlying file and signals all readers to stop. // Close closes underlying file and signals all readers to stop.
func (l *JSONFileLogger) Close() error { func (l *JSONFileLogger) Close() error {
l.mu.Lock() l.mu.Lock()
l.closed = true
err := l.writer.Close() err := l.writer.Close()
for r := range l.readers { for r := range l.readers {
r.Close() r.Close()

View File

@ -88,10 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
} }
} }
if !config.Follow { if !config.Follow || l.closed {
if err := latestFile.Close(); err != nil {
logrus.Errorf("Error closing file: %v", err)
}
l.mu.Unlock() l.mu.Unlock()
return return
} }
@ -100,17 +97,18 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
latestFile.Seek(0, os.SEEK_END) latestFile.Seek(0, os.SEEK_END)
} }
notifyRotate := l.writer.NotifyRotate()
defer l.writer.NotifyRotateEvict(notifyRotate)
l.readers[logWatcher] = struct{}{} l.readers[logWatcher] = struct{}{}
l.mu.Unlock() l.mu.Unlock()
notifyRotate := l.writer.NotifyRotate()
followLogs(latestFile, logWatcher, notifyRotate, config.Since) followLogs(latestFile, logWatcher, notifyRotate, config.Since)
l.mu.Lock() l.mu.Lock()
delete(l.readers, logWatcher) delete(l.readers, logWatcher)
l.mu.Unlock() l.mu.Unlock()
l.writer.NotifyRotateEvict(notifyRotate)
} }
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {

View File

@ -62,7 +62,7 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
return nil, logger.ErrReadLogsNotSupported return nil, logger.ErrReadLogsNotSupported
} }
follow := config.Follow && container.IsRunning() follow := config.Follow && !cLogCreated
tailLines, err := strconv.Atoi(config.Tail) tailLines, err := strconv.Atoi(config.Tail)
if err != nil { if err != nil {
tailLines = -1 tailLines = -1