Merge pull request #36078 from mixja/multiline-max-event-processing

awslogs - don't add new lines to maximum sized events
This commit is contained in:
Yong Tang 2018-01-24 12:06:49 -08:00 committed by GitHub
commit a636ed5ff4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 12 deletions

View File

@ -410,7 +410,7 @@ func (l *logStream) collectBatch() {
// If event buffer is older than batch publish frequency flush the event buffer
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
eventBufferExpired := eventBufferAge >= int64(batchPublishFrequency)/int64(time.Millisecond)
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
@ -431,21 +431,23 @@ func (l *logStream) collectBatch() {
if eventBufferTimestamp == 0 {
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
}
unprocessedLine := msg.Line
line := msg.Line
if l.multilinePattern != nil {
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
// Append new line
processedLine := append(unprocessedLine, "\n"...)
eventBuffer = append(eventBuffer, processedLine...)
// Append new line if event is less than max event size
if len(line) < maximumBytesPerEvent {
line = append(line, "\n"...)
}
eventBuffer = append(eventBuffer, line...)
logger.PutMessage(msg)
} else {
l.processEvent(batch, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
l.processEvent(batch, line, msg.Timestamp.UnixNano()/int64(time.Millisecond))
logger.PutMessage(msg)
}
}
@ -461,14 +463,14 @@ func (l *logStream) collectBatch() {
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, timestamp int64) {
for len(unprocessedLine) > 0 {
func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) {
for len(events) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(unprocessedLine)
lineBytes := len(events)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := unprocessedLine[:lineBytes]
line := events[:lineBytes]
event := wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
@ -480,7 +482,7 @@ func (l *logStream) processEvent(batch *eventBatch, unprocessedLine []byte, time
added := batch.add(event, lineBytes)
if added {
unprocessedLine = unprocessedLine[lineBytes:]
events = events[lineBytes:]
} else {
l.publishBatch(batch)
batch.reset()

View File

@ -726,6 +726,59 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
stream.Close()
}
func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
mockClient := newMockClient()
multilinePattern := regexp.MustCompile("xxxx")
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
multilinePattern: multilinePattern,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}
go stream.collectBatch()
// Log max event size
longline := strings.Repeat("A", maximumBytesPerEvent)
stream.Log(&logger.Message{
Line: []byte(longline),
Timestamp: time.Now(),
})
// Log short event
shortline := strings.Repeat("B", 100)
stream.Log(&logger.Message{
Line: []byte(shortline),
Timestamp: time.Now(),
})
// Fire ticker
ticks <- time.Now().Add(batchPublishFrequency)
// Verify multiline events
// We expect a maximum sized event with no new line characters and a
// second short event with a new line character at the end
argument := <-mockClient.putLogEventsArgument
assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
assert.Equal(t, 2, len(argument.LogEvents), "Expected two events")
assert.Equal(t, longline, *argument.LogEvents[0].Message, "Received incorrect multiline message")
assert.Equal(t, shortline+"\n", *argument.LogEvents[1].Message, "Received incorrect multiline message")
stream.Close()
}
func TestCollectBatchClose(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{