From 512ac778bfe9743848c71318c9e6785a78307155 Mon Sep 17 00:00:00 2001 From: Maximiliano Maccanti Date: Thu, 20 Dec 2018 18:41:47 +0000 Subject: [PATCH] Add two configurable options to awslogs driver Add awslogs-force-flush-interval-seconds and awslogs-max-buffered-events configurable options to aswlogs driver to replace hardcoded values of repsectively 5 seconds and 4K. Signed-off-by: Maximiliano Maccanti --- daemon/logger/awslogs/cloudwatchlogs.go | 79 +++++++++++++++----- daemon/logger/awslogs/cloudwatchlogs_test.go | 74 ++++++++++++++++-- 2 files changed, 127 insertions(+), 26 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index cec5e96831..2b1514246b 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -39,7 +39,11 @@ const ( datetimeFormatKey = "awslogs-datetime-format" multilinePatternKey = "awslogs-multiline-pattern" credentialsEndpointKey = "awslogs-credentials-endpoint" - batchPublishFrequency = 5 * time.Second + forceFlushIntervalKey = "awslogs-force-flush-interval-seconds" + maxBufferedEventsKey = "awslogs-max-buffered-events" + + defaultForceFlushInterval = 5 * time.Second + defaultMaxBufferedEvents = 4096 // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html perEventBytes = 26 @@ -64,16 +68,17 @@ const ( ) type logStream struct { - logStreamName string - logGroupName string - logCreateGroup bool - logNonBlocking bool - multilinePattern *regexp.Regexp - client api - messages chan *logger.Message - lock sync.RWMutex - closed bool - sequenceToken *string + logStreamName string + logGroupName string + logCreateGroup bool + logNonBlocking bool + forceFlushInterval time.Duration + multilinePattern *regexp.Regexp + client api + messages chan *logger.Message + lock sync.RWMutex + closed bool + sequenceToken *string } var _ logger.SizedLogger = &logStream{} @@ -138,6 +143,23 @@ func New(info logger.Info) (logger.Logger, error) { logNonBlocking := info.Config["mode"] == "non-blocking" + forceFlushInterval := defaultForceFlushInterval + if info.Config[forceFlushIntervalKey] != "" { + forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey]) + if err != nil { + return nil, err + } + forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second + } + + maxBufferedEvents := int(defaultMaxBufferedEvents) + if info.Config[maxBufferedEventsKey] != "" { + maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey]) + if err != nil { + return nil, err + } + } + if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] } @@ -153,13 +175,14 @@ func New(info logger.Info) (logger.Logger, error) { } containerStream := &logStream{ - logStreamName: logStreamName, - logGroupName: logGroupName, - logCreateGroup: logCreateGroup, - logNonBlocking: logNonBlocking, - multilinePattern: multilinePattern, - client: client, - messages: make(chan *logger.Message, 4096), + logStreamName: logStreamName, + logGroupName: logGroupName, + logCreateGroup: logCreateGroup, + logNonBlocking: logNonBlocking, + forceFlushInterval: forceFlushInterval, + multilinePattern: multilinePattern, + client: client, + messages: make(chan *logger.Message, maxBufferedEvents), } creationDone := make(chan bool) @@ -471,7 +494,11 @@ var newTicker = func(freq time.Duration) *time.Ticker { func (l *logStream) collectBatch(created chan bool) { // Wait for the logstream/group to be created <-created - ticker := newTicker(batchPublishFrequency) + flushInterval := l.forceFlushInterval + if flushInterval <= 0 { + flushInterval = defaultForceFlushInterval + } + ticker := newTicker(flushInterval) var eventBuffer []byte var eventBufferTimestamp int64 var batch = newEventBatch() @@ -481,7 +508,7 @@ func (l *logStream) collectBatch(created chan bool) { // If event buffer is older than batch publish frequency flush the event buffer if eventBufferTimestamp > 0 && len(eventBuffer) > 0 { eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp - eventBufferExpired := eventBufferAge >= int64(batchPublishFrequency)/int64(time.Millisecond) + eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond) eventBufferNegative := eventBufferAge < 0 if eventBufferExpired || eventBufferNegative { l.processEvent(batch, eventBuffer, eventBufferTimestamp) @@ -672,6 +699,8 @@ func ValidateLogOpt(cfg map[string]string) error { case datetimeFormatKey: case multilinePatternKey: case credentialsEndpointKey: + case forceFlushIntervalKey: + case maxBufferedEventsKey: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } @@ -684,6 +713,16 @@ func ValidateLogOpt(cfg map[string]string) error { return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err) } } + if cfg[forceFlushIntervalKey] != "" { + if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 { + return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey]) + } + } + if cfg[maxBufferedEventsKey] != "" { + if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 { + return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey]) + } + } _, datetimeFormatKeyExists := cfg[datetimeFormatKey] _, multilinePatternKeyExists := cfg[multilinePatternKey] if datetimeFormatKeyExists && multilinePatternKeyExists { diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index fdae99c76d..738a2a6189 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -762,10 +762,10 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { Timestamp: time.Now().Add(time.Second), }) - // Fire ticker batchPublishFrequency seconds later - ticks <- time.Now().Add(batchPublishFrequency + time.Second) + // Fire ticker defaultForceFlushInterval seconds later + ticks <- time.Now().Add(defaultForceFlushInterval + time.Second) - // Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency) + // Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval) argument := <-mockClient.putLogEventsArgument assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput") assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event") @@ -777,8 +777,8 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { Timestamp: time.Now().Add(time.Second), }) - // Fire ticker another batchPublishFrequency seconds later - ticks <- time.Now().Add(2*batchPublishFrequency + time.Second) + // Fire ticker another defaultForceFlushInterval seconds later + ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second) // Verify the event buffer is truly flushed - we should only receive a single event argument = <-mockClient.putLogEventsArgument @@ -880,7 +880,7 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { }) // Fire ticker - ticks <- time.Now().Add(batchPublishFrequency) + ticks <- time.Now().Add(defaultForceFlushInterval) // Verify multiline events // We expect a maximum sized event with no new line characters and a @@ -1419,6 +1419,68 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) { assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error") } +func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) { + cfg := map[string]string{ + forceFlushIntervalKey: "0", + logGroupKey: groupName, + } + nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': 0" + + err := ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[forceFlushIntervalKey] = "-1" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': -1" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[forceFlushIntervalKey] = "a" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': a" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[forceFlushIntervalKey] = "10" + + err = ValidateLogOpt(cfg) + assert.Check(t, err == nil, "Unexpected error") +} + +func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) { + cfg := map[string]string{ + maxBufferedEventsKey: "0", + logGroupKey: groupName, + } + nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-max-buffered-events': 0" + + err := ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[maxBufferedEventsKey] = "-1" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': -1" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[maxBufferedEventsKey] = "a" + nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': a" + + err = ValidateLogOpt(cfg) + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + + cfg[maxBufferedEventsKey] = "10" + + err = ValidateLogOpt(cfg) + assert.Check(t, err == nil, "Unexpected error") +} + func TestCreateTagSuccess(t *testing.T) { mockClient := newMockClient() info := logger.Info{