mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			223 lines
		
	
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			223 lines
		
	
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package logger // import "github.com/docker/docker/daemon/logger"
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	defaultRingMaxSize = 1e6 // 1MB
 | 
						|
)
 | 
						|
 | 
						|
// RingLogger is a ring buffer that implements the Logger interface.
 | 
						|
// This is used when lossy logging is OK.
 | 
						|
type RingLogger struct {
 | 
						|
	buffer    *messageRing
 | 
						|
	l         Logger
 | 
						|
	logInfo   Info
 | 
						|
	closeFlag int32
 | 
						|
}
 | 
						|
 | 
						|
type ringWithReader struct {
 | 
						|
	*RingLogger
 | 
						|
}
 | 
						|
 | 
						|
func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
 | 
						|
	reader, ok := r.l.(LogReader)
 | 
						|
	if !ok {
 | 
						|
		// something is wrong if we get here
 | 
						|
		panic("expected log reader")
 | 
						|
	}
 | 
						|
	return reader.ReadLogs(cfg)
 | 
						|
}
 | 
						|
 | 
						|
func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
 | 
						|
	l := &RingLogger{
 | 
						|
		buffer:  newRing(maxSize),
 | 
						|
		l:       driver,
 | 
						|
		logInfo: logInfo,
 | 
						|
	}
 | 
						|
	go l.run()
 | 
						|
	return l
 | 
						|
}
 | 
						|
 | 
						|
// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
 | 
						|
// the passed in logger.
 | 
						|
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
 | 
						|
	if maxSize < 0 {
 | 
						|
		maxSize = defaultRingMaxSize
 | 
						|
	}
 | 
						|
	l := newRingLogger(driver, logInfo, maxSize)
 | 
						|
	if _, ok := driver.(LogReader); ok {
 | 
						|
		return &ringWithReader{l}
 | 
						|
	}
 | 
						|
	return l
 | 
						|
}
 | 
						|
 | 
						|
// Log queues messages into the ring buffer
 | 
						|
func (r *RingLogger) Log(msg *Message) error {
 | 
						|
	if r.closed() {
 | 
						|
		return errClosed
 | 
						|
	}
 | 
						|
	return r.buffer.Enqueue(msg)
 | 
						|
}
 | 
						|
 | 
						|
// Name returns the name of the underlying logger
 | 
						|
func (r *RingLogger) Name() string {
 | 
						|
	return r.l.Name()
 | 
						|
}
 | 
						|
 | 
						|
func (r *RingLogger) closed() bool {
 | 
						|
	return atomic.LoadInt32(&r.closeFlag) == 1
 | 
						|
}
 | 
						|
 | 
						|
func (r *RingLogger) setClosed() {
 | 
						|
	atomic.StoreInt32(&r.closeFlag, 1)
 | 
						|
}
 | 
						|
 | 
						|
// Close closes the logger
 | 
						|
func (r *RingLogger) Close() error {
 | 
						|
	r.setClosed()
 | 
						|
	r.buffer.Close()
 | 
						|
	// empty out the queue
 | 
						|
	var logErr bool
 | 
						|
	for _, msg := range r.buffer.Drain() {
 | 
						|
		if logErr {
 | 
						|
			// some error logging a previous message, so re-insert to message pool
 | 
						|
			// and assume log driver is hosed
 | 
						|
			PutMessage(msg)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if err := r.l.Log(msg); err != nil {
 | 
						|
			logrus.WithField("driver", r.l.Name()).
 | 
						|
				WithField("container", r.logInfo.ContainerID).
 | 
						|
				WithError(err).
 | 
						|
				Errorf("Error writing log message")
 | 
						|
			logErr = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return r.l.Close()
 | 
						|
}
 | 
						|
 | 
						|
// run consumes messages from the ring buffer and forwards them to the underling
 | 
						|
// logger.
 | 
						|
// This is run in a goroutine when the RingLogger is created
 | 
						|
func (r *RingLogger) run() {
 | 
						|
	for {
 | 
						|
		if r.closed() {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		msg, err := r.buffer.Dequeue()
 | 
						|
		if err != nil {
 | 
						|
			// buffer is closed
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if err := r.l.Log(msg); err != nil {
 | 
						|
			logrus.WithField("driver", r.l.Name()).
 | 
						|
				WithField("container", r.logInfo.ContainerID).
 | 
						|
				WithError(err).
 | 
						|
				Errorf("Error writing log message")
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type messageRing struct {
 | 
						|
	mu sync.Mutex
 | 
						|
	// signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
 | 
						|
	wait *sync.Cond
 | 
						|
 | 
						|
	sizeBytes int64 // current buffer size
 | 
						|
	maxBytes  int64 // max buffer size size
 | 
						|
	queue     []*Message
 | 
						|
	closed    bool
 | 
						|
}
 | 
						|
 | 
						|
func newRing(maxBytes int64) *messageRing {
 | 
						|
	queueSize := 1000
 | 
						|
	if maxBytes == 0 || maxBytes == 1 {
 | 
						|
		// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
 | 
						|
		// message long.
 | 
						|
		queueSize = 1
 | 
						|
	}
 | 
						|
 | 
						|
	r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
 | 
						|
	r.wait = sync.NewCond(&r.mu)
 | 
						|
	return r
 | 
						|
}
 | 
						|
 | 
						|
// Enqueue adds a message to the buffer queue
 | 
						|
// If the message is too big for the buffer it drops the new message.
 | 
						|
// If there are no messages in the queue and the message is still too big, it adds the message anyway.
 | 
						|
func (r *messageRing) Enqueue(m *Message) error {
 | 
						|
	mSize := int64(len(m.Line))
 | 
						|
 | 
						|
	r.mu.Lock()
 | 
						|
	if r.closed {
 | 
						|
		r.mu.Unlock()
 | 
						|
		return errClosed
 | 
						|
	}
 | 
						|
	if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
 | 
						|
		r.wait.Signal()
 | 
						|
		r.mu.Unlock()
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	r.queue = append(r.queue, m)
 | 
						|
	r.sizeBytes += mSize
 | 
						|
	r.wait.Signal()
 | 
						|
	r.mu.Unlock()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Dequeue pulls a message off the queue
 | 
						|
// If there are no messages, it waits for one.
 | 
						|
// If the buffer is closed, it will return immediately.
 | 
						|
func (r *messageRing) Dequeue() (*Message, error) {
 | 
						|
	r.mu.Lock()
 | 
						|
	for len(r.queue) == 0 && !r.closed {
 | 
						|
		r.wait.Wait()
 | 
						|
	}
 | 
						|
 | 
						|
	if r.closed {
 | 
						|
		r.mu.Unlock()
 | 
						|
		return nil, errClosed
 | 
						|
	}
 | 
						|
 | 
						|
	msg := r.queue[0]
 | 
						|
	r.queue = r.queue[1:]
 | 
						|
	r.sizeBytes -= int64(len(msg.Line))
 | 
						|
	r.mu.Unlock()
 | 
						|
	return msg, nil
 | 
						|
}
 | 
						|
 | 
						|
var errClosed = errors.New("closed")
 | 
						|
 | 
						|
// Close closes the buffer ensuring no new messages can be added.
 | 
						|
// Any callers waiting to dequeue a message will be woken up.
 | 
						|
func (r *messageRing) Close() {
 | 
						|
	r.mu.Lock()
 | 
						|
	if r.closed {
 | 
						|
		r.mu.Unlock()
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	r.closed = true
 | 
						|
	r.wait.Broadcast()
 | 
						|
	r.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// Drain drains all messages from the queue.
 | 
						|
// This can be used after `Close()` to get any remaining messages that were in queue.
 | 
						|
func (r *messageRing) Drain() []*Message {
 | 
						|
	r.mu.Lock()
 | 
						|
	ls := make([]*Message, 0, len(r.queue))
 | 
						|
	ls = append(ls, r.queue...)
 | 
						|
	r.sizeBytes = 0
 | 
						|
	r.queue = r.queue[:0]
 | 
						|
	r.mu.Unlock()
 | 
						|
	return ls
 | 
						|
}
 |