package fluentd import ( "bytes" "fmt" "math" "net" "strconv" "strings" "text/template" "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" "github.com/fluent/fluent-logger-golang/fluent" ) type Fluentd struct { tag string containerID string containerName string writer *fluent.Fluent } type Receiver struct { ID string FullID string Name string } const ( name = "fluentd" defaultHostName = "localhost" defaultPort = 24224 defaultTagPrefix = "docker" ) func init() { if err := logger.RegisterLogDriver(name, New); err != nil { logrus.Fatal(err) } if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil { logrus.Fatal(err) } } func parseConfig(ctx logger.Context) (string, int, string, error) { host := defaultHostName port := defaultPort tag := "docker." + ctx.ContainerID[:12] config := ctx.Config if address := config["fluentd-address"]; address != "" { if h, p, err := net.SplitHostPort(address); err != nil { if !strings.Contains(err.Error(), "missing port in address") { return "", 0, "", err } host = h } else { portnum, err := strconv.Atoi(p) if err != nil { return "", 0, "", err } host = h port = portnum } } if config["fluentd-tag"] != "" { receiver := &Receiver{ ID: ctx.ContainerID[:12], FullID: ctx.ContainerID, Name: ctx.ContainerName, } tmpl, err := template.New("tag").Parse(config["fluentd-tag"]) if err != nil { return "", 0, "", err } buf := new(bytes.Buffer) if err := tmpl.Execute(buf, receiver); err != nil { return "", 0, "", err } tag = buf.String() } return host, port, tag, nil } func New(ctx logger.Context) (logger.Logger, error) { host, port, tag, err := parseConfig(ctx) if err != nil { return nil, err } logrus.Debugf("logging driver fluentd configured for container:%s, host:%s, port:%d, tag:%s.", ctx.ContainerID, host, port, tag) // logger tries to recoonect 2**32 - 1 times // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds] log, err := fluent.New(fluent.Config{FluentPort: port, FluentHost: host, RetryWait: 1000, MaxRetry: math.MaxInt32}) if err != nil { return nil, err } return &Fluentd{ tag: tag, containerID: ctx.ContainerID, containerName: ctx.ContainerName, writer: log, }, nil } func (f *Fluentd) Log(msg *logger.Message) error { data := map[string]string{ "container_id": f.containerID, "container_name": f.containerName, "source": msg.Source, "log": string(msg.Line), } // fluent-logger-golang buffers logs from failures and disconnections, // and these are transferred again automatically. return f.writer.PostWithTime(f.tag, msg.Timestamp, data) } func ValidateLogOpt(cfg map[string]string) error { for key := range cfg { switch key { case "fluentd-address": case "fluentd-tag": default: return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key) } } return nil } func (f *Fluentd) Close() error { return f.writer.Close() } func (f *Fluentd) Name() string { return name }