1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #24814 from samuelkarp/awslogs

awslogs: Record log line insert order for sorting
This commit is contained in:
Brian Goff 2016-08-12 15:51:06 -04:00 committed by GitHub
commit c6e3818dd1
2 changed files with 112 additions and 25 deletions

View file

@ -64,7 +64,11 @@ type regionFinder interface {
Region() (string, error) Region() (string, error)
} }
type byTimestamp []*cloudwatchlogs.InputLogEvent type wrappedEvent struct {
inputLogEvent *cloudwatchlogs.InputLogEvent
insertOrder int
}
type byTimestamp []wrappedEvent
// init registers the awslogs driver // init registers the awslogs driver
func init() { func init() {
@ -229,7 +233,7 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// calculations. // calculations.
func (l *logStream) collectBatch() { func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency) timer := newTicker(batchPublishFrequency)
var events []*cloudwatchlogs.InputLogEvent var events []wrappedEvent
bytes := 0 bytes := 0
for { for {
select { select {
@ -258,9 +262,12 @@ func (l *logStream) collectBatch() {
events = events[:0] events = events[:0]
bytes = 0 bytes = 0
} }
events = append(events, &cloudwatchlogs.InputLogEvent{ events = append(events, wrappedEvent{
Message: aws.String(string(line)), inputLogEvent: &cloudwatchlogs.InputLogEvent{
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), Message: aws.String(string(line)),
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
},
insertOrder: len(events),
}) })
bytes += (lineBytes + perEventBytes) bytes += (lineBytes + perEventBytes)
} }
@ -271,14 +278,17 @@ func (l *logStream) collectBatch() {
// publishBatch calls PutLogEvents for a given set of InputLogEvents, // publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the // accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request). // sequence token returned by the previous request).
func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) { func (l *logStream) publishBatch(events []wrappedEvent) {
if len(events) == 0 { if len(events) == 0 {
return return
} }
// events in a batch must be sorted by timestamp
// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
sort.Sort(byTimestamp(events)) sort.Sort(byTimestamp(events))
cwEvents := unwrapEvents(events)
nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken) nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.Error); ok { if awsErr, ok := err.(awserr.Error); ok {
@ -297,7 +307,7 @@ func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
// sequence code is bad, grab the correct one and retry // sequence code is bad, grab the correct one and retry
parts := strings.Split(awsErr.Message(), " ") parts := strings.Split(awsErr.Message(), " ")
token := parts[len(parts)-1] token := parts[len(parts)-1]
nextSequenceToken, err = l.putLogEvents(events, &token) nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
} }
} }
} }
@ -360,11 +370,14 @@ func (slice byTimestamp) Len() int {
// required by the sort.Interface interface. // required by the sort.Interface interface.
func (slice byTimestamp) Less(i, j int) bool { func (slice byTimestamp) Less(i, j int) bool {
iTimestamp, jTimestamp := int64(0), int64(0) iTimestamp, jTimestamp := int64(0), int64(0)
if slice != nil && slice[i].Timestamp != nil { if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
iTimestamp = *slice[i].Timestamp iTimestamp = *slice[i].inputLogEvent.Timestamp
} }
if slice != nil && slice[j].Timestamp != nil { if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
jTimestamp = *slice[j].Timestamp jTimestamp = *slice[j].inputLogEvent.Timestamp
}
if iTimestamp == jTimestamp {
return slice[i].insertOrder < slice[j].insertOrder
} }
return iTimestamp < jTimestamp return iTimestamp < jTimestamp
} }
@ -374,3 +387,11 @@ func (slice byTimestamp) Less(i, j int) bool {
func (slice byTimestamp) Swap(i, j int) { func (slice byTimestamp) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i] slice[i], slice[j] = slice[j], slice[i]
} }
func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
cwEvents := []*cloudwatchlogs.InputLogEvent{}
for _, input := range events {
cwEvents = append(cwEvents, input.inputLogEvent)
}
return cwEvents
}

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"reflect"
"runtime" "runtime"
"strings" "strings"
"testing" "testing"
@ -149,10 +150,11 @@ func TestPublishBatchSuccess(t *testing.T) {
NextSequenceToken: aws.String(nextSequenceToken), NextSequenceToken: aws.String(nextSequenceToken),
}, },
} }
events := []wrappedEvent{
events := []*cloudwatchlogs.InputLogEvent{
{ {
Message: aws.String(logline), inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
}, },
} }
@ -176,7 +178,7 @@ func TestPublishBatchSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 { if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
} }
if argument.LogEvents[0] != events[0] { if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input") t.Error("Expected event to equal input")
} }
} }
@ -193,9 +195,11 @@ func TestPublishBatchError(t *testing.T) {
errorResult: errors.New("Error!"), errorResult: errors.New("Error!"),
} }
events := []*cloudwatchlogs.InputLogEvent{ events := []wrappedEvent{
{ {
Message: aws.String(logline), inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
}, },
} }
@ -225,9 +229,11 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
}, },
} }
events := []*cloudwatchlogs.InputLogEvent{ events := []wrappedEvent{
{ {
Message: aws.String(logline), inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
}, },
} }
@ -252,7 +258,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 { if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
} }
if argument.LogEvents[0] != events[0] { if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input") t.Error("Expected event to equal input")
} }
@ -269,7 +275,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 { if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
} }
if argument.LogEvents[0] != events[0] { if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input") t.Error("Expected event to equal input")
} }
} }
@ -286,9 +292,11 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil), errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
} }
events := []*cloudwatchlogs.InputLogEvent{ events := []wrappedEvent{
{ {
Message: aws.String(logline), inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
}, },
} }
@ -313,7 +321,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
if len(argument.LogEvents) != 1 { if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
} }
if argument.LogEvents[0] != events[0] { if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input") t.Error("Expected event to equal input")
} }
} }
@ -625,3 +633,61 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
} }
} }
func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}
go stream.collectBatch()
times := maximumLogEventsPerPut
expectedEvents := []*cloudwatchlogs.InputLogEvent{}
timestamp := time.Now()
for i := 0; i < times; i++ {
line := fmt.Sprintf("%d", i)
if i%2 == 0 {
timestamp.Add(1 * time.Nanosecond)
}
stream.Log(&logger.Message{
Line: []byte(line),
Timestamp: timestamp,
})
expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
Message: aws.String(line),
Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
})
}
ticks <- time.Time{}
stream.Close()
argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
if len(argument.LogEvents) != times {
t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
}
for i := 0; i < times; i++ {
if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
}
}
}