1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Add two configurable options to awslogs driver

Add awslogs-force-flush-interval-seconds and awslogs-max-buffered-events configurable options to aswlogs driver to replace hardcoded values of repsectively 5 seconds and 4K.

Signed-off-by: Maximiliano Maccanti <maccanti@amazon.com>
This commit is contained in:
Maximiliano Maccanti 2018-12-20 18:41:47 +00:00
parent 2cb26cfe9c
commit 512ac778bf
2 changed files with 127 additions and 26 deletions

View file

@ -39,7 +39,11 @@ const (
datetimeFormatKey = "awslogs-datetime-format"
multilinePatternKey = "awslogs-multiline-pattern"
credentialsEndpointKey = "awslogs-credentials-endpoint"
batchPublishFrequency = 5 * time.Second
forceFlushIntervalKey = "awslogs-force-flush-interval-seconds"
maxBufferedEventsKey = "awslogs-max-buffered-events"
defaultForceFlushInterval = 5 * time.Second
defaultMaxBufferedEvents = 4096
// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
perEventBytes = 26
@ -64,16 +68,17 @@ const (
)
type logStream struct {
logStreamName string
logGroupName string
logCreateGroup bool
logNonBlocking bool
multilinePattern *regexp.Regexp
client api
messages chan *logger.Message
lock sync.RWMutex
closed bool
sequenceToken *string
logStreamName string
logGroupName string
logCreateGroup bool
logNonBlocking bool
forceFlushInterval time.Duration
multilinePattern *regexp.Regexp
client api
messages chan *logger.Message
lock sync.RWMutex
closed bool
sequenceToken *string
}
var _ logger.SizedLogger = &logStream{}
@ -138,6 +143,23 @@ func New(info logger.Info) (logger.Logger, error) {
logNonBlocking := info.Config["mode"] == "non-blocking"
forceFlushInterval := defaultForceFlushInterval
if info.Config[forceFlushIntervalKey] != "" {
forceFlushIntervalAsInt, err := strconv.Atoi(info.Config[forceFlushIntervalKey])
if err != nil {
return nil, err
}
forceFlushInterval = time.Duration(forceFlushIntervalAsInt) * time.Second
}
maxBufferedEvents := int(defaultMaxBufferedEvents)
if info.Config[maxBufferedEventsKey] != "" {
maxBufferedEvents, err = strconv.Atoi(info.Config[maxBufferedEventsKey])
if err != nil {
return nil, err
}
}
if info.Config[logStreamKey] != "" {
logStreamName = info.Config[logStreamKey]
}
@ -153,13 +175,14 @@ func New(info logger.Info) (logger.Logger, error) {
}
containerStream := &logStream{
logStreamName: logStreamName,
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
logNonBlocking: logNonBlocking,
multilinePattern: multilinePattern,
client: client,
messages: make(chan *logger.Message, 4096),
logStreamName: logStreamName,
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
logNonBlocking: logNonBlocking,
forceFlushInterval: forceFlushInterval,
multilinePattern: multilinePattern,
client: client,
messages: make(chan *logger.Message, maxBufferedEvents),
}
creationDone := make(chan bool)
@ -471,7 +494,11 @@ var newTicker = func(freq time.Duration) *time.Ticker {
func (l *logStream) collectBatch(created chan bool) {
// Wait for the logstream/group to be created
<-created
ticker := newTicker(batchPublishFrequency)
flushInterval := l.forceFlushInterval
if flushInterval <= 0 {
flushInterval = defaultForceFlushInterval
}
ticker := newTicker(flushInterval)
var eventBuffer []byte
var eventBufferTimestamp int64
var batch = newEventBatch()
@ -481,7 +508,7 @@ func (l *logStream) collectBatch(created chan bool) {
// If event buffer is older than batch publish frequency flush the event buffer
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
eventBufferExpired := eventBufferAge >= int64(batchPublishFrequency)/int64(time.Millisecond)
eventBufferExpired := eventBufferAge >= int64(flushInterval)/int64(time.Millisecond)
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
@ -672,6 +699,8 @@ func ValidateLogOpt(cfg map[string]string) error {
case datetimeFormatKey:
case multilinePatternKey:
case credentialsEndpointKey:
case forceFlushIntervalKey:
case maxBufferedEventsKey:
default:
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
}
@ -684,6 +713,16 @@ func ValidateLogOpt(cfg map[string]string) error {
return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
}
}
if cfg[forceFlushIntervalKey] != "" {
if value, err := strconv.Atoi(cfg[forceFlushIntervalKey]); err != nil || value <= 0 {
return fmt.Errorf("must specify a positive integer for log opt '%s': %v", forceFlushIntervalKey, cfg[forceFlushIntervalKey])
}
}
if cfg[maxBufferedEventsKey] != "" {
if value, err := strconv.Atoi(cfg[maxBufferedEventsKey]); err != nil || value <= 0 {
return fmt.Errorf("must specify a positive integer for log opt '%s': %v", maxBufferedEventsKey, cfg[maxBufferedEventsKey])
}
}
_, datetimeFormatKeyExists := cfg[datetimeFormatKey]
_, multilinePatternKeyExists := cfg[multilinePatternKey]
if datetimeFormatKeyExists && multilinePatternKeyExists {

View file

@ -762,10 +762,10 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
Timestamp: time.Now().Add(time.Second),
})
// Fire ticker batchPublishFrequency seconds later
ticks <- time.Now().Add(batchPublishFrequency + time.Second)
// Fire ticker defaultForceFlushInterval seconds later
ticks <- time.Now().Add(defaultForceFlushInterval + time.Second)
// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
// Verify single multiline event is flushed after maximum event buffer age (defaultForceFlushInterval)
argument := <-mockClient.putLogEventsArgument
assert.Check(t, argument != nil, "Expected non-nil PutLogEventsInput")
assert.Check(t, is.Equal(1, len(argument.LogEvents)), "Expected single multiline event")
@ -777,8 +777,8 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
Timestamp: time.Now().Add(time.Second),
})
// Fire ticker another batchPublishFrequency seconds later
ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)
// Fire ticker another defaultForceFlushInterval seconds later
ticks <- time.Now().Add(2*defaultForceFlushInterval + time.Second)
// Verify the event buffer is truly flushed - we should only receive a single event
argument = <-mockClient.putLogEventsArgument
@ -880,7 +880,7 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
})
// Fire ticker
ticks <- time.Now().Add(batchPublishFrequency)
ticks <- time.Now().Add(defaultForceFlushInterval)
// Verify multiline events
// We expect a maximum sized event with no new line characters and a
@ -1419,6 +1419,68 @@ func TestValidateLogOptionsDatetimeFormatAndMultilinePattern(t *testing.T) {
assert.Check(t, is.Equal(err.Error(), conflictingLogOptionsError), "Received invalid error")
}
func TestValidateLogOptionsForceFlushIntervalSeconds(t *testing.T) {
cfg := map[string]string{
forceFlushIntervalKey: "0",
logGroupKey: groupName,
}
nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': 0"
err := ValidateLogOpt(cfg)
assert.Check(t, err != nil, "Expected an error")
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
cfg[forceFlushIntervalKey] = "-1"
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': -1"
err = ValidateLogOpt(cfg)
assert.Check(t, err != nil, "Expected an error")
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
cfg[forceFlushIntervalKey] = "a"
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-force-flush-interval-seconds': a"
err = ValidateLogOpt(cfg)
assert.Check(t, err != nil, "Expected an error")
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
cfg[forceFlushIntervalKey] = "10"
err = ValidateLogOpt(cfg)
assert.Check(t, err == nil, "Unexpected error")
}
func TestValidateLogOptionsMaxBufferedEvents(t *testing.T) {
cfg := map[string]string{
maxBufferedEventsKey: "0",
logGroupKey: groupName,
}
nonPositiveIntegerLogOptionsError := "must specify a positive integer for log opt 'awslogs-max-buffered-events': 0"
err := ValidateLogOpt(cfg)
assert.Check(t, err != nil, "Expected an error")
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
cfg[maxBufferedEventsKey] = "-1"
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': -1"
err = ValidateLogOpt(cfg)
assert.Check(t, err != nil, "Expected an error")
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
cfg[maxBufferedEventsKey] = "a"
nonPositiveIntegerLogOptionsError = "must specify a positive integer for log opt 'awslogs-max-buffered-events': a"
err = ValidateLogOpt(cfg)
assert.Check(t, err != nil, "Expected an error")
assert.Check(t, is.Equal(err.Error(), nonPositiveIntegerLogOptionsError), "Received invalid error")
cfg[maxBufferedEventsKey] = "10"
err = ValidateLogOpt(cfg)
assert.Check(t, err == nil, "Unexpected error")
}
func TestCreateTagSuccess(t *testing.T) {
mockClient := newMockClient()
info := logger.Info{