diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index 04370fdbc0..d91eb809bc 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -56,7 +56,7 @@ package journald //} //static int is_attribute_field(const char *msg, size_t length) //{ -// const struct known_field { +// static const struct known_field { // const char *name; // size_t length; // } fields[] = { @@ -101,21 +101,23 @@ package journald // } // return rc; //} -//static int wait_for_data_or_close(sd_journal *j, int pipefd) +//static int wait_for_data_cancelable(sd_journal *j, int pipefd) //{ // struct pollfd fds[2]; // uint64_t when = 0; // int timeout, jevents, i; // struct timespec ts; // uint64_t now; +// +// memset(&fds, 0, sizeof(fds)); +// fds[0].fd = pipefd; +// fds[0].events = POLLHUP; +// fds[1].fd = sd_journal_get_fd(j); +// if (fds[1].fd < 0) { +// return fds[1].fd; +// } +// // do { -// memset(&fds, 0, sizeof(fds)); -// fds[0].fd = pipefd; -// fds[0].events = POLLHUP; -// fds[1].fd = sd_journal_get_fd(j); -// if (fds[1].fd < 0) { -// return fds[1].fd; -// } // jevents = sd_journal_get_events(j); // if (jevents < 0) { // return jevents; @@ -167,7 +169,7 @@ func (s *journald) Close() error { return nil } -func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor string) string { +func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char { var msg, data, cursor *C.char var length C.size_t var stamp C.uint64_t @@ -177,10 +179,8 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea drain: for { // Try not to send a given entry twice. - if oldCursor != "" { - ccursor := C.CString(oldCursor) - defer C.free(unsafe.Pointer(ccursor)) - for C.sd_journal_test_cursor(j, ccursor) > 0 { + if oldCursor != nil { + for C.sd_journal_test_cursor(j, oldCursor) > 0 { if C.sd_journal_next(j) <= 0 { break drain } @@ -234,25 +234,24 @@ drain: break } } - retCursor := "" - if C.sd_journal_get_cursor(j, &cursor) == 0 { - retCursor = C.GoString(cursor) - C.free(unsafe.Pointer(cursor)) - } - return retCursor + + // free(NULL) is safe + C.free(unsafe.Pointer(oldCursor)) + C.sd_journal_get_cursor(j, &cursor) + return cursor } -func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor string) { +func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char { s.readers.mu.Lock() s.readers.readers[logWatcher] = logWatcher s.readers.mu.Unlock() go func() { // Keep copying journal data out until we're notified to stop // or we hit an error. - status := C.wait_for_data_or_close(j, pfd[0]) + status := C.wait_for_data_cancelable(j, pfd[0]) for status == 1 { cursor = s.drainJournal(logWatcher, config, j, cursor) - status = C.wait_for_data_or_close(j, pfd[0]) + status = C.wait_for_data_cancelable(j, pfd[0]) } if status < 0 { cerrstr := C.strerror(C.int(-status)) @@ -274,15 +273,16 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re // Notify the other goroutine that its work is done. C.close(pfd[1]) } + + return cursor } func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { var j *C.sd_journal - var cmatch *C.char + var cmatch, cursor *C.char var stamp C.uint64_t var sinceUnixMicro uint64 var pipes [2]C.int - cursor := "" // Get a handle to the journal. rc := C.sd_journal_open(&j, C.int(0)) @@ -370,7 +370,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon return } } - cursor = s.drainJournal(logWatcher, config, j, "") + cursor = s.drainJournal(logWatcher, config, j, nil) if config.Follow { // Allocate a descriptor for following the journal, if we'll // need one. Do it here so that we can report if it fails. @@ -382,13 +382,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon if C.pipe(&pipes[0]) == C.int(-1) { logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe") } else { - s.followJournal(logWatcher, config, j, pipes, cursor) + cursor = s.followJournal(logWatcher, config, j, pipes, cursor) // Let followJournal handle freeing the journal context // object and closing the channel. following = true } } } + + C.free(unsafe.Pointer(cursor)) return }