From b24c8e07f1f2619f0a6d73b808dffceb314e7081 Mon Sep 17 00:00:00 2001 From: Ghislain Bourgeois Date: Tue, 5 Sep 2017 13:42:18 -0400 Subject: [PATCH 1/4] Update to latest go-gelf version and add tests Signed-off-by: Ghislain Bourgeois --- daemon/logger/gelf/gelf.go | 6 +- daemon/logger/gelf/gelf_test.go | 68 +++ vendor.conf | 2 +- vendor/github.com/Graylog2/go-gelf/README.md | 117 +++-- .../Graylog2/go-gelf/gelf/message.go | 147 +++++++ .../Graylog2/go-gelf/gelf/tcpreader.go | 93 ++++ .../Graylog2/go-gelf/gelf/tcpwriter.go | 97 ++++ .../Graylog2/go-gelf/gelf/udpwriter.go | 231 ++++++++++ .../github.com/Graylog2/go-gelf/gelf/utils.go | 41 ++ .../Graylog2/go-gelf/gelf/writer.go | 413 +----------------- 10 files changed, 773 insertions(+), 442 deletions(-) create mode 100644 daemon/logger/gelf/gelf_test.go create mode 100644 vendor/github.com/Graylog2/go-gelf/gelf/message.go create mode 100644 vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go create mode 100644 vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go create mode 100644 vendor/github.com/Graylog2/go-gelf/gelf/udpwriter.go create mode 100644 vendor/github.com/Graylog2/go-gelf/gelf/utils.go diff --git a/daemon/logger/gelf/gelf.go b/daemon/logger/gelf/gelf.go index de209ce9bf..5d36252130 100644 --- a/daemon/logger/gelf/gelf.go +++ b/daemon/logger/gelf/gelf.go @@ -23,7 +23,7 @@ import ( const name = "gelf" type gelfLogger struct { - writer *gelf.Writer + writer gelf.Writer info logger.Info hostname string rawExtra json.RawMessage @@ -90,7 +90,7 @@ func New(info logger.Info) (logger.Logger, error) { } // create new gelfWriter - gelfWriter, err := gelf.NewWriter(address) + gelfWriter, err := gelf.NewUDPWriter(address) if err != nil { return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err) } @@ -135,7 +135,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error { Host: s.hostname, Short: string(msg.Line), TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0, - Level: level, + Level: int32(level), RawExtra: s.rawExtra, } logger.PutMessage(msg) diff --git a/daemon/logger/gelf/gelf_test.go b/daemon/logger/gelf/gelf_test.go new file mode 100644 index 0000000000..dd83427107 --- /dev/null +++ b/daemon/logger/gelf/gelf_test.go @@ -0,0 +1,68 @@ +// +build linux + +package gelf + +import ( + "testing" +) + +//Validate parseAddress +func TestParseAddress(t *testing.T) { + host, err := parseAddress("udp://127.0.0.1:12201") + if err != nil { + t.Fatal(err) + } + if host != "127.0.0.1:12201" { + t.Fatalf("Expected host 127.0.0.1, got %s", host) + } + + _, err = parseAddress("127.0.0.1:12201") + if err == nil { + t.Fatal("Expected error requiring protocol") + } + + _, err = parseAddress("http://127.0.0.1:12201") + if err == nil { + t.Fatal("Expected error restricting protocol") + } +} + +//Validate options +func TestValidateLogOpt(t *testing.T) { + err := ValidateLogOpt(map[string]string{ + "gelf-address": "udp://127.0.0.1:12201", + "tag": "testtag", + "labels": "testlabel", + "env": "testenv", + "env-regex": "testenv-regex", + "gelf-compression-level": "9", + "gelf-compression-type": "gzip", + }) + if err != nil { + t.Fatal(err) + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "udp://127.0.0.1:12201", + "gelf-compression-level": "ultra", + "gelf-compression-type": "zlib", + }) + if err == nil { + t.Fatal("Expected compression level error") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "udp://127.0.0.1:12201", + "gelf-compression-type": "rar", + }) + if err == nil { + t.Fatal("Expected compression type error") + } + + err = ValidateLogOpt(map[string]string{ + "invalid": "invalid", + }) + if err == nil { + t.Fatal("Expected unknown option error") + } +} diff --git a/vendor.conf b/vendor.conf index 5affff74e5..d305583507 100644 --- a/vendor.conf +++ b/vendor.conf @@ -76,7 +76,7 @@ github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd852 github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4 # gelf logging driver deps -github.com/Graylog2/go-gelf 7029da823dad4ef3a876df61065156acb703b2ea +github.com/Graylog2/go-gelf v2 github.com/fluent/fluent-logger-golang v1.2.1 # fluent-logger-golang deps diff --git a/vendor/github.com/Graylog2/go-gelf/README.md b/vendor/github.com/Graylog2/go-gelf/README.md index 4900a350a0..81d56121b3 100644 --- a/vendor/github.com/Graylog2/go-gelf/README.md +++ b/vendor/github.com/Graylog2/go-gelf/README.md @@ -1,17 +1,53 @@ -go-gelf - GELF library and writer for Go +go-gelf - GELF Library and Writer for Go ======================================== -GELF is graylog2's UDP logging format. This library provides an API -that applications can use to log messages directly to a graylog2 -server, along with an `io.Writer` that can be use to redirect the -standard library's log messages (or `os.Stdout`), to a graylog2 server. +[GELF] (Graylog Extended Log Format) is an application-level logging +protocol that avoids many of the shortcomings of [syslog]. While it +can be run over any stream or datagram transport protocol, it has +special support ([chunking]) to allow long messages to be split over +multiple datagrams. + +Versions +-------- + +In order to enable versionning of this package with Go, this project +is using GoPkg.in. The default branch of this project will be v1 +for some time to prevent breaking clients. We encourage all project +to change their imports to the new GoPkg.in URIs as soon as possible. + +To see up to date code, make sure to switch to the master branch. + +v1.0.0 +------ + +This implementation currently supports UDP and TCP as a transport +protocol. TLS is unsupported. + +The library provides an API that applications can use to log messages +directly to a Graylog server and an `io.Writer` that can be used to +redirect the standard library's log messages (`os.Stdout`) to a +Graylog server. + +[GELF]: http://docs.graylog.org/en/2.2/pages/gelf.html +[syslog]: https://tools.ietf.org/html/rfc5424 +[chunking]: http://docs.graylog.org/en/2.2/pages/gelf.html#chunked-gelf + Installing ---------- go-gelf is go get-able: - go get github.com/Graylog2/go-gelf/gelf + go get gopkg.in/Graylog2/go-gelf.v1/gelf + + or + + go get github.com/Graylog2/go-gelf/gelf + +This will get you version 1.0.0, with only UDP support and legacy API. +Newer versions are available through GoPkg.in: + + go get gopkg.in/Graylog2/go-gelf.v2/gelf Usage ----- @@ -21,50 +57,55 @@ having your `main` function (or even `init`) call `log.SetOutput()`. By using an `io.MultiWriter`, we can log to both stdout and graylog - giving us both centralized and local logs. (Redundancy is nice). - package main +```golang +package main - import ( - "flag" - "github.com/Graylog2/go-gelf/gelf" - "io" - "log" - "os" - ) +import ( + "flag" + "gopkg.in/Graylog2/go-gelf.v2/gelf" + "io" + "log" + "os" +) - func main() { - var graylogAddr string +func main() { + var graylogAddr string - flag.StringVar(&graylogAddr, "graylog", "", "graylog server addr") - flag.Parse() + flag.StringVar(&graylogAddr, "graylog", "", "graylog server addr") + flag.Parse() - if graylogAddr != "" { - gelfWriter, err := gelf.NewWriter(graylogAddr) - if err != nil { - log.Fatalf("gelf.NewWriter: %s", err) - } - // log to both stderr and graylog2 - log.SetOutput(io.MultiWriter(os.Stderr, gelfWriter)) - log.Printf("logging to stderr & graylog2@'%s'", graylogAddr) - } + if graylogAddr != "" { + // If using UDP + gelfWriter, err := gelf.NewUDPWriter(graylogAddr) + // If using TCP + //gelfWriter, err := gelf.NewTCPWriter(graylogAddr) + if err != nil { + log.Fatalf("gelf.NewWriter: %s", err) + } + // log to both stderr and graylog2 + log.SetOutput(io.MultiWriter(os.Stderr, gelfWriter)) + log.Printf("logging to stderr & graylog2@'%s'", graylogAddr) + } - // From here on out, any calls to log.Print* functions - // will appear on stdout, and be sent over UDP to the - // specified Graylog2 server. + // From here on out, any calls to log.Print* functions + // will appear on stdout, and be sent over UDP or TCP to the + // specified Graylog2 server. - log.Printf("Hello gray World") - - // ... - } + log.Printf("Hello gray World") + // ... +} +``` The above program can be invoked as: - go run test.go -graylog=localhost:12201 + go run test.go -graylog=localhost:12201 -Because GELF messages are sent over UDP, graylog server availability -doesn't impact application performance or response time. There is a -small, fixed overhead per log call, regardless of whether the target +When using UDP messages may be dropped or re-ordered. However, Graylog +server availability will not impact application performance; there is +a small, fixed overhead per log call regardless of whether the target server is reachable or not. + To Do ----- diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/message.go b/vendor/github.com/Graylog2/go-gelf/gelf/message.go new file mode 100644 index 0000000000..4e5a05ec32 --- /dev/null +++ b/vendor/github.com/Graylog2/go-gelf/gelf/message.go @@ -0,0 +1,147 @@ +package gelf + +import ( + "bytes" + "encoding/json" + "time" +) + +// Message represents the contents of the GELF message. It is gzipped +// before sending. +type Message struct { + Version string `json:"version"` + Host string `json:"host"` + Short string `json:"short_message"` + Full string `json:"full_message,omitempty"` + TimeUnix float64 `json:"timestamp"` + Level int32 `json:"level,omitempty"` + Facility string `json:"facility,omitempty"` + Extra map[string]interface{} `json:"-"` + RawExtra json.RawMessage `json:"-"` +} + +// Syslog severity levels +const ( + LOG_EMERG = 0 + LOG_ALERT = 1 + LOG_CRIT = 2 + LOG_ERR = 3 + LOG_WARNING = 4 + LOG_NOTICE = 5 + LOG_INFO = 6 + LOG_DEBUG = 7 +) + +func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error { + b, err := json.Marshal(m) + if err != nil { + return err + } + // write up until the final } + if _, err = buf.Write(b[:len(b)-1]); err != nil { + return err + } + if len(m.Extra) > 0 { + eb, err := json.Marshal(m.Extra) + if err != nil { + return err + } + // merge serialized message + serialized extra map + if err = buf.WriteByte(','); err != nil { + return err + } + // write serialized extra bytes, without enclosing quotes + if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil { + return err + } + } + + if len(m.RawExtra) > 0 { + if err := buf.WriteByte(','); err != nil { + return err + } + + // write serialized extra bytes, without enclosing quotes + if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil { + return err + } + } + + // write final closing quotes + return buf.WriteByte('}') +} + +func (m *Message) UnmarshalJSON(data []byte) error { + i := make(map[string]interface{}, 16) + if err := json.Unmarshal(data, &i); err != nil { + return err + } + for k, v := range i { + if k[0] == '_' { + if m.Extra == nil { + m.Extra = make(map[string]interface{}, 1) + } + m.Extra[k] = v + continue + } + switch k { + case "version": + m.Version = v.(string) + case "host": + m.Host = v.(string) + case "short_message": + m.Short = v.(string) + case "full_message": + m.Full = v.(string) + case "timestamp": + m.TimeUnix = v.(float64) + case "level": + m.Level = int32(v.(float64)) + case "facility": + m.Facility = v.(string) + } + } + return nil +} + +func (m *Message) toBytes() (messageBytes []byte, err error) { + buf := newBuffer() + defer bufPool.Put(buf) + if err = m.MarshalJSONBuf(buf); err != nil { + return nil, err + } + messageBytes = buf.Bytes() + return messageBytes, nil +} + +func constructMessage(p []byte, hostname string, facility string, file string, line int) (m *Message) { + // remove trailing and leading whitespace + p = bytes.TrimSpace(p) + + // If there are newlines in the message, use the first line + // for the short message and set the full message to the + // original input. If the input has no newlines, stick the + // whole thing in Short. + short := p + full := []byte("") + if i := bytes.IndexRune(p, '\n'); i > 0 { + short = p[:i] + full = p + } + + m = &Message{ + Version: "1.1", + Host: hostname, + Short: string(short), + Full: string(full), + TimeUnix: float64(time.Now().Unix()), + Level: 6, // info + Facility: facility, + Extra: map[string]interface{}{ + "_file": file, + "_line": line, + }, + } + + return m +} diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go new file mode 100644 index 0000000000..8f22c9aea4 --- /dev/null +++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go @@ -0,0 +1,93 @@ +package gelf + +import ( + "bufio" + "encoding/json" + "fmt" + "net" +) + +type TCPReader struct { + listener *net.TCPListener + conn net.Conn + messages chan []byte +} + +func newTCPReader(addr string) (*TCPReader, chan string, error) { + var err error + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err) + } + + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return nil, nil, fmt.Errorf("ListenTCP: %s", err) + } + + r := &TCPReader{ + listener: listener, + messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages + } + + signal := make(chan string, 1) + + go r.listenUntilCloseSignal(signal) + + return r, signal, nil +} + +func (r *TCPReader) listenUntilCloseSignal(signal chan string) { + defer func() { signal <- "done" }() + defer r.listener.Close() + for { + conn, err := r.listener.Accept() + if err != nil { + break + } + go handleConnection(conn, r.messages) + select { + case sig := <-signal: + if sig == "stop" { + break + } + default: + } + } +} + +func (r *TCPReader) addr() string { + return r.listener.Addr().String() +} + +func handleConnection(conn net.Conn, messages chan<- []byte) { + defer conn.Close() + reader := bufio.NewReader(conn) + + var b []byte + var err error + + for { + if b, err = reader.ReadBytes(0); err != nil { + continue + } + if len(b) > 0 { + messages <- b + } + } +} + +func (r *TCPReader) readMessage() (*Message, error) { + b := <-r.messages + + var msg Message + if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil { + return nil, fmt.Errorf("json.Unmarshal: %s", err) + } + + return &msg, nil +} + +func (r *TCPReader) Close() { + r.listener.Close() +} diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go new file mode 100644 index 0000000000..ab95cbcd02 --- /dev/null +++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go @@ -0,0 +1,97 @@ +package gelf + +import ( + "fmt" + "net" + "os" + "sync" + "time" +) + +const ( + DefaultMaxReconnect = 3 + DefaultReconnectDelay = 1 +) + +type TCPWriter struct { + GelfWriter + mu sync.Mutex + MaxReconnect int + ReconnectDelay time.Duration +} + +func NewTCPWriter(addr string) (*TCPWriter, error) { + var err error + w := new(TCPWriter) + w.MaxReconnect = DefaultMaxReconnect + w.ReconnectDelay = DefaultReconnectDelay + w.proto = "tcp" + w.addr = addr + + if w.conn, err = net.Dial("tcp", addr); err != nil { + return nil, err + } + if w.hostname, err = os.Hostname(); err != nil { + return nil, err + } + + return w, nil +} + +// WriteMessage sends the specified message to the GELF server +// specified in the call to New(). It assumes all the fields are +// filled out appropriately. In general, clients will want to use +// Write, rather than WriteMessage. +func (w *TCPWriter) WriteMessage(m *Message) (err error) { + messageBytes, err := m.toBytes() + if err != nil { + return err + } + + messageBytes = append(messageBytes, 0) + + n, err := w.writeToSocketWithReconnectAttempts(messageBytes) + if err != nil { + return err + } + if n != len(messageBytes) { + return fmt.Errorf("bad write (%d/%d)", n, len(messageBytes)) + } + + return nil +} + +func (w *TCPWriter) Write(p []byte) (n int, err error) { + file, line := getCallerIgnoringLogMulti(1) + + m := constructMessage(p, w.hostname, w.Facility, file, line) + + if err = w.WriteMessage(m); err != nil { + return 0, err + } + + return len(p), nil +} + +func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) { + var errConn error + + w.mu.Lock() + for i := 0; n <= w.MaxReconnect; i++ { + errConn = nil + + n, err = w.conn.Write(zBytes) + if err != nil { + time.Sleep(w.ReconnectDelay * time.Second) + w.conn, errConn = net.Dial("tcp", w.addr) + } else { + break + } + } + w.mu.Unlock() + + if errConn != nil { + return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn) + } + return n, nil +} diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/udpwriter.go b/vendor/github.com/Graylog2/go-gelf/gelf/udpwriter.go new file mode 100644 index 0000000000..23bbd5e510 --- /dev/null +++ b/vendor/github.com/Graylog2/go-gelf/gelf/udpwriter.go @@ -0,0 +1,231 @@ +// Copyright 2012 SocialCode. All rights reserved. +// Use of this source code is governed by the MIT +// license that can be found in the LICENSE file. + +package gelf + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "compress/zlib" + "crypto/rand" + "fmt" + "io" + "net" + "os" + "path" + "sync" +) + +type UDPWriter struct { + GelfWriter + CompressionLevel int // one of the consts from compress/flate + CompressionType CompressType +} + +// What compression type the writer should use when sending messages +// to the graylog2 server +type CompressType int + +const ( + CompressGzip CompressType = iota + CompressZlib + CompressNone +) + +// Used to control GELF chunking. Should be less than (MTU - len(UDP +// header)). +// +// TODO: generate dynamically using Path MTU Discovery? +const ( + ChunkSize = 1420 + chunkedHeaderLen = 12 + chunkedDataLen = ChunkSize - chunkedHeaderLen +) + +var ( + magicChunked = []byte{0x1e, 0x0f} + magicZlib = []byte{0x78} + magicGzip = []byte{0x1f, 0x8b} +) + +// numChunks returns the number of GELF chunks necessary to transmit +// the given compressed buffer. +func numChunks(b []byte) int { + lenB := len(b) + if lenB <= ChunkSize { + return 1 + } + return len(b)/chunkedDataLen + 1 +} + +// New returns a new GELF Writer. This writer can be used to send the +// output of the standard Go log functions to a central GELF server by +// passing it to log.SetOutput() +func NewUDPWriter(addr string) (*UDPWriter, error) { + var err error + w := new(UDPWriter) + w.CompressionLevel = flate.BestSpeed + + if w.conn, err = net.Dial("udp", addr); err != nil { + return nil, err + } + if w.hostname, err = os.Hostname(); err != nil { + return nil, err + } + + w.Facility = path.Base(os.Args[0]) + + return w, nil +} + +// writes the gzip compressed byte array to the connection as a series +// of GELF chunked messages. The format is documented at +// http://docs.graylog.org/en/2.1/pages/gelf.html as: +// +// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte +// total, chunk-data +func (w *GelfWriter) writeChunked(zBytes []byte) (err error) { + b := make([]byte, 0, ChunkSize) + buf := bytes.NewBuffer(b) + nChunksI := numChunks(zBytes) + if nChunksI > 128 { + return fmt.Errorf("msg too large, would need %d chunks", nChunksI) + } + nChunks := uint8(nChunksI) + // use urandom to get a unique message id + msgId := make([]byte, 8) + n, err := io.ReadFull(rand.Reader, msgId) + if err != nil || n != 8 { + return fmt.Errorf("rand.Reader: %d/%s", n, err) + } + + bytesLeft := len(zBytes) + for i := uint8(0); i < nChunks; i++ { + buf.Reset() + // manually write header. Don't care about + // host/network byte order, because the spec only + // deals in individual bytes. + buf.Write(magicChunked) //magic + buf.Write(msgId) + buf.WriteByte(i) + buf.WriteByte(nChunks) + // slice out our chunk from zBytes + chunkLen := chunkedDataLen + if chunkLen > bytesLeft { + chunkLen = bytesLeft + } + off := int(i) * chunkedDataLen + chunk := zBytes[off : off+chunkLen] + buf.Write(chunk) + + // write this chunk, and make sure the write was good + n, err := w.conn.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("Write (chunk %d/%d): %s", i, + nChunks, err) + } + if n != len(buf.Bytes()) { + return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)", + i, nChunks, n, len(buf.Bytes())) + } + + bytesLeft -= chunkLen + } + + if bytesLeft != 0 { + return fmt.Errorf("error: %d bytes left after sending", bytesLeft) + } + return nil +} + +// 1k bytes buffer by default +var bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 1024)) + }, +} + +func newBuffer() *bytes.Buffer { + b := bufPool.Get().(*bytes.Buffer) + if b != nil { + b.Reset() + return b + } + return bytes.NewBuffer(nil) +} + +// WriteMessage sends the specified message to the GELF server +// specified in the call to New(). It assumes all the fields are +// filled out appropriately. In general, clients will want to use +// Write, rather than WriteMessage. +func (w *UDPWriter) WriteMessage(m *Message) (err error) { + mBuf := newBuffer() + defer bufPool.Put(mBuf) + if err = m.MarshalJSONBuf(mBuf); err != nil { + return err + } + mBytes := mBuf.Bytes() + + var ( + zBuf *bytes.Buffer + zBytes []byte + ) + + var zw io.WriteCloser + switch w.CompressionType { + case CompressGzip: + zBuf = newBuffer() + defer bufPool.Put(zBuf) + zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) + case CompressZlib: + zBuf = newBuffer() + defer bufPool.Put(zBuf) + zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) + case CompressNone: + zBytes = mBytes + default: + panic(fmt.Sprintf("unknown compression type %d", + w.CompressionType)) + } + if zw != nil { + if err != nil { + return + } + if _, err = zw.Write(mBytes); err != nil { + zw.Close() + return + } + zw.Close() + zBytes = zBuf.Bytes() + } + + if numChunks(zBytes) > 1 { + return w.writeChunked(zBytes) + } + n, err := w.conn.Write(zBytes) + if err != nil { + return + } + if n != len(zBytes) { + return fmt.Errorf("bad write (%d/%d)", n, len(zBytes)) + } + + return nil +} + +// Write encodes the given string in a GELF message and sends it to +// the server specified in New(). +func (w *UDPWriter) Write(p []byte) (n int, err error) { + // 1 for the function that called us. + file, line := getCallerIgnoringLogMulti(1) + + m := constructMessage(p, w.hostname, w.Facility, file, line) + + if err = w.WriteMessage(m); err != nil { + return 0, err + } + + return len(p), nil +} diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/utils.go b/vendor/github.com/Graylog2/go-gelf/gelf/utils.go new file mode 100644 index 0000000000..6f1c9f7c5f --- /dev/null +++ b/vendor/github.com/Graylog2/go-gelf/gelf/utils.go @@ -0,0 +1,41 @@ +package gelf + +import ( + "runtime" + "strings" +) + +// getCaller returns the filename and the line info of a function +// further down in the call stack. Passing 0 in as callDepth would +// return info on the function calling getCallerIgnoringLog, 1 the +// parent function, and so on. Any suffixes passed to getCaller are +// path fragments like "/pkg/log/log.go", and functions in the call +// stack from that file are ignored. +func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) { + // bump by 1 to ignore the getCaller (this) stackframe + callDepth++ +outer: + for { + var ok bool + _, file, line, ok = runtime.Caller(callDepth) + if !ok { + file = "???" + line = 0 + break + } + + for _, s := range suffixesToIgnore { + if strings.HasSuffix(file, s) { + callDepth++ + continue outer + } + } + break + } + return +} + +func getCallerIgnoringLogMulti(callDepth int) (string, int) { + // the +1 is to ignore this (getCallerIgnoringLogMulti) frame + return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go") +} diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go index 38e6944a07..93c36929b4 100644 --- a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go +++ b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go @@ -5,414 +5,27 @@ package gelf import ( - "bytes" - "compress/flate" - "compress/gzip" - "compress/zlib" - "crypto/rand" - "encoding/json" - "fmt" - "io" "net" - "os" - "path" - "runtime" - "strings" - "sync" - "time" ) +type Writer interface { + Close() error + Write([]byte) (int, error) + WriteMessage(*Message) error +} + // Writer implements io.Writer and is used to send both discrete // messages to a graylog2 server, or data from a stream-oriented // interface (like the functions in log). -type Writer struct { - mu sync.Mutex - conn net.Conn - hostname string - Facility string // defaults to current process name - CompressionLevel int // one of the consts from compress/flate - CompressionType CompressType -} - -// What compression type the writer should use when sending messages -// to the graylog2 server -type CompressType int - -const ( - CompressGzip CompressType = iota - CompressZlib - CompressNone -) - -// Message represents the contents of the GELF message. It is gzipped -// before sending. -type Message struct { - Version string `json:"version"` - Host string `json:"host"` - Short string `json:"short_message"` - Full string `json:"full_message,omitempty"` - TimeUnix float64 `json:"timestamp"` - Level int32 `json:"level,omitempty"` - Facility string `json:"facility,omitempty"` - Extra map[string]interface{} `json:"-"` - RawExtra json.RawMessage `json:"-"` -} - -// Used to control GELF chunking. Should be less than (MTU - len(UDP -// header)). -// -// TODO: generate dynamically using Path MTU Discovery? -const ( - ChunkSize = 1420 - chunkedHeaderLen = 12 - chunkedDataLen = ChunkSize - chunkedHeaderLen -) - -var ( - magicChunked = []byte{0x1e, 0x0f} - magicZlib = []byte{0x78} - magicGzip = []byte{0x1f, 0x8b} -) - -// Syslog severity levels -const ( - LOG_EMERG = int32(0) - LOG_ALERT = int32(1) - LOG_CRIT = int32(2) - LOG_ERR = int32(3) - LOG_WARNING = int32(4) - LOG_NOTICE = int32(5) - LOG_INFO = int32(6) - LOG_DEBUG = int32(7) -) - -// numChunks returns the number of GELF chunks necessary to transmit -// the given compressed buffer. -func numChunks(b []byte) int { - lenB := len(b) - if lenB <= ChunkSize { - return 1 - } - return len(b)/chunkedDataLen + 1 -} - -// New returns a new GELF Writer. This writer can be used to send the -// output of the standard Go log functions to a central GELF server by -// passing it to log.SetOutput() -func NewWriter(addr string) (*Writer, error) { - var err error - w := new(Writer) - w.CompressionLevel = flate.BestSpeed - - if w.conn, err = net.Dial("udp", addr); err != nil { - return nil, err - } - if w.hostname, err = os.Hostname(); err != nil { - return nil, err - } - - w.Facility = path.Base(os.Args[0]) - - return w, nil -} - -// writes the gzip compressed byte array to the connection as a series -// of GELF chunked messages. The format is documented at -// http://docs.graylog.org/en/2.1/pages/gelf.html as: -// -// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte -// total, chunk-data -func (w *Writer) writeChunked(zBytes []byte) (err error) { - b := make([]byte, 0, ChunkSize) - buf := bytes.NewBuffer(b) - nChunksI := numChunks(zBytes) - if nChunksI > 128 { - return fmt.Errorf("msg too large, would need %d chunks", nChunksI) - } - nChunks := uint8(nChunksI) - // use urandom to get a unique message id - msgId := make([]byte, 8) - n, err := io.ReadFull(rand.Reader, msgId) - if err != nil || n != 8 { - return fmt.Errorf("rand.Reader: %d/%s", n, err) - } - - bytesLeft := len(zBytes) - for i := uint8(0); i < nChunks; i++ { - buf.Reset() - // manually write header. Don't care about - // host/network byte order, because the spec only - // deals in individual bytes. - buf.Write(magicChunked) //magic - buf.Write(msgId) - buf.WriteByte(i) - buf.WriteByte(nChunks) - // slice out our chunk from zBytes - chunkLen := chunkedDataLen - if chunkLen > bytesLeft { - chunkLen = bytesLeft - } - off := int(i) * chunkedDataLen - chunk := zBytes[off : off+chunkLen] - buf.Write(chunk) - - // write this chunk, and make sure the write was good - n, err := w.conn.Write(buf.Bytes()) - if err != nil { - return fmt.Errorf("Write (chunk %d/%d): %s", i, - nChunks, err) - } - if n != len(buf.Bytes()) { - return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)", - i, nChunks, n, len(buf.Bytes())) - } - - bytesLeft -= chunkLen - } - - if bytesLeft != 0 { - return fmt.Errorf("error: %d bytes left after sending", bytesLeft) - } - return nil -} - -// 1k bytes buffer by default -var bufPool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, 1024)) - }, -} - -func newBuffer() *bytes.Buffer { - b := bufPool.Get().(*bytes.Buffer) - if b != nil { - b.Reset() - return b - } - return bytes.NewBuffer(nil) -} - -// WriteMessage sends the specified message to the GELF server -// specified in the call to New(). It assumes all the fields are -// filled out appropriately. In general, clients will want to use -// Write, rather than WriteMessage. -func (w *Writer) WriteMessage(m *Message) (err error) { - mBuf := newBuffer() - defer bufPool.Put(mBuf) - if err = m.MarshalJSONBuf(mBuf); err != nil { - return err - } - mBytes := mBuf.Bytes() - - var ( - zBuf *bytes.Buffer - zBytes []byte - ) - - var zw io.WriteCloser - switch w.CompressionType { - case CompressGzip: - zBuf = newBuffer() - defer bufPool.Put(zBuf) - zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) - case CompressZlib: - zBuf = newBuffer() - defer bufPool.Put(zBuf) - zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) - case CompressNone: - zBytes = mBytes - default: - panic(fmt.Sprintf("unknown compression type %d", - w.CompressionType)) - } - if zw != nil { - if err != nil { - return - } - if _, err = zw.Write(mBytes); err != nil { - zw.Close() - return - } - zw.Close() - zBytes = zBuf.Bytes() - } - - if numChunks(zBytes) > 1 { - return w.writeChunked(zBytes) - } - n, err := w.conn.Write(zBytes) - if err != nil { - return - } - if n != len(zBytes) { - return fmt.Errorf("bad write (%d/%d)", n, len(zBytes)) - } - - return nil +type GelfWriter struct { + addr string + conn net.Conn + hostname string + Facility string // defaults to current process name + proto string } // Close connection and interrupt blocked Read or Write operations -func (w *Writer) Close() error { +func (w *GelfWriter) Close() error { return w.conn.Close() } - -/* -func (w *Writer) Alert(m string) (err error) -func (w *Writer) Close() error -func (w *Writer) Crit(m string) (err error) -func (w *Writer) Debug(m string) (err error) -func (w *Writer) Emerg(m string) (err error) -func (w *Writer) Err(m string) (err error) -func (w *Writer) Info(m string) (err error) -func (w *Writer) Notice(m string) (err error) -func (w *Writer) Warning(m string) (err error) -*/ - -// getCaller returns the filename and the line info of a function -// further down in the call stack. Passing 0 in as callDepth would -// return info on the function calling getCallerIgnoringLog, 1 the -// parent function, and so on. Any suffixes passed to getCaller are -// path fragments like "/pkg/log/log.go", and functions in the call -// stack from that file are ignored. -func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) { - // bump by 1 to ignore the getCaller (this) stackframe - callDepth++ -outer: - for { - var ok bool - _, file, line, ok = runtime.Caller(callDepth) - if !ok { - file = "???" - line = 0 - break - } - - for _, s := range suffixesToIgnore { - if strings.HasSuffix(file, s) { - callDepth++ - continue outer - } - } - break - } - return -} - -func getCallerIgnoringLogMulti(callDepth int) (string, int) { - // the +1 is to ignore this (getCallerIgnoringLogMulti) frame - return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go") -} - -// Write encodes the given string in a GELF message and sends it to -// the server specified in New(). -func (w *Writer) Write(p []byte) (n int, err error) { - - // 1 for the function that called us. - file, line := getCallerIgnoringLogMulti(1) - - // remove trailing and leading whitespace - p = bytes.TrimSpace(p) - - // If there are newlines in the message, use the first line - // for the short message and set the full message to the - // original input. If the input has no newlines, stick the - // whole thing in Short. - short := p - full := []byte("") - if i := bytes.IndexRune(p, '\n'); i > 0 { - short = p[:i] - full = p - } - - m := Message{ - Version: "1.1", - Host: w.hostname, - Short: string(short), - Full: string(full), - TimeUnix: float64(time.Now().Unix()), - Level: 6, // info - Facility: w.Facility, - Extra: map[string]interface{}{ - "_file": file, - "_line": line, - }, - } - - if err = w.WriteMessage(&m); err != nil { - return 0, err - } - - return len(p), nil -} - -func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error { - b, err := json.Marshal(m) - if err != nil { - return err - } - // write up until the final } - if _, err = buf.Write(b[:len(b)-1]); err != nil { - return err - } - if len(m.Extra) > 0 { - eb, err := json.Marshal(m.Extra) - if err != nil { - return err - } - // merge serialized message + serialized extra map - if err = buf.WriteByte(','); err != nil { - return err - } - // write serialized extra bytes, without enclosing quotes - if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil { - return err - } - } - - if len(m.RawExtra) > 0 { - if err := buf.WriteByte(','); err != nil { - return err - } - - // write serialized extra bytes, without enclosing quotes - if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil { - return err - } - } - - // write final closing quotes - return buf.WriteByte('}') -} - -func (m *Message) UnmarshalJSON(data []byte) error { - i := make(map[string]interface{}, 16) - if err := json.Unmarshal(data, &i); err != nil { - return err - } - for k, v := range i { - if k[0] == '_' { - if m.Extra == nil { - m.Extra = make(map[string]interface{}, 1) - } - m.Extra[k] = v - continue - } - switch k { - case "version": - m.Version = v.(string) - case "host": - m.Host = v.(string) - case "short_message": - m.Short = v.(string) - case "full_message": - m.Full = v.(string) - case "timestamp": - m.TimeUnix = v.(float64) - case "level": - m.Level = int32(v.(float64)) - case "facility": - m.Facility = v.(string) - } - } - return nil -} From 72f5e5e84f402f175ec08964053e1af688dad265 Mon Sep 17 00:00:00 2001 From: Ghislain Bourgeois Date: Tue, 5 Sep 2017 17:17:04 -0400 Subject: [PATCH 2/4] Remove empty gelf_unsupported.go Signed-off-by: Ghislain Bourgeois --- daemon/logger/gelf/gelf_unsupported.go | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 daemon/logger/gelf/gelf_unsupported.go diff --git a/daemon/logger/gelf/gelf_unsupported.go b/daemon/logger/gelf/gelf_unsupported.go deleted file mode 100644 index 266f73b18b..0000000000 --- a/daemon/logger/gelf/gelf_unsupported.go +++ /dev/null @@ -1,3 +0,0 @@ -// +build !linux - -package gelf From e21f7b6e76d578ba2ab9a6e85f4ece49290ee084 Mon Sep 17 00:00:00 2001 From: Ghislain Bourgeois Date: Tue, 5 Sep 2017 17:17:59 -0400 Subject: [PATCH 3/4] Add support for TCP parameters Signed-off-by: Ghislain Bourgeois --- daemon/logger/gelf/gelf.go | 46 +++++++++++++++----- daemon/logger/gelf/gelf_test.go | 77 ++++++++++++++++++++++++++++++--- 2 files changed, 106 insertions(+), 17 deletions(-) diff --git a/daemon/logger/gelf/gelf.go b/daemon/logger/gelf/gelf.go index 5d36252130..a03e6199e0 100644 --- a/daemon/logger/gelf/gelf.go +++ b/daemon/logger/gelf/gelf.go @@ -47,6 +47,10 @@ func New(info logger.Info) (logger.Logger, error) { return nil, err } + if address.Scheme == "tcp" { + return nil, fmt.Errorf("gelf: TCP not yet implemented") + } + // collect extra data for GELF message hostname, err := info.Hostname() if err != nil { @@ -90,9 +94,9 @@ func New(info logger.Info) (logger.Logger, error) { } // create new gelfWriter - gelfWriter, err := gelf.NewUDPWriter(address) + gelfWriter, err := gelf.NewUDPWriter(address.String()) if err != nil { - return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err) + return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address.String(), err) } if v, ok := info.Config["gelf-compression-type"]; ok { @@ -156,6 +160,11 @@ func (s *gelfLogger) Name() string { // ValidateLogOpt looks for gelf specific log option gelf-address. func ValidateLogOpt(cfg map[string]string) error { + address, err := parseAddress(cfg["gelf-address"]) + if err != nil { + return err + } + for key, val := range cfg { switch key { case "gelf-address": @@ -164,46 +173,59 @@ func ValidateLogOpt(cfg map[string]string) error { case "env": case "env-regex": case "gelf-compression-level": + if address.Scheme != "udp" { + return fmt.Errorf("compression is only supported on UDP") + } i, err := strconv.Atoi(val) if err != nil || i < flate.DefaultCompression || i > flate.BestCompression { return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key) } case "gelf-compression-type": + if address.Scheme != "udp" { + return fmt.Errorf("compression is only supported on UDP") + } switch val { case "gzip", "zlib", "none": default: return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key) } + case "gelf-tcp-max-reconnect", "gelf-tcp-reconnect-delay": + if address.Scheme != "tcp" { + return fmt.Errorf("%q is only valid for TCP", key) + } + i, err := strconv.Atoi(val) + if err != nil || i < 0 { + return fmt.Errorf("%q must be a positive integer", key) + } default: return fmt.Errorf("unknown log opt %q for gelf log driver", key) } } - _, err := parseAddress(cfg["gelf-address"]) - return err + return nil } -func parseAddress(address string) (string, error) { +func parseAddress(address string) (*url.URL, error) { if address == "" { - return "", nil + return nil, fmt.Errorf("gelf-address is a required parameter") } if !urlutil.IsTransportURL(address) { - return "", fmt.Errorf("gelf-address should be in form proto://address, got %v", address) + return nil, fmt.Errorf("gelf-address should be in form proto://address, got %v", address) } url, err := url.Parse(address) if err != nil { - return "", err + return nil, err } // we support only udp - if url.Scheme != "udp" { - return "", fmt.Errorf("gelf: endpoint needs to be UDP") + if url.Scheme != "udp" && url.Scheme != "tcp" { + return nil, fmt.Errorf("gelf: endpoint needs to be TCP or UDP") } // get host and port if _, _, err = net.SplitHostPort(url.Host); err != nil { - return "", fmt.Errorf("gelf: please provide gelf-address as udp://host:port") + return nil, fmt.Errorf("gelf: please provide gelf-address as proto://host:port") } - return url.Host, nil + return url, nil } diff --git a/daemon/logger/gelf/gelf_test.go b/daemon/logger/gelf/gelf_test.go index dd83427107..d47bea531f 100644 --- a/daemon/logger/gelf/gelf_test.go +++ b/daemon/logger/gelf/gelf_test.go @@ -8,12 +8,12 @@ import ( //Validate parseAddress func TestParseAddress(t *testing.T) { - host, err := parseAddress("udp://127.0.0.1:12201") + url, err := parseAddress("udp://127.0.0.1:12201") if err != nil { t.Fatal(err) } - if host != "127.0.0.1:12201" { - t.Fatalf("Expected host 127.0.0.1, got %s", host) + if url.String() != "udp://127.0.0.1:12201" { + t.Fatalf("Expected address udp://127.0.0.1:12201, got %s", url.String()) } _, err = parseAddress("127.0.0.1:12201") @@ -27,8 +27,8 @@ func TestParseAddress(t *testing.T) { } } -//Validate options -func TestValidateLogOpt(t *testing.T) { +//Validate UDP options +func TestUDPValidateLogOpt(t *testing.T) { err := ValidateLogOpt(map[string]string{ "gelf-address": "udp://127.0.0.1:12201", "tag": "testtag", @@ -65,4 +65,71 @@ func TestValidateLogOpt(t *testing.T) { if err == nil { t.Fatal("Expected unknown option error") } + + err = ValidateLogOpt(map[string]string{}) + if err == nil { + t.Fatal("Expected required parameter error") + } +} + +//Validate TCP options +func TestTCPValidateLogOpt(t *testing.T) { + err := ValidateLogOpt(map[string]string{ + "gelf-address": "tcp://127.0.0.1:12201", + }) + if err != nil { + t.Fatal("Expected TCP to be supported") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "tcp://127.0.0.1:12201", + "gelf-compression-level": "9", + }) + if err == nil { + t.Fatal("Expected TCP to reject compression level") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "tcp://127.0.0.1:12201", + "gelf-compression-type": "gzip", + }) + if err == nil { + t.Fatal("Expected TCP to reject compression type") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "tcp://127.0.0.1:12201", + "gelf-tcp-max-reconnect": "5", + "gelf-tcp-reconnect-delay": "10", + }) + if err != nil { + t.Fatal("Expected TCP reconnect to be a valid parameters") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "tcp://127.0.0.1:12201", + "gelf-tcp-max-reconnect": "-1", + "gelf-tcp-reconnect-delay": "-3", + }) + if err == nil { + t.Fatal("Expected negative TCP reconnect to be rejected") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "tcp://127.0.0.1:12201", + "gelf-tcp-max-reconnect": "invalid", + "gelf-tcp-reconnect-delay": "invalid", + }) + if err == nil { + t.Fatal("Expected TCP reconnect to be required to be an int") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "udp://127.0.0.1:12201", + "gelf-tcp-max-reconnect": "1", + "gelf-tcp-reconnect-delay": "3", + }) + if err == nil { + t.Fatal("Expected TCP reconnect to be invalid for UDP") + } } From e17f3511144b37855d56a009d7e7622029242d2d Mon Sep 17 00:00:00 2001 From: Ghislain Bourgeois Date: Wed, 6 Sep 2017 17:45:26 -0400 Subject: [PATCH 4/4] Add TCP support for GELF log driver Signed-off-by: Ghislain Bourgeois --- daemon/logger/gelf/gelf.go | 65 ++++++++-- daemon/logger/gelf/gelf_test.go | 219 +++++++++++++++++++++++++------- 2 files changed, 224 insertions(+), 60 deletions(-) diff --git a/daemon/logger/gelf/gelf.go b/daemon/logger/gelf/gelf.go index a03e6199e0..d902c0c8e1 100644 --- a/daemon/logger/gelf/gelf.go +++ b/daemon/logger/gelf/gelf.go @@ -47,10 +47,6 @@ func New(info logger.Info) (logger.Logger, error) { return nil, err } - if address.Scheme == "tcp" { - return nil, fmt.Errorf("gelf: TCP not yet implemented") - } - // collect extra data for GELF message hostname, err := info.Hostname() if err != nil { @@ -93,10 +89,58 @@ func New(info logger.Info) (logger.Logger, error) { return nil, err } - // create new gelfWriter - gelfWriter, err := gelf.NewUDPWriter(address.String()) + var gelfWriter gelf.Writer + if address.Scheme == "udp" { + gelfWriter, err = newGELFUDPWriter(address.Host, info) + if err != nil { + return nil, err + } + } else if address.Scheme == "tcp" { + gelfWriter, err = newGELFTCPWriter(address.Host, info) + if err != nil { + return nil, err + } + } + + return &gelfLogger{ + writer: gelfWriter, + info: info, + hostname: hostname, + rawExtra: rawExtra, + }, nil +} + +// create new TCP gelfWriter +func newGELFTCPWriter(address string, info logger.Info) (gelf.Writer, error) { + gelfWriter, err := gelf.NewTCPWriter(address) if err != nil { - return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address.String(), err) + return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err) + } + + if v, ok := info.Config["gelf-tcp-max-reconnect"]; ok { + i, err := strconv.Atoi(v) + if err != nil || i < 0 { + return nil, fmt.Errorf("gelf-tcp-max-reconnect must be a positive integer") + } + gelfWriter.MaxReconnect = i + } + + if v, ok := info.Config["gelf-tcp-reconnect-delay"]; ok { + i, err := strconv.Atoi(v) + if err != nil || i < 0 { + return nil, fmt.Errorf("gelf-tcp-reconnect-delay must be a positive integer") + } + gelfWriter.ReconnectDelay = time.Duration(i) + } + + return gelfWriter, nil +} + +// create new UDP gelfWriter +func newGELFUDPWriter(address string, info logger.Info) (gelf.Writer, error) { + gelfWriter, err := gelf.NewUDPWriter(address) + if err != nil { + return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err) } if v, ok := info.Config["gelf-compression-type"]; ok { @@ -120,12 +164,7 @@ func New(info logger.Info) (logger.Logger, error) { gelfWriter.CompressionLevel = val } - return &gelfLogger{ - writer: gelfWriter, - info: info, - hostname: hostname, - rawExtra: rawExtra, - }, nil + return gelfWriter, nil } func (s *gelfLogger) Log(msg *logger.Message) error { diff --git a/daemon/logger/gelf/gelf_test.go b/daemon/logger/gelf/gelf_test.go index d47bea531f..087c531231 100644 --- a/daemon/logger/gelf/gelf_test.go +++ b/daemon/logger/gelf/gelf_test.go @@ -3,10 +3,13 @@ package gelf import ( + "net" "testing" + + "github.com/docker/docker/daemon/logger" ) -//Validate parseAddress +// Validate parseAddress func TestParseAddress(t *testing.T) { url, err := parseAddress("udp://127.0.0.1:12201") if err != nil { @@ -27,52 +30,7 @@ func TestParseAddress(t *testing.T) { } } -//Validate UDP options -func TestUDPValidateLogOpt(t *testing.T) { - err := ValidateLogOpt(map[string]string{ - "gelf-address": "udp://127.0.0.1:12201", - "tag": "testtag", - "labels": "testlabel", - "env": "testenv", - "env-regex": "testenv-regex", - "gelf-compression-level": "9", - "gelf-compression-type": "gzip", - }) - if err != nil { - t.Fatal(err) - } - - err = ValidateLogOpt(map[string]string{ - "gelf-address": "udp://127.0.0.1:12201", - "gelf-compression-level": "ultra", - "gelf-compression-type": "zlib", - }) - if err == nil { - t.Fatal("Expected compression level error") - } - - err = ValidateLogOpt(map[string]string{ - "gelf-address": "udp://127.0.0.1:12201", - "gelf-compression-type": "rar", - }) - if err == nil { - t.Fatal("Expected compression type error") - } - - err = ValidateLogOpt(map[string]string{ - "invalid": "invalid", - }) - if err == nil { - t.Fatal("Expected unknown option error") - } - - err = ValidateLogOpt(map[string]string{}) - if err == nil { - t.Fatal("Expected required parameter error") - } -} - -//Validate TCP options +// Validate TCP options func TestTCPValidateLogOpt(t *testing.T) { err := ValidateLogOpt(map[string]string{ "gelf-address": "tcp://127.0.0.1:12201", @@ -133,3 +91,170 @@ func TestTCPValidateLogOpt(t *testing.T) { t.Fatal("Expected TCP reconnect to be invalid for UDP") } } + +// Validate UDP options +func TestUDPValidateLogOpt(t *testing.T) { + err := ValidateLogOpt(map[string]string{ + "gelf-address": "udp://127.0.0.1:12201", + "tag": "testtag", + "labels": "testlabel", + "env": "testenv", + "env-regex": "testenv-regex", + "gelf-compression-level": "9", + "gelf-compression-type": "gzip", + }) + if err != nil { + t.Fatal(err) + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "udp://127.0.0.1:12201", + "gelf-compression-level": "ultra", + "gelf-compression-type": "zlib", + }) + if err == nil { + t.Fatal("Expected compression level error") + } + + err = ValidateLogOpt(map[string]string{ + "gelf-address": "udp://127.0.0.1:12201", + "gelf-compression-type": "rar", + }) + if err == nil { + t.Fatal("Expected compression type error") + } + + err = ValidateLogOpt(map[string]string{ + "invalid": "invalid", + }) + if err == nil { + t.Fatal("Expected unknown option error") + } + + err = ValidateLogOpt(map[string]string{}) + if err == nil { + t.Fatal("Expected required parameter error") + } +} + +// Validate newGELFTCPWriter +func TestNewGELFTCPWriter(t *testing.T) { + address := "127.0.0.1:0" + tcpAddr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + t.Fatal(err) + } + + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + t.Fatal(err) + } + + url := "tcp://" + listener.Addr().String() + info := logger.Info{ + Config: map[string]string{ + "gelf-address": url, + "gelf-tcp-max-reconnect": "0", + "gelf-tcp-reconnect-delay": "0", + "tag": "{{.ID}}", + }, + ContainerID: "12345678901234567890", + } + + writer, err := newGELFTCPWriter(listener.Addr().String(), info) + if err != nil { + t.Fatal(err) + } + + err = writer.Close() + if err != nil { + t.Fatal(err) + } + + err = listener.Close() + if err != nil { + t.Fatal(err) + } +} + +// Validate newGELFUDPWriter +func TestNewGELFUDPWriter(t *testing.T) { + address := "127.0.0.1:0" + info := logger.Info{ + Config: map[string]string{ + "gelf-address": "udp://127.0.0.1:0", + "gelf-compression-level": "5", + "gelf-compression-type": "gzip", + }, + } + + writer, err := newGELFUDPWriter(address, info) + if err != nil { + t.Fatal(err) + } + writer.Close() + if err != nil { + t.Fatal(err) + } +} + +// Validate New for TCP +func TestNewTCP(t *testing.T) { + address := "127.0.0.1:0" + tcpAddr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + t.Fatal(err) + } + + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + t.Fatal(err) + } + + url := "tcp://" + listener.Addr().String() + info := logger.Info{ + Config: map[string]string{ + "gelf-address": url, + "gelf-tcp-max-reconnect": "0", + "gelf-tcp-reconnect-delay": "0", + }, + ContainerID: "12345678901234567890", + } + + logger, err := New(info) + if err != nil { + t.Fatal(err) + } + + err = logger.Close() + if err != nil { + t.Fatal(err) + } + + err = listener.Close() + if err != nil { + t.Fatal(err) + } +} + +// Validate New for UDP +func TestNewUDP(t *testing.T) { + info := logger.Info{ + Config: map[string]string{ + "gelf-address": "udp://127.0.0.1:0", + "gelf-compression-level": "5", + "gelf-compression-type": "gzip", + }, + ContainerID: "12345678901234567890", + } + + logger, err := New(info) + if err != nil { + t.Fatal(err) + } + + err = logger.Close() + if err != nil { + t.Fatal(err) + } +}