Update to latest go-gelf version and add tests

Signed-off-by: Ghislain Bourgeois <ghislain.bourgeois@gmail.com>
This commit is contained in:
Ghislain Bourgeois 2017-09-05 13:42:18 -04:00
parent 945d80cd6a
commit b24c8e07f1
10 changed files with 773 additions and 442 deletions

View File

@ -23,7 +23,7 @@ import (
const name = "gelf"
type gelfLogger struct {
writer *gelf.Writer
writer gelf.Writer
info logger.Info
hostname string
rawExtra json.RawMessage
@ -90,7 +90,7 @@ func New(info logger.Info) (logger.Logger, error) {
}
// create new gelfWriter
gelfWriter, err := gelf.NewWriter(address)
gelfWriter, err := gelf.NewUDPWriter(address)
if err != nil {
return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
}
@ -135,7 +135,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error {
Host: s.hostname,
Short: string(msg.Line),
TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
Level: level,
Level: int32(level),
RawExtra: s.rawExtra,
}
logger.PutMessage(msg)

View File

@ -0,0 +1,68 @@
// +build linux
package gelf
import (
"testing"
)
//Validate parseAddress
func TestParseAddress(t *testing.T) {
host, err := parseAddress("udp://127.0.0.1:12201")
if err != nil {
t.Fatal(err)
}
if host != "127.0.0.1:12201" {
t.Fatalf("Expected host 127.0.0.1, got %s", host)
}
_, err = parseAddress("127.0.0.1:12201")
if err == nil {
t.Fatal("Expected error requiring protocol")
}
_, err = parseAddress("http://127.0.0.1:12201")
if err == nil {
t.Fatal("Expected error restricting protocol")
}
}
//Validate options
func TestValidateLogOpt(t *testing.T) {
err := ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"tag": "testtag",
"labels": "testlabel",
"env": "testenv",
"env-regex": "testenv-regex",
"gelf-compression-level": "9",
"gelf-compression-type": "gzip",
})
if err != nil {
t.Fatal(err)
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-level": "ultra",
"gelf-compression-type": "zlib",
})
if err == nil {
t.Fatal("Expected compression level error")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-type": "rar",
})
if err == nil {
t.Fatal("Expected compression type error")
}
err = ValidateLogOpt(map[string]string{
"invalid": "invalid",
})
if err == nil {
t.Fatal("Expected unknown option error")
}
}

View File

@ -76,7 +76,7 @@ github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd852
github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4
# gelf logging driver deps
github.com/Graylog2/go-gelf 7029da823dad4ef3a876df61065156acb703b2ea
github.com/Graylog2/go-gelf v2
github.com/fluent/fluent-logger-golang v1.2.1
# fluent-logger-golang deps

View File

