mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Split StreamConfig from New, Utest table driven
Signed-off-by: Maximiliano Maccanti <maccanti@amazon.com>
This commit is contained in:
parent
512ac778bf
commit
687cbfa739
2 changed files with 179 additions and 97 deletions
|
@ -81,6 +81,16 @@ type logStream struct {
|
|||
sequenceToken *string
|
||||
}
|
||||
|
||||
type logStreamConfig struct {
|
||||
logStreamName string
|
||||
logGroupName string
|
||||
logCreateGroup bool
|
||||
logNonBlocking bool
|
||||
forceFlushInterval time.Duration
|
||||
maxBufferedEvents int
|
||||
multilinePattern *regexp.Regexp
|
||||
}
|
||||
|
||||
var _ logger.SizedLogger = &logStream{}
|
||||
|
||||
type api interface {
|
||||
|
@ -128,6 +138,70 @@ type eventBatch struct {
|
|||
// AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, the shared credentials
|
||||
// file (~/.aws/credentials), and the EC2 Instance Metadata Service.
|
||||
func New(info logger.Info) (logger.Logger, error) {
|
||||
containerStreamConfig, err := newStreamConfig(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := newAWSLogsClient(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
containerStream := &logStream{
|
||||
logStreamName: containerStreamConfig.logStreamName,
|
||||
logGroupName: containerStreamConfig.logGroupName,
|
||||
logCreateGroup: containerStreamConfig.logCreateGroup,
|
||||
logNonBlocking: containerStreamConfig.logNonBlocking,
|
||||
forceFlushInterval: containerStreamConfig.forceFlushInterval,
|
||||
multilinePattern: containerStreamConfig.multilinePattern,
|
||||
client: client,
|
||||
messages: make(chan *logger.Message, containerStreamConfig.maxBufferedEvents),
|
||||
}
|
||||
|
||||
creationDone := make(chan bool)
|
||||
if containerStream.logNonBlocking {
|
||||
go func() {
|
||||
backoff := 1
|
||||
maxBackoff := 32
|
||||
for {
|
||||
// If logger is closed we are done
|
||||
containerStream.lock.RLock()
|
||||
if containerStream.closed {
|
||||
containerStream.lock.RUnlock()
|
||||
break
|
||||
}
|
||||
containerStream.lock.RUnlock()
|
||||
err := containerStream.create()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(backoff) * time.Second)
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
}
|
||||
logrus.
|
||||
WithError(err).
|
||||
WithField("container-id", info.ContainerID).
|
||||
WithField("container-name", info.ContainerName).
|
||||
Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
|
||||
}
|
||||
close(creationDone)
|
||||
}()
|
||||
} else {
|
||||
if err = containerStream.create(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
close(creationDone)
|
||||
}
|
||||
go containerStream.collectBatch(creationDone)
|
||||
|
||||
return containerStream, nil
|
||||
}
|
||||
|
||||
// Parses most of the awslogs- options and prepares a config object to be used for newing the actual stream
|
||||
// It has been formed out to ease Utest of the New above
|
||||
func newStreamConfig(info logger.Info) (*logStreamConfig, error) {
|
||||
logGroupName := info.Config[logGroupKey]
|
||||
logStreamName, err := loggerutils.ParseLogTag(info, "{{.FullID}}")
|
||||
if err != nil {
|
||||
|
@ -169,61 +243,17 @@ func New(info logger.Info) (logger.Logger, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
client, err := newAWSLogsClient(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
containerStream := &logStream{
|
||||
containerStreamConfig := &logStreamConfig{
|
||||
logStreamName: logStreamName,
|
||||
logGroupName: logGroupName,
|
||||
logCreateGroup: logCreateGroup,
|
||||
logNonBlocking: logNonBlocking,
|
||||
forceFlushInterval: forceFlushInterval,
|
||||
maxBufferedEvents: maxBufferedEvents,
|
||||
multilinePattern: multilinePattern,
|
||||
client: client,
|
||||
messages: make(chan *logger.Message, maxBufferedEvents),
|
||||
}
|
||||
|
||||
creationDone := make(chan bool)
|
||||
if logNonBlocking {
|
||||
go func() {
|
||||
backoff := 1
|
||||
maxBackoff := 32
|
||||
for {
|
||||
// If logger is closed we are done
|
||||
containerStream.lock.RLock()
|
||||
if containerStream.closed {
|
||||
containerStream.lock.RUnlock()
|
||||
break
|
||||
}
|
||||
containerStream.lock.RUnlock()
|
||||
err := containerStream.create()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(backoff) * time.Second)
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
}
|
||||
logrus.
|
||||
WithError(err).
|
||||
WithField("container-id", info.ContainerID).
|
||||
WithField("container-name", info.ContainerName).
|
||||
Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
|
||||
}
|
||||
close(creationDone)
|
||||
}()
|
||||
} else {
|
||||
if err = containerStream.create(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
close(creationDone)
|
||||
}
|
||||
go containerStream.collectBatch(creationDone)
|
||||
|
||||
return containerStream, nil
|
||||
return containerStreamConfig, nil
|
||||
}
|
||||
|
||||
// Parses awslogs-multiline-pattern and awslogs-datetime-format options
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"reflect"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -59,6 +60,59 @@ func testEventBatch(events []wrappedEvent) *eventBatch {
|
|||
return batch
|
||||
}
|
||||
|
||||
func TestNewStreamConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
logStreamName string
|
||||
logGroupName string
|
||||
logCreateGroup string
|
||||
logNonBlocking string
|
||||
forceFlushInterval string
|
||||
maxBufferedEvents string
|
||||
datetimeFormat string
|
||||
multilinePattern string
|
||||
shouldErr bool
|
||||
testName string
|
||||
}{
|
||||
{"", groupName, "", "", "", "", "", "", false, "defaults"},
|
||||
{"", groupName, "invalid create group", "", "", "", "", "", true, "invalid create group"},
|
||||
{"", groupName, "", "", "invalid flush interval", "", "", "", true, "invalid flush interval"},
|
||||
{"", groupName, "", "", "", "invalid max buffered events", "", "", true, "invalid max buffered events"},
|
||||
{"", groupName, "", "", "", "", "", "n{1001}", true, "invalid multiline pattern"},
|
||||
{"", groupName, "", "", "15", "", "", "", true, "flush interval at 15"},
|
||||
{"", groupName, "", "", "", "1024", "", "", true, "max buffered events at 1024"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
cfg := map[string]string{
|
||||
logGroupKey: tc.logGroupName,
|
||||
logCreateGroupKey: tc.logCreateGroup,
|
||||
"mode": tc.logNonBlocking,
|
||||
forceFlushIntervalKey: tc.forceFlushInterval,
|
||||
maxBufferedEventsKey: tc.maxBufferedEvents,
|
||||
logStreamKey: tc.logStreamName,
|
||||
datetimeFormatKey: tc.datetimeFormat,
|
||||
multilinePatternKey: tc.multilinePattern,
|
||||
}
|
||||
|
||||
info := logger.Info{
|
||||
Config: cfg,
|
||||
}
|
||||
logStreamConfig, err := newStreamConfig(info)
|
||||
if tc.shouldErr {
|
||||
assert.Check(t, err != nil, "Expected an error")
|
||||
} else {
|
||||
assert.Check(t, err == nil, "Unexpected error")
|
||||
assert.Check(t, logStreamConfig.logGroupName == tc.logGroupName, "Unexpected logGroupName")
|
||||
if tc.forceFlushInterval != "" {
|
||||
forceFlushIntervalAsInt, _ := strconv.Atoi(info.Config[forceFlushIntervalKey])
|
||||
assert.Check(t, logStreamConfig.forceFlushInterval == time.Duration(forceFlushIntervalAsInt)*time.Second, "Unexpected forceFlushInterval")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewAWSLogsClientUserAgentHandler(t *testing.T) {
|
||||
info := logger.Info{
|
||||
Config: map[string]string{
|
||||
|
@ -1420,66 +1474,64 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
shouldErr bool
|
||||
}{
|
||||
{"0", true},
|
||||
{"-1", true},
|
||||
{"a", true},
|
||||
{"10", false},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.input, func(t *testing.T) {
|
||||
cfg := map[string]string{
|
||||
forceFlushIntervalKey: "0",
|
||||
forceFlushIntervalKey: tc.input,
|
||||
logGroupKey: groupName,
|
||||
}
|
||||
nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': 0"
|
||||
|
||||
err := ValidateLogOpt(cfg)
|
||||
if tc.shouldErr {
|
||||
expectedErr := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': " + tc.input
|
||||
assert.Check(t, err != nil, "Expected an error")
|
||||
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
|
||||
|
||||
cfg[forceFlushIntervalKey] = "-1"
|
||||
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': -1"
|
||||
|
||||
err = ValidateLogOpt(cfg)
|
||||
assert.Check(t, err != nil, "Expected an error")
|
||||
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
|
||||
|
||||
cfg[forceFlushIntervalKey] = "a"
|
||||
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': a"
|
||||
|
||||
err = ValidateLogOpt(cfg)
|
||||
assert.Check(t, err != nil, "Expected an error")
|
||||
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
|
||||
|
||||
cfg[forceFlushIntervalKey] = "10"
|
||||
|
||||
err = ValidateLogOpt(cfg)
|
||||
assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error")
|
||||
} else {
|
||||
assert.Check(t, err == nil, "Unexpected error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
shouldErr bool
|
||||
}{
|
||||
{"0", true},
|
||||
{"-1", true},
|
||||
{"a", true},
|
||||
{"10", false},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.input, func(t *testing.T) {
|
||||
cfg := map[string]string{
|
||||
maxBufferedEventsKey: "0",
|
||||
maxBufferedEventsKey: tc.input,
|
||||
logGroupKey: groupName,
|
||||
}
|
||||
nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-max-buffered-events': 0"
|
||||
|
||||
err := ValidateLogOpt(cfg)
|
||||
if tc.shouldErr {
|
||||
expectedErr := "must specify a positive integer for log opt 'awslogs-max-buffered-events': " + tc.input
|
||||
assert.Check(t, err != nil, "Expected an error")
|
||||
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
|
||||
|
||||
cfg[maxBufferedEventsKey] = "-1"
|
||||
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': -1"
|
||||
|
||||
err = ValidateLogOpt(cfg)
|
||||
assert.Check(t, err != nil, "Expected an error")
|
||||
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
|
||||
|
||||
cfg[maxBufferedEventsKey] = "a"
|
||||
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': a"
|
||||
|
||||
err = ValidateLogOpt(cfg)
|
||||
assert.Check(t, err != nil, "Expected an error")
|
||||
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
|
||||
|
||||
cfg[maxBufferedEventsKey] = "10"
|
||||
|
||||
err = ValidateLogOpt(cfg)
|
||||
assert.Check(t, is.Equal(err.Error(), expectedErr), "Received invalid error")
|
||||
} else {
|
||||
assert.Check(t, err == nil, "Unexpected error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateTagSuccess(t *testing.T) {
|
||||
mockClient := newMockClient()
|
||||
|
|
Loading…
Add table
Reference in a new issue