Merge pull request #39987 from devonestes/fix-splunk-ack

Add ability to handle index acknowledgment with splunk log driver
This commit is contained in:
Sebastiaan van Stijn 2019-10-14 03:02:35 +02:00 committed by GitHub
commit 133eddaee8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/urlutil" "github.com/docker/docker/pkg/urlutil"
"github.com/google/uuid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -41,6 +42,7 @@ const (
splunkVerifyConnectionKey = "splunk-verify-connection" splunkVerifyConnectionKey = "splunk-verify-connection"
splunkGzipCompressionKey = "splunk-gzip" splunkGzipCompressionKey = "splunk-gzip"
splunkGzipCompressionLevelKey = "splunk-gzip-level" splunkGzipCompressionLevelKey = "splunk-gzip-level"
splunkIndexAcknowledgment = "splunk-index-acknowledgment"
envKey = "env" envKey = "env"
envRegexKey = "env-regex" envRegexKey = "env-regex"
labelsKey = "labels" labelsKey = "labels"
@ -91,6 +93,7 @@ type splunkLogger struct {
postMessagesFrequency time.Duration postMessagesFrequency time.Duration
postMessagesBatchSize int postMessagesBatchSize int
bufferMaximum int bufferMaximum int
indexAck bool
// For synchronization between background worker and logger. // For synchronization between background worker and logger.
// We use channel to send messages to worker go routine. // We use channel to send messages to worker go routine.
@ -217,6 +220,14 @@ func New(info logger.Info) (logger.Logger, error) {
} }
} }
indexAck := false
if indexAckStr, ok := info.Config[splunkIndexAcknowledgment]; ok {
indexAck, err = strconv.ParseBool(indexAckStr)
if err != nil {
return nil, err
}
}
transport := &http.Transport{ transport := &http.Transport{
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
@ -269,6 +280,7 @@ func New(info logger.Info) (logger.Logger, error) {
postMessagesFrequency: postMessagesFrequency, postMessagesFrequency: postMessagesFrequency,
postMessagesBatchSize: postMessagesBatchSize, postMessagesBatchSize: postMessagesBatchSize,
bufferMaximum: bufferMaximum, bufferMaximum: bufferMaximum,
indexAck: indexAck,
} }
// By default we verify connection, but we allow use to skip that // By default we verify connection, but we allow use to skip that
@ -505,6 +517,14 @@ func (l *splunkLogger) tryPostMessages(ctx context.Context, messages []*splunkMe
if l.gzipCompression { if l.gzipCompression {
req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Encoding", "gzip")
} }
// Set the correct header if index acknowledgment is enabled
if l.indexAck {
requestChannel, err := uuid.NewRandom()
if err != nil {
return err
}
req.Header.Set("X-Splunk-Request-Channel", requestChannel.String())
}
resp, err := l.client.Do(req) resp, err := l.client.Do(req)
if err != nil { if err != nil {
return err return err
@ -563,6 +583,7 @@ func ValidateLogOpt(cfg map[string]string) error {
case splunkVerifyConnectionKey: case splunkVerifyConnectionKey:
case splunkGzipCompressionKey: case splunkGzipCompressionKey:
case splunkGzipCompressionLevelKey: case splunkGzipCompressionLevelKey:
case splunkIndexAcknowledgment:
case envKey: case envKey:
case envRegexKey: case envRegexKey:
case labelsKey: case labelsKey: