mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
3f4fccb65f
This reduces allocs and bytes used per log entry significantly as well as some improvement to time per log operation. Each log driver, however, must put messages back in the pool once they are finished with the message. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
135 lines
2.9 KiB
Go
135 lines
2.9 KiB
Go
package logger
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
bufSize = 16 * 1024
|
|
readSize = 2 * 1024
|
|
)
|
|
|
|
// Copier can copy logs from specified sources to Logger and attach Timestamp.
|
|
// Writes are concurrent, so you need implement some sync in your logger.
|
|
type Copier struct {
|
|
// srcs is map of name -> reader pairs, for example "stdout", "stderr"
|
|
srcs map[string]io.Reader
|
|
dst Logger
|
|
copyJobs sync.WaitGroup
|
|
closeOnce sync.Once
|
|
closed chan struct{}
|
|
}
|
|
|
|
// NewCopier creates a new Copier
|
|
func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
|
|
return &Copier{
|
|
srcs: srcs,
|
|
dst: dst,
|
|
closed: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Run starts logs copying
|
|
func (c *Copier) Run() {
|
|
for src, w := range c.srcs {
|
|
c.copyJobs.Add(1)
|
|
go c.copySrc(src, w)
|
|
}
|
|
}
|
|
|
|
func (c *Copier) copySrc(name string, src io.Reader) {
|
|
defer c.copyJobs.Done()
|
|
buf := make([]byte, bufSize)
|
|
n := 0
|
|
eof := false
|
|
|
|
for {
|
|
select {
|
|
case <-c.closed:
|
|
return
|
|
default:
|
|
// Work out how much more data we are okay with reading this time.
|
|
upto := n + readSize
|
|
if upto > cap(buf) {
|
|
upto = cap(buf)
|
|
}
|
|
// Try to read that data.
|
|
if upto > n {
|
|
read, err := src.Read(buf[n:upto])
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
logrus.Errorf("Error scanning log stream: %s", err)
|
|
return
|
|
}
|
|
eof = true
|
|
}
|
|
n += read
|
|
}
|
|
// If we have no data to log, and there's no more coming, we're done.
|
|
if n == 0 && eof {
|
|
return
|
|
}
|
|
// Break up the data that we've buffered up into lines, and log each in turn.
|
|
p := 0
|
|
for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
|
|
select {
|
|
case <-c.closed:
|
|
return
|
|
default:
|
|
msg := NewMessage()
|
|
msg.Source = name
|
|
msg.Timestamp = time.Now().UTC()
|
|
msg.Line = append(msg.Line, buf[p:p+q]...)
|
|
|
|
if logErr := c.dst.Log(msg); logErr != nil {
|
|
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
|
|
}
|
|
}
|
|
p += q + 1
|
|
}
|
|
// If there's no more coming, or the buffer is full but
|
|
// has no newlines, log whatever we haven't logged yet,
|
|
// noting that it's a partial log line.
|
|
if eof || (p == 0 && n == len(buf)) {
|
|
if p < n {
|
|
msg := NewMessage()
|
|
msg.Source = name
|
|
msg.Timestamp = time.Now().UTC()
|
|
msg.Line = append(msg.Line, buf[p:n]...)
|
|
msg.Partial = true
|
|
|
|
if logErr := c.dst.Log(msg); logErr != nil {
|
|
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
|
|
}
|
|
p = 0
|
|
n = 0
|
|
}
|
|
if eof {
|
|
return
|
|
}
|
|
}
|
|
// Move any unlogged data to the front of the buffer in preparation for another read.
|
|
if p > 0 {
|
|
copy(buf[0:], buf[p:n])
|
|
n -= p
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wait waits until all copying is done
|
|
func (c *Copier) Wait() {
|
|
c.copyJobs.Wait()
|
|
}
|
|
|
|
// Close closes the copier
|
|
func (c *Copier) Close() {
|
|
c.closeOnce.Do(func() {
|
|
close(c.closed)
|
|
})
|
|
}
|