@ -1,17 +1,53 @@
go-gelf - GELF library and writer for Go
go-gelf - GELF Library and Writer for Go
========================================
GELF is graylog2's UDP logging format. This library provides an API
that applications can use to log messages directly to a graylog2
server, along with an `io.Writer` that can be use to redirect the
standard library's log messages (or `os.Stdout`), to a graylog2 server.
[GELF] (Graylog Extended Log Format) is an application-level logging
protocol that avoids many of the shortcomings of [syslog]. While it
can be run over any stream or datagram transport protocol, it has
special support ([chunking]) to allow long messages to be split over
multiple datagrams.
Versions
--------
In order to enable versionning of this package with Go, this project
is using GoPkg.in. The default branch of this project will be v1
for some time to prevent breaking clients. We encourage all project
to change their imports to the new GoPkg.in URIs as soon as possible.
To see up to date code, make sure to switch to the master branch.
v1.0.0
------
This implementation currently supports UDP and TCP as a transport
protocol. TLS is unsupported.
The library provides an API that applications can use to log messages
directly to a Graylog server and an `io.Writer` that can be used to
redirect the standard library's log messages (`os.Stdout`) to a
Graylog server.
[GELF]: http://docs.graylog.org/en/2.2/pages/gelf.html
[syslog]: https://tools.ietf.org/html/rfc5424
[chunking]: http://docs.graylog.org/en/2.2/pages/gelf.html#chunked-gelf
Installing
----------
go-gelf is go get-able:
go get github.com/Graylog2/go-gelf/gelf
go get gopkg.in/Graylog2/go-gelf.v1/gelf
or
go get github.com/Graylog2/go-gelf/gelf
This will get you version 1.0.0, with only UDP support and legacy API.
Newer versions are available through GoPkg.in:
go get gopkg.in/Graylog2/go-gelf.v2/gelf
Usage
-----
@ -21,50 +57,55 @@ having your `main` function (or even `init`) call `log.SetOutput()`.
By using an `io.MultiWriter`, we can log to both stdout and graylog -
giving us both centralized and local logs. (Redundancy is nice).
package main
```golang
package main
import (
"flag"
"github.com/Graylog2/go-gelf/gelf"
"io"
"log"
"os"
)
import (
"flag"
"gopkg.in/Graylog2/go-gelf.v2/gelf"
"io"
"log"
"os"
)
func main() {
var graylogAddr string
func main() {
var graylogAddr string
flag.StringVar(&graylogAddr, "graylog", "", "graylog server addr")
flag.Parse()
flag.StringVar(&graylogAddr, "graylog", "", "graylog server addr")
flag.Parse()
if graylogAddr != "" {
gelfWriter, err := gelf.NewWriter(graylogAddr)
if err != nil {
log.Fatalf("gelf.NewWriter: %s", err)
}
// log to both stderr and graylog2
log.SetOutput(io.MultiWriter(os.Stderr, gelfWriter))
log.Printf("logging to stderr & graylog2@'%s'", graylogAddr)
}
if graylogAddr != "" {
// If using UDP
gelfWriter, err := gelf.NewUDPWriter(graylogAddr)
// If using TCP
//gelfWriter, err := gelf.NewTCPWriter(graylogAddr)
if err != nil {
log.Fatalf("gelf.NewWriter: %s", err)
}
// log to both stderr and graylog2
log.SetOutput(io.MultiWriter(os.Stderr, gelfWriter))
log.Printf("logging to stderr & graylog2@'%s'", graylogAddr)
}
// From here on out, any calls to log.Print* functions
// will appear on stdout, and be sent over UDP to the
// specified Graylog2 server.
// From here on out, any calls to log.Print* functions
// will appear on stdout, and be sent over UDP or TCP to the
// specified Graylog2 server.
log.Printf("Hello gray World")
// ...
}
log.Printf("Hello gray World")
// ...
}
```
The above program can be invoked as:
go run test.go -graylog=localhost:12201
go run test.go -graylog=localhost:12201
Because GELF messages are sent over UDP, graylog server availability
doesn't impact application performance or response time. There is a
small, fixed overhead per log call, regardless of whether the target
When using UDP messages may be dropped or re-ordered. However, Graylog
server availability will not impact application performance; there is
a small, fixed overhead per log call regardless of whether the target
server is reachable or not.
To Do
-----

147
vendor/github.com/Graylog2/go-gelf/gelf/message.go generated vendored Normal file
View File

@ -0,0 +1,147 @@
package gelf
import (
"bytes"
"encoding/json"
"time"
)
// Message represents the contents of the GELF message. It is gzipped
// before sending.
type Message struct {
Version string `json:"version"`
Host string `json:"host"`
Short string `json:"short_message"`
Full string `json:"full_message,omitempty"`
TimeUnix float64 `json:"timestamp"`
Level int32 `json:"level,omitempty"`
Facility string `json:"facility,omitempty"`
Extra map[string]interface{} `json:"-"`
RawExtra json.RawMessage `json:"-"`
}
// Syslog severity levels
const (
LOG_EMERG = 0
LOG_ALERT = 1
LOG_CRIT = 2
LOG_ERR = 3
LOG_WARNING = 4
LOG_NOTICE = 5
LOG_INFO = 6
LOG_DEBUG = 7
)
func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
// write up until the final }
if _, err = buf.Write(b[:len(b)-1]); err != nil {
return err
}
if len(m.Extra) > 0 {
eb, err := json.Marshal(m.Extra)
if err != nil {
return err
}
// merge serialized message + serialized extra map
if err = buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil {
return err
}
}
if len(m.RawExtra) > 0 {
if err := buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil {
return err
}
}
// write final closing quotes
return buf.WriteByte('}')
}
func (m *Message) UnmarshalJSON(data []byte) error {
i := make(map[string]interface{}, 16)
if err := json.Unmarshal(data, &i); err != nil {
return err
}
for k, v := range i {
if k[0] == '_' {
if m.Extra == nil {
m.Extra = make(map[string]interface{}, 1)
}
m.Extra[k] = v
continue
}
switch k {
case "version":
m.Version = v.(string)
case "host":
m.Host = v.(string)
case "short_message":
m.Short = v.(string)
case "full_message":
m.Full = v.(string)
case "timestamp":
m.TimeUnix = v.(float64)
case "level":
m.Level = int32(v.(float64))
case "facility":
m.Facility = v.(string)
}
}
return nil
}
func (m *Message) toBytes() (messageBytes []byte, err error) {
buf := newBuffer()
defer bufPool.Put(buf)
if err = m.MarshalJSONBuf(buf); err != nil {
return nil, err
}
messageBytes = buf.Bytes()
return messageBytes, nil
}
func constructMessage(p []byte, hostname string, facility string, file string, line int) (m *Message) {
// remove trailing and leading whitespace
p = bytes.TrimSpace(p)
// If there are newlines in the message, use the first line
// for the short message and set the full message to the
// original input. If the input has no newlines, stick the
// whole thing in Short.
short := p
full := []byte("")
if i := bytes.IndexRune(p, '\n'); i > 0 {
short = p[:i]
full = p
}
m = &Message{
Version: "1.1",
Host: hostname,
Short: string(short),
Full: string(full),
TimeUnix: float64(time.Now().Unix()),
Level: 6, // info
Facility: facility,
Extra: map[string]interface{}{
"_file": file,
"_line": line,
},
}
return m
}

93
vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go generated vendored Normal file
View File

@ -0,0 +1,93 @@
package gelf
import (
"bufio"
"encoding/json"
"fmt"
"net"
)
type TCPReader struct {
listener *net.TCPListener
conn net.Conn
messages chan []byte
}
func newTCPReader(addr string) (*TCPReader, chan string, error) {
var err error
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, nil, fmt.Errorf("ListenTCP: %s", err)
}
r := &TCPReader{
listener: listener,
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
}
signal := make(chan string, 1)
go r.listenUntilCloseSignal(signal)
return r, signal, nil
}
func (r *TCPReader) listenUntilCloseSignal(signal chan string) {
defer func() { signal <- "done" }()
defer r.listener.Close()
for {
conn, err := r.listener.Accept()
if err != nil {
break
}
go handleConnection(conn, r.messages)
select {
case sig := <-signal:
if sig == "stop" {
break
}
default:
}
}
}
func (r *TCPReader) addr() string {
return r.listener.Addr().String()
}
func handleConnection(conn net.Conn, messages chan<- []byte) {
defer conn.Close()
reader := bufio.NewReader(conn)
var b []byte
var err error
for {
if b, err = reader.ReadBytes(0); err != nil {
continue
}
if len(b) > 0 {
messages <- b
}
}
}
func (r *TCPReader) readMessage() (*Message, error) {
b := <-r.messages
var msg Message
if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return &msg, nil
}
func (r *TCPReader) Close() {
r.listener.Close()
}

97
vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go generated vendored Normal file
View File

