diff --git a/daemon/logger/gelf/gelf.go b/daemon/logger/gelf/gelf.go index b42eb61b2c..3fe62c41b3 100644 --- a/daemon/logger/gelf/gelf.go +++ b/daemon/logger/gelf/gelf.go @@ -6,9 +6,12 @@ package gelf import ( "bytes" + "compress/flate" + "encoding/json" "fmt" "net" "net/url" + "strconv" "time" "github.com/Graylog2/go-gelf/gelf" @@ -24,7 +27,7 @@ type gelfLogger struct { writer *gelf.Writer ctx logger.Context hostname string - extra map[string]interface{} + rawExtra json.RawMessage } func init() { @@ -81,17 +84,43 @@ func New(ctx logger.Context) (logger.Logger, error) { extra[k] = v } + rawExtra, err := json.Marshal(extra) + if err != nil { + return nil, err + } + // create new gelfWriter gelfWriter, err := gelf.NewWriter(address) if err != nil { return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err) } + if v, ok := ctx.Config["gelf-compression-type"]; ok { + switch v { + case "gzip": + gelfWriter.CompressionType = gelf.CompressGzip + case "zlib": + gelfWriter.CompressionType = gelf.CompressZlib + case "none": + gelfWriter.CompressionType = gelf.CompressNone + default: + return nil, fmt.Errorf("gelf: invalid compression type %q", v) + } + } + + if v, ok := ctx.Config["gelf-compression-level"]; ok { + val, err := strconv.Atoi(v) + if err != nil { + return nil, fmt.Errorf("gelf: invalid compression level %s, err %v", v, err) + } + gelfWriter.CompressionLevel = val + } + return &gelfLogger{ writer: gelfWriter, ctx: ctx, hostname: hostname, - extra: extra, + rawExtra: rawExtra, }, nil } @@ -107,7 +136,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error { Short: string(msg.Line), TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0, Level: level, - Extra: s.extra, + RawExtra: s.rawExtra, } if err := s.writer.WriteMessage(&m); err != nil { @@ -127,15 +156,26 @@ func (s *gelfLogger) Name() string { // ValidateLogOpt looks for gelf specific log options gelf-address, & // gelf-tag. func ValidateLogOpt(cfg map[string]string) error { - for key := range cfg { + for key, val := range cfg { switch key { case "gelf-address": case "gelf-tag": case "tag": case "labels": case "env": + case "gelf-compression-level": + i, err := strconv.Atoi(val) + if err != nil || i < flate.DefaultCompression || i > flate.BestCompression { + return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key) + } + case "gelf-compression-type": + switch val { + case "gzip", "zlib", "none": + default: + return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key) + } default: - return fmt.Errorf("unknown log opt '%s' for gelf log driver", key) + return fmt.Errorf("unknown log opt %q for gelf log driver", key) } } diff --git a/docs/admin/logging/overview.md b/docs/admin/logging/overview.md index 8338ab3286..025a3da3f6 100644 --- a/docs/admin/logging/overview.md +++ b/docs/admin/logging/overview.md @@ -152,6 +152,8 @@ The GELF logging driver supports the following options: --log-opt tag="database" --log-opt labels=label1,label2 --log-opt env=env1,env2 + --log-opt gelf-compression-type=gzip + --log-opt gelf-compression-level=1 The `gelf-address` option specifies the remote GELF server address that the driver connects to. Currently, only `udp` is supported as the transport and you must @@ -173,6 +175,14 @@ underscore (`_`). "_fizz": "buzz", // […] +The `gelf-compression-type` option can be used to change how the GELF driver +compresses each log message. The accepted values are `gzip`, `zlib` and `none`. +`gzip` is chosen by default. + +The `gelf-compression-level` option can be used to change the level of compresssion +when `gzip` or `zlib` is selected as `gelf-compression-type`. Accepted value +must be from from -1 to 9 (BestCompression). Higher levels typically +run slower but compress more. Default value is 1 (BestSpeed). ## fluentd options diff --git a/hack/vendor.sh b/hack/vendor.sh index 5303760bb8..cebbda20d7 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -68,7 +68,7 @@ clone git github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd85 clone git github.com/golang/protobuf f7137ae6b19afbfd61a94b746fda3b3fe0491874 # gelf logging driver deps -clone git github.com/Graylog2/go-gelf 6c62a85f1d47a67f2a5144c0e745b325889a8120 +clone git github.com/Graylog2/go-gelf aab2f594e4585d43468ac57287b0dece9d806883 clone git github.com/fluent/fluent-logger-golang v1.0.0 # fluent-logger-golang deps diff --git a/vendor/src/github.com/Graylog2/go-gelf/gelf/reader.go b/vendor/src/github.com/Graylog2/go-gelf/gelf/reader.go index 305b865db7..ff719fc714 100644 --- a/vendor/src/github.com/Graylog2/go-gelf/gelf/reader.go +++ b/vendor/src/github.com/Graylog2/go-gelf/gelf/reader.go @@ -66,7 +66,6 @@ func (r *Reader) ReadMessage() (*Message, error) { var ( err error n, length int - buf bytes.Buffer cid, ocid []byte seq, total uint8 cHead []byte @@ -122,19 +121,18 @@ func (r *Reader) ReadMessage() (*Message, error) { // zlib is slightly more complicated, but correct cReader, err = zlib.NewReader(bytes.NewReader(cBuf)) } else { - return nil, fmt.Errorf("unknown magic: %x %v", cHead, cHead) + // compliance with https://github.com/Graylog2/graylog2-server + // treating all messages as uncompressed if they are not gzip, zlib or + // chunked + cReader = bytes.NewReader(cBuf) } if err != nil { return nil, fmt.Errorf("NewReader: %s", err) } - if _, err = io.Copy(&buf, cReader); err != nil { - return nil, fmt.Errorf("io.Copy: %s", err) - } - msg := new(Message) - if err := json.Unmarshal(buf.Bytes(), &msg); err != nil { + if err := json.NewDecoder(cReader).Decode(&msg); err != nil { return nil, fmt.Errorf("json.Unmarshal: %s", err) } diff --git a/vendor/src/github.com/Graylog2/go-gelf/gelf/writer.go b/vendor/src/github.com/Graylog2/go-gelf/gelf/writer.go index 6a5a0238e1..90cdb99216 100644 --- a/vendor/src/github.com/Graylog2/go-gelf/gelf/writer.go +++ b/vendor/src/github.com/Graylog2/go-gelf/gelf/writer.go @@ -41,6 +41,7 @@ type CompressType int const ( CompressGzip CompressType = iota CompressZlib + CompressNone ) // Message represents the contents of the GELF message. It is gzipped @@ -49,15 +50,14 @@ type Message struct { Version string `json:"version"` Host string `json:"host"` Short string `json:"short_message"` - Full string `json:"full_message"` + Full string `json:"full_message,omitempty"` TimeUnix float64 `json:"timestamp"` - Level int32 `json:"level"` - Facility string `json:"facility"` + Level int32 `json:"level,omitempty"` + Facility string `json:"facility,omitempty"` Extra map[string]interface{} `json:"-"` + RawExtra json.RawMessage `json:"-"` } -type innerMessage Message //against circular (Un)MarshalJSON - // Used to control GELF chunking. Should be less than (MTU - len(UDP // header)). // @@ -76,14 +76,14 @@ var ( // Syslog severity levels const ( - LOG_EMERG = int32(0) - LOG_ALERT = int32(1) - LOG_CRIT = int32(2) - LOG_ERR = int32(3) - LOG_WARNING = int32(4) - LOG_NOTICE = int32(5) - LOG_INFO = int32(6) - LOG_DEBUG = int32(7) + LOG_EMERG = int32(0) + LOG_ALERT = int32(1) + LOG_CRIT = int32(2) + LOG_ERR = int32(3) + LOG_WARNING = int32(4) + LOG_NOTICE = int32(5) + LOG_INFO = int32(6) + LOG_DEBUG = int32(7) ) // numChunks returns the number of GELF chunks necessary to transmit @@ -176,40 +176,70 @@ func (w *Writer) writeChunked(zBytes []byte) (err error) { return nil } +// 1k bytes buffer by default +var bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 1024)) + }, +} + +func newBuffer() *bytes.Buffer { + b := bufPool.Get().(*bytes.Buffer) + if b != nil { + b.Reset() + return b + } + return bytes.NewBuffer(nil) +} + // WriteMessage sends the specified message to the GELF server // specified in the call to New(). It assumes all the fields are // filled out appropriately. In general, clients will want to use // Write, rather than WriteMessage. func (w *Writer) WriteMessage(m *Message) (err error) { - mBytes, err := json.Marshal(m) - if err != nil { - return + mBuf := newBuffer() + defer bufPool.Put(mBuf) + if err = m.MarshalJSONBuf(mBuf); err != nil { + return err } + mBytes := mBuf.Bytes() + + var ( + zBuf *bytes.Buffer + zBytes []byte + ) - var zBuf bytes.Buffer var zw io.WriteCloser switch w.CompressionType { case CompressGzip: - zw, err = gzip.NewWriterLevel(&zBuf, w.CompressionLevel) + zBuf = newBuffer() + defer bufPool.Put(zBuf) + zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) case CompressZlib: - zw, err = zlib.NewWriterLevel(&zBuf, w.CompressionLevel) + zBuf = newBuffer() + defer bufPool.Put(zBuf) + zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) + case CompressNone: + zBytes = mBytes default: panic(fmt.Sprintf("unknown compression type %d", w.CompressionType)) } - if err != nil { - return + if zw != nil { + if err != nil { + return + } + if _, err = zw.Write(mBytes); err != nil { + zw.Close() + return + } + zw.Close() + zBytes = zBuf.Bytes() } - if _, err = zw.Write(mBytes); err != nil { - return - } - zw.Close() - zBytes := zBuf.Bytes() if numChunks(zBytes) > 1 { return w.writeChunked(zBytes) } - n, err := w.conn.Write(zBytes) if err != nil { return @@ -222,8 +252,8 @@ func (w *Writer) WriteMessage(m *Message) (err error) { } // Close connection and interrupt blocked Read or Write operations -func (w *Writer) Close() (error) { - return w.conn.Close() +func (w *Writer) Close() error { + return w.conn.Close() } /* @@ -315,28 +345,43 @@ func (w *Writer) Write(p []byte) (n int, err error) { return len(p), nil } -func (m *Message) MarshalJSON() ([]byte, error) { - var err error - var b, eb []byte - - extra := m.Extra - b, err = json.Marshal((*innerMessage)(m)) - m.Extra = extra +func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error { + b, err := json.Marshal(m) if err != nil { - return nil, err + return err + } + // write up until the final } + if _, err = buf.Write(b[:len(b)-1]); err != nil { + return err + } + if len(m.Extra) > 0 { + eb, err := json.Marshal(m.Extra) + if err != nil { + return err + } + // merge serialized message + serialized extra map + if err = buf.WriteByte(','); err != nil { + return err + } + // write serialized extra bytes, without enclosing quotes + if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil { + return err + } } - if len(extra) == 0 { - return b, nil + if len(m.RawExtra) > 0 { + if err := buf.WriteByte(','); err != nil { + return err + } + + // write serialized extra bytes, without enclosing quotes + if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil { + return err + } } - if eb, err = json.Marshal(extra); err != nil { - return nil, err - } - - // merge serialized message + serialized extra map - b[len(b)-1] = ',' - return append(b, eb[1:len(eb)]...), nil + // write final closing quotes + return buf.WriteByte('}') } func (m *Message) UnmarshalJSON(data []byte) error {