From 5ba6cab0a9b9e51029fd48858ba6722103356b1a Mon Sep 17 00:00:00 2001 From: Samuel Karp Date: Tue, 19 Jul 2016 11:23:45 -0700 Subject: [PATCH 1/2] awslogs: Add unit test to ensure log line order Signed-off-by: Samuel Karp --- daemon/logger/awslogs/cloudwatchlogs_test.go | 59 ++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 48882c4ce4..f6a0d7590c 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/http" + "reflect" "runtime" "strings" "testing" @@ -625,3 +626,61 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) } } + +func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + times := maximumLogEventsPerPut + expectedEvents := []*cloudwatchlogs.InputLogEvent{} + timestamp := time.Now() + for i := 0; i < times; i++ { + line := fmt.Sprintf("%d", i) + if i%2 == 0 { + timestamp.Add(1 * time.Nanosecond) + } + stream.Log(&logger.Message{ + Line: []byte(line), + Timestamp: timestamp, + }) + expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{ + Message: aws.String(line), + Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)), + }) + } + + ticks <- time.Time{} + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != times { + t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents)) + } + for i := 0; i < times; i++ { + if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) { + t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i]) + } + } +} From 443f251cf596006fe4cb621dcab955f7da0a2a00 Mon Sep 17 00:00:00 2001 From: Samuel Karp Date: Tue, 19 Jul 2016 11:32:12 -0700 Subject: [PATCH 2/2] awslogs: Record log line insert order for sorting Fixes https://github.com/docker/docker/issues/24775 Signed-off-by: Samuel Karp --- daemon/logger/awslogs/cloudwatchlogs.go | 45 ++++++++++++++------ daemon/logger/awslogs/cloudwatchlogs_test.go | 33 ++++++++------ 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 8f59b27855..796fe96380 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -64,7 +64,11 @@ type regionFinder interface { Region() (string, error) } -type byTimestamp []*cloudwatchlogs.InputLogEvent +type wrappedEvent struct { + inputLogEvent *cloudwatchlogs.InputLogEvent + insertOrder int +} +type byTimestamp []wrappedEvent // init registers the awslogs driver func init() { @@ -229,7 +233,7 @@ var newTicker = func(freq time.Duration) *time.Ticker { // calculations. func (l *logStream) collectBatch() { timer := newTicker(batchPublishFrequency) - var events []*cloudwatchlogs.InputLogEvent + var events []wrappedEvent bytes := 0 for { select { @@ -258,9 +262,12 @@ func (l *logStream) collectBatch() { events = events[:0] bytes = 0 } - events = append(events, &cloudwatchlogs.InputLogEvent{ - Message: aws.String(string(line)), - Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), + events = append(events, wrappedEvent{ + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(string(line)), + Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), + }, + insertOrder: len(events), }) bytes += (lineBytes + perEventBytes) } @@ -271,14 +278,17 @@ func (l *logStream) collectBatch() { // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). -func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) { +func (l *logStream) publishBatch(events []wrappedEvent) { if len(events) == 0 { return } + // events in a batch must be sorted by timestamp + // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html sort.Sort(byTimestamp(events)) + cwEvents := unwrapEvents(events) - nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken) + nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken) if err != nil { if awsErr, ok := err.(awserr.Error); ok { @@ -297,7 +307,7 @@ func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) { // sequence code is bad, grab the correct one and retry parts := strings.Split(awsErr.Message(), " ") token := parts[len(parts)-1] - nextSequenceToken, err = l.putLogEvents(events, &token) + nextSequenceToken, err = l.putLogEvents(cwEvents, &token) } } } @@ -360,11 +370,14 @@ func (slice byTimestamp) Len() int { // required by the sort.Interface interface. func (slice byTimestamp) Less(i, j int) bool { iTimestamp, jTimestamp := int64(0), int64(0) - if slice != nil && slice[i].Timestamp != nil { - iTimestamp = *slice[i].Timestamp + if slice != nil && slice[i].inputLogEvent.Timestamp != nil { + iTimestamp = *slice[i].inputLogEvent.Timestamp } - if slice != nil && slice[j].Timestamp != nil { - jTimestamp = *slice[j].Timestamp + if slice != nil && slice[j].inputLogEvent.Timestamp != nil { + jTimestamp = *slice[j].inputLogEvent.Timestamp + } + if iTimestamp == jTimestamp { + return slice[i].insertOrder < slice[j].insertOrder } return iTimestamp < jTimestamp } @@ -374,3 +387,11 @@ func (slice byTimestamp) Less(i, j int) bool { func (slice byTimestamp) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } + +func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent { + cwEvents := []*cloudwatchlogs.InputLogEvent{} + for _, input := range events { + cwEvents = append(cwEvents, input.inputLogEvent) + } + return cwEvents +} diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index f6a0d7590c..8d0d9f4149 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -150,10 +150,11 @@ func TestPublishBatchSuccess(t *testing.T) { NextSequenceToken: aws.String(nextSequenceToken), }, } - - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -177,7 +178,7 @@ func TestPublishBatchSuccess(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } } @@ -194,9 +195,11 @@ func TestPublishBatchError(t *testing.T) { errorResult: errors.New("Error!"), } - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -226,9 +229,11 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { }, } - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -253,7 +258,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } @@ -270,7 +275,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } } @@ -287,9 +292,11 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil), } - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -314,7 +321,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } }