diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 28b521d1fa..d9bb38ca12 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -227,41 +227,50 @@ func TestNewAWSLogsClientRegionDetect(t *testing.T) { } func TestCreateSuccess(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, logCreateStream: true, } - mockClient.createLogStreamResult <- &createLogStreamResult{} + var input *cloudwatchlogs.CreateLogStreamInput + mockClient.createLogStreamFunc = func(i *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + input = i + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } err := stream.create() if err != nil { t.Errorf("Received unexpected err: %v\n", err) } - argument := <-mockClient.createLogStreamArgument - if argument.LogGroupName == nil { + if input.LogGroupName == nil { t.Fatal("Expected non-nil LogGroupName") } - if *argument.LogGroupName != groupName { + if *input.LogGroupName != groupName { t.Errorf("Expected LogGroupName to be %s", groupName) } - if argument.LogStreamName == nil { + if input.LogStreamName == nil { t.Fatal("Expected non-nil LogStreamName") } - if *argument.LogStreamName != streamName { + if *input.LogStreamName != streamName { t.Errorf("Expected LogStreamName to be %s", streamName) } } func TestCreateStreamSkipped(t *testing.T) { + mockClient := &mockClient{} stream := &logStream{ + client: mockClient, logGroupName: groupName, logStreamName: streamName, logCreateStream: false, } + mockClient.createLogStreamFunc = func(i *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + t.Error("CreateLogStream should not be called") + return nil, errors.New("should not be called") + } err := stream.create() @@ -271,7 +280,7 @@ func TestCreateStreamSkipped(t *testing.T) { } func TestCreateLogGroupSuccess(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -279,37 +288,62 @@ func TestCreateLogGroupSuccess(t *testing.T) { logCreateGroup: true, logCreateStream: true, } - mockClient.createLogGroupResult <- &createLogGroupResult{} - mockClient.createLogStreamResult <- &createLogStreamResult{} + var logGroupInput *cloudwatchlogs.CreateLogGroupInput + mockClient.createLogGroupFunc = func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + logGroupInput = input + return &cloudwatchlogs.CreateLogGroupOutput{}, nil + } + var logStreamInput *cloudwatchlogs.CreateLogStreamInput + createLogStreamCalls := 0 + mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + createLogStreamCalls++ + if logGroupInput == nil { + // log group not created yet + return nil, awserr.New(resourceNotFoundCode, "should error once", nil) + } + logStreamInput = input + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } err := stream.create() if err != nil { t.Errorf("Received unexpected err: %v\n", err) } - argument := <-mockClient.createLogStreamArgument - if argument.LogGroupName == nil { - t.Fatal("Expected non-nil LogGroupName") + if createLogStreamCalls < 2 { + t.Errorf("Expected CreateLogStream to be called twice, was called %d times", createLogStreamCalls) } - if *argument.LogGroupName != groupName { - t.Errorf("Expected LogGroupName to be %s", groupName) + if logGroupInput == nil { + t.Fatal("LogGroupInput should not be nil") } - if argument.LogStreamName == nil { + if logGroupInput.LogGroupName == nil { + t.Fatal("Expected non-nil LogGroupName in CreateLogGroup") + } + if *logGroupInput.LogGroupName != groupName { + t.Errorf("Expected LogGroupName to be %s in CreateLogGroup", groupName) + } + if logStreamInput.LogGroupName == nil { + t.Fatal("Expected non-nil LogGroupName in CreateLogStream") + } + if *logStreamInput.LogGroupName != groupName { + t.Errorf("Expected LogGroupName to be %s in CreateLogStream", groupName) + } + if logStreamInput.LogStreamName == nil { t.Fatal("Expected non-nil LogStreamName") } - if *argument.LogStreamName != streamName { + if *logStreamInput.LogStreamName != streamName { t.Errorf("Expected LogStreamName to be %s", streamName) } } func TestCreateError(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logCreateStream: true, } - mockClient.createLogStreamResult <- &createLogStreamResult{ - errorResult: errors.New("Error"), + mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return nil, errors.New("error") } err := stream.create() @@ -320,22 +354,25 @@ func TestCreateError(t *testing.T) { } func TestCreateAlreadyExists(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logCreateStream: true, } - mockClient.createLogStreamResult <- &createLogStreamResult{ - errorResult: awserr.New(resourceAlreadyExistsCode, "", nil), + calls := 0 + mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + calls++ + return nil, awserr.New(resourceAlreadyExistsCode, "", nil) } err := stream.create() assert.NilError(t, err) + assert.Equal(t, 1, calls) } func TestLogClosed(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, closed: true, @@ -350,7 +387,7 @@ func TestLogClosed(t *testing.T) { // non-blocking behavior is not enabled. Blocking is achieved through an // internal channel that must be drained for Log to return. func TestLogBlocking(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, messages: make(chan *logger.Message), @@ -388,7 +425,7 @@ func TestLogBlocking(t *testing.T) { } func TestLogNonBlockingBufferEmpty(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, messages: make(chan *logger.Message, 1), @@ -399,7 +436,7 @@ func TestLogNonBlockingBufferEmpty(t *testing.T) { } func TestLogNonBlockingBufferFull(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, messages: make(chan *logger.Message, 1), @@ -424,17 +461,19 @@ func TestLogNonBlockingBufferFull(t *testing.T) { } } func TestPublishBatchSuccess(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + var input *cloudwatchlogs.PutLogEventsInput + mockClient.putLogEventsFunc = func(i *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + input = i + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } events := []wrappedEvent{ { @@ -451,34 +490,33 @@ func TestPublishBatchSuccess(t *testing.T) { if *stream.sequenceToken != nextSequenceToken { t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken) } - argument := <-mockClient.putLogEventsArgument - if argument == nil { + if input == nil { t.Fatal("Expected non-nil PutLogEventsInput") } - if argument.SequenceToken == nil { + if input.SequenceToken == nil { t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken") } - if *argument.SequenceToken != sequenceToken { - t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken) + if *input.SequenceToken != sequenceToken { + t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *input.SequenceToken) } - if len(argument.LogEvents) != 1 { - t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) + if len(input.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(input.LogEvents)) } - if argument.LogEvents[0] != events[0].inputLogEvent { + if input.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } } func TestPublishBatchError(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - errorResult: errors.New("Error"), + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return nil, errors.New("error") } events := []wrappedEvent{ @@ -499,20 +537,22 @@ func TestPublishBatchError(t *testing.T) { } func TestPublishBatchInvalidSeqSuccess(t *testing.T) { - mockClient := newMockClientBuffered(2) + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil), - } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + if aws.StringValue(input.SequenceToken) != "token" { + return nil, awserr.New(invalidSequenceTokenCode, "use token token", nil) + } + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } events := []wrappedEvent{ @@ -531,7 +571,10 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken) } - argument := <-mockClient.putLogEventsArgument + if len(calls) != 2 { + t.Fatalf("Expected two calls to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -548,7 +591,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { t.Error("Expected event to equal input") } - argument = <-mockClient.putLogEventsArgument + argument = calls[1] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -567,15 +610,17 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { } func TestPublishBatchAlreadyAccepted(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, logStreamName: streamName, sequenceToken: aws.String(sequenceToken), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil), + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + return nil, awserr.New(dataAlreadyAcceptedCode, "use token token", nil) } events := []wrappedEvent{ @@ -594,7 +639,10 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken) } - argument := <-mockClient.putLogEventsArgument + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -613,7 +661,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { } func TestCollectBatchSimple(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -621,10 +669,12 @@ func TestCollectBatchSimple(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -641,10 +691,14 @@ func TestCollectBatchSimple(t *testing.T) { Timestamp: time.Time{}, }) + ticks <- time.Time{} ticks <- time.Time{} stream.Close() - argument := <-mockClient.putLogEventsArgument + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -657,7 +711,7 @@ func TestCollectBatchSimple(t *testing.T) { } func TestCollectBatchTicker(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -665,10 +719,14 @@ func TestCollectBatchTicker(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -691,9 +749,13 @@ func TestCollectBatchTicker(t *testing.T) { }) ticks <- time.Time{} - // Verify first batch - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + calls = calls[1:] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -713,7 +775,12 @@ func TestCollectBatchTicker(t *testing.T) { }) ticks <- time.Time{} - argument = <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument = calls[0] + close(called) if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -729,7 +796,7 @@ func TestCollectBatchTicker(t *testing.T) { } func TestCollectBatchMultilinePattern(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, @@ -739,10 +806,14 @@ func TestCollectBatchMultilinePattern(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -771,7 +842,12 @@ func TestCollectBatchMultilinePattern(t *testing.T) { ticks <- time.Now() // Verify single multiline event - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + calls = calls[1:] assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") @@ -779,7 +855,12 @@ func TestCollectBatchMultilinePattern(t *testing.T) { stream.Close() // Verify single event - argument = <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument = calls[0] + close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal("xxxx "+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") @@ -787,7 +868,7 @@ func TestCollectBatchMultilinePattern(t *testing.T) { func BenchmarkCollectBatch(b *testing.B) { for i := 0; i < b.N; i++ { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -795,10 +876,10 @@ func BenchmarkCollectBatch(b *testing.B) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -818,7 +899,7 @@ func BenchmarkCollectBatch(b *testing.B) { func BenchmarkCollectBatchMultilinePattern(b *testing.B) { for i := 0; i < b.N; i++ { - mockClient := newMockClient() + mockClient := &mockClient{} multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`) stream := &logStream{ client: mockClient, @@ -828,10 +909,10 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -849,7 +930,7 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) { } func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, @@ -859,10 +940,14 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -890,7 +975,12 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { ticks <- time.Now().Add(defaultForceFlushInterval + time.Second) // Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval) - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + calls = calls[1:] assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") @@ -905,7 +995,12 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second) // Verify the event buffer is truly flushed - we should only receive a single event - argument = <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument = calls[0] + close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") @@ -913,7 +1008,7 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { } func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, @@ -923,10 +1018,14 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -954,7 +1053,12 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { ticks <- time.Now().Add(-time.Second) // Verify single multiline event is flushed with a negative event buffer age - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") assert.Check(t, is.Equal(logline+"\n"+logline+"\n", *argument.LogEvents[0].Message), "Received incorrect multiline message") @@ -963,7 +1067,7 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { } func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} multilinePattern := regexp.MustCompile("xxxx") stream := &logStream{ client: mockClient, @@ -973,10 +1077,14 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -1009,7 +1117,12 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { // Verify multiline events // We expect a maximum sized event with no new line characters and a // second short event with a new line character at the end - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + close(called) assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(2, len(argument.LogEvents)), "Expected two events") assert.Check(t, is.Equal(longline, *argument.LogEvents[0].Message), "Received incorrect multiline message") @@ -1018,7 +1131,7 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { } func TestCollectBatchClose(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -1026,10 +1139,14 @@ func TestCollectBatchClose(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -1050,7 +1167,12 @@ func TestCollectBatchClose(t *testing.T) { // no ticks stream.Close() - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + close(called) if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1119,7 +1241,7 @@ func TestProcessEventEmoji(t *testing.T) { } func TestCollectBatchLineSplit(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -1127,10 +1249,14 @@ func TestCollectBatchLineSplit(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -1152,7 +1278,12 @@ func TestCollectBatchLineSplit(t *testing.T) { // no ticks stream.Close() - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + close(called) if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1168,7 +1299,7 @@ func TestCollectBatchLineSplit(t *testing.T) { } func TestCollectBatchLineSplitWithBinary(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -1176,10 +1307,14 @@ func TestCollectBatchLineSplitWithBinary(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -1201,7 +1336,12 @@ func TestCollectBatchLineSplitWithBinary(t *testing.T) { // no ticks stream.Close() - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + close(called) if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1217,7 +1357,7 @@ func TestCollectBatchLineSplitWithBinary(t *testing.T) { } func TestCollectBatchMaxEvents(t *testing.T) { - mockClient := newMockClientBuffered(1) + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -1225,10 +1365,14 @@ func TestCollectBatchMaxEvents(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } var ticks = make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -1252,7 +1396,12 @@ func TestCollectBatchMaxEvents(t *testing.T) { // no ticks stream.Close() - argument := <-mockClient.putLogEventsArgument + <-called + <-called + if len(calls) != 2 { + t.Fatalf("Expected two calls to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1260,7 +1409,8 @@ func TestCollectBatchMaxEvents(t *testing.T) { t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents)) } - argument = <-mockClient.putLogEventsArgument + argument = calls[1] + close(called) if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1271,7 +1421,7 @@ func TestCollectBatchMaxEvents(t *testing.T) { func TestCollectBatchMaxTotalBytes(t *testing.T) { expectedPuts := 2 - mockClient := newMockClientBuffered(expectedPuts) + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -1279,12 +1429,14 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - for i := 0; i < expectedPuts; i++ { - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: aws.String(nextSequenceToken), - }, - } + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, nil } var ticks = make(chan time.Time) @@ -1321,7 +1473,13 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { // no ticks, guarantee batch by size (and chan close) stream.Close() - argument := <-mockClient.putLogEventsArgument + for i := 0; i < expectedPuts; i++ { + <-called + } + if len(calls) != expectedPuts { + t.Fatalf("Expected %d calls to PutLogEvents, was %d: %v", expectedPuts, len(calls), calls) + } + argument := calls[0] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1344,7 +1502,7 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) } - argument = <-mockClient.putLogEventsArgument + argument = calls[1] if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) } @@ -1356,7 +1514,7 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { expectedPuts := 2 - mockClient := newMockClientBuffered(expectedPuts) + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -1364,12 +1522,14 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - for i := 0; i < expectedPuts; i++ { - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ - NextSequenceToken: aws.String(nextSequenceToken), - }, - } + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, nil } var ticks = make(chan time.Time) @@ -1401,7 +1561,13 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { // no ticks, guarantee batch by size (and chan close) stream.Close() - argument := <-mockClient.putLogEventsArgument + for i := 0; i < expectedPuts; i++ { + <-called + } + if len(calls) != expectedPuts { + t.Fatalf("Expected %d calls to PutLogEvents, was %d: %v", expectedPuts, len(calls), calls) + } + argument := calls[0] if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1424,7 +1590,7 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal) } - argument = <-mockClient.putLogEventsArgument + argument = calls[1] message := *argument.LogEvents[len(argument.LogEvents)-1].Message if message[len(message)-1:] != "B" { t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) @@ -1432,7 +1598,7 @@ func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) { } func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} stream := &logStream{ client: mockClient, logGroupName: groupName, @@ -1440,10 +1606,14 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { sequenceToken: aws.String(sequenceToken), messages: make(chan *logger.Message), } - mockClient.putLogEventsResult <- &putLogEventsResult{ - successResult: &cloudwatchlogs.PutLogEventsOutput{ + calls := make([]*cloudwatchlogs.PutLogEventsInput, 0) + called := make(chan struct{}, 50) + mockClient.putLogEventsFunc = func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + calls = append(calls, input) + called <- struct{}{} + return &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: aws.String(nextSequenceToken), - }, + }, nil } ticks := make(chan time.Time) newTicker = func(_ time.Duration) *time.Ticker { @@ -1477,7 +1647,12 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { ticks <- time.Time{} stream.Close() - argument := <-mockClient.putLogEventsArgument + <-called + if len(calls) != 1 { + t.Fatalf("Expected one call to PutLogEvents, was %d: %v", len(calls), calls) + } + argument := calls[0] + close(called) if argument == nil { t.Fatal("Expected non-nil PutLogEventsInput") } @@ -1639,7 +1814,7 @@ func TestValidateLogOptionsFormat(t *testing.T) { } func TestCreateTagSuccess(t *testing.T) { - mockClient := newMockClient() + mockClient := &mockClient{} info := logger.Info{ ContainerName: "/test-container", ContainerID: "container-abcdefghijklmnopqrstuvwxyz01234567890", @@ -1655,12 +1830,17 @@ func TestCreateTagSuccess(t *testing.T) { logStreamName: logStreamName, logCreateStream: true, } - mockClient.createLogStreamResult <- &createLogStreamResult{} + calls := make([]*cloudwatchlogs.CreateLogStreamInput, 0) + mockClient.createLogStreamFunc = func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + calls = append(calls, input) + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } err := stream.create() assert.NilError(t, err) - argument := <-mockClient.createLogStreamArgument + assert.Equal(t, 1, len(calls)) + argument := calls[0] if *argument.LogStreamName != "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890" { t.Errorf("Expected LogStreamName to be %s", "test-container/container-abcdefghijklmnopqrstuvwxyz01234567890") diff --git a/daemon/logger/awslogs/cwlogsiface_mock_test.go b/daemon/logger/awslogs/cwlogsiface_mock_test.go index 155e602b8c..c974bac60c 100644 --- a/daemon/logger/awslogs/cwlogsiface_mock_test.go +++ b/daemon/logger/awslogs/cwlogsiface_mock_test.go @@ -6,75 +6,29 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatchlogs" ) -type mockcwlogsclient struct { - createLogGroupArgument chan *cloudwatchlogs.CreateLogGroupInput - createLogGroupResult chan *createLogGroupResult - createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput - createLogStreamResult chan *createLogStreamResult - putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput - putLogEventsResult chan *putLogEventsResult +type mockClient struct { + createLogGroupFunc func(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) + createLogStreamFunc func(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) + putLogEventsFunc func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) } -type createLogGroupResult struct { - successResult *cloudwatchlogs.CreateLogGroupOutput - errorResult error +func (m *mockClient) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + return m.createLogGroupFunc(input) } -type createLogStreamResult struct { - successResult *cloudwatchlogs.CreateLogStreamOutput - errorResult error +func (m *mockClient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return m.createLogStreamFunc(input) } -type putLogEventsResult struct { - successResult *cloudwatchlogs.PutLogEventsOutput - errorResult error -} - -func newMockClient() *mockcwlogsclient { - return &mockcwlogsclient{ - createLogGroupArgument: make(chan *cloudwatchlogs.CreateLogGroupInput, 1), - createLogGroupResult: make(chan *createLogGroupResult, 1), - createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1), - createLogStreamResult: make(chan *createLogStreamResult, 1), - putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1), - putLogEventsResult: make(chan *putLogEventsResult, 1), +func (m *mockClient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + if err := checkPutLogEventsConstraints(input); err != nil { + return nil, err } + return m.putLogEventsFunc(input) } -func newMockClientBuffered(buflen int) *mockcwlogsclient { - return &mockcwlogsclient{ - createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, buflen), - createLogStreamResult: make(chan *createLogStreamResult, buflen), - putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, buflen), - putLogEventsResult: make(chan *putLogEventsResult, buflen), - } -} - -func (m *mockcwlogsclient) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { - m.createLogGroupArgument <- input - output := <-m.createLogGroupResult - return output.successResult, output.errorResult -} - -func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { - m.createLogStreamArgument <- input - output := <-m.createLogStreamResult - return output.successResult, output.errorResult -} - -func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - events := make([]*cloudwatchlogs.InputLogEvent, len(input.LogEvents)) - copy(events, input.LogEvents) - m.putLogEventsArgument <- &cloudwatchlogs.PutLogEventsInput{ - LogEvents: events, - SequenceToken: input.SequenceToken, - LogGroupName: input.LogGroupName, - LogStreamName: input.LogStreamName, - } - - // Intended mock output - output := <-m.putLogEventsResult - +func checkPutLogEventsConstraints(input *cloudwatchlogs.PutLogEventsInput) error { + events := input.LogEvents // Checked enforced limits in mock totalBytes := 0 for _, evt := range events { @@ -84,7 +38,7 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) eventBytes := len([]byte(*evt.Message)) if eventBytes > maximumBytesPerEvent { // exceeded per event message size limits - return nil, fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent) + return fmt.Errorf("maximum bytes per event exceeded: Event too large %d, max allowed: %d", eventBytes, maximumBytesPerEvent) } // total event bytes including overhead totalBytes += eventBytes + perEventBytes @@ -92,10 +46,9 @@ func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) if totalBytes > maximumBytesPerPut { // exceeded per put maximum size limit - return nil, fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut) + return fmt.Errorf("maximum bytes per put exceeded: Upload too large %d, max allowed: %d", totalBytes, maximumBytesPerPut) } - - return output.successResult, output.errorResult + return nil } type mockmetadataclient struct {