1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

The flag ForceStopAsyncSend was added to fluent logger lib in v1.9.0

* When async is enabled, this option defines the interval (ms) at which the connection
to the fluentd-address is re-established. This option is useful if the address
may resolve to one or more IP addresses, e.g. a Consul service address.

While the change in #42979 resolves the issue where a Docker container can be stuck
if the fluentd-address is unavailable, this functionality adds an additional benefit
in that a new and healthy fluentd-address can be resolved, allowing logs to flow once again.

This adds a `fluentd-async-reconnect-interval` log-opt for the fluentd logging driver.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
Signed-off-by: Conor Evans <coevans@tcd.ie>

Co-authored-by: Sebastiaan van Stijn <github@gone.nl>
Co-authored-by: Conor Evans <coevans@tcd.ie>
This commit is contained in:
Conor Evans 2021-12-22 15:38:11 +01:00
parent 3500d7e472
commit 5cbc08ce57
No known key found for this signature in database
GPG key ID: 1A9AF5C5F61D5AE9
2 changed files with 63 additions and 20 deletions

View file

@ -48,14 +48,18 @@ const (
defaultMaxRetries = math.MaxInt32
defaultRetryWait = 1000
addressKey = "fluentd-address"
asyncKey = "fluentd-async"
asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
bufferLimitKey = "fluentd-buffer-limit"
maxRetriesKey = "fluentd-max-retries"
requestAckKey = "fluentd-request-ack"
retryWaitKey = "fluentd-retry-wait"
subSecondPrecisionKey = "fluentd-sub-second-precision"
minReconnectInterval = 100 * time.Millisecond
maxReconnectInterval = 10 * time.Second
addressKey = "fluentd-address"
asyncKey = "fluentd-async"
asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead)
asyncReconnectIntervalKey = "fluentd-async-reconnect-interval"
bufferLimitKey = "fluentd-buffer-limit"
maxRetriesKey = "fluentd-max-retries"
requestAckKey = "fluentd-request-ack"
retryWaitKey = "fluentd-retry-wait"
subSecondPrecisionKey = "fluentd-sub-second-precision"
)
func init() {
@ -147,6 +151,7 @@ func ValidateLogOpt(cfg map[string]string) error {
case addressKey:
case asyncKey:
case asyncConnectKey:
case asyncReconnectIntervalKey:
case bufferLimitKey:
case maxRetriesKey:
case requestAckKey:
@ -216,6 +221,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) {
}
}
asyncReconnectInterval := 0
if cfg[asyncReconnectIntervalKey] != "" {
interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey])
if err != nil {
return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey)
}
if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) {
return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s",
asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval)
}
asyncReconnectInterval = int(interval.Milliseconds())
}
subSecondPrecision := false
if cfg[subSecondPrecisionKey] != "" {
if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil {
@ -231,18 +249,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) {
}
config = fluent.Config{
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
Async: async,
AsyncConnect: asyncConnect,
SubSecondPrecision: subSecondPrecision,
RequestAck: requestAck,
ForceStopAsyncSend: async || asyncConnect,
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
Async: async,
AsyncConnect: asyncConnect,
AsyncReconnectInterval: asyncReconnectInterval,
SubSecondPrecision: subSecondPrecision,
RequestAck: requestAck,
ForceStopAsyncSend: async || asyncConnect,
}
return config, nil

View file

@ -0,0 +1,24 @@
package fluentd // import "github.com/docker/docker/daemon/logger/fluentd"
import (
"testing"
"gotest.tools/v3/assert"
)
func TestValidateLogOptReconnectInterval(t *testing.T) {
invalidIntervals := []string{"-1", "1", "-1s", "99ms", "11s"}
for _, v := range invalidIntervals {
t.Run("invalid "+v, func(t *testing.T) {
err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v})
assert.ErrorContains(t, err, "invalid value for fluentd-async-reconnect-interval:")
})
}
validIntervals := []string{"100ms", "10s"}
for _, v := range validIntervals {
t.Run("valid "+v, func(t *testing.T) {
err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v})
assert.NilError(t, err)
})
}
}