@ -0,0 +1,97 @@
package gelf
import (
"fmt"
"net"
"os"
"sync"
"time"
)
const (
DefaultMaxReconnect = 3
DefaultReconnectDelay = 1
)
type TCPWriter struct {
GelfWriter
mu sync.Mutex
MaxReconnect int
ReconnectDelay time.Duration
}
func NewTCPWriter(addr string) (*TCPWriter, error) {
var err error
w := new(TCPWriter)
w.MaxReconnect = DefaultMaxReconnect
w.ReconnectDelay = DefaultReconnectDelay
w.proto = "tcp"
w.addr = addr
if w.conn, err = net.Dial("tcp", addr); err != nil {
return nil, err
}
if w.hostname, err = os.Hostname(); err != nil {
return nil, err
}
return w, nil
}
// WriteMessage sends the specified message to the GELF server
// specified in the call to New(). It assumes all the fields are
// filled out appropriately. In general, clients will want to use
// Write, rather than WriteMessage.
func (w *TCPWriter) WriteMessage(m *Message) (err error) {
messageBytes, err := m.toBytes()
if err != nil {
return err
}
messageBytes = append(messageBytes, 0)
n, err := w.writeToSocketWithReconnectAttempts(messageBytes)
if err != nil {
return err
}
if n != len(messageBytes) {
return fmt.Errorf("bad write (%d/%d)", n, len(messageBytes))
}
return nil
}
func (w *TCPWriter) Write(p []byte) (n int, err error) {
file, line := getCallerIgnoringLogMulti(1)
m := constructMessage(p, w.hostname, w.Facility, file, line)
if err = w.WriteMessage(m); err != nil {
return 0, err
}
return len(p), nil
}
func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) {
var errConn error
w.mu.Lock()
for i := 0; n <= w.MaxReconnect; i++ {
errConn = nil
n, err = w.conn.Write(zBytes)
if err != nil {
time.Sleep(w.ReconnectDelay * time.Second)
w.conn, errConn = net.Dial("tcp", w.addr)
} else {
break
}
}
w.mu.Unlock()
if errConn != nil {
return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn)
}
return n, nil
}

231
vendor/github.com/Graylog2/go-gelf/gelf/udpwriter.go generated vendored Normal file
View File

@ -0,0 +1,231 @@
// Copyright 2012 SocialCode. All rights reserved.
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.
package gelf
import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"crypto/rand"
"fmt"
"io"
"net"
"os"
"path"
"sync"
)
type UDPWriter struct {
GelfWriter
CompressionLevel int // one of the consts from compress/flate
CompressionType CompressType
}
// What compression type the writer should use when sending messages
// to the graylog2 server
type CompressType int
const (
CompressGzip CompressType = iota
CompressZlib
CompressNone
)
// Used to control GELF chunking. Should be less than (MTU - len(UDP
// header)).
//
// TODO: generate dynamically using Path MTU Discovery?
const (
ChunkSize = 1420
chunkedHeaderLen = 12
chunkedDataLen = ChunkSize - chunkedHeaderLen
)
var (
magicChunked = []byte{0x1e, 0x0f}
magicZlib = []byte{0x78}
magicGzip = []byte{0x1f, 0x8b}
)
// numChunks returns the number of GELF chunks necessary to transmit
// the given compressed buffer.
func numChunks(b []byte) int {
lenB := len(b)
if lenB <= ChunkSize {
return 1
}
return len(b)/chunkedDataLen + 1
}
// New returns a new GELF Writer. This writer can be used to send the
// output of the standard Go log functions to a central GELF server by
// passing it to log.SetOutput()
func NewUDPWriter(addr string) (*UDPWriter, error) {
var err error
w := new(UDPWriter)
w.CompressionLevel = flate.BestSpeed
if w.conn, err = net.Dial("udp", addr); err != nil {
return nil, err
}
if w.hostname, err = os.Hostname(); err != nil {
return nil, err
}
w.Facility = path.Base(os.Args[0])
return w, nil
}
// writes the gzip compressed byte array to the connection as a series
// of GELF chunked messages. The format is documented at
// http://docs.graylog.org/en/2.1/pages/gelf.html as:
//
// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
// total, chunk-data
func (w *GelfWriter) writeChunked(zBytes []byte) (err error) {
b := make([]byte, 0, ChunkSize)
buf := bytes.NewBuffer(b)
nChunksI := numChunks(zBytes)
if nChunksI > 128 {
return fmt.Errorf("msg too large, would need %d chunks", nChunksI)
}
nChunks := uint8(nChunksI)
// use urandom to get a unique message id
msgId := make([]byte, 8)
n, err := io.ReadFull(rand.Reader, msgId)
if err != nil || n != 8 {
return fmt.Errorf("rand.Reader: %d/%s", n, err)
}
bytesLeft := len(zBytes)
for i := uint8(0); i < nChunks; i++ {
buf.Reset()
// manually write header. Don't care about
// host/network byte order, because the spec only
// deals in individual bytes.
buf.Write(magicChunked) //magic
buf.Write(msgId)
buf.WriteByte(i)
buf.WriteByte(nChunks)
// slice out our chunk from zBytes
chunkLen := chunkedDataLen
if chunkLen > bytesLeft {
chunkLen = bytesLeft
}
off := int(i) * chunkedDataLen
chunk := zBytes[off : off+chunkLen]
buf.Write(chunk)
// write this chunk, and make sure the write was good
n, err := w.conn.Write(buf.Bytes())
if err != nil {
return fmt.Errorf("Write (chunk %d/%d): %s", i,
nChunks, err)
}
if n != len(buf.Bytes()) {
return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)",
i, nChunks, n, len(buf.Bytes()))
}
bytesLeft -= chunkLen
}
if bytesLeft != 0 {
return fmt.Errorf("error: %d bytes left after sending", bytesLeft)
}
return nil
}
// 1k bytes buffer by default
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
func newBuffer() *bytes.Buffer {
b := bufPool.Get().(*bytes.Buffer)
if b != nil {
b.Reset()
return b
}
return bytes.NewBuffer(nil)
}
// WriteMessage sends the specified message to the GELF server
// specified in the call to New(). It assumes all the fields are
// filled out appropriately. In general, clients will want to use
// Write, rather than WriteMessage.
func (w *UDPWriter) WriteMessage(m *Message) (err error) {
mBuf := newBuffer()
defer bufPool.Put(mBuf)
if err = m.MarshalJSONBuf(mBuf); err != nil {
return err
}
mBytes := mBuf.Bytes()
var (
zBuf *bytes.Buffer
zBytes []byte
)
var zw io.WriteCloser
switch w.CompressionType {
case CompressGzip:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressZlib:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressNone:
zBytes = mBytes
default:
panic(fmt.Sprintf("unknown compression type %d",
w.CompressionType))
}
if zw != nil {
if err != nil {
return
}
if _, err = zw.Write(mBytes); err != nil {
zw.Close()
return
}
zw.Close()
zBytes = zBuf.Bytes()
}
if numChunks(zBytes) > 1 {
return w.writeChunked(zBytes)
}
n, err := w.conn.Write(zBytes)
if err != nil {
return
}
if n != len(zBytes) {
return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
}
return nil
}
// Write encodes the given string in a GELF message and sends it to
// the server specified in New().
func (w *UDPWriter) Write(p []byte) (n int, err error) {
// 1 for the function that called us.
file, line := getCallerIgnoringLogMulti(1)
m := constructMessage(p, w.hostname, w.Facility, file, line)
if err = w.WriteMessage(m); err != nil {
return 0, err
}
return len(p), nil
}

