2018-02-05 16:05:59 -05:00
package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"
2015-08-05 00:35:06 +00:00
import (
"errors"
2015-09-28 04:53:29 +00:00
"fmt"
"net/http"
2017-09-27 17:11:49 -07:00
"net/http/httptest"
"os"
2016-07-19 11:23:45 -07:00
"reflect"
2017-02-13 21:24:59 +13:00
"regexp"
2015-09-28 04:53:29 +00:00
"runtime"
2018-12-21 20:27:48 +00:00
"strconv"
2015-08-05 00:35:06 +00:00
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
2015-09-28 04:53:29 +00:00
"github.com/aws/aws-sdk-go/aws/request"
2015-08-05 00:35:06 +00:00
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/docker/docker/daemon/logger"
2016-10-21 15:45:05 -07:00
"github.com/docker/docker/daemon/logger/loggerutils"
2015-11-09 19:32:46 +01:00
"github.com/docker/docker/dockerversion"
2020-02-07 14:39:24 +01:00
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
2015-08-05 00:35:06 +00:00
)
const (
groupName = "groupName"
streamName = "streamName"
sequenceToken = "sequenceToken"
nextSequenceToken = "nextSequenceToken"
2017-02-13 21:24:59 +13:00
logline = "this is a log line\r"
2017-02-23 00:33:51 +13:00
multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r"
2015-08-05 00:35:06 +00:00
)
2017-02-23 00:33:51 +13:00
// Generates i multi-line events each with j lines
func ( l * logStream ) logGenerator ( lineCount int , multilineCount int ) {
for i := 0 ; i < multilineCount ; i ++ {
l . Log ( & logger . Message {
Line : [ ] byte ( multilineLogline ) ,
Timestamp : time . Time { } ,
} )
for j := 0 ; j < lineCount ; j ++ {
l . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Time { } ,
} )
}
}
}
2017-11-30 16:17:17 -08:00
func testEventBatch ( events [ ] wrappedEvent ) * eventBatch {
batch := newEventBatch ( )
for _ , event := range events {
eventlen := len ( [ ] byte ( * event . inputLogEvent . Message ) )
batch . add ( event , eventlen )
}
return batch
}
2018-12-21 20:27:48 +00:00
func TestNewStreamConfig ( t * testing . T ) {
tests := [ ] struct {
logStreamName string
logGroupName string
logCreateGroup string
2021-03-04 10:03:21 -08:00
logCreateStream string
2018-12-21 20:27:48 +00:00
logNonBlocking string
forceFlushInterval string
maxBufferedEvents string
datetimeFormat string
multilinePattern string
shouldErr bool
testName string
} {
2021-03-04 10:03:21 -08:00
{ "" , groupName , "" , "" , "" , "" , "" , "" , "" , false , "defaults" } ,
{ "" , groupName , "invalid create group" , "" , "" , "" , "" , "" , "" , true , "invalid create group" } ,
{ "" , groupName , "" , "" , "" , "invalid flush interval" , "" , "" , "" , true , "invalid flush interval" } ,
{ "" , groupName , "" , "" , "" , "" , "invalid max buffered events" , "" , "" , true , "invalid max buffered events" } ,
{ "" , groupName , "" , "" , "" , "" , "" , "" , "n{1001}" , true , "invalid multiline pattern" } ,
{ "" , groupName , "" , "" , "" , "15" , "" , "" , "" , false , "flush interval at 15" } ,
{ "" , groupName , "" , "" , "" , "" , "1024" , "" , "" , false , "max buffered events at 1024" } ,
2018-12-21 20:27:48 +00:00
}
for _ , tc := range tests {
t . Run ( tc . testName , func ( t * testing . T ) {
cfg := map [ string ] string {
logGroupKey : tc . logGroupName ,
logCreateGroupKey : tc . logCreateGroup ,
"mode" : tc . logNonBlocking ,
forceFlushIntervalKey : tc . forceFlushInterval ,
maxBufferedEventsKey : tc . maxBufferedEvents ,
logStreamKey : tc . logStreamName ,
2021-03-04 10:03:21 -08:00
logCreateStreamKey : tc . logCreateStream ,
2018-12-21 20:27:48 +00:00
datetimeFormatKey : tc . datetimeFormat ,
multilinePatternKey : tc . multilinePattern ,
}
info := logger . Info {
Config : cfg ,
}
logStreamConfig , err := newStreamConfig ( info )
if tc . shouldErr {
assert . Check ( t , err != nil , "Expected an error" )
} else {
assert . Check ( t , err == nil , "Unexpected error" )
assert . Check ( t , logStreamConfig . logGroupName == tc . logGroupName , "Unexpected logGroupName" )
if tc . forceFlushInterval != "" {
forceFlushIntervalAsInt , _ := strconv . Atoi ( info . Config [ forceFlushIntervalKey ] )
assert . Check ( t , logStreamConfig . forceFlushInterval == time . Duration ( forceFlushIntervalAsInt ) * time . Second , "Unexpected forceFlushInterval" )
}
2018-12-21 22:24:40 +00:00
if tc . maxBufferedEvents != "" {
maxBufferedEvents , _ := strconv . Atoi ( info . Config [ maxBufferedEventsKey ] )
assert . Check ( t , logStreamConfig . maxBufferedEvents == maxBufferedEvents , "Unexpected maxBufferedEvents" )
}
2018-12-21 20:27:48 +00:00
}
} )
}
}
2015-09-28 04:53:29 +00:00
func TestNewAWSLogsClientUserAgentHandler ( t * testing . T ) {
2016-11-26 13:08:34 +08:00
info := logger . Info {
2015-09-28 04:53:29 +00:00
Config : map [ string ] string {
regionKey : "us-east-1" ,
} ,
}
2016-11-26 13:08:34 +08:00
client , err := newAWSLogsClient ( info )
2018-06-29 11:27:37 -07:00
assert . NilError ( t , err )
2015-09-28 04:53:29 +00:00
realClient , ok := client . ( * cloudwatchlogs . CloudWatchLogs )
2018-06-29 11:27:37 -07:00
assert . Check ( t , ok , "Could not cast client to cloudwatchlogs.CloudWatchLogs" )
2015-09-28 04:53:29 +00:00
buildHandlerList := realClient . Handlers . Build
request := & request . Request {
HTTPRequest : & http . Request {
Header : http . Header { } ,
} ,
}
buildHandlerList . Run ( request )
2016-05-27 09:29:37 -07:00
expectedUserAgentString := fmt . Sprintf ( "Docker %s (%s) %s/%s (%s; %s; %s)" ,
dockerversion . Version , runtime . GOOS , aws . SDKName , aws . SDKVersion , runtime . Version ( ) , runtime . GOOS , runtime . GOARCH )
2015-09-28 04:53:29 +00:00
userAgent := request . HTTPRequest . Header . Get ( "User-Agent" )
if userAgent != expectedUserAgentString {
t . Errorf ( "Wrong User-Agent string, expected \"%s\" but was \"%s\"" ,
expectedUserAgentString , userAgent )
}
}
2021-09-10 09:05:20 -07:00
func TestNewAWSLogsClientLogFormatHeaderHandler ( t * testing . T ) {
tests := [ ] struct {
logFormat string
expectedHeaderValue string
} {
{
logFormat : jsonEmfLogFormat ,
expectedHeaderValue : "json/emf" ,
} ,
{
logFormat : "" ,
expectedHeaderValue : "" ,
} ,
}
for _ , tc := range tests {
t . Run ( tc . logFormat , func ( t * testing . T ) {
info := logger . Info {
Config : map [ string ] string {
regionKey : "us-east-1" ,
logFormatKey : tc . logFormat ,
} ,
}
client , err := newAWSLogsClient ( info )
assert . NilError ( t , err )
realClient , ok := client . ( * cloudwatchlogs . CloudWatchLogs )
assert . Check ( t , ok , "Could not cast client to cloudwatchlogs.CloudWatchLogs" )
buildHandlerList := realClient . Handlers . Build
request := & request . Request {
HTTPRequest : & http . Request {
Header : http . Header { } ,
} ,
}
buildHandlerList . Run ( request )
logFormatHeaderVal := request . HTTPRequest . Header . Get ( "x-amzn-logs-format" )
assert . Equal ( t , tc . expectedHeaderValue , logFormatHeaderVal )
} )
}
}
2018-06-29 11:27:37 -07:00
func TestNewAWSLogsClientAWSLogsEndpoint ( t * testing . T ) {
endpoint := "mock-endpoint"
info := logger . Info {
Config : map [ string ] string {
regionKey : "us-east-1" ,
endpointKey : endpoint ,
} ,
}
client , err := newAWSLogsClient ( info )
assert . NilError ( t , err )
realClient , ok := client . ( * cloudwatchlogs . CloudWatchLogs )
assert . Check ( t , ok , "Could not cast client to cloudwatchlogs.CloudWatchLogs" )
endpointWithScheme := realClient . Endpoint
expectedEndpointWithScheme := "https://" + endpoint
assert . Equal ( t , endpointWithScheme , expectedEndpointWithScheme , "Wrong endpoint" )
}
2015-09-28 06:40:44 +00:00
func TestNewAWSLogsClientRegionDetect ( t * testing . T ) {
2016-11-26 13:08:34 +08:00
info := logger . Info {
2015-09-28 06:40:44 +00:00
Config : map [ string ] string { } ,
}
mockMetadata := newMockMetadataClient ( )
2019-08-05 23:46:43 +02:00
newRegionFinder = func ( ) ( regionFinder , error ) {
return mockMetadata , nil
2015-09-28 06:40:44 +00:00
}
mockMetadata . regionResult <- & regionResult {
successResult : "us-east-1" ,
}
2016-11-26 13:08:34 +08:00
_ , err := newAWSLogsClient ( info )
2018-06-29 11:27:37 -07:00
assert . NilError ( t , err )
2015-09-28 06:40:44 +00:00
}
2015-08-05 00:35:06 +00:00
func TestCreateSuccess ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
2021-03-04 10:03:21 -08:00
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
logCreateStream : true ,
2015-08-05 00:35:06 +00:00
}
2022-01-10 18:42:11 -08:00
var input * cloudwatchlogs . CreateLogStreamInput
mockClient . createLogStreamFunc = func ( i * cloudwatchlogs . CreateLogStreamInput ) ( * cloudwatchlogs . CreateLogStreamOutput , error ) {
input = i
return & cloudwatchlogs . CreateLogStreamOutput { } , nil
}
2015-08-05 00:35:06 +00:00
err := stream . create ( )
2022-01-10 21:17:31 -08:00
assert . NilError ( t , err )
assert . Equal ( t , groupName , aws . StringValue ( input . LogGroupName ) , "LogGroupName" )
assert . Equal ( t , streamName , aws . StringValue ( input . LogStreamName ) , "LogStreamName" )
2016-12-17 14:36:41 -08:00
}
2021-03-04 10:03:21 -08:00
func TestCreateStreamSkipped ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2021-03-04 10:03:21 -08:00
stream := & logStream {
2022-01-10 18:42:11 -08:00
client : mockClient ,
2021-03-04 10:03:21 -08:00
logGroupName : groupName ,
logStreamName : streamName ,
logCreateStream : false ,
}
2022-01-10 18:42:11 -08:00
mockClient . createLogStreamFunc = func ( i * cloudwatchlogs . CreateLogStreamInput ) ( * cloudwatchlogs . CreateLogStreamOutput , error ) {
t . Error ( "CreateLogStream should not be called" )
return nil , errors . New ( "should not be called" )
}
2021-03-04 10:03:21 -08:00
err := stream . create ( )
2022-01-10 21:17:31 -08:00
assert . NilError ( t , err )
2021-03-04 10:03:21 -08:00
}
2016-12-17 14:36:41 -08:00
func TestCreateLogGroupSuccess ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2016-12-17 14:36:41 -08:00
stream := & logStream {
2021-03-04 10:03:21 -08:00
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
logCreateGroup : true ,
logCreateStream : true ,
2016-12-17 14:36:41 -08:00
}
2022-01-10 18:42:11 -08:00
var logGroupInput * cloudwatchlogs . CreateLogGroupInput
mockClient . createLogGroupFunc = func ( input * cloudwatchlogs . CreateLogGroupInput ) ( * cloudwatchlogs . CreateLogGroupOutput , error ) {
logGroupInput = input
return & cloudwatchlogs . CreateLogGroupOutput { } , nil
}
var logStreamInput * cloudwatchlogs . CreateLogStreamInput
createLogStreamCalls := 0
mockClient . createLogStreamFunc = func ( input * cloudwatchlogs . CreateLogStreamInput ) ( * cloudwatchlogs . CreateLogStreamOutput , error ) {
createLogStreamCalls ++
if logGroupInput == nil {
// log group not created yet
return nil , awserr . New ( resourceNotFoundCode , "should error once" , nil )
}
logStreamInput = input
return & cloudwatchlogs . CreateLogStreamOutput { } , nil
}
2016-12-17 14:36:41 -08:00
err := stream . create ( )
2022-01-10 21:17:31 -08:00
assert . NilError ( t , err )
2022-01-10 18:42:11 -08:00
if createLogStreamCalls < 2 {
t . Errorf ( "Expected CreateLogStream to be called twice, was called %d times" , createLogStreamCalls )
2016-12-17 14:36:41 -08:00
}
2022-01-10 21:17:31 -08:00
assert . Check ( t , logGroupInput != nil )
assert . Equal ( t , groupName , aws . StringValue ( logGroupInput . LogGroupName ) , "LogGroupName in LogGroupInput" )
assert . Check ( t , logStreamInput != nil )
assert . Equal ( t , groupName , aws . StringValue ( logStreamInput . LogGroupName ) , "LogGroupName in LogStreamInput" )
assert . Equal ( t , streamName , aws . StringValue ( logStreamInput . LogStreamName ) , "LogStreamName in LogStreamInput" )
2015-08-05 00:35:06 +00:00
}
func TestCreateError ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
2021-03-04 10:03:21 -08:00
client : mockClient ,
logCreateStream : true ,
2015-08-05 00:35:06 +00:00
}
2022-01-10 18:42:11 -08:00
mockClient . createLogStreamFunc = func ( input * cloudwatchlogs . CreateLogStreamInput ) ( * cloudwatchlogs . CreateLogStreamOutput , error ) {
return nil , errors . New ( "error" )
2015-08-05 00:35:06 +00:00
}
err := stream . create ( )
if err == nil {
t . Fatal ( "Expected non-nil err" )
}
}
func TestCreateAlreadyExists ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
2021-03-04 10:03:21 -08:00
client : mockClient ,
logCreateStream : true ,
2015-08-05 00:35:06 +00:00
}
2022-01-10 18:42:11 -08:00
calls := 0
mockClient . createLogStreamFunc = func ( input * cloudwatchlogs . CreateLogStreamInput ) ( * cloudwatchlogs . CreateLogStreamOutput , error ) {
calls ++
return nil , awserr . New ( resourceAlreadyExistsCode , "" , nil )
2015-08-05 00:35:06 +00:00
}
err := stream . create ( )
2018-06-29 11:27:37 -07:00
assert . NilError ( t , err )
2022-01-10 18:42:11 -08:00
assert . Equal ( t , 1 , calls )
2015-08-05 00:35:06 +00:00
}
2018-03-02 18:52:15 +00:00
func TestLogClosed ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2018-03-02 18:52:15 +00:00
stream := & logStream {
client : mockClient ,
closed : true ,
}
err := stream . Log ( & logger . Message { } )
2022-01-10 21:17:31 -08:00
assert . Check ( t , err != nil )
2018-03-02 18:52:15 +00:00
}
2019-09-20 16:02:22 -07:00
// TestLogBlocking tests that the Log method blocks appropriately when
// non-blocking behavior is not enabled. Blocking is achieved through an
// internal channel that must be drained for Log to return.
2018-03-02 18:52:15 +00:00
func TestLogBlocking ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2018-03-02 18:52:15 +00:00
stream := & logStream {
client : mockClient ,
messages : make ( chan * logger . Message ) ,
}
errorCh := make ( chan error , 1 )
started := make ( chan bool )
go func ( ) {
started <- true
err := stream . Log ( & logger . Message { } )
errorCh <- err
} ( )
2019-09-20 16:02:22 -07:00
// block until the goroutine above has started
2018-03-02 18:52:15 +00:00
<- started
select {
case err := <- errorCh :
t . Fatal ( "Expected stream.Log to block: " , err )
default :
}
2019-09-20 16:02:22 -07:00
// assuming it is blocked, we can now try to drain the internal channel and
// unblock it
2018-03-02 18:52:15 +00:00
select {
2019-09-20 16:02:22 -07:00
case <- time . After ( 10 * time . Millisecond ) :
// if we're unable to drain the channel within 10ms, something seems broken
2018-03-02 18:52:15 +00:00
t . Fatal ( "Expected to be able to read from stream.messages but was unable to" )
2019-09-20 16:02:22 -07:00
case <- stream . messages :
2018-03-02 18:52:15 +00:00
}
select {
case err := <- errorCh :
2018-06-29 11:27:37 -07:00
assert . NilError ( t , err )
2018-03-02 18:52:15 +00:00
case <- time . After ( 30 * time . Second ) :
t . Fatal ( "timed out waiting for read" )
}
}
func TestLogNonBlockingBufferEmpty ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2018-03-02 18:52:15 +00:00
stream := & logStream {
client : mockClient ,
messages : make ( chan * logger . Message , 1 ) ,
logNonBlocking : true ,
}
err := stream . Log ( & logger . Message { } )
2018-06-29 11:27:37 -07:00
assert . NilError ( t , err )
2018-03-02 18:52:15 +00:00
}
func TestLogNonBlockingBufferFull ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2018-03-02 18:52:15 +00:00
stream := & logStream {
client : mockClient ,
messages : make ( chan * logger . Message , 1 ) ,
logNonBlocking : true ,
}
stream . messages <- & logger . Message { }
2019-10-29 17:03:38 -04:00
errorCh := make ( chan error , 1 )
2018-03-02 18:52:15 +00:00
started := make ( chan bool )
go func ( ) {
started <- true
err := stream . Log ( & logger . Message { } )
errorCh <- err
} ( )
<- started
select {
case err := <- errorCh :
2022-01-10 21:17:31 -08:00
assert . Check ( t , err != nil )
2018-03-02 18:52:15 +00:00
case <- time . After ( 30 * time . Second ) :
t . Fatal ( "Expected Log call to not block" )
}
}
2015-08-05 00:35:06 +00:00
func TestPublishBatchSuccess ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
}
2022-01-10 18:42:11 -08:00
var input * cloudwatchlogs . PutLogEventsInput
mockClient . putLogEventsFunc = func ( i * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
input = i
return & cloudwatchlogs . PutLogEventsOutput {
2015-08-05 00:35:06 +00:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2015-08-05 00:35:06 +00:00
}
2016-07-19 11:32:12 -07:00
events := [ ] wrappedEvent {
2015-08-05 00:35:06 +00:00
{
2016-07-19 11:32:12 -07:00
inputLogEvent : & cloudwatchlogs . InputLogEvent {
Message : aws . String ( logline ) ,
} ,
2015-08-05 00:35:06 +00:00
} ,
}
2017-11-30 16:17:17 -08:00
stream . publishBatch ( testEventBatch ( events ) )
2022-01-10 21:17:31 -08:00
assert . Equal ( t , nextSequenceToken , aws . StringValue ( stream . sequenceToken ) , "sequenceToken" )
assert . Assert ( t , input != nil )
assert . Equal ( t , sequenceToken , aws . StringValue ( input . SequenceToken ) , "input.SequenceToken" )
assert . Assert ( t , len ( input . LogEvents ) == 1 )
assert . Equal ( t , events [ 0 ] . inputLogEvent , input . LogEvents [ 0 ] )
2015-08-05 00:35:06 +00:00
}
func TestPublishBatchError ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
}
2022-01-10 18:42:11 -08:00
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
return nil , errors . New ( "error" )
2015-08-05 00:35:06 +00:00
}
2016-07-19 11:32:12 -07:00
events := [ ] wrappedEvent {
2015-08-05 00:35:06 +00:00
{
2016-07-19 11:32:12 -07:00
inputLogEvent : & cloudwatchlogs . InputLogEvent {
Message : aws . String ( logline ) ,
} ,
2015-08-05 00:35:06 +00:00
} ,
}
2017-11-30 16:17:17 -08:00
stream . publishBatch ( testEventBatch ( events ) )
2022-01-10 21:17:31 -08:00
assert . Equal ( t , sequenceToken , aws . StringValue ( stream . sequenceToken ) )
2015-08-05 00:35:06 +00:00
}
func TestPublishBatchInvalidSeqSuccess ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
if aws . StringValue ( input . SequenceToken ) != "token" {
return nil , awserr . New ( invalidSequenceTokenCode , "use token token" , nil )
}
return & cloudwatchlogs . PutLogEventsOutput {
2015-08-05 00:35:06 +00:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2015-08-05 00:35:06 +00:00
}
2016-07-19 11:32:12 -07:00
events := [ ] wrappedEvent {
2015-08-05 00:35:06 +00:00
{
2016-07-19 11:32:12 -07:00
inputLogEvent : & cloudwatchlogs . InputLogEvent {
Message : aws . String ( logline ) ,
} ,
2015-08-05 00:35:06 +00:00
} ,
}
2017-11-30 16:17:17 -08:00
stream . publishBatch ( testEventBatch ( events ) )
2022-01-10 21:17:31 -08:00
assert . Equal ( t , nextSequenceToken , aws . StringValue ( stream . sequenceToken ) )
assert . Assert ( t , len ( calls ) == 2 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Equal ( t , sequenceToken , aws . StringValue ( argument . SequenceToken ) )
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
assert . Equal ( t , events [ 0 ] . inputLogEvent , argument . LogEvents [ 0 ] )
2015-08-05 00:35:06 +00:00
2022-01-10 18:42:11 -08:00
argument = calls [ 1 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Equal ( t , "token" , aws . StringValue ( argument . SequenceToken ) )
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
assert . Equal ( t , events [ 0 ] . inputLogEvent , argument . LogEvents [ 0 ] )
2015-08-05 00:35:06 +00:00
}
func TestPublishBatchAlreadyAccepted ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
return nil , awserr . New ( dataAlreadyAcceptedCode , "use token token" , nil )
2015-08-05 00:35:06 +00:00
}
2016-07-19 11:32:12 -07:00
events := [ ] wrappedEvent {
2015-08-05 00:35:06 +00:00
{
2016-07-19 11:32:12 -07:00
inputLogEvent : & cloudwatchlogs . InputLogEvent {
Message : aws . String ( logline ) ,
} ,
2015-08-05 00:35:06 +00:00
} ,
}
2017-11-30 16:17:17 -08:00
stream . publishBatch ( testEventBatch ( events ) )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , stream . sequenceToken != nil )
assert . Equal ( t , "token" , aws . StringValue ( stream . sequenceToken ) )
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Equal ( t , sequenceToken , aws . StringValue ( argument . SequenceToken ) )
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
assert . Equal ( t , events [ 0 ] . inputLogEvent , argument . LogEvents [ 0 ] )
2015-08-05 00:35:06 +00:00
}
func TestCollectBatchSimple ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
return & cloudwatchlogs . PutLogEventsOutput {
2015-08-05 00:35:06 +00:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2015-08-05 00:35:06 +00:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2015-08-05 00:35:06 +00:00
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Time { } ,
} )
2022-01-10 18:42:11 -08:00
ticks <- time . Time { }
2015-08-05 00:35:06 +00:00
ticks <- time . Time { }
stream . Close ( )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
assert . Equal ( t , logline , aws . StringValue ( argument . LogEvents [ 0 ] . Message ) )
2015-08-05 00:35:06 +00:00
}
func TestCollectBatchTicker ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2015-08-05 00:35:06 +00:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2015-08-05 00:35:06 +00:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2015-08-05 00:35:06 +00:00
stream . Log ( & logger . Message {
Line : [ ] byte ( logline + " 1" ) ,
Timestamp : time . Time { } ,
} )
stream . Log ( & logger . Message {
Line : [ ] byte ( logline + " 2" ) ,
Timestamp : time . Time { } ,
} )
ticks <- time . Time { }
// Verify first batch
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
calls = calls [ 1 : ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == 2 )
assert . Equal ( t , logline + " 1" , aws . StringValue ( argument . LogEvents [ 0 ] . Message ) )
assert . Equal ( t , logline + " 2" , aws . StringValue ( argument . LogEvents [ 1 ] . Message ) )
2015-08-05 00:35:06 +00:00
stream . Log ( & logger . Message {
Line : [ ] byte ( logline + " 3" ) ,
Timestamp : time . Time { } ,
} )
ticks <- time . Time { }
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument = calls [ 0 ]
close ( called )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
assert . Equal ( t , logline + " 3" , aws . StringValue ( argument . LogEvents [ 0 ] . Message ) )
2015-08-05 00:35:06 +00:00
stream . Close ( )
}
2017-02-13 21:24:59 +13:00
func TestCollectBatchMultilinePattern ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2017-02-13 21:24:59 +13:00
multilinePattern := regexp . MustCompile ( "xxxx" )
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
multilinePattern : multilinePattern ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2017-02-13 21:24:59 +13:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2017-02-13 21:24:59 +13:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2017-02-13 21:24:59 +13:00
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Now ( ) ,
} )
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Now ( ) ,
} )
stream . Log ( & logger . Message {
Line : [ ] byte ( "xxxx " + logline ) ,
Timestamp : time . Now ( ) ,
} )
2017-05-12 21:55:18 +12:00
ticks <- time . Now ( )
2017-02-13 21:24:59 +13:00
// Verify single multiline event
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
calls = calls [ 1 : ]
2018-03-13 15:28:34 -04:00
assert . Check ( t , argument != nil , "Expected non-nil PutLogEventsInput" )
assert . Check ( t , is . Equal ( 1 , len ( argument . LogEvents ) ) , "Expected single multiline event" )
assert . Check ( t , is . Equal ( logline + "\n" + logline + "\n" , * argument . LogEvents [ 0 ] . Message ) , "Received incorrect multiline message" )
2017-02-13 21:24:59 +13:00
stream . Close ( )
// Verify single event
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument = calls [ 0 ]
close ( called )
2018-03-13 15:28:34 -04:00
assert . Check ( t , argument != nil , "Expected non-nil PutLogEventsInput" )
assert . Check ( t , is . Equal ( 1 , len ( argument . LogEvents ) ) , "Expected single multiline event" )
assert . Check ( t , is . Equal ( "xxxx " + logline + "\n" , * argument . LogEvents [ 0 ] . Message ) , "Received incorrect multiline message" )
2017-02-13 21:24:59 +13:00
}
2017-02-23 00:33:51 +13:00
func BenchmarkCollectBatch ( b * testing . B ) {
for i := 0 ; i < b . N ; i ++ {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2017-02-23 00:33:51 +13:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
return & cloudwatchlogs . PutLogEventsOutput {
2017-02-23 00:33:51 +13:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2017-02-23 00:33:51 +13:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2017-02-23 00:33:51 +13:00
stream . logGenerator ( 10 , 100 )
ticks <- time . Time { }
stream . Close ( )
}
}
func BenchmarkCollectBatchMultilinePattern ( b * testing . B ) {
for i := 0 ; i < b . N ; i ++ {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2017-02-23 00:33:51 +13:00
multilinePattern := regexp . MustCompile ( ` \d { 4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9] ` )
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
multilinePattern : multilinePattern ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
return & cloudwatchlogs . PutLogEventsOutput {
2017-02-23 00:33:51 +13:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2017-02-23 00:33:51 +13:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2017-02-23 00:33:51 +13:00
stream . logGenerator ( 10 , 100 )
ticks <- time . Time { }
stream . Close ( )
}
}
2017-02-13 21:24:59 +13:00
func TestCollectBatchMultilinePatternMaxEventAge ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2017-02-13 21:24:59 +13:00
multilinePattern := regexp . MustCompile ( "xxxx" )
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
multilinePattern : multilinePattern ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2017-02-13 21:24:59 +13:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2017-02-13 21:24:59 +13:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2017-02-13 21:24:59 +13:00
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Now ( ) ,
} )
// Log an event 1 second later
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Now ( ) . Add ( time . Second ) ,
} )
2018-12-20 18:41:47 +00:00
// Fire ticker defaultForceFlushInterval seconds later
ticks <- time . Now ( ) . Add ( defaultForceFlushInterval + time . Second )
2017-02-13 21:24:59 +13:00
2018-12-20 18:41:47 +00:00
// Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval)
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
calls = calls [ 1 : ]
2018-03-13 15:28:34 -04:00
assert . Check ( t , argument != nil , "Expected non-nil PutLogEventsInput" )
assert . Check ( t , is . Equal ( 1 , len ( argument . LogEvents ) ) , "Expected single multiline event" )
assert . Check ( t , is . Equal ( logline + "\n" + logline + "\n" , * argument . LogEvents [ 0 ] . Message ) , "Received incorrect multiline message" )
2017-05-12 21:55:18 +12:00
2017-07-28 20:33:14 +12:00
// Log an event 1 second later
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Now ( ) . Add ( time . Second ) ,
} )
2018-12-20 18:41:47 +00:00
// Fire ticker another defaultForceFlushInterval seconds later
ticks <- time . Now ( ) . Add ( 2 * defaultForceFlushInterval + time . Second )
2017-07-28 20:33:14 +12:00
// Verify the event buffer is truly flushed - we should only receive a single event
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument = calls [ 0 ]
close ( called )
2018-03-13 15:28:34 -04:00
assert . Check ( t , argument != nil , "Expected non-nil PutLogEventsInput" )
assert . Check ( t , is . Equal ( 1 , len ( argument . LogEvents ) ) , "Expected single multiline event" )
assert . Check ( t , is . Equal ( logline + "\n" , * argument . LogEvents [ 0 ] . Message ) , "Received incorrect multiline message" )
2017-05-12 21:55:18 +12:00
stream . Close ( )
}
func TestCollectBatchMultilinePatternNegativeEventAge ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2017-05-12 21:55:18 +12:00
multilinePattern := regexp . MustCompile ( "xxxx" )
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
multilinePattern : multilinePattern ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
2017-02-13 21:24:59 +13:00
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2017-05-12 21:55:18 +12:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2017-02-13 21:24:59 +13:00
}
2017-05-12 21:55:18 +12:00
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
2017-02-13 21:24:59 +13:00
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2017-05-12 21:55:18 +12:00
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Now ( ) ,
} )
// Log an event 1 second later
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Now ( ) . Add ( time . Second ) ,
} )
// Fire ticker in past to simulate negative event buffer age
ticks <- time . Now ( ) . Add ( - time . Second )
// Verify single multiline event is flushed with a negative event buffer age
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
close ( called )
2018-03-13 15:28:34 -04:00
assert . Check ( t , argument != nil , "Expected non-nil PutLogEventsInput" )
assert . Check ( t , is . Equal ( 1 , len ( argument . LogEvents ) ) , "Expected single multiline event" )
assert . Check ( t , is . Equal ( logline + "\n" + logline + "\n" , * argument . LogEvents [ 0 ] . Message ) , "Received incorrect multiline message" )
2017-05-12 21:55:18 +12:00
2017-02-13 21:24:59 +13:00
stream . Close ( )
}
2018-01-21 14:29:55 +13:00
func TestCollectBatchMultilinePatternMaxEventSize ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2018-01-21 14:29:55 +13:00
multilinePattern := regexp . MustCompile ( "xxxx" )
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
multilinePattern : multilinePattern ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2018-01-21 14:29:55 +13:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2018-01-21 14:29:55 +13:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2018-01-21 14:29:55 +13:00
// Log max event size
longline := strings . Repeat ( "A" , maximumBytesPerEvent )
stream . Log ( & logger . Message {
Line : [ ] byte ( longline ) ,
Timestamp : time . Now ( ) ,
} )
// Log short event
shortline := strings . Repeat ( "B" , 100 )
stream . Log ( & logger . Message {
Line : [ ] byte ( shortline ) ,
Timestamp : time . Now ( ) ,
} )
// Fire ticker
2018-12-20 18:41:47 +00:00
ticks <- time . Now ( ) . Add ( defaultForceFlushInterval )
2018-01-21 14:29:55 +13:00
// Verify multiline events
// We expect a maximum sized event with no new line characters and a
// second short event with a new line character at the end
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
close ( called )
2018-03-13 15:28:34 -04:00
assert . Check ( t , argument != nil , "Expected non-nil PutLogEventsInput" )
assert . Check ( t , is . Equal ( 2 , len ( argument . LogEvents ) ) , "Expected two events" )
assert . Check ( t , is . Equal ( longline , * argument . LogEvents [ 0 ] . Message ) , "Received incorrect multiline message" )
assert . Check ( t , is . Equal ( shortline + "\n" , * argument . LogEvents [ 1 ] . Message ) , "Received incorrect multiline message" )
2018-01-21 14:29:55 +13:00
stream . Close ( )
}
2015-08-05 00:35:06 +00:00
func TestCollectBatchClose ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2015-08-05 00:35:06 +00:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2015-08-05 00:35:06 +00:00
}
var ticks = make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2015-08-05 00:35:06 +00:00
stream . Log ( & logger . Message {
Line : [ ] byte ( logline ) ,
Timestamp : time . Time { } ,
} )
// no ticks
stream . Close ( )
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
close ( called )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
assert . Equal ( t , logline , aws . StringValue ( ( argument . LogEvents [ 0 ] . Message ) ) )
2015-08-05 00:35:06 +00:00
}
2018-10-05 16:30:41 -07:00
func TestEffectiveLen ( t * testing . T ) {
tests := [ ] struct {
str string
effectiveBytes int
} {
{ "Hello" , 5 } ,
{ string ( [ ] byte { 1 , 2 , 3 , 4 } ) , 4 } ,
{ "🙃" , 4 } ,
{ string ( [ ] byte { 0xFF , 0xFF , 0xFF , 0xFF } ) , 12 } ,
{ "He\xff\xffo" , 9 } ,
{ "" , 0 } ,
}
for i , tc := range tests {
t . Run ( fmt . Sprintf ( "%d/%s" , i , tc . str ) , func ( t * testing . T ) {
assert . Equal ( t , tc . effectiveBytes , effectiveLen ( tc . str ) )
} )
}
}
func TestFindValidSplit ( t * testing . T ) {
tests := [ ] struct {
str string
maxEffectiveBytes int
splitOffset int
effectiveBytes int
} {
{ "" , 10 , 0 , 0 } ,
{ "Hello" , 6 , 5 , 5 } ,
{ "Hello" , 2 , 2 , 2 } ,
{ "Hello" , 0 , 0 , 0 } ,
{ "🙃" , 3 , 0 , 0 } ,
{ "🙃" , 4 , 4 , 4 } ,
{ string ( [ ] byte { 'a' , 0xFF } ) , 2 , 1 , 1 } ,
{ string ( [ ] byte { 'a' , 0xFF } ) , 4 , 2 , 4 } ,
}
for i , tc := range tests {
t . Run ( fmt . Sprintf ( "%d/%s" , i , tc . str ) , func ( t * testing . T ) {
splitOffset , effectiveBytes := findValidSplit ( tc . str , tc . maxEffectiveBytes )
assert . Equal ( t , tc . splitOffset , splitOffset , "splitOffset" )
assert . Equal ( t , tc . effectiveBytes , effectiveBytes , "effectiveBytes" )
t . Log ( tc . str [ : tc . splitOffset ] )
t . Log ( tc . str [ tc . splitOffset : ] )
} )
}
}
func TestProcessEventEmoji ( t * testing . T ) {
stream := & logStream { }
batch := & eventBatch { }
bytes := [ ] byte ( strings . Repeat ( "🙃" , maximumBytesPerEvent / 4 + 1 ) )
stream . processEvent ( batch , bytes , 0 )
assert . Equal ( t , 2 , len ( batch . batch ) , "should be two events in the batch" )
assert . Equal ( t , strings . Repeat ( "🙃" , maximumBytesPerEvent / 4 ) , aws . StringValue ( batch . batch [ 0 ] . inputLogEvent . Message ) )
assert . Equal ( t , "🙃" , aws . StringValue ( batch . batch [ 1 ] . inputLogEvent . Message ) )
}
2015-08-05 00:35:06 +00:00
func TestCollectBatchLineSplit ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2015-08-05 00:35:06 +00:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2015-08-05 00:35:06 +00:00
}
var ticks = make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2015-08-05 00:35:06 +00:00
longline := strings . Repeat ( "A" , maximumBytesPerEvent )
stream . Log ( & logger . Message {
Line : [ ] byte ( longline + "B" ) ,
Timestamp : time . Time { } ,
} )
// no ticks
stream . Close ( )
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
close ( called )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == 2 )
assert . Equal ( t , longline , aws . StringValue ( argument . LogEvents [ 0 ] . Message ) )
assert . Equal ( t , "B" , aws . StringValue ( argument . LogEvents [ 1 ] . Message ) )
2015-08-05 00:35:06 +00:00
}
2018-10-05 16:30:41 -07:00
func TestCollectBatchLineSplitWithBinary ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2018-10-05 16:30:41 -07:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2018-10-05 16:30:41 -07:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2018-10-05 16:30:41 -07:00
}
var ticks = make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
longline := strings . Repeat ( "\xFF" , maximumBytesPerEvent / 3 ) // 0xFF is counted as the 3-byte utf8.RuneError
stream . Log ( & logger . Message {
Line : [ ] byte ( longline + "\xFD" ) ,
Timestamp : time . Time { } ,
} )
// no ticks
stream . Close ( )
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
close ( called )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == 2 )
assert . Equal ( t , longline , aws . StringValue ( argument . LogEvents [ 0 ] . Message ) )
assert . Equal ( t , "\xFD" , aws . StringValue ( argument . LogEvents [ 1 ] . Message ) )
2018-10-05 16:30:41 -07:00
}
2015-08-05 00:35:06 +00:00
func TestCollectBatchMaxEvents ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2015-08-05 00:35:06 +00:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2015-08-05 00:35:06 +00:00
}
var ticks = make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2015-08-05 00:35:06 +00:00
line := "A"
for i := 0 ; i <= maximumLogEventsPerPut ; i ++ {
stream . Log ( & logger . Message {
Line : [ ] byte ( line ) ,
Timestamp : time . Time { } ,
} )
}
// no ticks
stream . Close ( )
2022-01-10 18:42:11 -08:00
<- called
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 2 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Check ( t , len ( argument . LogEvents ) == maximumLogEventsPerPut )
2015-08-05 00:35:06 +00:00
2022-01-10 18:42:11 -08:00
argument = calls [ 1 ]
close ( called )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
2015-08-05 00:35:06 +00:00
}
func TestCollectBatchMaxTotalBytes ( t * testing . T ) {
2017-11-30 16:17:17 -08:00
expectedPuts := 2
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2015-08-05 00:35:06 +00:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
NextSequenceToken : aws . String ( nextSequenceToken ) ,
} , nil
2015-08-05 00:35:06 +00:00
}
2017-11-30 16:17:17 -08:00
2015-08-05 00:35:06 +00:00
var ticks = make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2015-08-05 00:35:06 +00:00
2017-11-30 16:17:17 -08:00
numPayloads := maximumBytesPerPut / ( maximumBytesPerEvent + perEventBytes )
// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings . Repeat ( "A" , maximumBytesPerPut - ( perEventBytes * numPayloads ) )
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
2015-08-05 00:35:06 +00:00
stream . Log ( & logger . Message {
2017-11-30 16:17:17 -08:00
Line : [ ] byte ( maxline [ : len ( maxline ) / 2 ] ) ,
Timestamp : time . Time { } ,
} )
stream . Log ( & logger . Message {
Line : [ ] byte ( maxline [ len ( maxline ) / 2 : ] ) ,
Timestamp : time . Time { } ,
} )
stream . Log ( & logger . Message {
Line : [ ] byte ( "B" ) ,
2015-08-05 00:35:06 +00:00
Timestamp : time . Time { } ,
} )
2017-11-30 16:17:17 -08:00
// no ticks, guarantee batch by size (and chan close)
2015-08-05 00:35:06 +00:00
stream . Close ( )
2022-01-10 18:42:11 -08:00
for i := 0 ; i < expectedPuts ; i ++ {
<- called
}
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == expectedPuts )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
2017-11-30 16:17:17 -08:00
// Should total to the maximum allowed bytes.
eventBytes := 0
2015-08-05 00:35:06 +00:00
for _ , event := range argument . LogEvents {
2017-11-30 16:17:17 -08:00
eventBytes += len ( * event . Message )
}
eventsOverhead := len ( argument . LogEvents ) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
2022-01-10 21:17:31 -08:00
assert . Check ( t , payloadTotal <= maximumBytesPerPut )
assert . Check ( t , payloadTotal >= lowestMaxBatch )
2015-08-05 00:35:06 +00:00
2022-01-10 18:42:11 -08:00
argument = calls [ 1 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( argument . LogEvents ) == 1 )
2017-11-30 16:17:17 -08:00
message := * argument . LogEvents [ len ( argument . LogEvents ) - 1 ] . Message
2022-01-10 21:17:31 -08:00
assert . Equal ( t , "B" , message [ len ( message ) - 1 : ] )
2018-10-05 16:30:41 -07:00
}
func TestCollectBatchMaxTotalBytesWithBinary ( t * testing . T ) {
expectedPuts := 2
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2018-10-05 16:30:41 -07:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
NextSequenceToken : aws . String ( nextSequenceToken ) ,
} , nil
2018-10-05 16:30:41 -07:00
}
var ticks = make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings . Repeat ( "\xFF" , ( maximumBytesPerPut - perEventBytes ) / 3 ) // 0xFF is counted as the 3-byte utf8.RuneError
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
stream . Log ( & logger . Message {
Line : [ ] byte ( maxline ) ,
Timestamp : time . Time { } ,
} )
stream . Log ( & logger . Message {
Line : [ ] byte ( "B" ) ,
Timestamp : time . Time { } ,
} )
// no ticks, guarantee batch by size (and chan close)
stream . Close ( )
2022-01-10 18:42:11 -08:00
for i := 0 ; i < expectedPuts ; i ++ {
<- called
}
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == expectedPuts )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
2018-10-05 16:30:41 -07:00
// Should total to the maximum allowed bytes.
eventBytes := 0
for _ , event := range argument . LogEvents {
eventBytes += effectiveLen ( * event . Message )
}
eventsOverhead := len ( argument . LogEvents ) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent
2022-01-10 21:17:31 -08:00
assert . Check ( t , payloadTotal <= maximumBytesPerPut )
assert . Check ( t , payloadTotal >= lowestMaxBatch )
2018-10-05 16:30:41 -07:00
2022-01-10 18:42:11 -08:00
argument = calls [ 1 ]
2018-10-05 16:30:41 -07:00
message := * argument . LogEvents [ len ( argument . LogEvents ) - 1 ] . Message
2022-01-10 21:17:31 -08:00
assert . Equal ( t , "B" , message [ len ( message ) - 1 : ] )
2015-08-05 00:35:06 +00:00
}
2016-07-19 11:23:45 -07:00
func TestCollectBatchWithDuplicateTimestamps ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2016-07-19 11:23:45 -07:00
stream := & logStream {
client : mockClient ,
logGroupName : groupName ,
logStreamName : streamName ,
sequenceToken : aws . String ( sequenceToken ) ,
messages : make ( chan * logger . Message ) ,
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . PutLogEventsInput , 0 )
called := make ( chan struct { } , 50 )
mockClient . putLogEventsFunc = func ( input * cloudwatchlogs . PutLogEventsInput ) ( * cloudwatchlogs . PutLogEventsOutput , error ) {
calls = append ( calls , input )
called <- struct { } { }
return & cloudwatchlogs . PutLogEventsOutput {
2016-07-19 11:23:45 -07:00
NextSequenceToken : aws . String ( nextSequenceToken ) ,
2022-01-10 18:42:11 -08:00
} , nil
2016-07-19 11:23:45 -07:00
}
ticks := make ( chan time . Time )
newTicker = func ( _ time . Duration ) * time . Ticker {
return & time . Ticker {
C : ticks ,
}
}
2018-03-02 18:52:15 +00:00
d := make ( chan bool )
close ( d )
go stream . collectBatch ( d )
2016-07-19 11:23:45 -07:00
2018-05-19 13:38:54 +02:00
var expectedEvents [ ] * cloudwatchlogs . InputLogEvent
2016-07-19 11:23:45 -07:00
times := maximumLogEventsPerPut
timestamp := time . Now ( )
for i := 0 ; i < times ; i ++ {
line := fmt . Sprintf ( "%d" , i )
if i % 2 == 0 {
timestamp . Add ( 1 * time . Nanosecond )
}
stream . Log ( & logger . Message {
Line : [ ] byte ( line ) ,
Timestamp : timestamp ,
} )
expectedEvents = append ( expectedEvents , & cloudwatchlogs . InputLogEvent {
Message : aws . String ( line ) ,
Timestamp : aws . Int64 ( timestamp . UnixNano ( ) / int64 ( time . Millisecond ) ) ,
} )
}
ticks <- time . Time { }
stream . Close ( )
2022-01-10 18:42:11 -08:00
<- called
2022-01-10 21:17:31 -08:00
assert . Assert ( t , len ( calls ) == 1 )
2022-01-10 18:42:11 -08:00
argument := calls [ 0 ]
close ( called )
2022-01-10 21:17:31 -08:00
assert . Assert ( t , argument != nil )
assert . Assert ( t , len ( argument . LogEvents ) == times )
2016-07-19 11:23:45 -07:00
for i := 0 ; i < times ; i ++ {
if ! reflect . DeepEqual ( * argument . LogEvents [ i ] , * expectedEvents [ i ] ) {
t . Errorf ( "Expected event to be %v but was %v" , * expectedEvents [ i ] , * argument . LogEvents [ i ] )
}
}
}
2016-10-21 15:45:05 -07:00
2017-02-13 21:24:59 +13:00
func TestParseLogOptionsMultilinePattern ( t * testing . T ) {
info := logger . Info {
Config : map [ string ] string {
multilinePatternKey : "^xxxx" ,
} ,
}
multilinePattern , err := parseMultilineOptions ( info )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err , "Received unexpected error" )
assert . Check ( t , multilinePattern . MatchString ( "xxxx" ) , "No multiline pattern match found" )
2017-02-13 21:24:59 +13:00
}
func TestParseLogOptionsDatetimeFormat ( t * testing . T ) {
datetimeFormatTests := [ ] struct {
format string
match string
} {
{ "%d/%m/%y %a %H:%M:%S%L %Z" , "31/12/10 Mon 08:42:44.345 NZDT" } ,
{ "%Y-%m-%d %A %I:%M:%S.%f%p%z" , "2007-12-04 Monday 08:42:44.123456AM+1200" } ,
2017-05-13 22:10:51 +12:00
{ "%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b" , "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec" } ,
{ "%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B" , "January|February|March|April|May|June|July|August|September|October|November|December" } ,
2017-02-13 21:24:59 +13:00
{ "%A|%A|%A|%A|%A|%A|%A" , "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday" } ,
{ "%a|%a|%a|%a|%a|%a|%a" , "Mon|Tue|Wed|Thu|Fri|Sat|Sun" } ,
{ "Day of the week: %w, Day of the year: %j" , "Day of the week: 4, Day of the year: 091" } ,
}
for _ , dt := range datetimeFormatTests {
2017-05-12 21:55:18 +12:00
t . Run ( dt . match , func ( t * testing . T ) {
info := logger . Info {
Config : map [ string ] string {
datetimeFormatKey : dt . format ,
} ,
}
multilinePattern , err := parseMultilineOptions ( info )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err , "Received unexpected error" )
assert . Check ( t , multilinePattern . MatchString ( dt . match ) , "No multiline pattern match found" )
2017-05-12 21:55:18 +12:00
} )
2017-02-13 21:24:59 +13:00
}
}
2017-05-12 21:55:18 +12:00
func TestValidateLogOptionsDatetimeFormatAndMultilinePattern ( t * testing . T ) {
cfg := map [ string ] string {
multilinePatternKey : "^xxxx" ,
datetimeFormatKey : "%Y-%m-%d" ,
logGroupKey : groupName ,
}
conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time"
err := ValidateLogOpt ( cfg )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err != nil , "Expected an error" )
assert . Check ( t , is . Equal ( err . Error ( ) , conflictingLogOptionsError ) , "Received invalid error" )
2017-05-12 21:55:18 +12:00
}
2018-12-20 18:41:47 +00:00
func TestValidateLogOptionsForceFlushIntervalSeconds ( t * testing . T ) {
2018-12-21 20:27:48 +00:00
tests := [ ] struct {
input string
shouldErr bool
} {
{ "0" , true } ,
{ "-1" , true } ,
{ "a" , true } ,
{ "10" , false } ,
2018-12-20 18:41:47 +00:00
}
2018-12-21 20:27:48 +00:00
for _ , tc := range tests {
t . Run ( tc . input , func ( t * testing . T ) {
cfg := map [ string ] string {
forceFlushIntervalKey : tc . input ,
logGroupKey : groupName ,
}
2018-12-20 18:41:47 +00:00
2018-12-21 20:27:48 +00:00
err := ValidateLogOpt ( cfg )
if tc . shouldErr {
expectedErr := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': " + tc . input
2019-01-03 01:19:04 +00:00
assert . Error ( t , err , expectedErr )
2018-12-21 20:27:48 +00:00
} else {
2019-01-03 01:19:04 +00:00
assert . NilError ( t , err )
2018-12-21 20:27:48 +00:00
}
} )
}
2018-12-20 18:41:47 +00:00
}
func TestValidateLogOptionsMaxBufferedEvents ( t * testing . T ) {
2018-12-21 20:27:48 +00:00
tests := [ ] struct {
input string
shouldErr bool
} {
{ "0" , true } ,
{ "-1" , true } ,
{ "a" , true } ,
{ "10" , false } ,
2018-12-20 18:41:47 +00:00
}
2018-12-21 20:27:48 +00:00
for _ , tc := range tests {
t . Run ( tc . input , func ( t * testing . T ) {
cfg := map [ string ] string {
maxBufferedEventsKey : tc . input ,
logGroupKey : groupName ,
}
2018-12-20 18:41:47 +00:00
2018-12-21 20:27:48 +00:00
err := ValidateLogOpt ( cfg )
if tc . shouldErr {
expectedErr := "must specify a positive integer for log opt 'awslogs-max-buffered-events': " + tc . input
2019-01-03 01:19:04 +00:00
assert . Error ( t , err , expectedErr )
2018-12-21 20:27:48 +00:00
} else {
2019-01-03 01:19:04 +00:00
assert . NilError ( t , err )
2018-12-21 20:27:48 +00:00
}
} )
}
2018-12-20 18:41:47 +00:00
}
2021-09-10 09:05:20 -07:00
func TestValidateLogOptionsFormat ( t * testing . T ) {
tests := [ ] struct {
format string
multiLinePattern string
datetimeFormat string
expErrMsg string
} {
{ "json/emf" , "" , "" , "" } ,
{ "random" , "" , "" , "unsupported log format 'random'" } ,
{ "" , "" , "" , "" } ,
{ "json/emf" , "---" , "" , "you cannot configure log opt 'awslogs-datetime-format' or 'awslogs-multiline-pattern' when log opt 'awslogs-format' is set to 'json/emf'" } ,
{ "json/emf" , "" , "yyyy-dd-mm" , "you cannot configure log opt 'awslogs-datetime-format' or 'awslogs-multiline-pattern' when log opt 'awslogs-format' is set to 'json/emf'" } ,
}
for i , tc := range tests {
t . Run ( fmt . Sprintf ( "%d/%s" , i , tc . format ) , func ( t * testing . T ) {
cfg := map [ string ] string {
logGroupKey : groupName ,
logFormatKey : tc . format ,
}
if tc . multiLinePattern != "" {
cfg [ multilinePatternKey ] = tc . multiLinePattern
}
if tc . datetimeFormat != "" {
cfg [ datetimeFormatKey ] = tc . datetimeFormat
}
err := ValidateLogOpt ( cfg )
if tc . expErrMsg != "" {
assert . Error ( t , err , tc . expErrMsg )
} else {
assert . NilError ( t , err )
}
} )
}
}
2016-10-21 15:45:05 -07:00
func TestCreateTagSuccess ( t * testing . T ) {
2022-01-10 18:42:11 -08:00
mockClient := & mockClient { }
2016-11-26 13:08:34 +08:00
info := logger . Info {
2016-10-21 15:45:05 -07:00
ContainerName : "/test-container" ,
ContainerID : "container-abcdefghijklmnopqrstuvwxyz01234567890" ,
Config : map [ string ] string { "tag" : "{{.Name}}/{{.FullID}}" } ,
}
2016-11-26 13:08:34 +08:00
logStreamName , e := loggerutils . ParseLogTag ( info , loggerutils . DefaultTemplate )
2016-10-21 15:45:05 -07:00
if e != nil {
t . Errorf ( "Error generating tag: %q" , e )
}
stream := & logStream {
2021-03-04 10:03:21 -08:00
client : mockClient ,
logGroupName : groupName ,
logStreamName : logStreamName ,
logCreateStream : true ,
2016-10-21 15:45:05 -07:00
}
2022-01-10 18:42:11 -08:00
calls := make ( [ ] * cloudwatchlogs . CreateLogStreamInput , 0 )
mockClient . createLogStreamFunc = func ( input * cloudwatchlogs . CreateLogStreamInput ) ( * cloudwatchlogs . CreateLogStreamOutput , error ) {
calls = append ( calls , input )
return & cloudwatchlogs . CreateLogStreamOutput { } , nil
}
2016-10-21 15:45:05 -07:00
err := stream . create ( )
2018-06-29 11:27:37 -07:00
assert . NilError ( t , err )
2022-01-10 18:42:11 -08:00
assert . Equal ( t , 1 , len ( calls ) )
argument := calls [ 0 ]
2016-10-21 15:45:05 -07:00
2022-01-10 21:17:31 -08:00
assert . Equal ( t , "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" , aws . StringValue ( argument . LogStreamName ) )
2016-10-21 15:45:05 -07:00
}
2017-05-18 23:30:00 +09:00
func BenchmarkUnwrapEvents ( b * testing . B ) {
events := make ( [ ] wrappedEvent , maximumLogEventsPerPut )
for i := 0 ; i < maximumLogEventsPerPut ; i ++ {
mes := strings . Repeat ( "0" , maximumBytesPerEvent )
events [ i ] . inputLogEvent = & cloudwatchlogs . InputLogEvent {
Message : & mes ,
}
}
b . ResetTimer ( )
for i := 0 ; i < b . N ; i ++ {
res := unwrapEvents ( events )
2018-03-13 15:28:34 -04:00
assert . Check ( b , is . Len ( res , maximumLogEventsPerPut ) )
2017-05-18 23:30:00 +09:00
}
}
2017-09-27 17:11:49 -07:00
func TestNewAWSLogsClientCredentialEndpointDetect ( t * testing . T ) {
// required for the cloudwatchlogs client
os . Setenv ( "AWS_REGION" , "us-west-2" )
defer os . Unsetenv ( "AWS_REGION" )
credsResp := ` {
"AccessKeyId" : "test-access-key-id" ,
"SecretAccessKey" : "test-secret-access-key"
} `
expectedAccessKeyID := "test-access-key-id"
expectedSecretAccessKey := "test-secret-access-key"
testServer := httptest . NewServer ( http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintln ( w , credsResp )
} ) )
defer testServer . Close ( )
// set the SDKEndpoint in the driver
newSDKEndpoint = testServer . URL
info := logger . Info {
Config : map [ string ] string { } ,
}
info . Config [ "awslogs-credentials-endpoint" ] = "/creds"
c , err := newAWSLogsClient ( info )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
client := c . ( * cloudwatchlogs . CloudWatchLogs )
creds , err := client . Config . Credentials . Get ( )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
2018-03-13 15:28:34 -04:00
assert . Check ( t , is . Equal ( expectedAccessKeyID , creds . AccessKeyID ) )
assert . Check ( t , is . Equal ( expectedSecretAccessKey , creds . SecretAccessKey ) )
2017-09-27 17:11:49 -07:00
}
func TestNewAWSLogsClientCredentialEnvironmentVariable ( t * testing . T ) {
// required for the cloudwatchlogs client
os . Setenv ( "AWS_REGION" , "us-west-2" )
defer os . Unsetenv ( "AWS_REGION" )
expectedAccessKeyID := "test-access-key-id"
expectedSecretAccessKey := "test-secret-access-key"
os . Setenv ( "AWS_ACCESS_KEY_ID" , expectedAccessKeyID )
defer os . Unsetenv ( "AWS_ACCESS_KEY_ID" )
os . Setenv ( "AWS_SECRET_ACCESS_KEY" , expectedSecretAccessKey )
defer os . Unsetenv ( "AWS_SECRET_ACCESS_KEY" )
info := logger . Info {
Config : map [ string ] string { } ,
}
c , err := newAWSLogsClient ( info )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
client := c . ( * cloudwatchlogs . CloudWatchLogs )
creds , err := client . Config . Credentials . Get ( )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
2018-03-13 15:28:34 -04:00
assert . Check ( t , is . Equal ( expectedAccessKeyID , creds . AccessKeyID ) )
assert . Check ( t , is . Equal ( expectedSecretAccessKey , creds . SecretAccessKey ) )
2017-09-27 17:11:49 -07:00
}
func TestNewAWSLogsClientCredentialSharedFile ( t * testing . T ) {
// required for the cloudwatchlogs client
os . Setenv ( "AWS_REGION" , "us-west-2" )
defer os . Unsetenv ( "AWS_REGION" )
expectedAccessKeyID := "test-access-key-id"
expectedSecretAccessKey := "test-secret-access-key"
contentStr := `
[ default ]
aws_access_key_id = "test-access-key-id"
aws_secret_access_key = "test-secret-access-key"
`
content := [ ] byte ( contentStr )
2021-08-24 18:10:50 +08:00
tmpfile , err := os . CreateTemp ( "" , "example" )
2017-09-27 17:11:49 -07:00
defer os . Remove ( tmpfile . Name ( ) ) // clean up
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
_ , err = tmpfile . Write ( content )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
err = tmpfile . Close ( )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
os . Unsetenv ( "AWS_ACCESS_KEY_ID" )
os . Unsetenv ( "AWS_SECRET_ACCESS_KEY" )
os . Setenv ( "AWS_SHARED_CREDENTIALS_FILE" , tmpfile . Name ( ) )
defer os . Unsetenv ( "AWS_SHARED_CREDENTIALS_FILE" )
info := logger . Info {
Config : map [ string ] string { } ,
}
c , err := newAWSLogsClient ( info )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
client := c . ( * cloudwatchlogs . CloudWatchLogs )
creds , err := client . Config . Credentials . Get ( )
2018-03-13 15:28:34 -04:00
assert . Check ( t , err )
2017-09-27 17:11:49 -07:00
2018-03-13 15:28:34 -04:00
assert . Check ( t , is . Equal ( expectedAccessKeyID , creds . AccessKeyID ) )
assert . Check ( t , is . Equal ( expectedSecretAccessKey , creds . SecretAccessKey ) )
2017-09-27 17:11:49 -07:00
}