From 512ac778bfe9743848c71318c9e6785a78307155 Mon Sep 17 00:00:00 2001 From: Maximiliano Maccanti Date: Thu, 20 Dec 2018 18:41:47 +0000 Subject: [PATCH 1/3] Add two configurable options to awslogs driver Add awslogs-force-flush-interval-seconds and awslogs-max-buffered-events configurable options to aswlogs driver to replace hardcoded values of repsectively 5 seconds and 4K. Signed-off-by: Maximiliano Maccanti --- daemon/logger/awslogs/cloudwatchlogs.go | 79 +++++++++++++++----- daemon/logger/awslogs/cloudwatchlogs_test.go | 74 ++++++++++++++++-- 2 files changed, 127 insertions(+), 26 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index cec5e96831..2b1514246b 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -39,7 +39,11 @@ const ( datetimeFormatKey = "awslogs-datetime-format" multilinePatternKey = "awslogs-multiline-pattern" credentialsEndpointKey = "awslogs-credentials-endpoint" - batchPublishFrequency = 5 * time.Second + forceFlushIntervalKey = "awslogs-force-flush-interval-seconds" + maxBufferedEventsKey = "awslogs-max-buffered-events" + + defaultForceFlushInterval = 5 * time.Second + defaultMaxBufferedEvents = 4096 // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html perEventBytes = 26 @@ -64,16 +68,17 @@ const ( ) type logStream struct { - logStreamName string - logGroupName string - logCreateGroup bool - logNonBlocking bool - multilinePattern *regexp.Regexp - client api - messages chan *logger.Message - lock sync.RWMutex - closed bool - sequenceToken *string + logStreamName string + logGroupName string + logCreateGroup bool + logNonBlocking bool + forceFlushInterval time.Duration + multilinePattern *regexp.Regexp + client api + messages chan *logger.Message + lock sync.RWMutex + closed bool + sequenceToken *string } var _ logger.SizedLogger = &logStream{} @@ -138,6 +143,23 @@ func New(info logger.Info) (logger.Logger, error) { logNonBlocking := info.Config["mode"] == "non-blocking" + forceFlushInterval := defaultForceFlushInterval + if info.Config[forceFlushIntervalKey] != "" { + forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey]) + if err != nil { + return nil, err + } + forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second + } + + maxBufferedEvents := int(defaultMaxBufferedEvents) + if info.Config[maxBufferedEventsKey] != "" { + maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey]) + if err != nil { + return nil, err + } + } + if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] } @@ -153,13 +175,14 @@ func New(info logger.Info) (logger.Logger, error) { } containerStream := &logStream{ - logStreamName: logStreamName, - logGroupName: logGroupName, - logCreateGroup: logCreateGroup, - logNonBlocking: logNonBlocking, - multilinePattern: multilinePattern, - client: client, - messages: make(chan *logger.Message, 4096), + logStreamName: logStreamName, + logGroupName: logGroupName, + logCreateGroup: logCreateGroup, + logNonBlocking: logNonBlocking, + forceFlushInterval: forceFlushInterval, + multilinePattern: multilinePattern, + client: client, + messages: make(chan *logger.Message, maxBufferedEvents), } creationDone := make(chan bool) @@ -471,7 +494,11 @@ var newTicker = func(freq time.Duration) *time.Ticker { func (l *logStream) collectBatch(created chan bool) { // Wait for the logstream/group to be created <-created - ticker := newTicker(batchPublishFrequency) + flushInterval := l.forceFlushInterval + if flushInterval <= 0 { + flushInterval = defaultForceFlushInterval + } + ticker := newTicker(flushInterval) var eventBuffer []byte var eventBufferTimestamp int64 var batch = newEventBatch() @@ -481,7 +508,7 @@ func (l *logStream) collectBatch(created chan bool) { // If event buffer is older than batch publish frequency flush the event buffer if eventBufferTimestamp > 0 && len(eventBuffer) > 0 { eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp - eventBufferExpired := eventBufferAge >= int64(batchPublishFrequency)/int64(time.Millisecond) + eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond) eventBufferNegative := eventBufferAge < 0 if eventBufferExpired || eventBufferNegative { l.processEvent(batch, eventBuffer, eventBufferTimestamp) @@ -672,6 +699,8 @@ func ValidateLogOpt(cfg map[string]string) error { case datetimeFormatKey: case multilinePatternKey: case credentialsEndpointKey: + case forceFlushIntervalKey: + case maxBufferedEventsKey: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } @@ -684,6 +713,16 @@ func ValidateLogOpt(cfg map[string]string) error { return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err) } } + if cfg[forceFlushIntervalKey] != "" { + if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 { + return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey]) + } + } + if cfg[maxBufferedEventsKey] != "" { + if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 { + return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey]) + } + } _, datetimeFormatKeyExists := cfg[datetimeFormatKey] _, multilinePatternKeyExists := cfg[multilinePatternKey] if datetimeFormatKeyExists && multilinePatternKeyExists { diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index fdae99c76d..738a2a6189 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -762,10 +762,10 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { Timestamp: time.Now().Add(time.Second), }) - // Fire ticker batchPublishFrequency seconds later - ticks <- time.Now().Add(batchPublishFrequency + time.Second) + // Fire ticker defaultForceFlushInterval seconds later + ticks <- time.Now().Add(defaultForceFlushInterval + time.Second) - // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency) + // Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval) argument := <-mockClient.putLogEventsArgument assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") @@ -777,8 +777,8 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { Timestamp: time.Now().Add(time.Second), }) - // Fire ticker another batchPublishFrequency seconds later - ticks <- time.Now().Add(2*batchPublishFrequency + time.Second) + // Fire ticker another defaultForceFlushInterval seconds later + ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second) // Verify the event buffer is truly flushed - we should only receive a single event argument = <-mockClient.putLogEventsArgument @@ -880,7 +880,7 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { }) // Fire ticker - ticks <- time.Now().Add(batchPublishFrequency) + ticks <- time.Now().Add(defaultForceFlushInterval) // Verify multiline events // We expect a maximum sized event with no new line characters and a @@ -1419,6 +1419,68 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) { assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error") } +func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) { + cfg := map[string]string{ + forceFlushIntervalKey: "0", + logGroupKey: groupName, + } + nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': 0" + + err := ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[forceFlushIntervalKey] = "-1" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': -1" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[forceFlushIntervalKey] = "a" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': a" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[forceFlushIntervalKey] = "10" + + err = ValidateLogOpt(cfg) + assert.Check(t, err == nil, "Unexpected error") +} + +func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) { + cfg := map[string]string{ + maxBufferedEventsKey: "0", + logGroupKey: groupName, + } + nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-max-buffered-events': 0" + + err := ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[maxBufferedEventsKey] = "-1" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': -1" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[maxBufferedEventsKey] = "a" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': a" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[maxBufferedEventsKey] = "10" + + err = ValidateLogOpt(cfg) + assert.Check(t, err == nil, "Unexpected error") +} + func TestCreateTagSuccess(t *testing.T) { mockClient := newMockClient() info := logger.Info{ From 687cbfa73980af4f258ceb0694c03b1341b9c900 Mon Sep 17 00:00:00 2001 From: Maximiliano Maccanti Date: Fri, 21 Dec 2018 20:27:48 +0000 Subject: [PATCH 2/3] Split StreamConfig from New, Utest table driven Signed-off-by: Maximiliano Maccanti --- daemon/logger/awslogs/cloudwatchlogs.go | 124 +++++++++------ daemon/logger/awslogs/cloudwatchlogs_test.go | 152 +++++++++++++------ 2 files changed, 179 insertions(+), 97 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 2b1514246b..f9cf1a9840 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -81,6 +81,16 @@ type logStream struct { sequenceToken *string } +type logStreamConfig struct { + logStreamName string + logGroupName string + logCreateGroup bool + logNonBlocking bool + forceFlushInterval time.Duration + maxBufferedEvents int + multilinePattern *regexp.Regexp +} + var _ logger.SizedLogger = &logStream{} type api interface { @@ -128,6 +138,70 @@ type eventBatch struct { // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials // file (~/.aws/credentials), and the EC2 Instance Metadata Service. func New(info logger.Info) (logger.Logger, error) { + containerStreamConfig, err := newStreamConfig(info) + if err != nil { + return nil, err + } + client, err := newAWSLogsClient(info) + if err != nil { + return nil, err + } + + containerStream := &logStream{ + logStreamName: containerStreamConfig.logStreamName, + logGroupName: containerStreamConfig.logGroupName, + logCreateGroup: containerStreamConfig.logCreateGroup, + logNonBlocking: containerStreamConfig.logNonBlocking, + forceFlushInterval: containerStreamConfig.forceFlushInterval, + multilinePattern: containerStreamConfig.multilinePattern, + client: client, + messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents), + } + + creationDone := make(chan bool) + if containerStream.logNonBlocking { + go func() { + backoff := 1 + maxBackoff := 32 + for { + // If logger is closed we are done + containerStream.lock.RLock() + if containerStream.closed { + containerStream.lock.RUnlock() + break + } + containerStream.lock.RUnlock() + err := containerStream.create() + if err == nil { + break + } + + time.Sleep(time.Duration(backoff) * time.Second) + if backoff < maxBackoff { + backoff *= 2 + } + logrus. + WithError(err). + WithField("container-id", info.ContainerID). + WithField("container-name", info.ContainerName). + Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds") + } + close(creationDone) + }() + } else { + if err = containerStream.create(); err != nil { + return nil, err + } + close(creationDone) + } + go containerStream.collectBatch(creationDone) + + return containerStream, nil +} + +// Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream +// It has been formed out to ease Utest of the New above +func newStreamConfig(info logger.Info) (*logStreamConfig, error) { logGroupName := info.Config[logGroupKey] logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}") if err != nil { @@ -169,61 +243,17 @@ func New(info logger.Info) (logger.Logger, error) { return nil, err } - client, err := newAWSLogsClient(info) - if err != nil { - return nil, err - } - - containerStream := &logStream{ + containerStreamConfig := &logStreamConfig{ logStreamName: logStreamName, logGroupName: logGroupName, logCreateGroup: logCreateGroup, logNonBlocking: logNonBlocking, forceFlushInterval: forceFlushInterval, + maxBufferedEvents: maxBufferedEvents, multilinePattern: multilinePattern, - client: client, - messages: make(chan *logger.Message, maxBufferedEvents), } - creationDone := make(chan bool) - if logNonBlocking { - go func() { - backoff := 1 - maxBackoff := 32 - for { - // If logger is closed we are done - containerStream.lock.RLock() - if containerStream.closed { - containerStream.lock.RUnlock() - break - } - containerStream.lock.RUnlock() - err := containerStream.create() - if err == nil { - break - } - - time.Sleep(time.Duration(backoff) * time.Second) - if backoff < maxBackoff { - backoff *= 2 - } - logrus. - WithError(err). - WithField("container-id", info.ContainerID). - WithField("container-name", info.ContainerName). - Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds") - } - close(creationDone) - }() - } else { - if err = containerStream.create(); err != nil { - return nil, err - } - close(creationDone) - } - go containerStream.collectBatch(creationDone) - - return containerStream, nil + return containerStreamConfig, nil } // Parses awslogs-multiline-pattern and awslogs-datetime-format options diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 738a2a6189..1cf8acf56b 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -10,6 +10,7 @@ import ( "reflect" "regexp" "runtime" + "strconv" "strings" "testing" "time" @@ -59,6 +60,59 @@ func testEventBatch(events []wrappedEvent) *eventBatch { return batch } +func TestNewStreamConfig(t *testing.T) { + tests := []struct { + logStreamName string + logGroupName string + logCreateGroup string + logNonBlocking string + forceFlushInterval string + maxBufferedEvents string + datetimeFormat string + multilinePattern string + shouldErr bool + testName string + }{ + {"", groupName, "", "", "", "", "", "", false, "defaults"}, + {"", groupName, "invalid create group", "", "", "", "", "", true, "invalid create group"}, + {"", groupName, "", "", "invalid flush interval", "", "", "", true, "invalid flush interval"}, + {"", groupName, "", "", "", "invalid max buffered events", "", "", true, "invalid max buffered events"}, + {"", groupName, "", "", "", "", "", "n{1001}", true, "invalid multiline pattern"}, + {"", groupName, "", "", "15", "", "", "", true, "flush interval at 15"}, + {"", groupName, "", "", "", "1024", "", "", true, "max buffered events at 1024"}, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + cfg := map[string]string{ + logGroupKey: tc.logGroupName, + logCreateGroupKey: tc.logCreateGroup, + "mode": tc.logNonBlocking, + forceFlushIntervalKey: tc.forceFlushInterval, + maxBufferedEventsKey: tc.maxBufferedEvents, + logStreamKey: tc.logStreamName, + datetimeFormatKey: tc.datetimeFormat, + multilinePatternKey: tc.multilinePattern, + } + + info := logger.Info{ + Config: cfg, + } + logStreamConfig, err := newStreamConfig(info) + if tc.shouldErr { + assert.Check(t, err != nil, "Expected an error") + } else { + assert.Check(t, err == nil, "Unexpected error") + assert.Check(t, logStreamConfig.logGroupName == tc.logGroupName, "Unexpected logGroupName") + if tc.forceFlushInterval != "" { + forceFlushIntervalAsInt, _ := strconv.Atoi(info.Config[forceFlushIntervalKey]) + assert.Check(t, logStreamConfig.forceFlushInterval == time.Duration(forceFlushIntervalAsInt)*time.Second, "Unexpected forceFlushInterval") + } + } + }) + } +} + func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { info := logger.Info{ Config: map[string]string{ @@ -1420,65 +1474,63 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) { } func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) { - cfg := map[string]string{ - forceFlushIntervalKey: "0", - logGroupKey: groupName, + tests := []struct { + input string + shouldErr bool + }{ + {"0", true}, + {"-1", true}, + {"a", true}, + {"10", false}, } - nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': 0" - err := ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + for _, tc := range tests { + t.Run(tc.input, func(t *testing.T) { + cfg := map[string]string{ + forceFlushIntervalKey: tc.input, + logGroupKey: groupName, + } - cfg[forceFlushIntervalKey] = "-1" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': -1" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[forceFlushIntervalKey] = "a" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': a" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[forceFlushIntervalKey] = "10" - - err = ValidateLogOpt(cfg) - assert.Check(t, err == nil, "Unexpected error") + err := ValidateLogOpt(cfg) + if tc.shouldErr { + expectedErr := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': " + tc.input + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error") + } else { + assert.Check(t, err == nil, "Unexpected error") + } + }) + } } func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) { - cfg := map[string]string{ - maxBufferedEventsKey: "0", - logGroupKey: groupName, + tests := []struct { + input string + shouldErr bool + }{ + {"0", true}, + {"-1", true}, + {"a", true}, + {"10", false}, } - nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-max-buffered-events': 0" - err := ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + for _, tc := range tests { + t.Run(tc.input, func(t *testing.T) { + cfg := map[string]string{ + maxBufferedEventsKey: tc.input, + logGroupKey: groupName, + } - cfg[maxBufferedEventsKey] = "-1" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': -1" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[maxBufferedEventsKey] = "a" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': a" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[maxBufferedEventsKey] = "10" - - err = ValidateLogOpt(cfg) - assert.Check(t, err == nil, "Unexpected error") + err := ValidateLogOpt(cfg) + if tc.shouldErr { + expectedErr := "must specify a positive integer for log opt 'awslogs-max-buffered-events': " + tc.input + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error") + } else { + assert.Check(t, err == nil, "Unexpected error") + } + }) + } } func TestCreateTagSuccess(t *testing.T) { From ad8a8e8a9e88980194a648ed165403d9f26fa904 Mon Sep 17 00:00:00 2001 From: Maximiliano Maccanti Date: Fri, 21 Dec 2018 22:24:40 +0000 Subject: [PATCH 3/3] NewStreamConfig UTest fixes Signed-off-by: Maximiliano Maccanti --- daemon/logger/awslogs/cloudwatchlogs_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 1cf8acf56b..3414dd51f8 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -78,8 +78,8 @@ func TestNewStreamConfig(t *testing.T) { {"", groupName, "", "", "invalid flush interval", "", "", "", true, "invalid flush interval"}, {"", groupName, "", "", "", "invalid max buffered events", "", "", true, "invalid max buffered events"}, {"", groupName, "", "", "", "", "", "n{1001}", true, "invalid multiline pattern"}, - {"", groupName, "", "", "15", "", "", "", true, "flush interval at 15"}, - {"", groupName, "", "", "", "1024", "", "", true, "max buffered events at 1024"}, + {"", groupName, "", "", "15", "", "", "", false, "flush interval at 15"}, + {"", groupName, "", "", "", "1024", "", "", false, "max buffered events at 1024"}, } for _, tc := range tests { @@ -108,6 +108,10 @@ func TestNewStreamConfig(t *testing.T) { forceFlushIntervalAsInt, _ := strconv.Atoi(info.Config[forceFlushIntervalKey]) assert.Check(t, logStreamConfig.forceFlushInterval == time.Duration(forceFlushIntervalAsInt)*time.Second, "Unexpected forceFlushInterval") } + if tc.maxBufferedEvents != "" { + maxBufferedEvents, _ := strconv.Atoi(info.Config[maxBufferedEventsKey]) + assert.Check(t, logStreamConfig.maxBufferedEvents == maxBufferedEvents, "Unexpected maxBufferedEvents") + } } }) }