From ab74038df93156ef27c03d6e4490bca5c3359423 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Mon, 13 Feb 2017 21:24:59 +1300 Subject: [PATCH 1/9] Add awslogs multiline support Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs.go | 177 +++++++++++++---- daemon/logger/awslogs/cloudwatchlogs_test.go | 189 ++++++++++++++++++- 2 files changed, 324 insertions(+), 42 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index ba9455e6ac..ed62e53896 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "os" + "regexp" "runtime" "sort" "strconv" @@ -34,6 +35,8 @@ const ( logStreamKey = "awslogs-stream" logCreateGroupKey = "awslogs-create-group" tagKey = "tag" + datetimeFormatKey = "awslogs-datetime-format" + multilinePatternKey = "awslogs-multiline-pattern" batchPublishFrequency = 5 * time.Second // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html @@ -53,14 +56,15 @@ const ( ) type logStream struct { - logStreamName string - logGroupName string - logCreateGroup bool - client api - messages chan *logger.Message - lock sync.RWMutex - closed bool - sequenceToken *string + logStreamName string + logGroupName string + logCreateGroup bool + multilinePattern *regexp.Regexp + client api + messages chan *logger.Message + lock sync.RWMutex + closed bool + sequenceToken *string } type api interface { @@ -89,9 +93,33 @@ func init() { } } +// Parses awslogs-multiline-pattern and awslogs-datetime-format options +// If awslogs-datetime-format is present, convert the format from strftime +// to regexp and return. +// If awslogs-multiline-pattern is present, compile regexp and return +func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) { + dateTimeFormat := info.Config[datetimeFormatKey] + multilinePatternKey := info.Config[multilinePatternKey] + if dateTimeFormat != "" { + r := regexp.MustCompile("%.") + multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string { + return strftimeToRegex[s] + }) + } + if multilinePatternKey != "" { + multilinePattern, err := regexp.Compile(multilinePatternKey) + if err != nil { + return nil, err + } + return multilinePattern, nil + } + return nil, nil +} + // New creates an awslogs logger using the configuration passed in on the // context. Supported context configuration variables are awslogs-region, -// awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is +// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern +// and awslogs-datetime-format. When available, configuration is // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID, // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and // the EC2 Instance Metadata Service. @@ -112,16 +140,23 @@ func New(info logger.Info) (logger.Logger, error) { if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] } + + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + return nil, err + } + client, err := newAWSLogsClient(info) if err != nil { return nil, err } containerStream := &logStream{ - logStreamName: logStreamName, - logGroupName: logGroupName, - logCreateGroup: logCreateGroup, - client: client, - messages: make(chan *logger.Message, 4096), + logStreamName: logStreamName, + logGroupName: logGroupName, + logCreateGroup: logCreateGroup, + multilinePattern: multilinePattern, + client: client, + messages: make(chan *logger.Message, 4096), } err = containerStream.create() if err != nil { @@ -309,48 +344,83 @@ var newTicker = func(freq time.Duration) *time.Ticker { func (l *logStream) collectBatch() { timer := newTicker(batchPublishFrequency) var events []wrappedEvent - bytes := 0 + var eventBuffer []byte + var eventBufferTimestamp int64 for { select { - case <-timer.C: + case t := <-timer.C: + // If event buffer is older than batch publish frequency flush the event buffer + if eventBufferTimestamp > 0 && len(eventBuffer) > 0 { + eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp + if eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond) { + events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + } + } l.publishBatch(events) events = events[:0] - bytes = 0 case msg, more := <-l.messages: if !more { + // Flush event buffer + events = l.processEvent(events, eventBuffer, eventBufferTimestamp) l.publishBatch(events) return } - unprocessedLine := msg.Line - for len(unprocessedLine) > 0 { - // Split line length so it does not exceed the maximum - lineBytes := len(unprocessedLine) - if lineBytes > maximumBytesPerEvent { - lineBytes = maximumBytesPerEvent - } - line := unprocessedLine[:lineBytes] - unprocessedLine = unprocessedLine[lineBytes:] - if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { - // Publish an existing batch if it's already over the maximum number of events or if adding this - // event would push it over the maximum number of total bytes. - l.publishBatch(events) - events = events[:0] - bytes = 0 - } - events = append(events, wrappedEvent{ - inputLogEvent: &cloudwatchlogs.InputLogEvent{ - Message: aws.String(string(line)), - Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), - }, - insertOrder: len(events), - }) - bytes += (lineBytes + perEventBytes) + if eventBufferTimestamp == 0 { + eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) } + unprocessedLine := msg.Line + if l.multilinePattern != nil { + if l.multilinePattern.Match(unprocessedLine) { + // This is a new log event so flush the current eventBuffer to events + events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) + eventBuffer = eventBuffer[:0] + } + eventBuffer = append(eventBuffer, unprocessedLine...) + // If we have exceeded max bytes per event flush the event buffer up to max bytes + if len(eventBuffer) > maximumBytesPerEvent { + events = l.processEvent(events, eventBuffer[:maximumBytesPerEvent], eventBufferTimestamp) + eventBuffer = eventBuffer[maximumBytesPerEvent:] + } + logger.PutMessage(msg) + continue + } + events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) logger.PutMessage(msg) } } } +// processEvent processes log events +func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent { + bytes := 0 + for len(unprocessedLine) > 0 { + // Split line length so it does not exceed the maximum + lineBytes := len(unprocessedLine) + if lineBytes > maximumBytesPerEvent { + lineBytes = maximumBytesPerEvent + } + line := unprocessedLine[:lineBytes] + unprocessedLine = unprocessedLine[lineBytes:] + if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { + // Publish an existing batch if it's already over the maximum number of events or if adding this + // event would push it over the maximum number of total bytes. + l.publishBatch(events) + events = events[:0] + bytes = 0 + } + events = append(events, wrappedEvent{ + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(string(line)), + Timestamp: aws.Int64(timestamp), + }, + insertOrder: len(events), + }) + bytes += (lineBytes + perEventBytes) + } + return events +} + // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). @@ -394,6 +464,29 @@ func (l *logStream) publishBatch(events []wrappedEvent) { } } +// Maps strftime format strings to regex +var strftimeToRegex = map[string]string{ + /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`, + /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`, + /*weekdayZeroIndex */ `%w`: `[0-6]`, + /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`, + /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`, + /*monthFull */ `%B`: `(?:January|February|March|April|June|July|August|September|October|November|December)`, + /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`, + /*yearCentury */ `%Y`: `\d{4}`, + /*yearZeroPadded */ `%y`: `\d{2}`, + /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`, + /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`, + /*AM or PM */ `%p`: "[A,P]M", + /*minuteZeroPadded */ `%M`: `[0-5][0-9]`, + /*secondZeroPadded */ `%S`: `[0-5][0-9]`, + /*microsecondZeroPadded */ `%f`: `\d{6}`, + /*utcOffset */ `%z`: `[+-]\d{4}`, + /*tzName */ `%Z`: `[A-Z]{1,4}T`, + /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`, + /*milliseconds */ `%L`: `\.\d{3}`, +} + // putLogEvents wraps the PutLogEvents API func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) { input := &cloudwatchlogs.PutLogEventsInput{ @@ -428,6 +521,8 @@ func ValidateLogOpt(cfg map[string]string) error { case logCreateGroupKey: case regionKey: case tagKey: + case datetimeFormatKey: + case multilinePatternKey: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index ac0bb09c39..48c4395a04 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "reflect" + "regexp" "runtime" "strings" "testing" @@ -24,7 +25,7 @@ const ( streamName = "streamName" sequenceToken = "sequenceToken" nextSequenceToken = "nextSequenceToken" - logline = "this is a log line" + logline = "this is a log line\r" ) func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { @@ -471,6 +472,127 @@ func TestCollectBatchTicker(t *testing.T) { } +func TestCollectBatchMultilinePattern(t *testing.T) { + mockClient := newMockClient() + multilinePattern := regexp.MustCompile("xxxx") + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + multilinePattern: multilinePattern, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now(), + }) + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now(), + }) + stream.Log(&logger.Message{ + Line: []byte("xxxx " + logline), + Timestamp: time.Now(), + }) + + ticks <- time.Time{} + + // Verify single multiline event + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline+logline { + t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + } + + stream.Close() + + // Verify single event + argument = <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != "xxxx "+logline { + t.Errorf("Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message) + } +} + +func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { + mockClient := newMockClient() + multilinePattern := regexp.MustCompile("xxxx") + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + multilinePattern: multilinePattern, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now(), + }) + + // Log an event 1 second later + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now().Add(time.Second), + }) + + // Fire ticker batchPublishFrequency seconds later + ticks <- time.Now().Add(batchPublishFrequency * time.Second) + + // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency) + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline+logline { + t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + } + + stream.Close() +} + func TestCollectBatchClose(t *testing.T) { mockClient := newMockClient() stream := &logStream{ @@ -724,6 +846,71 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { } } +func TestParseLogOptionsMultilinePattern(t *testing.T) { + info := logger.Info{ + Config: map[string]string{ + multilinePatternKey: "^xxxx", + }, + } + + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + if !multilinePattern.MatchString("xxxx") { + t.Errorf("Expected multilinePattern to match string xxxx but no match found") + } +} + +func TestParseLogOptionsDatetimeFormatSupersedesMultilinePattern(t *testing.T) { + info := logger.Info{ + Config: map[string]string{ + multilinePatternKey: "^xxxx", + datetimeFormatKey: "%Y-%m-%d", + }, + } + + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + if multilinePattern.MatchString("xxxx") { + t.Errorf("Expected multilinePattern to NOT match string xxxx but match was made") + } + if !multilinePattern.MatchString("2017-01-01") { + t.Errorf("Expected multilinePattern to match string 2017-01-01 but no match found") + } +} + +func TestParseLogOptionsDatetimeFormat(t *testing.T) { + datetimeFormatTests := []struct { + format string + match string + }{ + {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"}, + {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"}, + {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec"}, + {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|June|July|August|September|October|November|December"}, + {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"}, + {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"}, + {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"}, + } + for _, dt := range datetimeFormatTests { + info := logger.Info{ + Config: map[string]string{ + datetimeFormatKey: dt.format, + }, + } + multilinePattern, err := parseMultilineOptions(info) + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + if !multilinePattern.MatchString(dt.match) { + t.Errorf("Expected multilinePattern %s to match string %s but no match found", dt.format, dt.match) + } + } +} + func TestCreateTagSuccess(t *testing.T) { mockClient := newMockClient() info := logger.Info{ From 84b03660dab03ac366d99ba817a5ec8523808386 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Thu, 23 Feb 2017 00:33:51 +1300 Subject: [PATCH 2/9] Add awslogs benchmarks Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs_test.go | 76 ++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 48c4395a04..c7ea484c11 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -26,8 +26,25 @@ const ( sequenceToken = "sequenceToken" nextSequenceToken = "nextSequenceToken" logline = "this is a log line\r" + multilineLogline = "2017-01-01 01:01:44 This is a multiline log entry\r" ) +// Generates i multi-line events each with j lines +func (l *logStream) logGenerator(lineCount int, multilineCount int) { + for i := 0; i < multilineCount; i++ { + l.Log(&logger.Message{ + Line: []byte(multilineLogline), + Timestamp: time.Time{}, + }) + for j := 0; j < lineCount; j++ { + l.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Time{}, + }) + } + } +} + func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { info := logger.Info{ Config: map[string]string{ @@ -539,6 +556,65 @@ func TestCollectBatchMultilinePattern(t *testing.T) { } } +func BenchmarkCollectBatch(b *testing.B) { + for i := 0; i < b.N; i++ { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + stream.logGenerator(10, 100) + ticks <- time.Time{} + stream.Close() + } +} + +func BenchmarkCollectBatchMultilinePattern(b *testing.B) { + for i := 0; i < b.N; i++ { + mockClient := newMockClient() + multilinePattern := regexp.MustCompile(`\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[1,2][0-9]|3[0,1]) (?:[0,1][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]`) + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + multilinePattern: multilinePattern, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + go stream.collectBatch() + stream.logGenerator(10, 100) + ticks <- time.Time{} + stream.Close() + } +} + func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { mockClient := newMockClient() multilinePattern := regexp.MustCompile("xxxx") From 6f073e352247217a4cc5ee825676a9a496655455 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Thu, 23 Feb 2017 01:07:59 +1300 Subject: [PATCH 3/9] Formatting Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index c7ea484c11..1e733c15fd 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -578,7 +578,7 @@ func BenchmarkCollectBatch(b *testing.B) { } } - go stream.collectBatch() + go stream.collectBatch() stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() From f775005a17c7ae0d3851cc86f46b7c46aafa27d8 Mon Sep 17 00:00:00 2001 From: Tibor Vass Date: Tue, 28 Mar 2017 17:10:26 -0700 Subject: [PATCH 4/9] awslogs: use github.com/pkg/errors to wrap an error Signed-off-by: Tibor Vass --- daemon/logger/awslogs/cloudwatchlogs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index ed62e53896..b942d55272 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -3,7 +3,6 @@ package awslogs import ( "bytes" - "errors" "fmt" "os" "regexp" @@ -25,6 +24,7 @@ import ( "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/dockerversion" "github.com/docker/docker/pkg/templates" + "github.com/pkg/errors" ) const ( @@ -109,7 +109,7 @@ func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) { if multilinePatternKey != "" { multilinePattern, err := regexp.Compile(multilinePatternKey) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey) } return multilinePattern, nil } From 8ef9c19ecdd16881289a63e5e89de3f5c83def66 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Fri, 12 May 2017 21:55:18 +1200 Subject: [PATCH 5/9] Code review changes Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs.go | 118 ++++++++------- daemon/logger/awslogs/cloudwatchlogs_test.go | 144 +++++++++++-------- 2 files changed, 146 insertions(+), 116 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index b942d55272..a54ddc0b73 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -93,29 +93,6 @@ func init() { } } -// Parses awslogs-multiline-pattern and awslogs-datetime-format options -// If awslogs-datetime-format is present, convert the format from strftime -// to regexp and return. -// If awslogs-multiline-pattern is present, compile regexp and return -func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) { - dateTimeFormat := info.Config[datetimeFormatKey] - multilinePatternKey := info.Config[multilinePatternKey] - if dateTimeFormat != "" { - r := regexp.MustCompile("%.") - multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string { - return strftimeToRegex[s] - }) - } - if multilinePatternKey != "" { - multilinePattern, err := regexp.Compile(multilinePatternKey) - if err != nil { - return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey) - } - return multilinePattern, nil - } - return nil, nil -} - // New creates an awslogs logger using the configuration passed in on the // context. Supported context configuration variables are awslogs-region, // awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern @@ -167,6 +144,56 @@ func New(info logger.Info) (logger.Logger, error) { return containerStream, nil } +// Parses awslogs-multiline-pattern and awslogs-datetime-format options +// If awslogs-datetime-format is present, convert the format from strftime +// to regexp and return. +// If awslogs-multiline-pattern is present, compile regexp and return +func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) { + dateTimeFormat := info.Config[datetimeFormatKey] + multilinePatternKey := info.Config[multilinePatternKey] + // strftime input is parsed into a regular expression + if dateTimeFormat != "" { + // %. matches each strftime format sequence and ReplaceAllStringFunc + // looks up each format sequence in the conversion table strftimeToRegex + // to replace with a defined regular expression + r := regexp.MustCompile("%.") + multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string { + return strftimeToRegex[s] + }) + } + if multilinePatternKey != "" { + multilinePattern, err := regexp.Compile(multilinePatternKey) + if err != nil { + return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey) + } + return multilinePattern, nil + } + return nil, nil +} + +// Maps strftime format strings to regex +var strftimeToRegex = map[string]string{ + /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`, + /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`, + /*weekdayZeroIndex */ `%w`: `[0-6]`, + /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`, + /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`, + /*monthFull */ `%B`: `(?:January|February|March|April|June|July|August|September|October|November|December)`, + /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`, + /*yearCentury */ `%Y`: `\d{4}`, + /*yearZeroPadded */ `%y`: `\d{2}`, + /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`, + /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`, + /*AM or PM */ `%p`: "[A,P]M", + /*minuteZeroPadded */ `%M`: `[0-5][0-9]`, + /*secondZeroPadded */ `%S`: `[0-5][0-9]`, + /*microsecondZeroPadded */ `%f`: `\d{6}`, + /*utcOffset */ `%z`: `[+-]\d{4}`, + /*tzName */ `%Z`: `[A-Z]{1,4}T`, + /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`, + /*milliseconds */ `%L`: `\.\d{3}`, +} + func parseLogGroup(info logger.Info, groupTemplate string) (string, error) { tmpl, err := templates.NewParse("log-group", groupTemplate) if err != nil { @@ -352,7 +379,9 @@ func (l *logStream) collectBatch() { // If event buffer is older than batch publish frequency flush the event buffer if eventBufferTimestamp > 0 && len(eventBuffer) > 0 { eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp - if eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond) { + eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond) + eventBufferNegative := eventBufferAge < 0 + if eventBufferExpired || eventBufferNegative { events = l.processEvent(events, eventBuffer, eventBufferTimestamp) } } @@ -376,12 +405,12 @@ func (l *logStream) collectBatch() { eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond) eventBuffer = eventBuffer[:0] } - eventBuffer = append(eventBuffer, unprocessedLine...) - // If we have exceeded max bytes per event flush the event buffer up to max bytes - if len(eventBuffer) > maximumBytesPerEvent { - events = l.processEvent(events, eventBuffer[:maximumBytesPerEvent], eventBufferTimestamp) - eventBuffer = eventBuffer[maximumBytesPerEvent:] + // If we will exceed max bytes per event flush the current event buffer before appending + if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent { + events = l.processEvent(events, eventBuffer, eventBufferTimestamp) + eventBuffer = eventBuffer[:0] } + eventBuffer = append(eventBuffer, unprocessedLine...) logger.PutMessage(msg) continue } @@ -464,29 +493,6 @@ func (l *logStream) publishBatch(events []wrappedEvent) { } } -// Maps strftime format strings to regex -var strftimeToRegex = map[string]string{ - /*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`, - /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`, - /*weekdayZeroIndex */ `%w`: `[0-6]`, - /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`, - /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`, - /*monthFull */ `%B`: `(?:January|February|March|April|June|July|August|September|October|November|December)`, - /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`, - /*yearCentury */ `%Y`: `\d{4}`, - /*yearZeroPadded */ `%y`: `\d{2}`, - /*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`, - /*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`, - /*AM or PM */ `%p`: "[A,P]M", - /*minuteZeroPadded */ `%M`: `[0-5][0-9]`, - /*secondZeroPadded */ `%S`: `[0-5][0-9]`, - /*microsecondZeroPadded */ `%f`: `\d{6}`, - /*utcOffset */ `%z`: `[+-]\d{4}`, - /*tzName */ `%Z`: `[A-Z]{1,4}T`, - /*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`, - /*milliseconds */ `%L`: `\.\d{3}`, -} - // putLogEvents wraps the PutLogEvents API func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) { input := &cloudwatchlogs.PutLogEventsInput{ @@ -512,7 +518,8 @@ func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenc } // ValidateLogOpt looks for awslogs-specific log options awslogs-region, -// awslogs-group, awslogs-stream, awslogs-create-group +// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format, +// awslogs-multiline-pattern func ValidateLogOpt(cfg map[string]string) error { for key := range cfg { switch key { @@ -535,6 +542,11 @@ func ValidateLogOpt(cfg map[string]string) error { return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err) } } + _, datetimeFormatKeyExists := cfg[datetimeFormatKey] + _, multilinePatternKeyExists := cfg[multilinePatternKey] + if datetimeFormatKeyExists && multilinePatternKeyExists { + return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey) + } return nil } diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 1e733c15fd..463843bc3c 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -18,6 +18,7 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/dockerversion" + "github.com/stretchr/testify/assert" ) const ( @@ -527,33 +528,21 @@ func TestCollectBatchMultilinePattern(t *testing.T) { Timestamp: time.Now(), }) - ticks <- time.Time{} + ticks <- time.Now() // Verify single multiline event argument := <-mockClient.putLogEventsArgument - if argument == nil { - t.Fatal("Expected non-nil PutLogEventsInput") - } - if len(argument.LogEvents) != 1 { - t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - } - if *argument.LogEvents[0].Message != logline+logline { - t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) - } + assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") + assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + assert.Equal(t, logline+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) stream.Close() // Verify single event argument = <-mockClient.putLogEventsArgument - if argument == nil { - t.Fatal("Expected non-nil PutLogEventsInput") - } - if len(argument.LogEvents) != 1 { - t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - } - if *argument.LogEvents[0].Message != "xxxx "+logline { - t.Errorf("Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message) - } + assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") + assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + assert.Equal(t, "xxxx "+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message) } func BenchmarkCollectBatch(b *testing.B) { @@ -656,16 +645,58 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency) argument := <-mockClient.putLogEventsArgument - if argument == nil { - t.Fatal("Expected non-nil PutLogEventsInput") + assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") + assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + assert.Equal(t, logline+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + + stream.Close() +} + +func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { + mockClient := newMockClient() + multilinePattern := regexp.MustCompile("xxxx") + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + multilinePattern: multilinePattern, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), } - if len(argument.LogEvents) != 1 { - t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, } - if *argument.LogEvents[0].Message != logline+logline { - t.Errorf("Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } } + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now(), + }) + + // Log an event 1 second later + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Now().Add(time.Second), + }) + + // Fire ticker in past to simulate negative event buffer age + ticks <- time.Now().Add(-time.Second) + + // Verify single multiline event is flushed with a negative event buffer age + argument := <-mockClient.putLogEventsArgument + assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") + assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + assert.Equal(t, logline+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + stream.Close() } @@ -930,32 +961,8 @@ func TestParseLogOptionsMultilinePattern(t *testing.T) { } multilinePattern, err := parseMultilineOptions(info) - if err != nil { - t.Errorf("Received unexpected err: %v\n", err) - } - if !multilinePattern.MatchString("xxxx") { - t.Errorf("Expected multilinePattern to match string xxxx but no match found") - } -} - -func TestParseLogOptionsDatetimeFormatSupersedesMultilinePattern(t *testing.T) { - info := logger.Info{ - Config: map[string]string{ - multilinePatternKey: "^xxxx", - datetimeFormatKey: "%Y-%m-%d", - }, - } - - multilinePattern, err := parseMultilineOptions(info) - if err != nil { - t.Errorf("Received unexpected err: %v\n", err) - } - if multilinePattern.MatchString("xxxx") { - t.Errorf("Expected multilinePattern to NOT match string xxxx but match was made") - } - if !multilinePattern.MatchString("2017-01-01") { - t.Errorf("Expected multilinePattern to match string 2017-01-01 but no match found") - } + assert.Nil(t, err, "Received unexpected err: %v\n", err) + assert.True(t, multilinePattern.MatchString("xxxx"), "Expected multilinePattern to match string xxxx but no match found") } func TestParseLogOptionsDatetimeFormat(t *testing.T) { @@ -972,21 +979,32 @@ func TestParseLogOptionsDatetimeFormat(t *testing.T) { {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"}, } for _, dt := range datetimeFormatTests { - info := logger.Info{ - Config: map[string]string{ - datetimeFormatKey: dt.format, - }, - } - multilinePattern, err := parseMultilineOptions(info) - if err != nil { - t.Errorf("Received unexpected err: %v\n", err) - } - if !multilinePattern.MatchString(dt.match) { - t.Errorf("Expected multilinePattern %s to match string %s but no match found", dt.format, dt.match) - } + t.Run(dt.match, func(t *testing.T) { + info := logger.Info{ + Config: map[string]string{ + datetimeFormatKey: dt.format, + }, + } + multilinePattern, err := parseMultilineOptions(info) + assert.Nil(t, err, "Received unexpected err: %v\n", err) + assert.True(t, multilinePattern.MatchString(dt.match), "Expected multilinePattern %s to match string %s but no match found", dt.format, dt.match) + }) } } +func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) { + cfg := map[string]string{ + multilinePatternKey: "^xxxx", + datetimeFormatKey: "%Y-%m-%d", + logGroupKey: groupName, + } + conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time" + + err := ValidateLogOpt(cfg) + assert.NotNil(t, err, "Expected an error but received nil") + assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received incorrect error: %v\n", err) +} + func TestCreateTagSuccess(t *testing.T) { mockClient := newMockClient() info := logger.Info{ From 4374f16667708118c126cd71f733493b0070e807 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Sat, 13 May 2017 09:44:34 +1200 Subject: [PATCH 6/9] Append new line to multiline events Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs.go | 4 +++- daemon/logger/awslogs/cloudwatchlogs_test.go | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index a54ddc0b73..6de65f8ac8 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -410,7 +410,9 @@ func (l *logStream) collectBatch() { events = l.processEvent(events, eventBuffer, eventBufferTimestamp) eventBuffer = eventBuffer[:0] } - eventBuffer = append(eventBuffer, unprocessedLine...) + // Append new line + processedLine := append(unprocessedLine, "\n"...) + eventBuffer = append(eventBuffer, processedLine...) logger.PutMessage(msg) continue } diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 463843bc3c..3812b0a76e 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -534,7 +534,7 @@ func TestCollectBatchMultilinePattern(t *testing.T) { argument := <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, logline+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) stream.Close() @@ -542,7 +542,7 @@ func TestCollectBatchMultilinePattern(t *testing.T) { argument = <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, "xxxx "+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message) + assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message) } func BenchmarkCollectBatch(b *testing.B) { @@ -647,7 +647,7 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { argument := <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, logline+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) stream.Close() } @@ -695,7 +695,7 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { argument := <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, logline+logline, *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) stream.Close() } From bf1c377f6012f3db714e0f848d70e776ef1226a6 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Sat, 13 May 2017 22:10:51 +1200 Subject: [PATCH 7/9] Add missing month in datetime map Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs.go | 4 ++-- daemon/logger/awslogs/cloudwatchlogs_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 6de65f8ac8..304da2faf2 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -177,8 +177,8 @@ var strftimeToRegex = map[string]string{ /*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`, /*weekdayZeroIndex */ `%w`: `[0-6]`, /*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`, - /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`, - /*monthFull */ `%B`: `(?:January|February|March|April|June|July|August|September|October|November|December)`, + /*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`, + /*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`, /*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`, /*yearCentury */ `%Y`: `\d{4}`, /*yearZeroPadded */ `%y`: `\d{2}`, diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 3812b0a76e..541b8d8c99 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -972,8 +972,8 @@ func TestParseLogOptionsDatetimeFormat(t *testing.T) { }{ {"%d/%m/%y %a %H:%M:%S%L %Z", "31/12/10 Mon 08:42:44.345 NZDT"}, {"%Y-%m-%d %A %I:%M:%S.%f%p%z", "2007-12-04 Monday 08:42:44.123456AM+1200"}, - {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec"}, - {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|June|July|August|September|October|November|December"}, + {"%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b|%b", "Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec"}, + {"%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B|%B", "January|February|March|April|May|June|July|August|September|October|November|December"}, {"%A|%A|%A|%A|%A|%A|%A", "Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday"}, {"%a|%a|%a|%a|%a|%a|%a", "Mon|Tue|Wed|Thu|Fri|Sat|Sun"}, {"Day of the week: %w, Day of the year: %j", "Day of the week: 4, Day of the year: 091"}, From 3a4cf2b076a8f04f54b6edff4f661f5c3b2c8157 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Mon, 15 May 2017 10:28:18 +1200 Subject: [PATCH 8/9] Code review changes Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs.go | 15 ++++++++--- daemon/logger/awslogs/cloudwatchlogs_test.go | 28 ++++++++++---------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 304da2faf2..818b7bdfc3 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -367,7 +367,14 @@ var newTicker = func(freq time.Duration) *time.Ticker { // maximumBytesPerPut). Log messages are split by the maximum bytes per event // (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead // (defined in perEventBytes) which is accounted for in split- and batch- -// calculations. +// calculations. If the awslogs-multiline-pattern or awslogs-datetime-format +// options have been configured, multiline processing is enabled, where +// log messages are stored in an event buffer until a multiline pattern match +// is found, at which point the messages in the event buffer are pushed to +// CloudWatch logs as a single log event. Multline messages still are processed +// according to the maximumBytesPerPut constraint, and the implementation only +// allows for messages to be buffered for a maximum of 2*batchPublishFrequency +// seconds. func (l *logStream) collectBatch() { timer := newTicker(batchPublishFrequency) var events []wrappedEvent @@ -414,10 +421,10 @@ func (l *logStream) collectBatch() { processedLine := append(unprocessedLine, "\n"...) eventBuffer = append(eventBuffer, processedLine...) logger.PutMessage(msg) - continue + } else { + events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) + logger.PutMessage(msg) } - events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond)) - logger.PutMessage(msg) } } } diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 541b8d8c99..655b99c7a4 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -533,16 +533,16 @@ func TestCollectBatchMultilinePattern(t *testing.T) { // Verify single multiline event argument := <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") - assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event") + assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message") stream.Close() // Verify single event argument = <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") - assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", "xxxx "+logline, *argument.LogEvents[0].Message) + assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event") + assert.Equal(t, "xxxx "+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message") } func BenchmarkCollectBatch(b *testing.B) { @@ -646,8 +646,8 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency) argument := <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") - assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event") + assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message") stream.Close() } @@ -694,8 +694,8 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { // Verify single multiline event is flushed with a negative event buffer age argument := <-mockClient.putLogEventsArgument assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput") - assert.Equal(t, 1, len(argument.LogEvents), "Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) - assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Expected message to be %s but was %s", logline+logline, *argument.LogEvents[0].Message) + assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event") + assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message") stream.Close() } @@ -961,8 +961,8 @@ func TestParseLogOptionsMultilinePattern(t *testing.T) { } multilinePattern, err := parseMultilineOptions(info) - assert.Nil(t, err, "Received unexpected err: %v\n", err) - assert.True(t, multilinePattern.MatchString("xxxx"), "Expected multilinePattern to match string xxxx but no match found") + assert.Nil(t, err, "Received unexpected error") + assert.True(t, multilinePattern.MatchString("xxxx"), "No multiline pattern match found") } func TestParseLogOptionsDatetimeFormat(t *testing.T) { @@ -986,8 +986,8 @@ func TestParseLogOptionsDatetimeFormat(t *testing.T) { }, } multilinePattern, err := parseMultilineOptions(info) - assert.Nil(t, err, "Received unexpected err: %v\n", err) - assert.True(t, multilinePattern.MatchString(dt.match), "Expected multilinePattern %s to match string %s but no match found", dt.format, dt.match) + assert.Nil(t, err, "Received unexpected error") + assert.True(t, multilinePattern.MatchString(dt.match), "No multiline pattern match found") }) } } @@ -1001,8 +1001,8 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) { conflictingLogOptionsError := "you cannot configure log opt 'awslogs-datetime-format' and 'awslogs-multiline-pattern' at the same time" err := ValidateLogOpt(cfg) - assert.NotNil(t, err, "Expected an error but received nil") - assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received incorrect error: %v\n", err) + assert.NotNil(t, err, "Expected an error") + assert.Equal(t, err.Error(), conflictingLogOptionsError, "Received invalid error") } func TestCreateTagSuccess(t *testing.T) { From dc87490b632949394307e25dce17c8db6cf664c6 Mon Sep 17 00:00:00 2001 From: Justin Menga Date: Mon, 15 May 2017 11:08:16 +1200 Subject: [PATCH 9/9] Update comments describing new behaviour Signed-off-by: Justin Menga --- daemon/logger/awslogs/cloudwatchlogs.go | 32 +++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 818b7bdfc3..dc1cd5113a 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -359,22 +359,16 @@ var newTicker = func(freq time.Duration) *time.Ticker { } // collectBatch executes as a goroutine to perform batching of log events for -// submission to the log stream. Batching is performed on time- and size- -// bases. Time-based batching occurs at a 5 second interval (defined in the -// batchPublishFrequency const). Size-based batching is performed on the -// maximum number of events per batch (defined in maximumLogEventsPerPut) and -// the maximum number of total bytes in a batch (defined in -// maximumBytesPerPut). Log messages are split by the maximum bytes per event -// (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead -// (defined in perEventBytes) which is accounted for in split- and batch- -// calculations. If the awslogs-multiline-pattern or awslogs-datetime-format -// options have been configured, multiline processing is enabled, where -// log messages are stored in an event buffer until a multiline pattern match -// is found, at which point the messages in the event buffer are pushed to -// CloudWatch logs as a single log event. Multline messages still are processed +// submission to the log stream. If the awslogs-multiline-pattern or +// awslogs-datetime-format options have been configured, multiline processing +// is enabled, where log messages are stored in an event buffer until a multiline +// pattern match is found, at which point the messages in the event buffer are +// pushed to CloudWatch logs as a single log event. Multline messages are processed // according to the maximumBytesPerPut constraint, and the implementation only // allows for messages to be buffered for a maximum of 2*batchPublishFrequency -// seconds. +// seconds. When events are ready to be processed for submission to CloudWatch +// Logs, the processEvents method is called. If a multiline pattern is not +// configured, log events are submitted to the processEvents method immediately. func (l *logStream) collectBatch() { timer := newTicker(batchPublishFrequency) var events []wrappedEvent @@ -429,7 +423,15 @@ func (l *logStream) collectBatch() { } } -// processEvent processes log events +// processEvent processes log events that are ready for submission to CloudWatch +// logs. Batching is performed on time- and size-bases. Time-based batching +// occurs at a 5 second interval (defined in the batchPublishFrequency const). +// Size-based batching is performed on the maximum number of events per batch +// (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a +// batch (defined in maximumBytesPerPut). Log messages are split by the maximum +// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event +// byte overhead (defined in perEventBytes) which is accounted for in split- and +// batch-calculations. func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent { bytes := 0 for len(unprocessedLine) > 0 {