diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index 997c01c652..30a23293bc 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -48,14 +48,18 @@ const ( defaultMaxRetries = math.MaxInt32 defaultRetryWait = 1000 - addressKey = "fluentd-address" - asyncKey = "fluentd-async" - asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead) - bufferLimitKey = "fluentd-buffer-limit" - maxRetriesKey = "fluentd-max-retries" - requestAckKey = "fluentd-request-ack" - retryWaitKey = "fluentd-retry-wait" - subSecondPrecisionKey = "fluentd-sub-second-precision" + minReconnectInterval = 100 * time.Millisecond + maxReconnectInterval = 10 * time.Second + + addressKey = "fluentd-address" + asyncKey = "fluentd-async" + asyncConnectKey = "fluentd-async-connect" // deprecated option (use fluent-async instead) + asyncReconnectIntervalKey = "fluentd-async-reconnect-interval" + bufferLimitKey = "fluentd-buffer-limit" + maxRetriesKey = "fluentd-max-retries" + requestAckKey = "fluentd-request-ack" + retryWaitKey = "fluentd-retry-wait" + subSecondPrecisionKey = "fluentd-sub-second-precision" ) func init() { @@ -147,6 +151,7 @@ func ValidateLogOpt(cfg map[string]string) error { case addressKey: case asyncKey: case asyncConnectKey: + case asyncReconnectIntervalKey: case bufferLimitKey: case maxRetriesKey: case requestAckKey: @@ -216,6 +221,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) { } } + asyncReconnectInterval := 0 + if cfg[asyncReconnectIntervalKey] != "" { + interval, err := time.ParseDuration(cfg[asyncReconnectIntervalKey]) + if err != nil { + return config, errors.Wrapf(err, "invalid value for %s", asyncReconnectIntervalKey) + } + if interval != 0 && (interval < minReconnectInterval || interval > maxReconnectInterval) { + return config, errors.Errorf("invalid value for %s: value (%q) must be between %s and %s", + asyncReconnectIntervalKey, interval, minReconnectInterval, maxReconnectInterval) + } + asyncReconnectInterval = int(interval.Milliseconds()) + } + subSecondPrecision := false if cfg[subSecondPrecisionKey] != "" { if subSecondPrecision, err = strconv.ParseBool(cfg[subSecondPrecisionKey]); err != nil { @@ -231,18 +249,19 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) { } config = fluent.Config{ - FluentPort: loc.port, - FluentHost: loc.host, - FluentNetwork: loc.protocol, - FluentSocketPath: loc.path, - BufferLimit: bufferLimit, - RetryWait: retryWait, - MaxRetry: maxRetries, - Async: async, - AsyncConnect: asyncConnect, - SubSecondPrecision: subSecondPrecision, - RequestAck: requestAck, - ForceStopAsyncSend: async || asyncConnect, + FluentPort: loc.port, + FluentHost: loc.host, + FluentNetwork: loc.protocol, + FluentSocketPath: loc.path, + BufferLimit: bufferLimit, + RetryWait: retryWait, + MaxRetry: maxRetries, + Async: async, + AsyncConnect: asyncConnect, + AsyncReconnectInterval: asyncReconnectInterval, + SubSecondPrecision: subSecondPrecision, + RequestAck: requestAck, + ForceStopAsyncSend: async || asyncConnect, } return config, nil diff --git a/daemon/logger/fluentd/fluentd_test.go b/daemon/logger/fluentd/fluentd_test.go new file mode 100644 index 0000000000..e67d63249b --- /dev/null +++ b/daemon/logger/fluentd/fluentd_test.go @@ -0,0 +1,24 @@ +package fluentd // import "github.com/docker/docker/daemon/logger/fluentd" +import ( + "testing" + + "gotest.tools/v3/assert" +) + +func TestValidateLogOptReconnectInterval(t *testing.T) { + invalidIntervals := []string{"-1", "1", "-1s", "99ms", "11s"} + for _, v := range invalidIntervals { + t.Run("invalid "+v, func(t *testing.T) { + err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v}) + assert.ErrorContains(t, err, "invalid value for fluentd-async-reconnect-interval:") + }) + } + + validIntervals := []string{"100ms", "10s"} + for _, v := range validIntervals { + t.Run("valid "+v, func(t *testing.T) { + err := ValidateLogOpt(map[string]string{asyncReconnectIntervalKey: v}) + assert.NilError(t, err) + }) + } +} diff --git a/vendor.conf b/vendor.conf index 0f10ed68f9..c5e8d1b518 100644 --- a/vendor.conf +++ b/vendor.conf @@ -103,7 +103,7 @@ github.com/godbus/dbus/v5 c88335c0b1d28a30e7fc76d526a0 github.com/Graylog2/go-gelf 1550ee647df0510058c9d67a45c56f18911d80b8 # v2 branch # fluent-logger-golang deps -github.com/fluent/fluent-logger-golang 0b652e850a9140d0b1db6390d8925d0601e952db # v1.8.0 +github.com/fluent/fluent-logger-golang 5538e904aeb515c10a624da620581bdf420d4b8a # v1.9.0 github.com/philhofer/fwd bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0 github.com/tinylib/msgp af6442a0fcf6e2a1b824f70dd0c734f01e817751 # v1.1.0 diff --git a/vendor/github.com/fluent/fluent-logger-golang/README.md b/vendor/github.com/fluent/fluent-logger-golang/README.md index 554619a31c..d0b6b23161 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/README.md +++ b/vendor/github.com/fluent/fluent-logger-golang/README.md @@ -132,6 +132,11 @@ When Async is enabled, if this is callback is provided, it will be called on eve takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the delivery of the message was unsuccessful. +### AsyncReconnectInterval +When async is enabled, this option defines the interval (ms) at which the connection +to the fluentd-address is re-established. This option is useful if the address +may resolve to one or more IP addresses, e.g. a Consul service address. + ### SubSecondPrecision Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later. diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go index c2e1b25955..72169911ae 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go @@ -65,6 +65,11 @@ type Config struct { AsyncConnect bool `json:"async_connect"` MarshalAsJSON bool `json:"marshal_as_json"` + // AsyncReconnectInterval defines the interval (ms) at which the connection + // to the fluentd-address is re-established. This option is useful if the address + // may resolve to one or more IP addresses, e.g. a Consul service address. + AsyncReconnectInterval int `json:"async_reconnect_interval"` + // Sub-second precision timestamps are only possible for those using fluentd // v0.14+ and serializing their messages with msgpack. SubSecondPrecision bool `json:"sub_second_precision"` @@ -108,6 +113,9 @@ type Fluent struct { closed bool wg sync.WaitGroup + // time at which the most recent connection to fluentd-address was established. + latestReconnectTime time.Time + muconn sync.RWMutex conn net.Conn } @@ -447,6 +455,10 @@ func (f *Fluent) connect(ctx context.Context) (err error) { err = NewErrUnknownNetwork(f.Config.FluentNetwork) } + if err == nil { + f.latestReconnectTime = time.Now() + } + return err } @@ -508,6 +520,15 @@ func (f *Fluent) run(ctx context.Context) { return } + if f.AsyncReconnectInterval > 0 { + if time.Since(f.latestReconnectTime) > time.Duration(f.AsyncReconnectInterval)*time.Millisecond { + f.muconn.Lock() + f.close() + f.connectWithRetry(ctx) + f.muconn.Unlock() + } + } + err := f.writeWithRetry(ctx, entry) if err != nil && err != errIsClosing { fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go index 83e8932492..e65eb951b6 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go @@ -1,3 +1,3 @@ package fluent -const Version = "1.4.0" +const Version = "1.9.0"