mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Clean up journald logger
We clean up the journald logger with these four changes. 1. Make field array static 2. Make function name more appropriate 3. Initialize the file descriptors only once 4. Avoid copying the journald cursor Point 4 is the most significant change: instead of treating the journald cursor like a Go string we use it as a raw C.char pointer. That way we avoid the copying by the C.CString and C.GoString functions. Signed-off-by: Silvan Jegen <s.jegen@gmail.com>
This commit is contained in:
parent
a70b1d74a0
commit
d359daaa48
1 changed files with 29 additions and 27 deletions
|
@ -56,7 +56,7 @@ package journald
|
||||||
//}
|
//}
|
||||||
//static int is_attribute_field(const char *msg, size_t length)
|
//static int is_attribute_field(const char *msg, size_t length)
|
||||||
//{
|
//{
|
||||||
// const struct known_field {
|
// static const struct known_field {
|
||||||
// const char *name;
|
// const char *name;
|
||||||
// size_t length;
|
// size_t length;
|
||||||
// } fields[] = {
|
// } fields[] = {
|
||||||
|
@ -101,14 +101,14 @@ package journald
|
||||||
// }
|
// }
|
||||||
// return rc;
|
// return rc;
|
||||||
//}
|
//}
|
||||||
//static int wait_for_data_or_close(sd_journal *j, int pipefd)
|
//static int wait_for_data_cancelable(sd_journal *j, int pipefd)
|
||||||
//{
|
//{
|
||||||
// struct pollfd fds[2];
|
// struct pollfd fds[2];
|
||||||
// uint64_t when = 0;
|
// uint64_t when = 0;
|
||||||
// int timeout, jevents, i;
|
// int timeout, jevents, i;
|
||||||
// struct timespec ts;
|
// struct timespec ts;
|
||||||
// uint64_t now;
|
// uint64_t now;
|
||||||
// do {
|
//
|
||||||
// memset(&fds, 0, sizeof(fds));
|
// memset(&fds, 0, sizeof(fds));
|
||||||
// fds[0].fd = pipefd;
|
// fds[0].fd = pipefd;
|
||||||
// fds[0].events = POLLHUP;
|
// fds[0].events = POLLHUP;
|
||||||
|
@ -116,6 +116,8 @@ package journald
|
||||||
// if (fds[1].fd < 0) {
|
// if (fds[1].fd < 0) {
|
||||||
// return fds[1].fd;
|
// return fds[1].fd;
|
||||||
// }
|
// }
|
||||||
|
//
|
||||||
|
// do {
|
||||||
// jevents = sd_journal_get_events(j);
|
// jevents = sd_journal_get_events(j);
|
||||||
// if (jevents < 0) {
|
// if (jevents < 0) {
|
||||||
// return jevents;
|
// return jevents;
|
||||||
|
@ -167,7 +169,7 @@ func (s *journald) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor string) string {
|
func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, oldCursor *C.char) *C.char {
|
||||||
var msg, data, cursor *C.char
|
var msg, data, cursor *C.char
|
||||||
var length C.size_t
|
var length C.size_t
|
||||||
var stamp C.uint64_t
|
var stamp C.uint64_t
|
||||||
|
@ -177,10 +179,8 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea
|
||||||
drain:
|
drain:
|
||||||
for {
|
for {
|
||||||
// Try not to send a given entry twice.
|
// Try not to send a given entry twice.
|
||||||
if oldCursor != "" {
|
if oldCursor != nil {
|
||||||
ccursor := C.CString(oldCursor)
|
for C.sd_journal_test_cursor(j, oldCursor) > 0 {
|
||||||
defer C.free(unsafe.Pointer(ccursor))
|
|
||||||
for C.sd_journal_test_cursor(j, ccursor) > 0 {
|
|
||||||
if C.sd_journal_next(j) <= 0 {
|
if C.sd_journal_next(j) <= 0 {
|
||||||
break drain
|
break drain
|
||||||
}
|
}
|
||||||
|
@ -234,25 +234,24 @@ drain:
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
retCursor := ""
|
|
||||||
if C.sd_journal_get_cursor(j, &cursor) == 0 {
|
// free(NULL) is safe
|
||||||
retCursor = C.GoString(cursor)
|
C.free(unsafe.Pointer(oldCursor))
|
||||||
C.free(unsafe.Pointer(cursor))
|
C.sd_journal_get_cursor(j, &cursor)
|
||||||
}
|
return cursor
|
||||||
return retCursor
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor string) {
|
func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char {
|
||||||
s.readers.mu.Lock()
|
s.readers.mu.Lock()
|
||||||
s.readers.readers[logWatcher] = logWatcher
|
s.readers.readers[logWatcher] = logWatcher
|
||||||
s.readers.mu.Unlock()
|
s.readers.mu.Unlock()
|
||||||
go func() {
|
go func() {
|
||||||
// Keep copying journal data out until we're notified to stop
|
// Keep copying journal data out until we're notified to stop
|
||||||
// or we hit an error.
|
// or we hit an error.
|
||||||
status := C.wait_for_data_or_close(j, pfd[0])
|
status := C.wait_for_data_cancelable(j, pfd[0])
|
||||||
for status == 1 {
|
for status == 1 {
|
||||||
cursor = s.drainJournal(logWatcher, config, j, cursor)
|
cursor = s.drainJournal(logWatcher, config, j, cursor)
|
||||||
status = C.wait_for_data_or_close(j, pfd[0])
|
status = C.wait_for_data_cancelable(j, pfd[0])
|
||||||
}
|
}
|
||||||
if status < 0 {
|
if status < 0 {
|
||||||
cerrstr := C.strerror(C.int(-status))
|
cerrstr := C.strerror(C.int(-status))
|
||||||
|
@ -274,15 +273,16 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re
|
||||||
// Notify the other goroutine that its work is done.
|
// Notify the other goroutine that its work is done.
|
||||||
C.close(pfd[1])
|
C.close(pfd[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
|
||||||
var j *C.sd_journal
|
var j *C.sd_journal
|
||||||
var cmatch *C.char
|
var cmatch, cursor *C.char
|
||||||
var stamp C.uint64_t
|
var stamp C.uint64_t
|
||||||
var sinceUnixMicro uint64
|
var sinceUnixMicro uint64
|
||||||
var pipes [2]C.int
|
var pipes [2]C.int
|
||||||
cursor := ""
|
|
||||||
|
|
||||||
// Get a handle to the journal.
|
// Get a handle to the journal.
|
||||||
rc := C.sd_journal_open(&j, C.int(0))
|
rc := C.sd_journal_open(&j, C.int(0))
|
||||||
|
@ -370,7 +370,7 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cursor = s.drainJournal(logWatcher, config, j, "")
|
cursor = s.drainJournal(logWatcher, config, j, nil)
|
||||||
if config.Follow {
|
if config.Follow {
|
||||||
// Allocate a descriptor for following the journal, if we'll
|
// Allocate a descriptor for following the journal, if we'll
|
||||||
// need one. Do it here so that we can report if it fails.
|
// need one. Do it here so that we can report if it fails.
|
||||||
|
@ -382,13 +382,15 @@ func (s *journald) readLogs(logWatcher *logger.LogWatcher, config logger.ReadCon
|
||||||
if C.pipe(&pipes[0]) == C.int(-1) {
|
if C.pipe(&pipes[0]) == C.int(-1) {
|
||||||
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
|
logWatcher.Err <- fmt.Errorf("error opening journald close notification pipe")
|
||||||
} else {
|
} else {
|
||||||
s.followJournal(logWatcher, config, j, pipes, cursor)
|
cursor = s.followJournal(logWatcher, config, j, pipes, cursor)
|
||||||
// Let followJournal handle freeing the journal context
|
// Let followJournal handle freeing the journal context
|
||||||
// object and closing the channel.
|
// object and closing the channel.
|
||||||
following = true
|
following = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
C.free(unsafe.Pointer(cursor))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue