mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			186 lines
		
	
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			186 lines
		
	
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package logger // import "github.com/docker/docker/daemon/logger"
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"io"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	types "github.com/docker/docker/api/types/backend"
 | 
						|
	"github.com/docker/docker/pkg/stringid"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// readSize is the maximum bytes read during a single read
 | 
						|
	// operation.
 | 
						|
	readSize = 2 * 1024
 | 
						|
 | 
						|
	// defaultBufSize provides a reasonable default for loggers that do
 | 
						|
	// not have an external limit to impose on log line size.
 | 
						|
	defaultBufSize = 16 * 1024
 | 
						|
)
 | 
						|
 | 
						|
// Copier can copy logs from specified sources to Logger and attach Timestamp.
 | 
						|
// Writes are concurrent, so you need implement some sync in your logger.
 | 
						|
type Copier struct {
 | 
						|
	// srcs is map of name -> reader pairs, for example "stdout", "stderr"
 | 
						|
	srcs      map[string]io.Reader
 | 
						|
	dst       Logger
 | 
						|
	copyJobs  sync.WaitGroup
 | 
						|
	closeOnce sync.Once
 | 
						|
	closed    chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
// NewCopier creates a new Copier
 | 
						|
func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
 | 
						|
	return &Copier{
 | 
						|
		srcs:   srcs,
 | 
						|
		dst:    dst,
 | 
						|
		closed: make(chan struct{}),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Run starts logs copying
 | 
						|
func (c *Copier) Run() {
 | 
						|
	for src, w := range c.srcs {
 | 
						|
		c.copyJobs.Add(1)
 | 
						|
		go c.copySrc(src, w)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *Copier) copySrc(name string, src io.Reader) {
 | 
						|
	defer c.copyJobs.Done()
 | 
						|
 | 
						|
	bufSize := defaultBufSize
 | 
						|
	if sizedLogger, ok := c.dst.(SizedLogger); ok {
 | 
						|
		bufSize = sizedLogger.BufSize()
 | 
						|
	}
 | 
						|
	buf := make([]byte, bufSize)
 | 
						|
 | 
						|
	n := 0
 | 
						|
	eof := false
 | 
						|
	var partialid string
 | 
						|
	var partialTS time.Time
 | 
						|
	var ordinal int
 | 
						|
	firstPartial := true
 | 
						|
	hasMorePartial := false
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-c.closed:
 | 
						|
			return
 | 
						|
		default:
 | 
						|
			// Work out how much more data we are okay with reading this time.
 | 
						|
			upto := n + readSize
 | 
						|
			if upto > cap(buf) {
 | 
						|
				upto = cap(buf)
 | 
						|
			}
 | 
						|
			// Try to read that data.
 | 
						|
			if upto > n {
 | 
						|
				read, err := src.Read(buf[n:upto])
 | 
						|
				if err != nil {
 | 
						|
					if err != io.EOF {
 | 
						|
						logReadsFailedCount.Inc(1)
 | 
						|
						logrus.Errorf("Error scanning log stream: %s", err)
 | 
						|
						return
 | 
						|
					}
 | 
						|
					eof = true
 | 
						|
				}
 | 
						|
				n += read
 | 
						|
			}
 | 
						|
			// If we have no data to log, and there's no more coming, we're done.
 | 
						|
			if n == 0 && eof {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			// Break up the data that we've buffered up into lines, and log each in turn.
 | 
						|
			p := 0
 | 
						|
 | 
						|
			for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
 | 
						|
				select {
 | 
						|
				case <-c.closed:
 | 
						|
					return
 | 
						|
				default:
 | 
						|
					msg := NewMessage()
 | 
						|
					msg.Source = name
 | 
						|
					msg.Line = append(msg.Line, buf[p:p+q]...)
 | 
						|
 | 
						|
					if hasMorePartial {
 | 
						|
						msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: true}
 | 
						|
 | 
						|
						// reset
 | 
						|
						partialid = ""
 | 
						|
						ordinal = 0
 | 
						|
						firstPartial = true
 | 
						|
						hasMorePartial = false
 | 
						|
					}
 | 
						|
					if msg.PLogMetaData == nil {
 | 
						|
						msg.Timestamp = time.Now().UTC()
 | 
						|
					} else {
 | 
						|
						msg.Timestamp = partialTS
 | 
						|
					}
 | 
						|
 | 
						|
					if logErr := c.dst.Log(msg); logErr != nil {
 | 
						|
						logWritesFailedCount.Inc(1)
 | 
						|
						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
 | 
						|
					}
 | 
						|
				}
 | 
						|
				p += q + 1
 | 
						|
			}
 | 
						|
			// If there's no more coming, or the buffer is full but
 | 
						|
			// has no newlines, log whatever we haven't logged yet,
 | 
						|
			// noting that it's a partial log line.
 | 
						|
			if eof || (p == 0 && n == len(buf)) {
 | 
						|
				if p < n {
 | 
						|
					msg := NewMessage()
 | 
						|
					msg.Source = name
 | 
						|
					msg.Line = append(msg.Line, buf[p:n]...)
 | 
						|
 | 
						|
					// Generate unique partialID for first partial. Use it across partials.
 | 
						|
					// Record timestamp for first partial. Use it across partials.
 | 
						|
					// Initialize Ordinal for first partial. Increment it across partials.
 | 
						|
					if firstPartial {
 | 
						|
						msg.Timestamp = time.Now().UTC()
 | 
						|
						partialTS = msg.Timestamp
 | 
						|
						partialid = stringid.GenerateRandomID()
 | 
						|
						ordinal = 1
 | 
						|
						firstPartial = false
 | 
						|
						totalPartialLogs.Inc(1)
 | 
						|
					} else {
 | 
						|
						msg.Timestamp = partialTS
 | 
						|
					}
 | 
						|
					msg.PLogMetaData = &types.PartialLogMetaData{ID: partialid, Ordinal: ordinal, Last: false}
 | 
						|
					ordinal++
 | 
						|
					hasMorePartial = true
 | 
						|
 | 
						|
					if logErr := c.dst.Log(msg); logErr != nil {
 | 
						|
						logWritesFailedCount.Inc(1)
 | 
						|
						logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
 | 
						|
					}
 | 
						|
					p = 0
 | 
						|
					n = 0
 | 
						|
				}
 | 
						|
				if eof {
 | 
						|
					return
 | 
						|
				}
 | 
						|
			}
 | 
						|
			// Move any unlogged data to the front of the buffer in preparation for another read.
 | 
						|
			if p > 0 {
 | 
						|
				copy(buf[0:], buf[p:n])
 | 
						|
				n -= p
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Wait waits until all copying is done
 | 
						|
func (c *Copier) Wait() {
 | 
						|
	c.copyJobs.Wait()
 | 
						|
}
 | 
						|
 | 
						|
// Close closes the copier
 | 
						|
func (c *Copier) Close() {
 | 
						|
	c.closeOnce.Do(func() {
 | 
						|
		close(c.closed)
 | 
						|
	})
 | 
						|
}
 |