mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
![Jacob Vallejo](/assets/img/avatar_default.png)
The previous bytes counter was moved out of scope was not counting the total number of bytes in the batch. This type encapsulates the counter and the batch for consideration and code ergonomics. Signed-off-by: Jacob Vallejo <jakeev@amazon.com>
689 lines
22 KiB
Go
689 lines
22 KiB
Go
// Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
|
|
package awslogs
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"regexp"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/credentials/endpointcreds"
|
|
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
|
"github.com/aws/aws-sdk-go/aws/request"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
|
"github.com/docker/docker/daemon/logger"
|
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
|
"github.com/docker/docker/dockerversion"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
name = "awslogs"
|
|
regionKey = "awslogs-region"
|
|
regionEnvKey = "AWS_REGION"
|
|
logGroupKey = "awslogs-group"
|
|
logStreamKey = "awslogs-stream"
|
|
logCreateGroupKey = "awslogs-create-group"
|
|
tagKey = "tag"
|
|
datetimeFormatKey = "awslogs-datetime-format"
|
|
multilinePatternKey = "awslogs-multiline-pattern"
|
|
credentialsEndpointKey = "awslogs-credentials-endpoint"
|
|
batchPublishFrequency = 5 * time.Second
|
|
|
|
// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
|
|
perEventBytes = 26
|
|
maximumBytesPerPut = 1048576
|
|
maximumLogEventsPerPut = 10000
|
|
|
|
// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
|
|
maximumBytesPerEvent = 262144 - perEventBytes
|
|
|
|
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
|
|
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
|
|
invalidSequenceTokenCode = "InvalidSequenceTokenException"
|
|
resourceNotFoundCode = "ResourceNotFoundException"
|
|
|
|
credentialsEndpoint = "http://169.254.170.2"
|
|
|
|
userAgentHeader = "User-Agent"
|
|
)
|
|
|
|
type logStream struct {
|
|
logStreamName string
|
|
logGroupName string
|
|
logCreateGroup bool
|
|
multilinePattern *regexp.Regexp
|
|
client api
|
|
messages chan *logger.Message
|
|
lock sync.RWMutex
|
|
closed bool
|
|
sequenceToken *string
|
|
}
|
|
|
|
type api interface {
|
|
CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
|
|
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
|
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
|
|
}
|
|
|
|
type regionFinder interface {
|
|
Region() (string, error)
|
|
}
|
|
|
|
type wrappedEvent struct {
|
|
inputLogEvent *cloudwatchlogs.InputLogEvent
|
|
insertOrder int
|
|
}
|
|
type byTimestamp []wrappedEvent
|
|
|
|
// init registers the awslogs driver
|
|
func init() {
|
|
if err := logger.RegisterLogDriver(name, New); err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// eventBatch holds the events that are batched for submission and the
|
|
// associated data about it.
|
|
//
|
|
// Warning: this type is not threadsafe and must not be used
|
|
// concurrently. This type is expected to be consumed in a single go
|
|
// routine and never concurrently.
|
|
type eventBatch struct {
|
|
batch []wrappedEvent
|
|
bytes int
|
|
}
|
|
|
|
// New creates an awslogs logger using the configuration passed in on the
|
|
// context. Supported context configuration variables are awslogs-region,
|
|
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
|
|
// and awslogs-datetime-format. When available, configuration is
|
|
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
|
|
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
|
|
// the EC2 Instance Metadata Service.
|
|
func New(info logger.Info) (logger.Logger, error) {
|
|
logGroupName := info.Config[logGroupKey]
|
|
logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
logCreateGroup := false
|
|
if info.Config[logCreateGroupKey] != "" {
|
|
logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if info.Config[logStreamKey] != "" {
|
|
logStreamName = info.Config[logStreamKey]
|
|
}
|
|
|
|
multilinePattern, err := parseMultilineOptions(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client, err := newAWSLogsClient(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
containerStream := &logStream{
|
|
logStreamName: logStreamName,
|
|
logGroupName: logGroupName,
|
|
logCreateGroup: logCreateGroup,
|
|
multilinePattern: multilinePattern,
|
|
client: client,
|
|
messages: make(chan *logger.Message, 4096),
|
|
}
|
|
err = containerStream.create()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go containerStream.collectBatch()
|
|
|
|
return containerStream, nil
|
|
}
|
|
|
|
// Parses awslogs-multiline-pattern and awslogs-datetime-format options
|
|
// If awslogs-datetime-format is present, convert the format from strftime
|
|
// to regexp and return.
|
|
// If awslogs-multiline-pattern is present, compile regexp and return
|
|
func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
|
|
dateTimeFormat := info.Config[datetimeFormatKey]
|
|
multilinePatternKey := info.Config[multilinePatternKey]
|
|
// strftime input is parsed into a regular expression
|
|
if dateTimeFormat != "" {
|
|
// %. matches each strftime format sequence and ReplaceAllStringFunc
|
|
// looks up each format sequence in the conversion table strftimeToRegex
|
|
// to replace with a defined regular expression
|
|
r := regexp.MustCompile("%.")
|
|
multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
|
|
return strftimeToRegex[s]
|
|
})
|
|
}
|
|
if multilinePatternKey != "" {
|
|
multilinePattern, err := regexp.Compile(multilinePatternKey)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
|
|
}
|
|
return multilinePattern, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// Maps strftime format strings to regex
|
|
var strftimeToRegex = map[string]string{
|
|
/*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
|
|
/*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
|
|
/*weekdayZeroIndex */ `%w`: `[0-6]`,
|
|
/*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
|
|
/*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
|
|
/*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
|
|
/*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
|
|
/*yearCentury */ `%Y`: `\d{4}`,
|
|
/*yearZeroPadded */ `%y`: `\d{2}`,
|
|
/*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
|
|
/*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
|
|
/*AM or PM */ `%p`: "[A,P]M",
|
|
/*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
|
|
/*secondZeroPadded */ `%S`: `[0-5][0-9]`,
|
|
/*microsecondZeroPadded */ `%f`: `\d{6}`,
|
|
/*utcOffset */ `%z`: `[+-]\d{4}`,
|
|
/*tzName */ `%Z`: `[A-Z]{1,4}T`,
|
|
/*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
|
|
/*milliseconds */ `%L`: `\.\d{3}`,
|
|
}
|
|
|
|
// newRegionFinder is a variable such that the implementation
|
|
// can be swapped out for unit tests.
|
|
var newRegionFinder = func() regionFinder {
|
|
return ec2metadata.New(session.New())
|
|
}
|
|
|
|
// newSDKEndpoint is a variable such that the implementation
|
|
// can be swapped out for unit tests.
|
|
var newSDKEndpoint = credentialsEndpoint
|
|
|
|
// newAWSLogsClient creates the service client for Amazon CloudWatch Logs.
|
|
// Customizations to the default client from the SDK include a Docker-specific
|
|
// User-Agent string and automatic region detection using the EC2 Instance
|
|
// Metadata Service when region is otherwise unspecified.
|
|
func newAWSLogsClient(info logger.Info) (api, error) {
|
|
var region *string
|
|
if os.Getenv(regionEnvKey) != "" {
|
|
region = aws.String(os.Getenv(regionEnvKey))
|
|
}
|
|
if info.Config[regionKey] != "" {
|
|
region = aws.String(info.Config[regionKey])
|
|
}
|
|
if region == nil || *region == "" {
|
|
logrus.Info("Trying to get region from EC2 Metadata")
|
|
ec2MetadataClient := newRegionFinder()
|
|
r, err := ec2MetadataClient.Region()
|
|
if err != nil {
|
|
logrus.WithFields(logrus.Fields{
|
|
"error": err,
|
|
}).Error("Could not get region from EC2 metadata, environment, or log option")
|
|
return nil, errors.New("Cannot determine region for awslogs driver")
|
|
}
|
|
region = &r
|
|
}
|
|
|
|
sess, err := session.NewSession()
|
|
if err != nil {
|
|
return nil, errors.New("Failed to create a service client session for for awslogs driver")
|
|
}
|
|
|
|
// attach region to cloudwatchlogs config
|
|
sess.Config.Region = region
|
|
|
|
if uri, ok := info.Config[credentialsEndpointKey]; ok {
|
|
logrus.Debugf("Trying to get credentials from awslogs-credentials-endpoint")
|
|
|
|
endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
|
|
creds := endpointcreds.NewCredentialsClient(*sess.Config, sess.Handlers, endpoint,
|
|
func(p *endpointcreds.Provider) {
|
|
p.ExpiryWindow = 5 * time.Minute
|
|
})
|
|
|
|
// attach credentials to cloudwatchlogs config
|
|
sess.Config.Credentials = creds
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"region": *region,
|
|
}).Debug("Created awslogs client")
|
|
|
|
client := cloudwatchlogs.New(sess)
|
|
|
|
client.Handlers.Build.PushBackNamed(request.NamedHandler{
|
|
Name: "DockerUserAgentHandler",
|
|
Fn: func(r *request.Request) {
|
|
currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
|
|
r.HTTPRequest.Header.Set(userAgentHeader,
|
|
fmt.Sprintf("Docker %s (%s) %s",
|
|
dockerversion.Version, runtime.GOOS, currentAgent))
|
|
},
|
|
})
|
|
return client, nil
|
|
}
|
|
|
|
// Name returns the name of the awslogs logging driver
|
|
func (l *logStream) Name() string {
|
|
return name
|
|
}
|
|
|
|
func (l *logStream) BufSize() int {
|
|
return maximumBytesPerEvent
|
|
}
|
|
|
|
// Log submits messages for logging by an instance of the awslogs logging driver
|
|
func (l *logStream) Log(msg *logger.Message) error {
|
|
l.lock.RLock()
|
|
defer l.lock.RUnlock()
|
|
if !l.closed {
|
|
l.messages <- msg
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close closes the instance of the awslogs logging driver
|
|
func (l *logStream) Close() error {
|
|
l.lock.Lock()
|
|
defer l.lock.Unlock()
|
|
if !l.closed {
|
|
close(l.messages)
|
|
}
|
|
l.closed = true
|
|
return nil
|
|
}
|
|
|
|
// create creates log group and log stream for the instance of the awslogs logging driver
|
|
func (l *logStream) create() error {
|
|
if err := l.createLogStream(); err != nil {
|
|
if l.logCreateGroup {
|
|
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode {
|
|
if err := l.createLogGroup(); err != nil {
|
|
return err
|
|
}
|
|
return l.createLogStream()
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createLogGroup creates a log group for the instance of the awslogs logging driver
|
|
func (l *logStream) createLogGroup() error {
|
|
if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
|
LogGroupName: aws.String(l.logGroupName),
|
|
}); err != nil {
|
|
if awsErr, ok := err.(awserr.Error); ok {
|
|
fields := logrus.Fields{
|
|
"errorCode": awsErr.Code(),
|
|
"message": awsErr.Message(),
|
|
"origError": awsErr.OrigErr(),
|
|
"logGroupName": l.logGroupName,
|
|
"logCreateGroup": l.logCreateGroup,
|
|
}
|
|
if awsErr.Code() == resourceAlreadyExistsCode {
|
|
// Allow creation to succeed
|
|
logrus.WithFields(fields).Info("Log group already exists")
|
|
return nil
|
|
}
|
|
logrus.WithFields(fields).Error("Failed to create log group")
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// createLogStream creates a log stream for the instance of the awslogs logging driver
|
|
func (l *logStream) createLogStream() error {
|
|
input := &cloudwatchlogs.CreateLogStreamInput{
|
|
LogGroupName: aws.String(l.logGroupName),
|
|
LogStreamName: aws.String(l.logStreamName),
|
|
}
|
|
|
|
_, err := l.client.CreateLogStream(input)
|
|
|
|
if err != nil {
|
|
if awsErr, ok := err.(awserr.Error); ok {
|
|
fields := logrus.Fields{
|
|
"errorCode": awsErr.Code(),
|
|
"message": awsErr.Message(),
|
|
"origError": awsErr.OrigErr(),
|
|
"logGroupName": l.logGroupName,
|
|
"logStreamName": l.logStreamName,
|
|
}
|
|
if awsErr.Code() == resourceAlreadyExistsCode {
|
|
// Allow creation to succeed
|
|
logrus.WithFields(fields).Info("Log stream already exists")
|
|
return nil
|
|
}
|
|
logrus.WithFields(fields).Error("Failed to create log stream")
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// newTicker is used for time-based batching. newTicker is a variable such
|
|
// that the implementation can be swapped out for unit tests.
|
|
var newTicker = func(freq time.Duration) *time.Ticker {
|
|
return time.NewTicker(freq)
|
|
}
|
|
|
|
// collectBatch executes as a goroutine to perform batching of log events for
|
|
// submission to the log stream. If the awslogs-multiline-pattern or
|
|
// awslogs-datetime-format options have been configured, multiline processing
|
|
// is enabled, where log messages are stored in an event buffer until a multiline
|
|
// pattern match is found, at which point the messages in the event buffer are
|
|
// pushed to CloudWatch logs as a single log event. Multiline messages are processed
|
|
// according to the maximumBytesPerPut constraint, and the implementation only
|
|
// allows for messages to be buffered for a maximum of 2*batchPublishFrequency
|
|
// seconds. When events are ready to be processed for submission to CloudWatch
|
|
// Logs, the processEvents method is called. If a multiline pattern is not
|
|
// configured, log events are submitted to the processEvents method immediately.
|
|
func (l *logStream) collectBatch() {
|
|
ticker := newTicker(batchPublishFrequency)
|
|
var eventBuffer []byte
|
|
var eventBufferTimestamp int64
|
|
var batch = newEventBatch()
|
|
for {
|
|
select {
|
|
case t := <-ticker.C:
|
|
// If event buffer is older than batch publish frequency flush the event buffer
|
|
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
|
|
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
|
|
eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
|
|
eventBufferNegative := eventBufferAge < 0
|
|
if eventBufferExpired || eventBufferNegative {
|
|
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
|
|
eventBuffer = eventBuffer[:0]
|
|
}
|
|
}
|
|
l.publishBatch(batch)
|
|
batch.reset()
|
|
case msg, more := <-l.messages:
|
|
if !more {
|
|
// Flush event buffer and release resources
|
|
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
|
|
eventBuffer = eventBuffer[:0]
|
|
l.publishBatch(batch)
|
|
batch.reset()
|
|
return
|
|
}
|
|
if eventBufferTimestamp == 0 {
|
|
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
|
|
}
|
|
unprocessedLine := msg.Line
|
|
if l.multilinePattern != nil {
|
|
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
|
|
// This is a new log event or we will exceed max bytes per event
|
|
// so flush the current eventBuffer to events and reset timestamp
|
|
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
|
|
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
|
|
eventBuffer = eventBuffer[:0]
|
|
}
|
|
// Append new line
|
|
processedLine := append(unprocessedLine, "\n"...)
|
|
eventBuffer = append(eventBuffer, processedLine...)
|
|
logger.PutMessage(msg)
|
|
} else {
|
|
l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
|
|
logger.PutMessage(msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processEvent processes log events that are ready for submission to CloudWatch
|
|
// logs. Batching is performed on time- and size-bases. Time-based batching
|
|
// occurs at a 5 second interval (defined in the batchPublishFrequency const).
|
|
// Size-based batching is performed on the maximum number of events per batch
|
|
// (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
|
|
// batch (defined in maximumBytesPerPut). Log messages are split by the maximum
|
|
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
|
|
// byte overhead (defined in perEventBytes) which is accounted for in split- and
|
|
// batch-calculations.
|
|
func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
|
|
for len(unprocessedLine) > 0 {
|
|
// Split line length so it does not exceed the maximum
|
|
lineBytes := len(unprocessedLine)
|
|
if lineBytes > maximumBytesPerEvent {
|
|
lineBytes = maximumBytesPerEvent
|
|
}
|
|
line := unprocessedLine[:lineBytes]
|
|
|
|
event := wrappedEvent{
|
|
inputLogEvent: &cloudwatchlogs.InputLogEvent{
|
|
Message: aws.String(string(line)),
|
|
Timestamp: aws.Int64(timestamp),
|
|
},
|
|
insertOrder: batch.count(),
|
|
}
|
|
|
|
added := batch.add(event, lineBytes)
|
|
if added {
|
|
unprocessedLine = unprocessedLine[lineBytes:]
|
|
} else {
|
|
l.publishBatch(batch)
|
|
batch.reset()
|
|
}
|
|
}
|
|
}
|
|
|
|
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
|
|
// accounting for sequencing requirements (each request must reference the
|
|
// sequence token returned by the previous request).
|
|
func (l *logStream) publishBatch(batch *eventBatch) {
|
|
if batch.isEmpty() {
|
|
return
|
|
}
|
|
cwEvents := unwrapEvents(batch.events())
|
|
|
|
nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
|
|
|
|
if err != nil {
|
|
if awsErr, ok := err.(awserr.Error); ok {
|
|
if awsErr.Code() == dataAlreadyAcceptedCode {
|
|
// already submitted, just grab the correct sequence token
|
|
parts := strings.Split(awsErr.Message(), " ")
|
|
nextSequenceToken = &parts[len(parts)-1]
|
|
logrus.WithFields(logrus.Fields{
|
|
"errorCode": awsErr.Code(),
|
|
"message": awsErr.Message(),
|
|
"logGroupName": l.logGroupName,
|
|
"logStreamName": l.logStreamName,
|
|
}).Info("Data already accepted, ignoring error")
|
|
err = nil
|
|
} else if awsErr.Code() == invalidSequenceTokenCode {
|
|
// sequence code is bad, grab the correct one and retry
|
|
parts := strings.Split(awsErr.Message(), " ")
|
|
token := parts[len(parts)-1]
|
|
nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
} else {
|
|
l.sequenceToken = nextSequenceToken
|
|
}
|
|
}
|
|
|
|
// putLogEvents wraps the PutLogEvents API
|
|
func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
|
|
input := &cloudwatchlogs.PutLogEventsInput{
|
|
LogEvents: events,
|
|
SequenceToken: sequenceToken,
|
|
LogGroupName: aws.String(l.logGroupName),
|
|
LogStreamName: aws.String(l.logStreamName),
|
|
}
|
|
resp, err := l.client.PutLogEvents(input)
|
|
if err != nil {
|
|
if awsErr, ok := err.(awserr.Error); ok {
|
|
logrus.WithFields(logrus.Fields{
|
|
"errorCode": awsErr.Code(),
|
|
"message": awsErr.Message(),
|
|
"origError": awsErr.OrigErr(),
|
|
"logGroupName": l.logGroupName,
|
|
"logStreamName": l.logStreamName,
|
|
}).Error("Failed to put log events")
|
|
}
|
|
return nil, err
|
|
}
|
|
return resp.NextSequenceToken, nil
|
|
}
|
|
|
|
// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
|
|
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
|
|
// awslogs-multiline-pattern
|
|
func ValidateLogOpt(cfg map[string]string) error {
|
|
for key := range cfg {
|
|
switch key {
|
|
case logGroupKey:
|
|
case logStreamKey:
|
|
case logCreateGroupKey:
|
|
case regionKey:
|
|
case tagKey:
|
|
case datetimeFormatKey:
|
|
case multilinePatternKey:
|
|
case credentialsEndpointKey:
|
|
default:
|
|
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
|
|
}
|
|
}
|
|
if cfg[logGroupKey] == "" {
|
|
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
|
|
}
|
|
if cfg[logCreateGroupKey] != "" {
|
|
if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
|
|
return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
|
|
}
|
|
}
|
|
_, datetimeFormatKeyExists := cfg[datetimeFormatKey]
|
|
_, multilinePatternKeyExists := cfg[multilinePatternKey]
|
|
if datetimeFormatKeyExists && multilinePatternKeyExists {
|
|
return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Len returns the length of a byTimestamp slice. Len is required by the
|
|
// sort.Interface interface.
|
|
func (slice byTimestamp) Len() int {
|
|
return len(slice)
|
|
}
|
|
|
|
// Less compares two values in a byTimestamp slice by Timestamp. Less is
|
|
// required by the sort.Interface interface.
|
|
func (slice byTimestamp) Less(i, j int) bool {
|
|
iTimestamp, jTimestamp := int64(0), int64(0)
|
|
if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
|
|
iTimestamp = *slice[i].inputLogEvent.Timestamp
|
|
}
|
|
if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
|
|
jTimestamp = *slice[j].inputLogEvent.Timestamp
|
|
}
|
|
if iTimestamp == jTimestamp {
|
|
return slice[i].insertOrder < slice[j].insertOrder
|
|
}
|
|
return iTimestamp < jTimestamp
|
|
}
|
|
|
|
// Swap swaps two values in a byTimestamp slice with each other. Swap is
|
|
// required by the sort.Interface interface.
|
|
func (slice byTimestamp) Swap(i, j int) {
|
|
slice[i], slice[j] = slice[j], slice[i]
|
|
}
|
|
|
|
func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
|
|
cwEvents := make([]*cloudwatchlogs.InputLogEvent, len(events))
|
|
for i, input := range events {
|
|
cwEvents[i] = input.inputLogEvent
|
|
}
|
|
return cwEvents
|
|
}
|
|
|
|
func newEventBatch() *eventBatch {
|
|
return &eventBatch{
|
|
batch: make([]wrappedEvent, 0),
|
|
bytes: 0,
|
|
}
|
|
}
|
|
|
|
// events returns a slice of wrappedEvents sorted in order of their
|
|
// timestamps and then by their insertion order (see `byTimestamp`).
|
|
//
|
|
// Warning: this method is not threadsafe and must not be used
|
|
// concurrently.
|
|
func (b *eventBatch) events() []wrappedEvent {
|
|
sort.Sort(byTimestamp(b.batch))
|
|
return b.batch
|
|
}
|
|
|
|
// add adds an event to the batch of events accounting for the
|
|
// necessary overhead for an event to be logged. An error will be
|
|
// returned if the event cannot be added to the batch due to service
|
|
// limits.
|
|
//
|
|
// Warning: this method is not threadsafe and must not be used
|
|
// concurrently.
|
|
func (b *eventBatch) add(event wrappedEvent, size int) bool {
|
|
addBytes := size + perEventBytes
|
|
|
|
// verify we are still within service limits
|
|
switch {
|
|
case len(b.batch)+1 > maximumLogEventsPerPut:
|
|
return false
|
|
case b.bytes+addBytes > maximumBytesPerPut:
|
|
return false
|
|
}
|
|
|
|
b.bytes += addBytes
|
|
b.batch = append(b.batch, event)
|
|
|
|
return true
|
|
}
|
|
|
|
// count is the number of batched events. Warning: this method
|
|
// is not threadsafe and must not be used concurrently.
|
|
func (b *eventBatch) count() int {
|
|
return len(b.batch)
|
|
}
|
|
|
|
// size is the total number of bytes that the batch represents.
|
|
//
|
|
// Warning: this method is not threadsafe and must not be used
|
|
// concurrently.
|
|
func (b *eventBatch) size() int {
|
|
return b.bytes
|
|
}
|
|
|
|
func (b *eventBatch) isEmpty() bool {
|
|
zeroEvents := b.count() == 0
|
|
zeroSize := b.size() == 0
|
|
return zeroEvents && zeroSize
|
|
}
|
|
|
|
// reset prepares the batch for reuse.
|
|
func (b *eventBatch) reset() {
|
|
b.bytes = 0
|
|
b.batch = b.batch[:0]
|
|
}
|