1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

awslogs: replace channel-based mocks

Signed-off-by: Samuel Karp <skarp@amazon.com>
This commit is contained in:
Samuel Karp 2022-01-10 18:42:11 -08:00
parent ea96e160e4
commit f0e450992c
No known key found for this signature in database
GPG key ID: 7F8CDFDD70CC3D44
2 changed files with 341 additions and 208 deletions

View file

@ -227,41 +227,50 @@ func TestNewAWSLogsClientRegionDetect(t *testing.T) {
} }
func TestCreateSuccess(t *testing.T) { func TestCreateSuccess(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
logStreamName: streamName, logStreamName: streamName,
logCreateStream: true, logCreateStream: true,
} }
mockClient.createLogStreamResult <- &createLogStreamResult{} var input *cloudwatchlogs.CreateLogStreamInput
mockClient.createLogStreamFunc = func(i *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
input = i
return &cloudwatchlogs.CreateLogStreamOutput{}, nil
}
err := stream.create() err := stream.create()
if err != nil { if err != nil {
t.Errorf("Received unexpected err: %v\n", err) t.Errorf("Received unexpected err: %v\n", err)
} }
argument := <-mockClient.createLogStreamArgument if input.LogGroupName == nil {
if argument.LogGroupName == nil {
t.Fatal("Expected non-nil LogGroupName") t.Fatal("Expected non-nil LogGroupName")
} }
if *argument.LogGroupName != groupName { if *input.LogGroupName != groupName {
t.Errorf("Expected LogGroupName to be %s", groupName) t.Errorf("Expected LogGroupName to be %s", groupName)
} }
if argument.LogStreamName == nil { if input.LogStreamName == nil {
t.Fatal("Expected non-nil LogStreamName") t.Fatal("Expected non-nil LogStreamName")
} }
if *argument.LogStreamName != streamName { if *input.LogStreamName != streamName {
t.Errorf("Expected LogStreamName to be %s", streamName) t.Errorf("Expected LogStreamName to be %s", streamName)
} }
} }
func TestCreateStreamSkipped(t *testing.T) { func TestCreateStreamSkipped(t *testing.T) {
mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient,
logGroupName: groupName, logGroupName: groupName,
logStreamName: streamName, logStreamName: streamName,
logCreateStream: false, logCreateStream: false,
} }
mockClient.createLogStreamFunc = func(i *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
t.Error("CreateLogStream should not be called")
return nil, errors.New("should not be called")
}
err := stream.create() err := stream.create()
@ -271,7 +280,7 @@ func TestCreateStreamSkipped(t *testing.T) {
} }
func TestCreateLogGroupSuccess(t *testing.T) { func TestCreateLogGroupSuccess(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -279,37 +288,62 @@ func TestCreateLogGroupSuccess(t *testing.T) {
logCreateGroup: true, logCreateGroup: true,
logCreateStream: true, logCreateStream: true,
} }
mockClient.createLogGroupResult <- &createLogGroupResult{} var logGroupInput *cloudwatchlogs.CreateLogGroupInput
mockClient.createLogStreamResult <- &createLogStreamResult{} mockClient.createLogGroupFunc = func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) {
logGroupInput = input
return &cloudwatchlogs.CreateLogGroupOutput{}, nil
}
var logStreamInput *cloudwatchlogs.CreateLogStreamInput
createLogStreamCalls := 0
mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
createLogStreamCalls++
if logGroupInput == nil {
// log group not created yet
return nil, awserr.New(resourceNotFoundCode, "should error once", nil)
}
logStreamInput = input
return &cloudwatchlogs.CreateLogStreamOutput{}, nil
}
err := stream.create() err := stream.create()
if err != nil { if err != nil {
t.Errorf("Received unexpected err: %v\n", err) t.Errorf("Received unexpected err: %v\n", err)
} }
argument := <-mockClient.createLogStreamArgument if createLogStreamCalls < 2 {
if argument.LogGroupName == nil { t.Errorf("Expected CreateLogStream to be called twice, was called %d times", createLogStreamCalls)
t.Fatal("Expected non-nil LogGroupName")
} }
if *argument.LogGroupName != groupName { if logGroupInput == nil {
t.Errorf("Expected LogGroupName to be %s", groupName) t.Fatal("LogGroupInput should not be nil")
} }
if argument.LogStreamName == nil { if logGroupInput.LogGroupName == nil {
t.Fatal("Expected non-nil LogGroupName in CreateLogGroup")
}
if *logGroupInput.LogGroupName != groupName {
t.Errorf("Expected LogGroupName to be %s in CreateLogGroup", groupName)
}
if logStreamInput.LogGroupName == nil {
t.Fatal("Expected non-nil LogGroupName in CreateLogStream")
}
if *logStreamInput.LogGroupName != groupName {
t.Errorf("Expected LogGroupName to be %s in CreateLogStream", groupName)
}
if logStreamInput.LogStreamName == nil {
t.Fatal("Expected non-nil LogStreamName") t.Fatal("Expected non-nil LogStreamName")
} }
if *argument.LogStreamName != streamName { if *logStreamInput.LogStreamName != streamName {
t.Errorf("Expected LogStreamName to be %s", streamName) t.Errorf("Expected LogStreamName to be %s", streamName)
} }
} }
func TestCreateError(t *testing.T) { func TestCreateError(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logCreateStream: true, logCreateStream: true,
} }
mockClient.createLogStreamResult <- &createLogStreamResult{ mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
errorResult: errors.New("Error"), return nil, errors.New("error")
} }
err := stream.create() err := stream.create()
@ -320,22 +354,25 @@ func TestCreateError(t *testing.T) {
} }
func TestCreateAlreadyExists(t *testing.T) { func TestCreateAlreadyExists(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logCreateStream: true, logCreateStream: true,
} }
mockClient.createLogStreamResult <- &createLogStreamResult{ calls := 0
errorResult: awserr.New(resourceAlreadyExistsCode, "", nil), mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
calls++
return nil, awserr.New(resourceAlreadyExistsCode, "", nil)
} }
err := stream.create() err := stream.create()
assert.NilError(t, err) assert.NilError(t, err)
assert.Equal(t, 1, calls)
} }
func TestLogClosed(t *testing.T) { func TestLogClosed(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
closed: true, closed: true,
@ -350,7 +387,7 @@ func TestLogClosed(t *testing.T) {
// non-blocking behavior is not enabled. Blocking is achieved through an // non-blocking behavior is not enabled. Blocking is achieved through an
// internal channel that must be drained for Log to return. // internal channel that must be drained for Log to return.
func TestLogBlocking(t *testing.T) { func TestLogBlocking(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
@ -388,7 +425,7 @@ func TestLogBlocking(t *testing.T) {
} }
func TestLogNonBlockingBufferEmpty(t *testing.T) { func TestLogNonBlockingBufferEmpty(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
messages: make(chan *logger.Message, 1), messages: make(chan *logger.Message, 1),
@ -399,7 +436,7 @@ func TestLogNonBlockingBufferEmpty(t *testing.T) {
} }
func TestLogNonBlockingBufferFull(t *testing.T) { func TestLogNonBlockingBufferFull(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
messages: make(chan *logger.Message, 1), messages: make(chan *logger.Message, 1),
@ -424,17 +461,19 @@ func TestLogNonBlockingBufferFull(t *testing.T) {
} }
} }
func TestPublishBatchSuccess(t *testing.T) { func TestPublishBatchSuccess(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
logStreamName: streamName, logStreamName: streamName,
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ var input *cloudwatchlogs.PutLogEventsInput
successResult: &cloudwatchlogs.PutLogEventsOutput{ mockClient.putLogEventsFunc = func(i *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
input = i
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
events := []wrappedEvent{ events := []wrappedEvent{
{ {
@ -451,34 +490,33 @@ func TestPublishBatchSuccess(t *testing.T) {
if *stream.sequenceToken != nextSequenceToken { if *stream.sequenceToken != nextSequenceToken {
t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken) t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
} }
argument := <-mockClient.putLogEventsArgument if input == nil {
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
if argument.SequenceToken == nil { if input.SequenceToken == nil {
t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken") t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken")
} }
if *argument.SequenceToken != sequenceToken { if *input.SequenceToken != sequenceToken {
t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken) t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *input.SequenceToken)
} }
if len(argument.LogEvents) != 1 { if len(input.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(input.LogEvents))
} }
if argument.LogEvents[0] != events[0].inputLogEvent { if input.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input") t.Error("Expected event to equal input")
} }
} }
func TestPublishBatchError(t *testing.T) { func TestPublishBatchError(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
logStreamName: streamName, logStreamName: streamName,
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
errorResult: errors.New("Error"), return nil, errors.New("error")
} }
events := []wrappedEvent{ events := []wrappedEvent{
@ -499,20 +537,22 @@ func TestPublishBatchError(t *testing.T) {
} }
func TestPublishBatchInvalidSeqSuccess(t *testing.T) { func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
mockClient := newMockClientBuffered(2) mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
logStreamName: streamName, logStreamName: streamName,
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil), mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
if aws.StringValue(input.SequenceToken) != "token" {
return nil, awserr.New(invalidSequenceTokenCode, "use token token", nil)
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ return &cloudwatchlogs.PutLogEventsOutput{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
events := []wrappedEvent{ events := []wrappedEvent{
@ -531,7 +571,10 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken) t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken)
} }
argument := <-mockClient.putLogEventsArgument if len(calls) != 2 {
t.Fatalf("Expected two calls to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -548,7 +591,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
t.Error("Expected event to equal input") t.Error("Expected event to equal input")
} }
argument = <-mockClient.putLogEventsArgument argument = calls[1]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -567,15 +610,17 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
} }
func TestPublishBatchAlreadyAccepted(t *testing.T) { func TestPublishBatchAlreadyAccepted(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
logStreamName: streamName, logStreamName: streamName,
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil), mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
return nil, awserr.New(dataAlreadyAcceptedCode, "use token token", nil)
} }
events := []wrappedEvent{ events := []wrappedEvent{
@ -594,7 +639,10 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken) t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken)
} }
argument := <-mockClient.putLogEventsArgument if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -613,7 +661,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
} }
func TestCollectBatchSimple(t *testing.T) { func TestCollectBatchSimple(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -621,10 +669,12 @@ func TestCollectBatchSimple(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -641,10 +691,14 @@ func TestCollectBatchSimple(t *testing.T) {
Timestamp: time.Time{}, Timestamp: time.Time{},
}) })
ticks <- time.Time{}
ticks <- time.Time{} ticks <- time.Time{}
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -657,7 +711,7 @@ func TestCollectBatchSimple(t *testing.T) {
} }
func TestCollectBatchTicker(t *testing.T) { func TestCollectBatchTicker(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -665,10 +719,14 @@ func TestCollectBatchTicker(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -691,9 +749,13 @@ func TestCollectBatchTicker(t *testing.T) {
}) })
ticks <- time.Time{} ticks <- time.Time{}
// Verify first batch // Verify first batch
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
calls = calls[1:]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -713,7 +775,12 @@ func TestCollectBatchTicker(t *testing.T) {
}) })
ticks <- time.Time{} ticks <- time.Time{}
argument = <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument = calls[0]
close(called)
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -729,7 +796,7 @@ func TestCollectBatchTicker(t *testing.T) {
} }
func TestCollectBatchMultilinePattern(t *testing.T) { func TestCollectBatchMultilinePattern(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
multilinePattern := regexp.MustCompile("xxxx") multilinePattern := regexp.MustCompile("xxxx")
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
@ -739,10 +806,14 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -771,7 +842,12 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
ticks <- time.Now() ticks <- time.Now()
// Verify single multiline event // Verify single multiline event
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
calls = calls[1:]
assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
@ -779,7 +855,12 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
stream.Close() stream.Close()
// Verify single event // Verify single event
argument = <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument = calls[0]
close(called)
assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
assert.Check(t, is.Equal("xxxx "+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") assert.Check(t, is.Equal("xxxx "+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
@ -787,7 +868,7 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
func BenchmarkCollectBatch(b *testing.B) { func BenchmarkCollectBatch(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -795,10 +876,10 @@ func BenchmarkCollectBatch(b *testing.B) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
successResult: &cloudwatchlogs.PutLogEventsOutput{ return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -818,7 +899,7 @@ func BenchmarkCollectBatch(b *testing.B) {
func BenchmarkCollectBatchMultilinePattern(b *testing.B) { func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
mockClient := newMockClient() mockClient := &mockClient{}
multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`) multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`)
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
@ -828,10 +909,10 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
successResult: &cloudwatchlogs.PutLogEventsOutput{ return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -849,7 +930,7 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
} }
func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
multilinePattern := regexp.MustCompile("xxxx") multilinePattern := regexp.MustCompile("xxxx")
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
@ -859,10 +940,14 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -890,7 +975,12 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
ticks <- time.Now().Add(defaultForceFlushInterval + time.Second) ticks <- time.Now().Add(defaultForceFlushInterval + time.Second)
// Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval) // Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval)
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
calls = calls[1:]
assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
@ -905,7 +995,12 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second) ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second)
// Verify the event buffer is truly flushed - we should only receive a single event // Verify the event buffer is truly flushed - we should only receive a single event
argument = <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument = calls[0]
close(called)
assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
assert.Check(t, is.Equal(logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") assert.Check(t, is.Equal(logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
@ -913,7 +1008,7 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
} }
func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
multilinePattern := regexp.MustCompile("xxxx") multilinePattern := regexp.MustCompile("xxxx")
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
@ -923,10 +1018,14 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -954,7 +1053,12 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
ticks <- time.Now().Add(-time.Second) ticks <- time.Now().Add(-time.Second)
// Verify single multiline event is flushed with a negative event buffer age // Verify single multiline event is flushed with a negative event buffer age
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
close(called)
assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message")
@ -963,7 +1067,7 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
} }
func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
multilinePattern := regexp.MustCompile("xxxx") multilinePattern := regexp.MustCompile("xxxx")
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
@ -973,10 +1077,14 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -1009,7 +1117,12 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
// Verify multiline events // Verify multiline events
// We expect a maximum sized event with no new line characters and a // We expect a maximum sized event with no new line characters and a
// second short event with a new line character at the end // second short event with a new line character at the end
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
close(called)
assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
assert.Check(t, is.Equal(2, len(argument.LogEvents)), "Expected two events") assert.Check(t, is.Equal(2, len(argument.LogEvents)), "Expected two events")
assert.Check(t, is.Equal(longline, *argument.LogEvents[0].Message), "Received incorrect multiline message") assert.Check(t, is.Equal(longline, *argument.LogEvents[0].Message), "Received incorrect multiline message")
@ -1018,7 +1131,7 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
} }
func TestCollectBatchClose(t *testing.T) { func TestCollectBatchClose(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -1026,10 +1139,14 @@ func TestCollectBatchClose(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
var ticks = make(chan time.Time) var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -1050,7 +1167,12 @@ func TestCollectBatchClose(t *testing.T) {
// no ticks // no ticks
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
close(called)
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1119,7 +1241,7 @@ func TestProcessEventEmoji(t *testing.T) {
} }
func TestCollectBatchLineSplit(t *testing.T) { func TestCollectBatchLineSplit(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -1127,10 +1249,14 @@ func TestCollectBatchLineSplit(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
var ticks = make(chan time.Time) var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -1152,7 +1278,12 @@ func TestCollectBatchLineSplit(t *testing.T) {
// no ticks // no ticks
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
close(called)
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1168,7 +1299,7 @@ func TestCollectBatchLineSplit(t *testing.T) {
} }
func TestCollectBatchLineSplitWithBinary(t *testing.T) { func TestCollectBatchLineSplitWithBinary(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -1176,10 +1307,14 @@ func TestCollectBatchLineSplitWithBinary(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
var ticks = make(chan time.Time) var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -1201,7 +1336,12 @@ func TestCollectBatchLineSplitWithBinary(t *testing.T) {
// no ticks // no ticks
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
close(called)
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1217,7 +1357,7 @@ func TestCollectBatchLineSplitWithBinary(t *testing.T) {
} }
func TestCollectBatchMaxEvents(t *testing.T) { func TestCollectBatchMaxEvents(t *testing.T) {
mockClient := newMockClientBuffered(1) mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -1225,10 +1365,14 @@ func TestCollectBatchMaxEvents(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
var ticks = make(chan time.Time) var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -1252,7 +1396,12 @@ func TestCollectBatchMaxEvents(t *testing.T) {
// no ticks // no ticks
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument <-called
<-called
if len(calls) != 2 {
t.Fatalf("Expected two calls to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1260,7 +1409,8 @@ func TestCollectBatchMaxEvents(t *testing.T) {
t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents)) t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents))
} }
argument = <-mockClient.putLogEventsArgument argument = calls[1]
close(called)
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1271,7 +1421,7 @@ func TestCollectBatchMaxEvents(t *testing.T) {
func TestCollectBatchMaxTotalBytes(t *testing.T) { func TestCollectBatchMaxTotalBytes(t *testing.T) {
expectedPuts := 2 expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts) mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -1279,12 +1429,14 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
for i := 0; i < expectedPuts; i++ { calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
mockClient.putLogEventsResult <- &putLogEventsResult{ called := make(chan struct{}, 50)
successResult: &cloudwatchlogs.PutLogEventsOutput{ mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
}
} }
var ticks = make(chan time.Time) var ticks = make(chan time.Time)
@ -1321,7 +1473,13 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
// no ticks, guarantee batch by size (and chan close) // no ticks, guarantee batch by size (and chan close)
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument for i := 0; i < expectedPuts; i++ {
<-called
}
if len(calls) != expectedPuts {
t.Fatalf("Expected %d calls to PutLogEvents, was %d: %v", expectedPuts, len(calls), calls)
}
argument := calls[0]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1344,7 +1502,7 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
} }
argument = <-mockClient.putLogEventsArgument argument = calls[1]
if len(argument.LogEvents) != 1 { if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents))
} }
@ -1356,7 +1514,7 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
expectedPuts := 2 expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts) mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -1364,12 +1522,14 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
for i := 0; i < expectedPuts; i++ { calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
mockClient.putLogEventsResult <- &putLogEventsResult{ called := make(chan struct{}, 50)
successResult: &cloudwatchlogs.PutLogEventsOutput{ mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
}
} }
var ticks = make(chan time.Time) var ticks = make(chan time.Time)
@ -1401,7 +1561,13 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
// no ticks, guarantee batch by size (and chan close) // no ticks, guarantee batch by size (and chan close)
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument for i := 0; i < expectedPuts; i++ {
<-called
}
if len(calls) != expectedPuts {
t.Fatalf("Expected %d calls to PutLogEvents, was %d: %v", expectedPuts, len(calls), calls)
}
argument := calls[0]
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1424,7 +1590,7 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
} }
argument = <-mockClient.putLogEventsArgument argument = calls[1]
message := *argument.LogEvents[len(argument.LogEvents)-1].Message message := *argument.LogEvents[len(argument.LogEvents)-1].Message
if message[len(message)-1:] != "B" { if message[len(message)-1:] != "B" {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
@ -1432,7 +1598,7 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
} }
func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
stream := &logStream{ stream := &logStream{
client: mockClient, client: mockClient,
logGroupName: groupName, logGroupName: groupName,
@ -1440,10 +1606,14 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
sequenceToken: aws.String(sequenceToken), sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message), messages: make(chan *logger.Message),
} }
mockClient.putLogEventsResult <- &putLogEventsResult{ calls := make([]*cloudwatchlogs.PutLogEventsInput, 0)
successResult: &cloudwatchlogs.PutLogEventsOutput{ called := make(chan struct{}, 50)
mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
calls = append(calls, input)
called <- struct{}{}
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, }, nil
} }
ticks := make(chan time.Time) ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker { newTicker = func(_ time.Duration) *time.Ticker {
@ -1477,7 +1647,12 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
ticks <- time.Time{} ticks <- time.Time{}
stream.Close() stream.Close()
argument := <-mockClient.putLogEventsArgument <-called
if len(calls) != 1 {
t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls)
}
argument := calls[0]
close(called)
if argument == nil { if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput") t.Fatal("Expected non-nil PutLogEventsInput")
} }
@ -1639,7 +1814,7 @@ func TestValidateLogOptionsFormat(t *testing.T) {
} }
func TestCreateTagSuccess(t *testing.T) { func TestCreateTagSuccess(t *testing.T) {
mockClient := newMockClient() mockClient := &mockClient{}
info := logger.Info{ info := logger.Info{
ContainerName: "/test-container", ContainerName: "/test-container",
ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890", ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890",
@ -1655,12 +1830,17 @@ func TestCreateTagSuccess(t *testing.T) {
logStreamName: logStreamName, logStreamName: logStreamName,
logCreateStream: true, logCreateStream: true,
} }
mockClient.createLogStreamResult <- &createLogStreamResult{} calls := make([]*cloudwatchlogs.CreateLogStreamInput, 0)
mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
calls = append(calls, input)
return &cloudwatchlogs.CreateLogStreamOutput{}, nil
}
err := stream.create() err := stream.create()
assert.NilError(t, err) assert.NilError(t, err)
argument := <-mockClient.createLogStreamArgument assert.Equal(t, 1, len(calls))
argument := calls[0]
if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" { if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" {
t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890") t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890")

View file

@ -6,75 +6,29 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
) )
type mockcwlogsclient struct { type mockClient struct {
createLogGroupArgument chan *cloudwatchlogs.CreateLogGroupInput createLogGroupFunc func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
createLogGroupResult chan *createLogGroupResult createLogStreamFunc func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput putLogEventsFunc func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
createLogStreamResult chan *createLogStreamResult
putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput
putLogEventsResult chan *putLogEventsResult
} }
type createLogGroupResult struct { func (m *mockClient) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) {
successResult *cloudwatchlogs.CreateLogGroupOutput return m.createLogGroupFunc(input)
errorResult error
} }
type createLogStreamResult struct { func (m *mockClient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
successResult *cloudwatchlogs.CreateLogStreamOutput return m.createLogStreamFunc(input)
errorResult error
} }
type putLogEventsResult struct { func (m *mockClient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
successResult *cloudwatchlogs.PutLogEventsOutput if err := checkPutLogEventsConstraints(input); err != nil {
errorResult error return nil, err
}
return m.putLogEventsFunc(input)
} }
func newMockClient() *mockcwlogsclient { func checkPutLogEventsConstraints(input *cloudwatchlogs.PutLogEventsInput) error {
return &mockcwlogsclient{ events := input.LogEvents
createLogGroupArgument: make(chan *cloudwatchlogs.CreateLogGroupInput, 1),
createLogGroupResult: make(chan *createLogGroupResult, 1),
createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1),
createLogStreamResult: make(chan *createLogStreamResult, 1),
putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1),
putLogEventsResult: make(chan *putLogEventsResult, 1),
}
}
func newMockClientBuffered(buflen int) *mockcwlogsclient {
return &mockcwlogsclient{
createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, buflen),
createLogStreamResult: make(chan *createLogStreamResult, buflen),
putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, buflen),
putLogEventsResult: make(chan *putLogEventsResult, buflen),
}
}
func (m *mockcwlogsclient) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) {
m.createLogGroupArgument <- input
output := <-m.createLogGroupResult
return output.successResult, output.errorResult
}
func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
m.createLogStreamArgument <- input
output := <-m.createLogStreamResult
return output.successResult, output.errorResult
}
func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
events := make([]*cloudwatchlogs.InputLogEvent, len(input.LogEvents))
copy(events, input.LogEvents)
m.putLogEventsArgument <- &cloudwatchlogs.PutLogEventsInput{
LogEvents: events,
SequenceToken: input.SequenceToken,
LogGroupName: input.LogGroupName,
LogStreamName: input.LogStreamName,
}
// Intended mock output
output := <-m.putLogEventsResult
// Checked enforced limits in mock // Checked enforced limits in mock
totalBytes := 0 totalBytes := 0
for _, evt := range events { for _, evt := range events {
@ -84,7 +38,7 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput)
eventBytes := len([]byte(*evt.Message)) eventBytes := len([]byte(*evt.Message))
if eventBytes > maximumBytesPerEvent { if eventBytes > maximumBytesPerEvent {
// exceeded per event message size limits // exceeded per event message size limits
return nil, fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent) return fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent)
} }
// total event bytes including overhead // total event bytes including overhead
totalBytes += eventBytes + perEventBytes totalBytes += eventBytes + perEventBytes
@ -92,10 +46,9 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput)
if totalBytes > maximumBytesPerPut { if totalBytes > maximumBytesPerPut {
// exceeded per put maximum size limit // exceeded per put maximum size limit
return nil, fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut) return fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut)
} }
return nil
return output.successResult, output.errorResult
} }
type mockmetadataclient struct { type mockmetadataclient struct {