mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	remove unnescessary import aliases, brackets, and so on. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
		
			
				
	
	
		
			139 lines
		
	
	
	
		
			2.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
	
		
			2.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package logger // import "github.com/docker/docker/daemon/logger"
 | 
						|
 | 
						|
import (
 | 
						|
	"io"
 | 
						|
	"os"
 | 
						|
	"path/filepath"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/docker/docker/api/types/plugins/logdriver"
 | 
						|
	"github.com/docker/docker/pkg/plugingetter"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
)
 | 
						|
 | 
						|
// pluginAdapter takes a plugin and implements the Logger interface for logger
 | 
						|
// instances
 | 
						|
type pluginAdapter struct {
 | 
						|
	driverName   string
 | 
						|
	id           string
 | 
						|
	plugin       logPlugin
 | 
						|
	fifoPath     string
 | 
						|
	capabilities Capability
 | 
						|
	logInfo      Info
 | 
						|
 | 
						|
	// synchronize access to the log stream and shared buffer
 | 
						|
	mu     sync.Mutex
 | 
						|
	enc    logdriver.LogEntryEncoder
 | 
						|
	stream io.WriteCloser
 | 
						|
	// buf is shared for each `Log()` call to reduce allocations.
 | 
						|
	// buf must be protected by mutex
 | 
						|
	buf logdriver.LogEntry
 | 
						|
}
 | 
						|
 | 
						|
func (a *pluginAdapter) Log(msg *Message) error {
 | 
						|
	a.mu.Lock()
 | 
						|
 | 
						|
	a.buf.Line = msg.Line
 | 
						|
	a.buf.TimeNano = msg.Timestamp.UnixNano()
 | 
						|
	a.buf.Partial = msg.PLogMetaData != nil
 | 
						|
	a.buf.Source = msg.Source
 | 
						|
 | 
						|
	err := a.enc.Encode(&a.buf)
 | 
						|
	a.buf.Reset()
 | 
						|
 | 
						|
	a.mu.Unlock()
 | 
						|
 | 
						|
	PutMessage(msg)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (a *pluginAdapter) Name() string {
 | 
						|
	return a.driverName
 | 
						|
}
 | 
						|
 | 
						|
func (a *pluginAdapter) Close() error {
 | 
						|
	a.mu.Lock()
 | 
						|
	defer a.mu.Unlock()
 | 
						|
 | 
						|
	if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if err := a.stream.Close(); err != nil {
 | 
						|
		logrus.WithError(err).Error("error closing plugin fifo")
 | 
						|
	}
 | 
						|
	if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
 | 
						|
		logrus.WithError(err).Error("error cleaning up plugin fifo")
 | 
						|
	}
 | 
						|
 | 
						|
	// may be nil, especially for unit tests
 | 
						|
	if pluginGetter != nil {
 | 
						|
		pluginGetter.Get(a.Name(), extName, plugingetter.Release)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type pluginAdapterWithRead struct {
 | 
						|
	*pluginAdapter
 | 
						|
}
 | 
						|
 | 
						|
func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
 | 
						|
	watcher := NewLogWatcher()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		defer close(watcher.Msg)
 | 
						|
		stream, err := a.plugin.ReadLogs(a.logInfo, config)
 | 
						|
		if err != nil {
 | 
						|
			watcher.Err <- errors.Wrap(err, "error getting log reader")
 | 
						|
			return
 | 
						|
		}
 | 
						|
		defer stream.Close()
 | 
						|
 | 
						|
		dec := logdriver.NewLogEntryDecoder(stream)
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-watcher.WatchClose():
 | 
						|
				return
 | 
						|
			default:
 | 
						|
			}
 | 
						|
 | 
						|
			var buf logdriver.LogEntry
 | 
						|
			if err := dec.Decode(&buf); err != nil {
 | 
						|
				if err == io.EOF {
 | 
						|
					return
 | 
						|
				}
 | 
						|
				select {
 | 
						|
				case watcher.Err <- errors.Wrap(err, "error decoding log message"):
 | 
						|
				case <-watcher.WatchClose():
 | 
						|
				}
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			msg := &Message{
 | 
						|
				Timestamp: time.Unix(0, buf.TimeNano),
 | 
						|
				Line:      buf.Line,
 | 
						|
				Source:    buf.Source,
 | 
						|
			}
 | 
						|
 | 
						|
			// plugin should handle this, but check just in case
 | 
						|
			if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			select {
 | 
						|
			case watcher.Msg <- msg:
 | 
						|
			case <-watcher.WatchClose():
 | 
						|
				// make sure the message we consumed is sent
 | 
						|
				watcher.Msg <- msg
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	return watcher
 | 
						|
}
 |