package awslogs // import "github.com/docker/docker/daemon/logger/awslogs" import ( "errors" "fmt" "net/http" "net/http/httptest" "os" "reflect" "regexp" "runtime" "strconv" "strings" "testing" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/dockerversion" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" ) const ( groupName = "groupName" streamName = "streamName" sequenceToken = "sequenceToken" nextSequenceToken = "nextSequenceToken" logline = "this is a log line\r" multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r" ) // Generates i multi-line events each with j lines func (l *logStream) logGenerator(lineCount int, multilineCount int) { for i := 0; i < multilineCount; i++ { l.Log(&logger.Message{ Line: []byte(multilineLogline), Timestamp: time.Time{}, }) for j := 0; j < lineCount; j++ { l.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Time{}, }) } } } func testEventBatch(events []wrappedEvent) *eventBatch { batch := newEventBatch() for _, event := range events { eventlen := len([]byte(*event.inputLogEvent.Message)) batch.add(event, eventlen) } return batch } func TestNewStreamConfig(t *testing.T) { tests := []struct { logStreamName string logGroupName string logCreateGroup string logCreateStream string logNonBlocking string forceFlushInterval string maxBufferedEvents string datetimeFormat string multilinePattern string shouldErr bool testName string }{ {"", groupName, "", "", "", "", "", "", "", false, "defaults"}, {"", groupName, "invalid create group", "", "", "", "", "", "", true, "invalid create group"}, {"", groupName, "", "", "", "invalid flush interval", "", "", "", true, "invalid flush interval"}, {"", groupName, "", "", "", "", "invalid max buffered events", "", "", true, "invalid max buffered events"}, {"", groupName, "", "", "", "", "", "", "n{1001}", true, "invalid multiline pattern"}, {"", groupName, "", "", "", "15", "", "", "", false, "flush interval at 15"}, {"", groupName, "", "", "", "", "1024", "", "", false, "max buffered events at 1024"}, } for _, tc := range tests { t.Run(tc.testName, func(t *testing.T) { cfg := map[string]string{ logGroupKey: tc.logGroupName, logCreateGroupKey: tc.logCreateGroup, "mode": tc.logNonBlocking, forceFlushIntervalKey: tc.forceFlushInterval, maxBufferedEventsKey: tc.maxBufferedEvents, logStreamKey: tc.logStreamName, logCreateStreamKey: tc.logCreateStream, datetimeFormatKey: tc.datetimeFormat, multilinePatternKey: tc.multilinePattern, } info := logger.Info{ Config: cfg, } logStreamConfig, err := newStreamConfig(info) if tc.shouldErr { assert.Check(t, err != nil, "Expected an error") } else { assert.Check(t, err == nil, "Unexpected error") assert.Check(t, logStreamConfig.logGroupName == tc.logGroupName, "Unexpected logGroupName") if tc.forceFlushInterval != "" { forceFlushIntervalAsInt, _ := strconv.Atoi(info.Config[forceFlushIntervalKey]) assert.Check(t, logStreamConfig.forceFlushInterval == time.Duration(forceFlushIntervalAsInt)*time.Second, "Unexpected forceFlushInterval") } if tc.maxBufferedEvents != "" { maxBufferedEvents, _ := strconv.Atoi(info.Config[maxBufferedEventsKey]) assert.Check(t, logStreamConfig.maxBufferedEvents == maxBufferedEvents, "Unexpected maxBufferedEvents") } } }) } } func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { info := logger.Info{ Config: map[string]string{ regionKey: "us-east-1", }, } client, err := newAWSLogsClient(info) assert.NilError(t, err) realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs) assert.Check(t, ok, "Could not cast client to cloudwatchlogs.CloudWatchLogs") buildHandlerList := realClient.Handlers.Build request := &request.Request{ HTTPRequest: &http.Request{ Header: http.Header{}, }, } buildHandlerList.Run(request) expectedUserAgentString := fmt.Sprintf("Docker %s (%s) %s/%s (%s; %s; %s)", dockerversion.Version, runtime.GOOS, aws.SDKName, aws.SDKVersion, runtime.Version(), runtime.GOOS, runtime.GOARCH) userAgent := request.HTTPRequest.Header.Get("User-Agent") if userAgent != expectedUserAgentString { t.Errorf("Wrong User-Agent string, expected \"%s\" but was \"%s\"", expectedUserAgentString, userAgent) } } func TestNewAWSLogsClientLogFormatHeaderHandler(t *testing.T) { tests := []struct { logFormat string expectedHeaderValue string }{ { logFormat: jsonEmfLogFormat, expectedHeaderValue: "json/emf", }, { logFormat: "", expectedHeaderValue: "", }, } for _, tc := range tests { t.Run(tc.logFormat, func(t *testing.T) { info := logger.Info{ Config: map[string]string{ regionKey: "us-east-1", logFormatKey: tc.logFormat, }, } client, err := newAWSLogsClient(info) assert.NilError(t, err) realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs) assert.Check(t, ok, "Could not cast client to cloudwatchlogs.CloudWatchLogs") buildHandlerList := realClient.Handlers.Build request := &request.Request{ HTTPRequest: &http.Request{ Header: http.Header{}, }, } buildHandlerList.Run(request) logFormatHeaderVal := request.HTTPRequest.Header.Get("x-amzn-logs-format") assert.Equal(t, tc.expectedHeaderValue, logFormatHeaderVal) }) } } func TestNewAWSLogsClientAWSLogsEndpoint(t *testing.T) { endpoint := "mock-endpoint" info := logger.Info{ Config: map[string]string{ regionKey: "us-east-1", endpointKey: endpoint, }, } client, err := newAWSLogsClient(info) assert.NilError(t, err) realClient, ok := client.(*cloudwatchlogs.CloudWatchLogs) assert.Check(t, ok, "Could not cast client to cloudwatchlogs.CloudWatchLogs") endpointWithScheme := realClient.Endpoint expectedEndpointWithScheme := "https://" + endpoint assert.Equal(t, endpointWithScheme, expectedEndpointWithScheme, "Wrong endpoint") } func TestNewAWSLogsClientRegionDetect(t *testing.T) { info := logger.Info{ Config: map[string]string{}, } mockMetadata := newMockMetadataClient() newRegionFinder = func() (regionFinder, error) { return mockMetadata, nil } mockMetadata.regionResult <- ®ionResult{ successResult: "us-east-1", } _, err := newAWSLogsClient(info) assert.NilError(t, err) } func TestCreateSuccess(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, logCreateStream: true, } var input *cloudwatchlogs.CreateLogStreamInput mockClient.createLogStreamFunc = func(i *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { input = i return &cloudwatchlogs.CreateLogStreamOutput{}, nil } err := stream.create() assert.NilError(t, err) assert.Equal(t, groupName, aws.StringValue(input.LogGroupName), "LogGroupName") assert.Equal(t, streamName, aws.StringValue(input.LogStreamName), "LogStreamName") } func TestCreateStreamSkipped(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, logCreateStream: false, } mockClient.createLogStreamFunc = func(i *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { t.Error("CreateLogStream should not be called") return nil, errors.New("should not be called") } err := stream.create() assert.NilError(t, err) } func TestCreateLogGroupSuccess(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, logCreateGroup: true, logCreateStream: true, } var logGroupInput *cloudwatchlogs.CreateLogGroupInput mockClient.createLogGroupFunc = func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { logGroupInput = input return &cloudwatchlogs.CreateLogGroupOutput{}, nil } var logStreamInput *cloudwatchlogs.CreateLogStreamInput createLogStreamCalls := 0 mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { createLogStreamCalls++ if logGroupInput == nil { // log group not created yet return nil, awserr.New(resourceNotFoundCode, "should error once", nil) } logStreamInput = input return &cloudwatchlogs.CreateLogStreamOutput{}, nil } err := stream.create() assert.NilError(t, err) if createLogStreamCalls < 2 { t.Errorf("Expected CreateLogStream to be called twice, was called %d times", createLogStreamCalls) } assert.Check(t, logGroupInput != nil) assert.Equal(t, groupName, aws.StringValue(logGroupInput.LogGroupName), "LogGroupName in LogGroupInput") assert.Check(t, logStreamInput != nil) assert.Equal(t, groupName, aws.StringValue(logStreamInput.LogGroupName), "LogGroupName in LogStreamInput") assert.Equal(t, streamName, aws.StringValue(logStreamInput.LogStreamName), "LogStreamName in LogStreamInput") } func TestCreateError(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logCreateStream: true, } mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { return nil, errors.New("error") } err := stream.create() if err == nil { t.Fatal("Expected non-nil err") } } func TestCreateAlreadyExists(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logCreateStream: true, } calls := 0 mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { calls++ return nil, awserr.New(resourceAlreadyExistsCode, "", nil) } err := stream.create() assert.NilError(t, err) assert.Equal(t, 1, calls) } func TestLogClosed(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, closed: true, } err := stream.Log(&logger.Message{}) assert.Check(t, err != nil) } // TestLogBlocking tests that the Log method blocks appropriately when // non-blocking behavior is not enabled. Blocking is achieved through an // internal channel that must be drained for Log to return. func TestLogBlocking(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, messages: make(chan *logger.Message), } errorCh := make(chan error, 1) started := make(chan bool) go func() { started <- true err := stream.Log(&logger.Message{}) errorCh <- err }() // block until the goroutine above has started <-started select { case err := <-errorCh: t.Fatal("Expected stream.Log to block: ", err) default: } // assuming it is blocked, we can now try to drain the internal channel and // unblock it select { case <-time.After(10 * time.Millisecond): // if we're unable to drain the channel within 10ms, something seems broken t.Fatal("Expected to be able to read from stream.messages but was unable to") case <-stream.messages: } select { case err := <-errorCh: assert.NilError(t, err) case <-time.After(30 * time.Second): t.Fatal("timed out waiting for read") } } func TestLogNonBlockingBufferEmpty(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, messages: make(chan *logger.Message, 1), logNonBlocking: true, } err := stream.Log(&logger.Message{}) assert.NilError(t, err) } func TestLogNonBlockingBufferFull(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, messages: make(chan *logger.Message, 1), logNonBlocking: true, } stream.messages <- &logger.Message{} errorCh := make(chan error, 1) started := make(chan bool) go func() { started <- true err := stream.Log(&logger.Message{}) errorCh <- err }() <-started select { case err := <-errorCh: assert.Check(t, err != nil) case <-time.After(30 * time.Second): t.Fatal("Expected Log call to not block") } } func TestPublishBatchSuccess(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } var input *cloudwatchlogs.PutLogEventsInput mockClient.putLogEventsFunc = func(i *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { input = i return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } events := []wrappedEvent{ { inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(logline), }, }, } stream.publishBatch(testEventBatch(events)) assert.Equal(t, nextSequenceToken, aws.StringValue(stream.sequenceToken), "sequenceToken") assert.Assert(t, input != nil) assert.Equal(t, sequenceToken, aws.StringValue(input.SequenceToken), "input.SequenceToken") assert.Assert(t, len(input.LogEvents) == 1) assert.Equal(t, events[0].inputLogEvent, input.LogEvents[0]) } func TestPublishBatchError(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { return nil, errors.New("error") } events := []wrappedEvent{ { inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(logline), }, }, } stream.publishBatch(testEventBatch(events)) assert.Equal(t, sequenceToken, aws.StringValue(stream.sequenceToken)) } func TestPublishBatchInvalidSeqSuccess(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) if aws.StringValue(input.SequenceToken) != "token" { return nil, awserr.New(invalidSequenceTokenCode, "use token token", nil) } return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } events := []wrappedEvent{ { inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(logline), }, }, } stream.publishBatch(testEventBatch(events)) assert.Equal(t, nextSequenceToken, aws.StringValue(stream.sequenceToken)) assert.Assert(t, len(calls) == 2) argument := calls[0] assert.Assert(t, argument != nil) assert.Equal(t, sequenceToken, aws.StringValue(argument.SequenceToken)) assert.Assert(t, len(argument.LogEvents) == 1) assert.Equal(t, events[0].inputLogEvent, argument.LogEvents[0]) argument = calls[1] assert.Assert(t, argument != nil) assert.Equal(t, "token", aws.StringValue(argument.SequenceToken)) assert.Assert(t, len(argument.LogEvents) == 1) assert.Equal(t, events[0].inputLogEvent, argument.LogEvents[0]) } func TestPublishBatchAlreadyAccepted(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) return nil, awserr.New(dataAlreadyAcceptedCode, "use token token", nil) } events := []wrappedEvent{ { inputLogEvent: &cloudwatchlogs.InputLogEvent{ Message: aws.String(logline), }, }, } stream.publishBatch(testEventBatch(events)) assert.Assert(t, stream.sequenceToken != nil) assert.Equal(t, "token", aws.StringValue(stream.sequenceToken)) assert.Assert(t, len(calls) == 1) argument := calls[0] assert.Assert(t, argument != nil) assert.Equal(t, sequenceToken, aws.StringValue(argument.SequenceToken)) assert.Assert(t, len(argument.LogEvents) == 1) assert.Equal(t, events[0].inputLogEvent, argument.LogEvents[0]) } func TestCollectBatchSimple(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Time{}, }) ticks <- time.Time{} ticks <- time.Time{} stream.Close() assert.Assert(t, len(calls) == 1) argument := calls[0] assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == 1) assert.Equal(t, logline, aws.StringValue(argument.LogEvents[0].Message)) } func TestCollectBatchTicker(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline + " 1"), Timestamp: time.Time{}, }) stream.Log(&logger.Message{ Line: []byte(logline + " 2"), Timestamp: time.Time{}, }) ticks <- time.Time{} // Verify first batch <-called assert.Assert(t, len(calls) == 1) argument := calls[0] calls = calls[1:] assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == 2) assert.Equal(t, logline+" 1", aws.StringValue(argument.LogEvents[0].Message)) assert.Equal(t, logline+" 2", aws.StringValue(argument.LogEvents[1].Message)) stream.Log(&logger.Message{ Line: []byte(logline + " 3"), Timestamp: time.Time{}, }) ticks <- time.Time{} <-called assert.Assert(t, len(calls) == 1) argument = calls[0] close(called) assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == 1) assert.Equal(t, logline+" 3", aws.StringValue(argument.LogEvents[0].Message)) stream.Close() } func TestCollectBatchMultilinePattern(t *testing.T) { mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, multilinePattern: multilinePattern, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Now(), }) stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Now(), }) stream.Log(&logger.Message{ Line: []byte("xxxx " + logline), Timestamp: time.Now(), }) ticks <- time.Now() // Verify single multiline event <-called assert.Assert(t, len(calls) == 1) argument := calls[0] calls = calls[1:] assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") stream.Close() // Verify single event <-called assert.Assert(t, len(calls) == 1) argument = calls[0] close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal("xxxx "+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") } func BenchmarkCollectBatch(b *testing.B) { for i := 0; i < b.N; i++ { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() } } func BenchmarkCollectBatchMultilinePattern(b *testing.B) { for i := 0; i < b.N; i++ { mockClient := &mockClient{} multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`) stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, multilinePattern: multilinePattern, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() } } func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, multilinePattern: multilinePattern, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Now(), }) // Log an event 1 second later stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Now().Add(time.Second), }) // Fire ticker defaultForceFlushInterval seconds later ticks <- time.Now().Add(defaultForceFlushInterval + time.Second) // Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval) <-called assert.Assert(t, len(calls) == 1) argument := calls[0] calls = calls[1:] assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") // Log an event 1 second later stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Now().Add(time.Second), }) // Fire ticker another defaultForceFlushInterval seconds later ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second) // Verify the event buffer is truly flushed - we should only receive a single event <-called assert.Assert(t, len(calls) == 1) argument = calls[0] close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") stream.Close() } func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, multilinePattern: multilinePattern, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Now(), }) // Log an event 1 second later stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Now().Add(time.Second), }) // Fire ticker in past to simulate negative event buffer age ticks <- time.Now().Add(-time.Second) // Verify single multiline event is flushed with a negative event buffer age <-called assert.Assert(t, len(calls) == 1) argument := calls[0] close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") stream.Close() } func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, multilinePattern: multilinePattern, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) // Log max event size longline := strings.Repeat("A", maximumBytesPerEvent) stream.Log(&logger.Message{ Line: []byte(longline), Timestamp: time.Now(), }) // Log short event shortline := strings.Repeat("B", 100) stream.Log(&logger.Message{ Line: []byte(shortline), Timestamp: time.Now(), }) // Fire ticker ticks <- time.Now().Add(defaultForceFlushInterval) // Verify multiline events // We expect a maximum sized event with no new line characters and a // second short event with a new line character at the end <-called assert.Assert(t, len(calls) == 1) argument := calls[0] close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(2, len(argument.LogEvents)), "Expected two events") assert.Check(t, is.Equal(longline, *argument.LogEvents[0].Message), "Received incorrect multiline message") assert.Check(t, is.Equal(shortline+"\n", *argument.LogEvents[1].Message), "Received incorrect multiline message") stream.Close() } func TestCollectBatchClose(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), Timestamp: time.Time{}, }) // no ticks stream.Close() <-called assert.Assert(t, len(calls) == 1) argument := calls[0] close(called) assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == 1) assert.Equal(t, logline, aws.StringValue((argument.LogEvents[0].Message))) } func TestEffectiveLen(t *testing.T) { tests := []struct { str string effectiveBytes int }{ {"Hello", 5}, {string([]byte{1, 2, 3, 4}), 4}, {"🙃", 4}, {string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12}, {"He\xff\xffo", 9}, {"", 0}, } for i, tc := range tests { t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) { assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str)) }) } } func TestFindValidSplit(t *testing.T) { tests := []struct { str string maxEffectiveBytes int splitOffset int effectiveBytes int }{ {"", 10, 0, 0}, {"Hello", 6, 5, 5}, {"Hello", 2, 2, 2}, {"Hello", 0, 0, 0}, {"🙃", 3, 0, 0}, {"🙃", 4, 4, 4}, {string([]byte{'a', 0xFF}), 2, 1, 1}, {string([]byte{'a', 0xFF}), 4, 2, 4}, } for i, tc := range tests { t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) { splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes) assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset") assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes") t.Log(tc.str[:tc.splitOffset]) t.Log(tc.str[tc.splitOffset:]) }) } } func TestProcessEventEmoji(t *testing.T) { stream := &logStream{} batch := &eventBatch{} bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1)) stream.processEvent(batch, bytes, 0) assert.Equal(t, 2, len(batch.batch), "should be two events in the batch") assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message)) assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message)) } func TestCollectBatchLineSplit(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) longline := strings.Repeat("A", maximumBytesPerEvent) stream.Log(&logger.Message{ Line: []byte(longline + "B"), Timestamp: time.Time{}, }) // no ticks stream.Close() <-called assert.Assert(t, len(calls) == 1) argument := calls[0] close(called) assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == 2) assert.Equal(t, longline, aws.StringValue(argument.LogEvents[0].Message)) assert.Equal(t, "B", aws.StringValue(argument.LogEvents[1].Message)) } func TestCollectBatchLineSplitWithBinary(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError stream.Log(&logger.Message{ Line: []byte(longline + "\xFD"), Timestamp: time.Time{}, }) // no ticks stream.Close() <-called assert.Assert(t, len(calls) == 1) argument := calls[0] close(called) assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == 2) assert.Equal(t, longline, aws.StringValue(argument.LogEvents[0].Message)) assert.Equal(t, "\xFD", aws.StringValue(argument.LogEvents[1].Message)) } func TestCollectBatchMaxEvents(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) line := "A" for i := 0; i <= maximumLogEventsPerPut; i++ { stream.Log(&logger.Message{ Line: []byte(line), Timestamp: time.Time{}, }) } // no ticks stream.Close() <-called <-called assert.Assert(t, len(calls) == 2) argument := calls[0] assert.Assert(t, argument != nil) assert.Check(t, len(argument.LogEvents) == maximumLogEventsPerPut) argument = calls[1] close(called) assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == 1) } func TestCollectBatchMaxTotalBytes(t *testing.T) { expectedPuts := 2 mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes) // maxline is the maximum line that could be submitted after // accounting for its overhead. maxline := strings.Repeat("A", maximumBytesPerPut-(perEventBytes*numPayloads)) // This will be split and batched up to the `maximumBytesPerPut' // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but // should also tolerate an offset within that range. stream.Log(&logger.Message{ Line: []byte(maxline[:len(maxline)/2]), Timestamp: time.Time{}, }) stream.Log(&logger.Message{ Line: []byte(maxline[len(maxline)/2:]), Timestamp: time.Time{}, }) stream.Log(&logger.Message{ Line: []byte("B"), Timestamp: time.Time{}, }) // no ticks, guarantee batch by size (and chan close) stream.Close() for i := 0; i < expectedPuts; i++ { <-called } assert.Assert(t, len(calls) == expectedPuts) argument := calls[0] assert.Assert(t, argument != nil) // Should total to the maximum allowed bytes. eventBytes := 0 for _, event := range argument.LogEvents { eventBytes += len(*event.Message) } eventsOverhead := len(argument.LogEvents) * perEventBytes payloadTotal := eventBytes + eventsOverhead // lowestMaxBatch allows the payload to be offset if the messages // don't lend themselves to align with the maximum event size. lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent assert.Check(t, payloadTotal <= maximumBytesPerPut) assert.Check(t, payloadTotal >= lowestMaxBatch) argument = calls[1] assert.Assert(t, len(argument.LogEvents) == 1) message := *argument.LogEvents[len(argument.LogEvents)-1].Message assert.Equal(t, "B", message[len(message)-1:]) } func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { expectedPuts := 2 mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) // maxline is the maximum line that could be submitted after // accounting for its overhead. maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError // This will be split and batched up to the `maximumBytesPerPut' // (+/- `maximumBytesPerEvent'). This /should/ be aligned, but // should also tolerate an offset within that range. stream.Log(&logger.Message{ Line: []byte(maxline), Timestamp: time.Time{}, }) stream.Log(&logger.Message{ Line: []byte("B"), Timestamp: time.Time{}, }) // no ticks, guarantee batch by size (and chan close) stream.Close() for i := 0; i < expectedPuts; i++ { <-called } assert.Assert(t, len(calls) == expectedPuts) argument := calls[0] assert.Assert(t, argument != nil) // Should total to the maximum allowed bytes. eventBytes := 0 for _, event := range argument.LogEvents { eventBytes += effectiveLen(*event.Message) } eventsOverhead := len(argument.LogEvents) * perEventBytes payloadTotal := eventBytes + eventsOverhead // lowestMaxBatch allows the payload to be offset if the messages // don't lend themselves to align with the maximum event size. lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent assert.Check(t, payloadTotal <= maximumBytesPerPut) assert.Check(t, payloadTotal >= lowestMaxBatch) argument = calls[1] message := *argument.LogEvents[len(argument.LogEvents)-1].Message assert.Equal(t, "B", message[len(message)-1:]) } func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) called := make(chan struct{}, 50) mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { calls = append(calls, input) called <- struct{}{} return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { return &time.Ticker{ C: ticks, } } d := make(chan bool) close(d) go stream.collectBatch(d) var expectedEvents []*cloudwatchlogs.InputLogEvent times := maximumLogEventsPerPut timestamp := time.Now() for i := 0; i < times; i++ { line := fmt.Sprintf("%d", i) if i%2 == 0 { timestamp.Add(1 * time.Nanosecond) } stream.Log(&logger.Message{ Line: []byte(line), Timestamp: timestamp, }) expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{ Message: aws.String(line), Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)), }) } ticks <- time.Time{} stream.Close() <-called assert.Assert(t, len(calls) == 1) argument := calls[0] close(called) assert.Assert(t, argument != nil) assert.Assert(t, len(argument.LogEvents) == times) for i := 0; i < times; i++ { if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) { t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i]) } } } func TestParseLogOptionsMultilinePattern(t *testing.T) { info := logger.Info{ Config: map[string]string{ multilinePatternKey: "^xxxx", }, } multilinePattern, err := parseMultilineOptions(info) assert.Check(t, err, "Received unexpected error") assert.Check(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found") } func TestParseLogOptionsDatetimeFormat(t *testing.T) { datetimeFormatTests := []struct { format string match string }{ {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"}, {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"}, {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"}, {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"}, {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"}, {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"}, {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"}, } for _, dt := range datetimeFormatTests { t.Run(dt.match, func(t *testing.T) { info := logger.Info{ Config: map[string]string{ datetimeFormatKey: dt.format, }, } multilinePattern, err := parseMultilineOptions(info) assert.Check(t, err, "Received unexpected error") assert.Check(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found") }) } } func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) { cfg := map[string]string{ multilinePatternKey: "^xxxx", datetimeFormatKey: "%Y-%m-%d", logGroupKey: groupName, } conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time" err := ValidateLogOpt(cfg) assert.Check(t, err != nil, "Expected an error") assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error") } func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) { tests := []struct { input string shouldErr bool }{ {"0", true}, {"-1", true}, {"a", true}, {"10", false}, } for _, tc := range tests { t.Run(tc.input, func(t *testing.T) { cfg := map[string]string{ forceFlushIntervalKey: tc.input, logGroupKey: groupName, } err := ValidateLogOpt(cfg) if tc.shouldErr { expectedErr := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': " + tc.input assert.Error(t, err, expectedErr) } else { assert.NilError(t, err) } }) } } func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) { tests := []struct { input string shouldErr bool }{ {"0", true}, {"-1", true}, {"a", true}, {"10", false}, } for _, tc := range tests { t.Run(tc.input, func(t *testing.T) { cfg := map[string]string{ maxBufferedEventsKey: tc.input, logGroupKey: groupName, } err := ValidateLogOpt(cfg) if tc.shouldErr { expectedErr := "must specify a positive integer for log opt 'awslogs-max-buffered-events': " + tc.input assert.Error(t, err, expectedErr) } else { assert.NilError(t, err) } }) } } func TestValidateLogOptionsFormat(t *testing.T) { tests := []struct { format string multiLinePattern string datetimeFormat string expErrMsg string }{ {"json/emf", "", "", ""}, {"random", "", "", "unsupported log format 'random'"}, {"", "", "", ""}, {"json/emf", "---", "", "you cannot configure log opt 'awslogs-datetime-format' or 'awslogs-multiline-pattern' when log opt 'awslogs-format' is set to 'json/emf'"}, {"json/emf", "", "yyyy-dd-mm", "you cannot configure log opt 'awslogs-datetime-format' or 'awslogs-multiline-pattern' when log opt 'awslogs-format' is set to 'json/emf'"}, } for i, tc := range tests { t.Run(fmt.Sprintf("%d/%s", i, tc.format), func(t *testing.T) { cfg := map[string]string{ logGroupKey: groupName, logFormatKey: tc.format, } if tc.multiLinePattern != "" { cfg[multilinePatternKey] = tc.multiLinePattern } if tc.datetimeFormat != "" { cfg[datetimeFormatKey] = tc.datetimeFormat } err := ValidateLogOpt(cfg) if tc.expErrMsg != "" { assert.Error(t, err, tc.expErrMsg) } else { assert.NilError(t, err) } }) } } func TestCreateTagSuccess(t *testing.T) { mockClient := &mockClient{} info := logger.Info{ ContainerName: "/test-container", ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890", Config: map[string]string{"tag": "{{.Name}}/{{.FullID}}"}, } logStreamName, e := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate) if e != nil { t.Errorf("Error generating tag: %q", e) } stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: logStreamName, logCreateStream: true, } calls := make([]*cloudwatchlogs.CreateLogStreamInput, 0) mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { calls = append(calls, input) return &cloudwatchlogs.CreateLogStreamOutput{}, nil } err := stream.create() assert.NilError(t, err) assert.Equal(t, 1, len(calls)) argument := calls[0] assert.Equal(t, "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890", aws.StringValue(argument.LogStreamName)) } func BenchmarkUnwrapEvents(b *testing.B) { events := make([]wrappedEvent, maximumLogEventsPerPut) for i := 0; i < maximumLogEventsPerPut; i++ { mes := strings.Repeat("0", maximumBytesPerEvent) events[i].inputLogEvent = &cloudwatchlogs.InputLogEvent{ Message: &mes, } } b.ResetTimer() for i := 0; i < b.N; i++ { res := unwrapEvents(events) assert.Check(b, is.Len(res, maximumLogEventsPerPut)) } } func TestNewAWSLogsClientCredentialEndpointDetect(t *testing.T) { // required for the cloudwatchlogs client t.Setenv("AWS_REGION", "us-west-2") credsResp := `{ "AccessKeyId" : "test-access-key-id", "SecretAccessKey": "test-secret-access-key" }` expectedAccessKeyID := "test-access-key-id" expectedSecretAccessKey := "test-secret-access-key" testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") fmt.Fprintln(w, credsResp) })) defer testServer.Close() // set the SDKEndpoint in the driver newSDKEndpoint = testServer.URL info := logger.Info{ Config: map[string]string{}, } info.Config["awslogs-credentials-endpoint"] = "/creds" c, err := newAWSLogsClient(info) assert.Check(t, err) client := c.(*cloudwatchlogs.CloudWatchLogs) creds, err := client.Config.Credentials.Get() assert.Check(t, err) assert.Check(t, is.Equal(expectedAccessKeyID, creds.AccessKeyID)) assert.Check(t, is.Equal(expectedSecretAccessKey, creds.SecretAccessKey)) } func TestNewAWSLogsClientCredentialEnvironmentVariable(t *testing.T) { // required for the cloudwatchlogs client t.Setenv("AWS_REGION", "us-west-2") expectedAccessKeyID := "test-access-key-id" expectedSecretAccessKey := "test-secret-access-key" t.Setenv("AWS_ACCESS_KEY_ID", expectedAccessKeyID) t.Setenv("AWS_SECRET_ACCESS_KEY", expectedSecretAccessKey) info := logger.Info{ Config: map[string]string{}, } c, err := newAWSLogsClient(info) assert.Check(t, err) client := c.(*cloudwatchlogs.CloudWatchLogs) creds, err := client.Config.Credentials.Get() assert.Check(t, err) assert.Check(t, is.Equal(expectedAccessKeyID, creds.AccessKeyID)) assert.Check(t, is.Equal(expectedSecretAccessKey, creds.SecretAccessKey)) } func TestNewAWSLogsClientCredentialSharedFile(t *testing.T) { // required for the cloudwatchlogs client t.Setenv("AWS_REGION", "us-west-2") expectedAccessKeyID := "test-access-key-id" expectedSecretAccessKey := "test-secret-access-key" contentStr := ` [default] aws_access_key_id = "test-access-key-id" aws_secret_access_key = "test-secret-access-key" ` content := []byte(contentStr) tmpfile, err := os.CreateTemp("", "example") defer os.Remove(tmpfile.Name()) // clean up assert.Check(t, err) _, err = tmpfile.Write(content) assert.Check(t, err) err = tmpfile.Close() assert.Check(t, err) os.Unsetenv("AWS_ACCESS_KEY_ID") os.Unsetenv("AWS_SECRET_ACCESS_KEY") t.Setenv("AWS_SHARED_CREDENTIALS_FILE", tmpfile.Name()) info := logger.Info{ Config: map[string]string{}, } c, err := newAWSLogsClient(info) assert.Check(t, err) client := c.(*cloudwatchlogs.CloudWatchLogs) creds, err := client.Config.Credentials.Get() assert.Check(t, err) assert.Check(t, is.Equal(expectedAccessKeyID, creds.AccessKeyID)) assert.Check(t, is.Equal(expectedSecretAccessKey, creds.SecretAccessKey)) }