1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/logger/awslogs/cloudwatchlogs.go
Alexander Morozov 32964def83 awslogs: fix logrus import
Signed-off-by: Alexander Morozov <lk4d4@docker.com>
2015-09-11 11:05:06 -07:00

326 lines
10 KiB
Go

// Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
package awslogs
import (
"fmt"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/docker/docker/daemon/logger"
)
const (
name = "awslogs"
regionKey = "awslogs-region"
regionEnvKey = "AWS_REGION"
logGroupKey = "awslogs-group"
logStreamKey = "awslogs-stream"
batchPublishFrequency = 5 * time.Second
// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
perEventBytes = 26
maximumBytesPerPut = 1048576
maximumLogEventsPerPut = 10000
// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
maximumBytesPerEvent = 262144 - perEventBytes
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
invalidSequenceTokenCode = "InvalidSequenceTokenException"
)
type logStream struct {
logStreamName string
logGroupName string
client api
messages chan *logger.Message
lock sync.RWMutex
closed bool
sequenceToken *string
}
type api interface {
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
}
type byTimestamp []*cloudwatchlogs.InputLogEvent
// init registers the awslogs driver and sets the default region, if provided
func init() {
if os.Getenv(regionEnvKey) != "" {
aws.DefaultConfig.Region = aws.String(os.Getenv(regionEnvKey))
}
if err := logger.RegisterLogDriver(name, New); err != nil {
logrus.Fatal(err)
}
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
logrus.Fatal(err)
}
}
// New creates an awslogs logger using the configuration passed in on the
// context. Supported context configuration variables are awslogs-region,
// awslogs-group, and awslogs-stream. When available, configuration is
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
// the EC2 Instance Metadata Service.
func New(ctx logger.Context) (logger.Logger, error) {
logGroupName := ctx.Config[logGroupKey]
logStreamName := ctx.ContainerID
if ctx.Config[logStreamKey] != "" {
logStreamName = ctx.Config[logStreamKey]
}
config := aws.DefaultConfig
if ctx.Config[regionKey] != "" {
config = aws.DefaultConfig.Merge(&aws.Config{
Region: aws.String(ctx.Config[regionKey]),
})
}
containerStream := &logStream{
logStreamName: logStreamName,
logGroupName: logGroupName,
client: cloudwatchlogs.New(config),
messages: make(chan *logger.Message, 4096),
}
err := containerStream.create()
if err != nil {
return nil, err
}
go containerStream.collectBatch()
return containerStream, nil
}
// Name returns the name of the awslogs logging driver
func (l *logStream) Name() string {
return name
}
// Log submits messages for logging by an instance of the awslogs logging driver
func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if !l.closed {
l.messages <- msg
}
return nil
}
// Close closes the instance of the awslogs logging driver
func (l *logStream) Close() error {
l.lock.Lock()
defer l.lock.Unlock()
if !l.closed {
close(l.messages)
}
l.closed = true
return nil
}
// create creates a log stream for the instance of the awslogs logging driver
func (l *logStream) create() error {
input := &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(l.logGroupName),
LogStreamName: aws.String(l.logStreamName),
}
_, err := l.client.CreateLogStream(input)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
fields := logrus.Fields{
"errorCode": awsErr.Code(),
"message": awsErr.Message(),
"origError": awsErr.OrigErr(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}
if awsErr.Code() == resourceAlreadyExistsCode {
// Allow creation to succeed
logrus.WithFields(fields).Info("Log stream already exists")
return nil
}
logrus.WithFields(fields).Error("Failed to create log stream")
}
}
return err
}
// newTicker is used for time-based batching. newTicker is a variable such
// that the implementation can be swapped out for unit tests.
var newTicker = func(freq time.Duration) *time.Ticker {
return time.NewTicker(freq)
}
// collectBatch executes as a goroutine to perform batching of log events for
// submission to the log stream. Batching is performed on time- and size-
// bases. Time-based batching occurs at a 5 second interval (defined in the
// batchPublishFrequency const). Size-based batching is performed on the
// maximum number of events per batch (defined in maximumLogEventsPerPut) and
// the maximum number of total bytes in a batch (defined in
// maximumBytesPerPut). Log messages are split by the maximum bytes per event
// (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
// (defined in perEventBytes) which is accounted for in split- and batch-
// calculations.
func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency)
var events []*cloudwatchlogs.InputLogEvent
bytes := 0
for {
select {
case <-timer.C:
l.publishBatch(events)
events = events[:0]
bytes = 0
case msg, more := <-l.messages:
if !more {
l.publishBatch(events)
return
}
unprocessedLine := msg.Line
for len(unprocessedLine) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(unprocessedLine)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := unprocessedLine[:lineBytes]
unprocessedLine = unprocessedLine[lineBytes:]
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of events or if adding this
// event would push it over the maximum number of total bytes.
l.publishBatch(events)
events = events[:0]
bytes = 0
}
events = append(events, &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
})
bytes += (lineBytes + perEventBytes)
}
}
}
}
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
if len(events) == 0 {
return
}
sort.Sort(byTimestamp(events))
nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dataAlreadyAcceptedCode {
// already submitted, just grab the correct sequence token
parts := strings.Split(awsErr.Message(), " ")
nextSequenceToken = &parts[len(parts)-1]
logrus.WithFields(logrus.Fields{
"errorCode": awsErr.Code(),
"message": awsErr.Message(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}).Info("Data already accepted, ignoring error")
err = nil
} else if awsErr.Code() == invalidSequenceTokenCode {
// sequence code is bad, grab the correct one and retry
parts := strings.Split(awsErr.Message(), " ")
token := parts[len(parts)-1]
nextSequenceToken, err = l.putLogEvents(events, &token)
}
}
}
if err != nil {
logrus.Error(err)
} else {
l.sequenceToken = nextSequenceToken
}
}
// putLogEvents wraps the PutLogEvents API
func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: events,
SequenceToken: sequenceToken,
LogGroupName: aws.String(l.logGroupName),
LogStreamName: aws.String(l.logStreamName),
}
resp, err := l.client.PutLogEvents(input)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
logrus.WithFields(logrus.Fields{
"errorCode": awsErr.Code(),
"message": awsErr.Message(),
"origError": awsErr.OrigErr(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}).Error("Failed to put log events")
}
return nil, err
}
return resp.NextSequenceToken, nil
}
// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
// awslogs-group, and awslogs-stream
func ValidateLogOpt(cfg map[string]string) error {
for key := range cfg {
switch key {
case logGroupKey:
case logStreamKey:
case regionKey:
default:
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
}
}
if cfg[logGroupKey] == "" {
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
}
if cfg[regionKey] == "" && os.Getenv(regionEnvKey) == "" {
return fmt.Errorf(
"must specify a value for environment variable '%s' or log opt '%s'",
regionEnvKey,
regionKey)
}
return nil
}
// Len returns the length of a byTimestamp slice. Len is required by the
// sort.Interface interface.
func (slice byTimestamp) Len() int {
return len(slice)
}
// Less compares two values in a byTimestamp slice by Timestamp. Less is
// required by the sort.Interface interface.
func (slice byTimestamp) Less(i, j int) bool {
iTimestamp, jTimestamp := int64(0), int64(0)
if slice != nil && slice[i].Timestamp != nil {
iTimestamp = *slice[i].Timestamp
}
if slice != nil && slice[j].Timestamp != nil {
jTimestamp = *slice[j].Timestamp
}
return iTimestamp < jTimestamp
}
// Swap swaps two values in a byTimestamp slice with each other. Swap is
// required by the sort.Interface interface.
func (slice byTimestamp) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}