diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 2b1514246b..f9cf1a9840 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -81,6 +81,16 @@ type logStream struct { sequenceToken *string } +type logStreamConfig struct { + logStreamName string + logGroupName string + logCreateGroup bool + logNonBlocking bool + forceFlushInterval time.Duration + maxBufferedEvents int + multilinePattern *regexp.Regexp +} + var _ logger.SizedLogger = &logStream{} type api interface { @@ -128,6 +138,70 @@ type eventBatch struct { // AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials // file (~/.aws/credentials), and the EC2 Instance Metadata Service. func New(info logger.Info) (logger.Logger, error) { + containerStreamConfig, err := newStreamConfig(info) + if err != nil { + return nil, err + } + client, err := newAWSLogsClient(info) + if err != nil { + return nil, err + } + + containerStream := &logStream{ + logStreamName: containerStreamConfig.logStreamName, + logGroupName: containerStreamConfig.logGroupName, + logCreateGroup: containerStreamConfig.logCreateGroup, + logNonBlocking: containerStreamConfig.logNonBlocking, + forceFlushInterval: containerStreamConfig.forceFlushInterval, + multilinePattern: containerStreamConfig.multilinePattern, + client: client, + messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents), + } + + creationDone := make(chan bool) + if containerStream.logNonBlocking { + go func() { + backoff := 1 + maxBackoff := 32 + for { + // If logger is closed we are done + containerStream.lock.RLock() + if containerStream.closed { + containerStream.lock.RUnlock() + break + } + containerStream.lock.RUnlock() + err := containerStream.create() + if err == nil { + break + } + + time.Sleep(time.Duration(backoff) * time.Second) + if backoff < maxBackoff { + backoff *= 2 + } + logrus. + WithError(err). + WithField("container-id", info.ContainerID). + WithField("container-name", info.ContainerName). + Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds") + } + close(creationDone) + }() + } else { + if err = containerStream.create(); err != nil { + return nil, err + } + close(creationDone) + } + go containerStream.collectBatch(creationDone) + + return containerStream, nil +} + +// Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream +// It has been formed out to ease Utest of the New above +func newStreamConfig(info logger.Info) (*logStreamConfig, error) { logGroupName := info.Config[logGroupKey] logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}") if err != nil { @@ -169,61 +243,17 @@ func New(info logger.Info) (logger.Logger, error) { return nil, err } - client, err := newAWSLogsClient(info) - if err != nil { - return nil, err - } - - containerStream := &logStream{ + containerStreamConfig := &logStreamConfig{ logStreamName: logStreamName, logGroupName: logGroupName, logCreateGroup: logCreateGroup, logNonBlocking: logNonBlocking, forceFlushInterval: forceFlushInterval, + maxBufferedEvents: maxBufferedEvents, multilinePattern: multilinePattern, - client: client, - messages: make(chan *logger.Message, maxBufferedEvents), } - creationDone := make(chan bool) - if logNonBlocking { - go func() { - backoff := 1 - maxBackoff := 32 - for { - // If logger is closed we are done - containerStream.lock.RLock() - if containerStream.closed { - containerStream.lock.RUnlock() - break - } - containerStream.lock.RUnlock() - err := containerStream.create() - if err == nil { - break - } - - time.Sleep(time.Duration(backoff) * time.Second) - if backoff < maxBackoff { - backoff *= 2 - } - logrus. - WithError(err). - WithField("container-id", info.ContainerID). - WithField("container-name", info.ContainerName). - Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds") - } - close(creationDone) - }() - } else { - if err = containerStream.create(); err != nil { - return nil, err - } - close(creationDone) - } - go containerStream.collectBatch(creationDone) - - return containerStream, nil + return containerStreamConfig, nil } // Parses awslogs-multiline-pattern and awslogs-datetime-format options diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 738a2a6189..1cf8acf56b 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -10,6 +10,7 @@ import ( "reflect" "regexp" "runtime" + "strconv" "strings" "testing" "time" @@ -59,6 +60,59 @@ func testEventBatch(events []wrappedEvent) *eventBatch { return batch } +func TestNewStreamConfig(t *testing.T) { + tests := []struct { + logStreamName string + logGroupName string + logCreateGroup string + logNonBlocking string + forceFlushInterval string + maxBufferedEvents string + datetimeFormat string + multilinePattern string + shouldErr bool + testName string + }{ + {"", groupName, "", "", "", "", "", "", false, "defaults"}, + {"", groupName, "invalid create group", "", "", "", "", "", true, "invalid create group"}, + {"", groupName, "", "", "invalid flush interval", "", "", "", true, "invalid flush interval"}, + {"", groupName, "", "", "", "invalid max buffered events", "", "", true, "invalid max buffered events"}, + {"", groupName, "", "", "", "", "", "n{1001}", true, "invalid multiline pattern"}, + {"", groupName, "", "", "15", "", "", "", true, "flush interval at 15"}, + {"", groupName, "", "", "", "1024", "", "", true, "max buffered events at 1024"}, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + cfg := map[string]string{ + logGroupKey: tc.logGroupName, + logCreateGroupKey: tc.logCreateGroup, + "mode": tc.logNonBlocking, + forceFlushIntervalKey: tc.forceFlushInterval, + maxBufferedEventsKey: tc.maxBufferedEvents, + logStreamKey: tc.logStreamName, + datetimeFormatKey: tc.datetimeFormat, + multilinePatternKey: tc.multilinePattern, + } + + info := logger.Info{ + Config: cfg, + } + logStreamConfig, err := newStreamConfig(info) + if tc.shouldErr { + assert.Check(t, err != nil, "Expected an error") + } else { + assert.Check(t, err == nil, "Unexpected error") + assert.Check(t, logStreamConfig.logGroupName == tc.logGroupName, "Unexpected logGroupName") + if tc.forceFlushInterval != "" { + forceFlushIntervalAsInt, _ := strconv.Atoi(info.Config[forceFlushIntervalKey]) + assert.Check(t, logStreamConfig.forceFlushInterval == time.Duration(forceFlushIntervalAsInt)*time.Second, "Unexpected forceFlushInterval") + } + } + }) + } +} + func TestNewAWSLogsClientUserAgentHandler(t *testing.T) { info := logger.Info{ Config: map[string]string{ @@ -1420,65 +1474,63 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) { } func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) { - cfg := map[string]string{ - forceFlushIntervalKey: "0", - logGroupKey: groupName, + tests := []struct { + input string + shouldErr bool + }{ + {"0", true}, + {"-1", true}, + {"a", true}, + {"10", false}, } - nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': 0" - err := ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + for _, tc := range tests { + t.Run(tc.input, func(t *testing.T) { + cfg := map[string]string{ + forceFlushIntervalKey: tc.input, + logGroupKey: groupName, + } - cfg[forceFlushIntervalKey] = "-1" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': -1" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[forceFlushIntervalKey] = "a" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': a" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[forceFlushIntervalKey] = "10" - - err = ValidateLogOpt(cfg) - assert.Check(t, err == nil, "Unexpected error") + err := ValidateLogOpt(cfg) + if tc.shouldErr { + expectedErr := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': " + tc.input + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error") + } else { + assert.Check(t, err == nil, "Unexpected error") + } + }) + } } func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) { - cfg := map[string]string{ - maxBufferedEventsKey: "0", - logGroupKey: groupName, + tests := []struct { + input string + shouldErr bool + }{ + {"0", true}, + {"-1", true}, + {"a", true}, + {"10", false}, } - nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-max-buffered-events': 0" - err := ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") + for _, tc := range tests { + t.Run(tc.input, func(t *testing.T) { + cfg := map[string]string{ + maxBufferedEventsKey: tc.input, + logGroupKey: groupName, + } - cfg[maxBufferedEventsKey] = "-1" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': -1" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[maxBufferedEventsKey] = "a" - nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': a" - - err = ValidateLogOpt(cfg) - assert.Check(t, err != nil, "Expected an error") - assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error") - - cfg[maxBufferedEventsKey] = "10" - - err = ValidateLogOpt(cfg) - assert.Check(t, err == nil, "Unexpected error") + err := ValidateLogOpt(cfg) + if tc.shouldErr { + expectedErr := "must specify a positive integer for log opt 'awslogs-max-buffered-events': " + tc.input + assert.Check(t, err != nil, "Expected an error") + assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error") + } else { + assert.Check(t, err == nil, "Unexpected error") + } + }) + } } func TestCreateTagSuccess(t *testing.T) {