41
vendor/github.com/Graylog2/go-gelf/gelf/utils.go generated vendored Normal file
View File

@ -0,0 +1,41 @@
package gelf
import (
"runtime"
"strings"
)
// getCaller returns the filename and the line info of a function
// further down in the call stack. Passing 0 in as callDepth would
// return info on the function calling getCallerIgnoringLog, 1 the
// parent function, and so on. Any suffixes passed to getCaller are
// path fragments like "/pkg/log/log.go", and functions in the call
// stack from that file are ignored.
func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
// bump by 1 to ignore the getCaller (this) stackframe
callDepth++
outer:
for {
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
break
}
for _, s := range suffixesToIgnore {
if strings.HasSuffix(file, s) {
callDepth++
continue outer
}
}
break
}
return
}
func getCallerIgnoringLogMulti(callDepth int) (string, int) {
// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go")
}

View File

@ -5,414 +5,27 @@
package gelf
import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"time"
)
type Writer interface {
Close() error
Write([]byte) (int, error)
WriteMessage(*Message) error
}
// Writer implements io.Writer and is used to send both discrete
// messages to a graylog2 server, or data from a stream-oriented
// interface (like the functions in log).
type Writer struct {
mu sync.Mutex
conn net.Conn
hostname string
Facility string // defaults to current process name
CompressionLevel int // one of the consts from compress/flate
CompressionType CompressType
}
// What compression type the writer should use when sending messages
// to the graylog2 server
type CompressType int
const (
CompressGzip CompressType = iota
CompressZlib
CompressNone
)
// Message represents the contents of the GELF message. It is gzipped
// before sending.
type Message struct {
Version string `json:"version"`
Host string `json:"host"`
Short string `json:"short_message"`
Full string `json:"full_message,omitempty"`
TimeUnix float64 `json:"timestamp"`
Level int32 `json:"level,omitempty"`
Facility string `json:"facility,omitempty"`
Extra map[string]interface{} `json:"-"`
RawExtra json.RawMessage `json:"-"`
}
// Used to control GELF chunking. Should be less than (MTU - len(UDP
// header)).
//
// TODO: generate dynamically using Path MTU Discovery?
const (
ChunkSize = 1420
chunkedHeaderLen = 12
chunkedDataLen = ChunkSize - chunkedHeaderLen
)
var (
magicChunked = []byte{0x1e, 0x0f}
magicZlib = []byte{0x78}
magicGzip = []byte{0x1f, 0x8b}
)
// Syslog severity levels
const (
LOG_EMERG = int32(0)
LOG_ALERT = int32(1)
LOG_CRIT = int32(2)
LOG_ERR = int32(3)
LOG_WARNING = int32(4)
LOG_NOTICE = int32(5)
LOG_INFO = int32(6)
LOG_DEBUG = int32(7)
)
// numChunks returns the number of GELF chunks necessary to transmit
// the given compressed buffer.
func numChunks(b []byte) int {
lenB := len(b)
if lenB <= ChunkSize {
return 1
}
return len(b)/chunkedDataLen + 1
}
// New returns a new GELF Writer. This writer can be used to send the
// output of the standard Go log functions to a central GELF server by
// passing it to log.SetOutput()
func NewWriter(addr string) (*Writer, error) {
var err error
w := new(Writer)
w.CompressionLevel = flate.BestSpeed
if w.conn, err = net.Dial("udp", addr); err != nil {
return nil, err
}
if w.hostname, err = os.Hostname(); err != nil {
return nil, err
}
w.Facility = path.Base(os.Args[0])
return w, nil
}
// writes the gzip compressed byte array to the connection as a series
// of GELF chunked messages. The format is documented at
// http://docs.graylog.org/en/2.1/pages/gelf.html as:
//
// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
// total, chunk-data
func (w *Writer) writeChunked(zBytes []byte) (err error) {
b := make([]byte, 0, ChunkSize)
buf := bytes.NewBuffer(b)
nChunksI := numChunks(zBytes)
if nChunksI > 128 {
return fmt.Errorf("msg too large, would need %d chunks", nChunksI)
}
nChunks := uint8(nChunksI)
// use urandom to get a unique message id
msgId := make([]byte, 8)
n, err := io.ReadFull(rand.Reader, msgId)
if err != nil || n != 8 {
return fmt.Errorf("rand.Reader: %d/%s", n, err)
}
bytesLeft := len(zBytes)
for i := uint8(0); i < nChunks; i++ {
buf.Reset()
// manually write header. Don't care about
// host/network byte order, because the spec only
// deals in individual bytes.
buf.Write(magicChunked) //magic
buf.Write(msgId)
buf.WriteByte(i)
buf.WriteByte(nChunks)
// slice out our chunk from zBytes
chunkLen := chunkedDataLen
if chunkLen > bytesLeft {
chunkLen = bytesLeft
}
off := int(i) * chunkedDataLen
chunk := zBytes[off : off+chunkLen]
buf.Write(chunk)
// write this chunk, and make sure the write was good
n, err := w.conn.Write(buf.Bytes())
if err != nil {
return fmt.Errorf("Write (chunk %d/%d): %s", i,
nChunks, err)
}
if n != len(buf.Bytes()) {
return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)",
i, nChunks, n, len(buf.Bytes()))
}
bytesLeft -= chunkLen
}
if bytesLeft != 0 {
return fmt.Errorf("error: %d bytes left after sending", bytesLeft)
}
return nil
}
// 1k bytes buffer by default
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
func newBuffer() *bytes.Buffer {
b := bufPool.Get().(*bytes.Buffer)
if b != nil {
b.Reset()
return b
}
return bytes.NewBuffer(nil)
}
// WriteMessage sends the specified message to the GELF server
// specified in the call to New(). It assumes all the fields are
// filled out appropriately. In general, clients will want to use
// Write, rather than WriteMessage.
func (w *Writer) WriteMessage(m *Message) (err error) {
mBuf := newBuffer()
defer bufPool.Put(mBuf)
if err = m.MarshalJSONBuf(mBuf); err != nil {
return err
}
mBytes := mBuf.Bytes()
var (
zBuf *bytes.Buffer
zBytes []byte
)
var zw io.WriteCloser
switch w.CompressionType {
case CompressGzip:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressZlib:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressNone:
zBytes = mBytes
default:
panic(fmt.Sprintf("unknown compression type %d",
w.CompressionType))
}
if zw != nil {
if err != nil {
return
}
if _, err = zw.Write(mBytes); err != nil {
zw.Close()
return
}
zw.Close()
zBytes = zBuf.Bytes()
}
if numChunks(zBytes) > 1 {
return w.writeChunked(zBytes)
}
n, err := w.conn.Write(zBytes)
if err != nil {
return
}
if n != len(zBytes) {
return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
}
return nil
type GelfWriter struct {
addr string
conn net.Conn
hostname string
Facility string // defaults to current process name
proto string
}
// Close connection and interrupt blocked Read or Write operations
func (w *Writer) Close() error {
func (w *GelfWriter) Close() error {
return w.conn.Close()
}
/*
func (w *Writer) Alert(m string) (err error)
func (w *Writer) Close() error
func (w *Writer) Crit(m string) (err error)
func (w *Writer) Debug(m string) (err error)
func (w *Writer) Emerg(m string) (err error)
func (w *Writer) Err(m string) (err error)
func (w *Writer) Info(m string) (err error)
func (w *Writer) Notice(m string) (err error)
func (w *Writer) Warning(m string) (err error)
*/
// getCaller returns the filename and the line info of a function
// further down in the call stack. Passing 0 in as callDepth would
// return info on the function calling getCallerIgnoringLog, 1 the
// parent function, and so on. Any suffixes passed to getCaller are
// path fragments like "/pkg/log/log.go", and functions in the call
// stack from that file are ignored.
func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
// bump by 1 to ignore the getCaller (this) stackframe
callDepth++
outer:
for {
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
break
}
for _, s := range suffixesToIgnore {
if strings.HasSuffix(file, s) {
callDepth++
continue outer
}
}
break
}
return
}
func getCallerIgnoringLogMulti(callDepth int) (string, int) {
// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go")
}
// Write encodes the given string in a GELF message and sends it to
// the server specified in New().
func (w *Writer) Write(p []byte) (n int, err error) {
// 1 for the function that called us.
file, line := getCallerIgnoringLogMulti(1)
// remove trailing and leading whitespace
p = bytes.TrimSpace(p)
// If there are newlines in the message, use the first line
// for the short message and set the full message to the
// original input. If the input has no newlines, stick the
// whole thing in Short.
short := p
full := []byte("")
if i := bytes.IndexRune(p, '\n'); i > 0 {
short = p[:i]
full = p
}
m := Message{
Version: "1.1",
Host: w.hostname,
Short: string(short),
Full: string(full),
TimeUnix: float64(time.Now().Unix()),
Level: 6, // info
Facility: w.Facility,
Extra: map[string]interface{}{
"_file": file,
"_line": line,
},
}
if err = w.WriteMessage(&m); err != nil {
return 0, err
}
return len(p), nil
}
func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
// write up until the final }
if _, err = buf.Write(b[:len(b)-1]); err != nil {
return err
}
if len(m.Extra) > 0 {
eb, err := json.Marshal(m.Extra)
if err != nil {
return err
}
// merge serialized message + serialized extra map
if err = buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil {
return err
}
}
if len(m.RawExtra) > 0 {
if err := buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil {
return err
}
}
// write final closing quotes
return buf.WriteByte('}')
}
func (m *Message) UnmarshalJSON(data []byte) error {
i := make(map[string]interface{}, 16)
if err := json.Unmarshal(data, &i); err != nil {
return err
}
for k, v := range i {
if k[0] == '_' {
if m.Extra == nil {
m.Extra = make(map[string]interface{}, 1)
}
m.Extra[k] = v
continue
}
switch k {
case "version":
m.Version = v.(string)
case "host":
m.Host = v.(string)
case "short_message":
m.Short = v.(string)
case "full_message":
m.Full = v.(string)
case "timestamp":
m.TimeUnix = v.(float64)
case "level":
m.Level = int32(v.(float64))
case "facility":
m.Facility = v.(string)
}
}
return nil
}