add scheme to fluentd-address

Signed-off-by: Akira Koyasu <mail@akirakoyasu.net>

- add scheme to fluentd-address
- define a new type `location`
- use `errors.Wrapf`
This commit is contained in:
Akira Koyasu 2016-08-28 23:24:56 +09:00
parent 1f9beceacc
commit cb176c848e
1 changed files with 63 additions and 14 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net"
"net/url"
"strconv"
"strings"
"time"
@ -13,8 +14,10 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/urlutil"
"github.com/docker/go-units"
"github.com/fluent/fluent-logger-golang/fluent"
"github.com/pkg/errors"
)
type fluentd struct {
@ -25,9 +28,17 @@ type fluentd struct {
extra map[string]string
}
type location struct {
protocol string
host string
port int
path string
}
const (
name = "fluentd"
defaultProtocol = "tcp"
defaultHost = "127.0.0.1"
defaultPort = 24224
defaultBufferLimit = 1024 * 1024
@ -60,7 +71,7 @@ func init() {
// the context. The supported context configuration variable is
// fluentd-address.
func New(ctx logger.Context) (logger.Logger, error) {
host, port, err := parseAddress(ctx.Config[addressKey])
loc, err := parseAddress(ctx.Config[addressKey])
if err != nil {
return nil, err
}
@ -107,12 +118,14 @@ func New(ctx logger.Context) (logger.Logger, error) {
}
fluentConfig := fluent.Config{
FluentPort: port,
FluentHost: host,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
AsyncConnect: asyncConnect,
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
AsyncConnect: asyncConnect,
}
logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig).
@ -172,29 +185,65 @@ func ValidateLogOpt(cfg map[string]string) error {
}
}
if _, _, err := parseAddress(cfg["fluentd-address"]); err != nil {
if _, err := parseAddress(cfg["fluentd-address"]); err != nil {
return err
}
return nil
}
func parseAddress(address string) (string, int, error) {
func parseAddress(address string) (*location, error) {
if address == "" {
return defaultHost, defaultPort, nil
return &location{
protocol: defaultProtocol,
host: defaultHost,
port: defaultPort,
path: "",
}, nil
}
protocol := defaultProtocol
givenAddress := address
if urlutil.IsTransportURL(address) {
url, err := url.Parse(address)
if err != nil {
return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
}
// unix and unixgram socket
if url.Scheme == "unix" || url.Scheme == "unixgram" {
return &location{
protocol: url.Scheme,
host: "",
port: 0,
path: url.Path,
}, nil
}
// tcp|udp
protocol = url.Scheme
address = url.Host
}
host, port, err := net.SplitHostPort(address)
if err != nil {
if !strings.Contains(err.Error(), "missing port in address") {
return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
}
return host, defaultPort, nil
return &location{
protocol: protocol,
host: host,
port: defaultPort,
path: "",
}, nil
}
portnum, err := strconv.Atoi(port)
if err != nil {
return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
}
return host, portnum, nil
return &location{
protocol: protocol,
host: host,
port: portnum,
path: "",
}, nil
}