Merge pull request #34758 from ghislainbourgeois/33495-add-tcp-to-gelf-log-driver

Add TCP support for GELF log driver
This commit is contained in:
Brian Goff 2017-10-10 10:26:01 -04:00 committed by GitHub
commit 3437f0f4e5
11 changed files with 1043 additions and 462 deletions

View File

@ -23,7 +23,7 @@ import (
const name = "gelf"
type gelfLogger struct {
writer *gelf.Writer
writer gelf.Writer
info logger.Info
hostname string
rawExtra json.RawMessage
@ -89,8 +89,56 @@ func New(info logger.Info) (logger.Logger, error) {
return nil, err
}
// create new gelfWriter
gelfWriter, err := gelf.NewWriter(address)
var gelfWriter gelf.Writer
if address.Scheme == "udp" {
gelfWriter, err = newGELFUDPWriter(address.Host, info)
if err != nil {
return nil, err
}
} else if address.Scheme == "tcp" {
gelfWriter, err = newGELFTCPWriter(address.Host, info)
if err != nil {
return nil, err
}
}
return &gelfLogger{
writer: gelfWriter,
info: info,
hostname: hostname,
rawExtra: rawExtra,
}, nil
}
// create new TCP gelfWriter
func newGELFTCPWriter(address string, info logger.Info) (gelf.Writer, error) {
gelfWriter, err := gelf.NewTCPWriter(address)
if err != nil {
return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
}
if v, ok := info.Config["gelf-tcp-max-reconnect"]; ok {
i, err := strconv.Atoi(v)
if err != nil || i < 0 {
return nil, fmt.Errorf("gelf-tcp-max-reconnect must be a positive integer")
}
gelfWriter.MaxReconnect = i
}
if v, ok := info.Config["gelf-tcp-reconnect-delay"]; ok {
i, err := strconv.Atoi(v)
if err != nil || i < 0 {
return nil, fmt.Errorf("gelf-tcp-reconnect-delay must be a positive integer")
}
gelfWriter.ReconnectDelay = time.Duration(i)
}
return gelfWriter, nil
}
// create new UDP gelfWriter
func newGELFUDPWriter(address string, info logger.Info) (gelf.Writer, error) {
gelfWriter, err := gelf.NewUDPWriter(address)
if err != nil {
return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
}
@ -116,12 +164,7 @@ func New(info logger.Info) (logger.Logger, error) {
gelfWriter.CompressionLevel = val
}
return &gelfLogger{
writer: gelfWriter,
info: info,
hostname: hostname,
rawExtra: rawExtra,
}, nil
return gelfWriter, nil
}
func (s *gelfLogger) Log(msg *logger.Message) error {
@ -135,7 +178,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error {
Host: s.hostname,
Short: string(msg.Line),
TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
Level: level,
Level: int32(level),
RawExtra: s.rawExtra,
}
logger.PutMessage(msg)
@ -156,6 +199,11 @@ func (s *gelfLogger) Name() string {
// ValidateLogOpt looks for gelf specific log option gelf-address.
func ValidateLogOpt(cfg map[string]string) error {
address, err := parseAddress(cfg["gelf-address"])
if err != nil {
return err
}
for key, val := range cfg {
switch key {
case "gelf-address":
@ -164,46 +212,59 @@ func ValidateLogOpt(cfg map[string]string) error {
case "env":
case "env-regex":
case "gelf-compression-level":
if address.Scheme != "udp" {
return fmt.Errorf("compression is only supported on UDP")
}
i, err := strconv.Atoi(val)
if err != nil || i < flate.DefaultCompression || i > flate.BestCompression {
return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
}
case "gelf-compression-type":
if address.Scheme != "udp" {
return fmt.Errorf("compression is only supported on UDP")
}
switch val {
case "gzip", "zlib", "none":
default:
return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
}
case "gelf-tcp-max-reconnect", "gelf-tcp-reconnect-delay":
if address.Scheme != "tcp" {
return fmt.Errorf("%q is only valid for TCP", key)
}
i, err := strconv.Atoi(val)
if err != nil || i < 0 {
return fmt.Errorf("%q must be a positive integer", key)
}
default:
return fmt.Errorf("unknown log opt %q for gelf log driver", key)
}
}
_, err := parseAddress(cfg["gelf-address"])
return err
return nil
}
func parseAddress(address string) (string, error) {
func parseAddress(address string) (*url.URL, error) {
if address == "" {
return "", nil
return nil, fmt.Errorf("gelf-address is a required parameter")
}
if !urlutil.IsTransportURL(address) {
return "", fmt.Errorf("gelf-address should be in form proto://address, got %v", address)
return nil, fmt.Errorf("gelf-address should be in form proto://address, got %v", address)
}
url, err := url.Parse(address)
if err != nil {
return "", err
return nil, err
}
// we support only udp
if url.Scheme != "udp" {
return "", fmt.Errorf("gelf: endpoint needs to be UDP")
if url.Scheme != "udp" && url.Scheme != "tcp" {
return nil, fmt.Errorf("gelf: endpoint needs to be TCP or UDP")
}
// get host and port
if _, _, err = net.SplitHostPort(url.Host); err != nil {
return "", fmt.Errorf("gelf: please provide gelf-address as udp://host:port")
return nil, fmt.Errorf("gelf: please provide gelf-address as proto://host:port")
}
return url.Host, nil
return url, nil
}

View File

@ -0,0 +1,260 @@
// +build linux
package gelf
import (
"net"
"testing"
"github.com/docker/docker/daemon/logger"
)
// Validate parseAddress
func TestParseAddress(t *testing.T) {
url, err := parseAddress("udp://127.0.0.1:12201")
if err != nil {
t.Fatal(err)
}
if url.String() != "udp://127.0.0.1:12201" {
t.Fatalf("Expected address udp://127.0.0.1:12201, got %s", url.String())
}
_, err = parseAddress("127.0.0.1:12201")
if err == nil {
t.Fatal("Expected error requiring protocol")
}
_, err = parseAddress("http://127.0.0.1:12201")
if err == nil {
t.Fatal("Expected error restricting protocol")
}
}
// Validate TCP options
func TestTCPValidateLogOpt(t *testing.T) {
err := ValidateLogOpt(map[string]string{
"gelf-address": "tcp://127.0.0.1:12201",
})
if err != nil {
t.Fatal("Expected TCP to be supported")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "tcp://127.0.0.1:12201",
"gelf-compression-level": "9",
})
if err == nil {
t.Fatal("Expected TCP to reject compression level")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "tcp://127.0.0.1:12201",
"gelf-compression-type": "gzip",
})
if err == nil {
t.Fatal("Expected TCP to reject compression type")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "tcp://127.0.0.1:12201",
"gelf-tcp-max-reconnect": "5",
"gelf-tcp-reconnect-delay": "10",
})
if err != nil {
t.Fatal("Expected TCP reconnect to be a valid parameters")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "tcp://127.0.0.1:12201",
"gelf-tcp-max-reconnect": "-1",
"gelf-tcp-reconnect-delay": "-3",
})
if err == nil {
t.Fatal("Expected negative TCP reconnect to be rejected")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "tcp://127.0.0.1:12201",
"gelf-tcp-max-reconnect": "invalid",
"gelf-tcp-reconnect-delay": "invalid",
})
if err == nil {
t.Fatal("Expected TCP reconnect to be required to be an int")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-tcp-max-reconnect": "1",
"gelf-tcp-reconnect-delay": "3",
})
if err == nil {
t.Fatal("Expected TCP reconnect to be invalid for UDP")
}
}
// Validate UDP options
func TestUDPValidateLogOpt(t *testing.T) {
err := ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"tag": "testtag",
"labels": "testlabel",
"env": "testenv",
"env-regex": "testenv-regex",
"gelf-compression-level": "9",
"gelf-compression-type": "gzip",
})
if err != nil {
t.Fatal(err)
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-level": "ultra",
"gelf-compression-type": "zlib",
})
if err == nil {
t.Fatal("Expected compression level error")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-type": "rar",
})
if err == nil {
t.Fatal("Expected compression type error")
}
err = ValidateLogOpt(map[string]string{
"invalid": "invalid",
})
if err == nil {
t.Fatal("Expected unknown option error")
}
err = ValidateLogOpt(map[string]string{})
if err == nil {
t.Fatal("Expected required parameter error")
}
}
// Validate newGELFTCPWriter
func TestNewGELFTCPWriter(t *testing.T) {
address := "127.0.0.1:0"
tcpAddr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
t.Fatal(err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
t.Fatal(err)
}
url := "tcp://" + listener.Addr().String()
info := logger.Info{
Config: map[string]string{
"gelf-address": url,
"gelf-tcp-max-reconnect": "0",
"gelf-tcp-reconnect-delay": "0",
"tag": "{{.ID}}",
},
ContainerID: "12345678901234567890",
}
writer, err := newGELFTCPWriter(listener.Addr().String(), info)
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
err = listener.Close()
if err != nil {
t.Fatal(err)
}
}
// Validate newGELFUDPWriter
func TestNewGELFUDPWriter(t *testing.T) {
address := "127.0.0.1:0"
info := logger.Info{
Config: map[string]string{
"gelf-address": "udp://127.0.0.1:0",
"gelf-compression-level": "5",
"gelf-compression-type": "gzip",
},
}
writer, err := newGELFUDPWriter(address, info)
if err != nil {
t.Fatal(err)
}
writer.Close()
if err != nil {
t.Fatal(err)
}
}
// Validate New for TCP
func TestNewTCP(t *testing.T) {
address := "127.0.0.1:0"
tcpAddr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
t.Fatal(err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
t.Fatal(err)
}
url := "tcp://" + listener.Addr().String()
info := logger.Info{
Config: map[string]string{
"gelf-address": url,
"gelf-tcp-max-reconnect": "0",
"gelf-tcp-reconnect-delay": "0",
},
ContainerID: "12345678901234567890",
}
logger, err := New(info)
if err != nil {
t.Fatal(err)
}
err = logger.Close()
if err != nil {
t.Fatal(err)
}
err = listener.Close()
if err != nil {
t.Fatal(err)
}
}
// Validate New for UDP
func TestNewUDP(t *testing.T) {
info := logger.Info{
Config: map[string]string{
"gelf-address": "udp://127.0.0.1:0",
"gelf-compression-level": "5",
"gelf-compression-type": "gzip",
},
ContainerID: "12345678901234567890",
}
logger, err := New(info)
if err != nil {
t.Fatal(err)
}
err = logger.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@ -1,3 +0,0 @@
// +build !linux
package gelf

View File

@ -79,7 +79,7 @@ github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd852
github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4
# gelf logging driver deps
github.com/Graylog2/go-gelf 7029da823dad4ef3a876df61065156acb703b2ea
github.com/Graylog2/go-gelf v2
github.com/fluent/fluent-logger-golang v1.2.1
# fluent-logger-golang deps

View File

@ -1,17 +1,53 @@
go-gelf - GELF library and writer for Go
go-gelf - GELF Library and Writer for Go
========================================
GELF is graylog2's UDP logging format. This library provides an API
that applications can use to log messages directly to a graylog2
server, along with an `io.Writer` that can be use to redirect the
standard library's log messages (or `os.Stdout`), to a graylog2 server.
[GELF] (Graylog Extended Log Format) is an application-level logging
protocol that avoids many of the shortcomings of [syslog]. While it
can be run over any stream or datagram transport protocol, it has
special support ([chunking]) to allow long messages to be split over
multiple datagrams.
Versions
--------
In order to enable versionning of this package with Go, this project
is using GoPkg.in. The default branch of this project will be v1
for some time to prevent breaking clients. We encourage all project
to change their imports to the new GoPkg.in URIs as soon as possible.
To see up to date code, make sure to switch to the master branch.
v1.0.0
------
This implementation currently supports UDP and TCP as a transport
protocol. TLS is unsupported.
The library provides an API that applications can use to log messages
directly to a Graylog server and an `io.Writer` that can be used to
redirect the standard library's log messages (`os.Stdout`) to a
Graylog server.
[GELF]: http://docs.graylog.org/en/2.2/pages/gelf.html
[syslog]: https://tools.ietf.org/html/rfc5424
[chunking]: http://docs.graylog.org/en/2.2/pages/gelf.html#chunked-gelf
Installing
----------
go-gelf is go get-able:
go get github.com/Graylog2/go-gelf/gelf
go get gopkg.in/Graylog2/go-gelf.v1/gelf
or
go get github.com/Graylog2/go-gelf/gelf
This will get you version 1.0.0, with only UDP support and legacy API.
Newer versions are available through GoPkg.in:
go get gopkg.in/Graylog2/go-gelf.v2/gelf
Usage
-----
@ -21,50 +57,55 @@ having your `main` function (or even `init`) call `log.SetOutput()`.
By using an `io.MultiWriter`, we can log to both stdout and graylog -
giving us both centralized and local logs. (Redundancy is nice).
package main
```golang
package main
import (
"flag"
"github.com/Graylog2/go-gelf/gelf"
"io"
"log"
"os"
)
import (
"flag"
"gopkg.in/Graylog2/go-gelf.v2/gelf"
"io"
"log"
"os"
)
func main() {
var graylogAddr string
func main() {
var graylogAddr string
flag.StringVar(&graylogAddr, "graylog", "", "graylog server addr")
flag.Parse()
flag.StringVar(&graylogAddr, "graylog", "", "graylog server addr")
flag.Parse()
if graylogAddr != "" {
gelfWriter, err := gelf.NewWriter(graylogAddr)
if err != nil {
log.Fatalf("gelf.NewWriter: %s", err)
}
// log to both stderr and graylog2
log.SetOutput(io.MultiWriter(os.Stderr, gelfWriter))
log.Printf("logging to stderr & graylog2@'%s'", graylogAddr)
}
if graylogAddr != "" {
// If using UDP
gelfWriter, err := gelf.NewUDPWriter(graylogAddr)
// If using TCP
//gelfWriter, err := gelf.NewTCPWriter(graylogAddr)
if err != nil {
log.Fatalf("gelf.NewWriter: %s", err)
}
// log to both stderr and graylog2
log.SetOutput(io.MultiWriter(os.Stderr, gelfWriter))
log.Printf("logging to stderr & graylog2@'%s'", graylogAddr)
}
// From here on out, any calls to log.Print* functions
// will appear on stdout, and be sent over UDP to the
// specified Graylog2 server.
// From here on out, any calls to log.Print* functions
// will appear on stdout, and be sent over UDP or TCP to the
// specified Graylog2 server.
log.Printf("Hello gray World")
// ...
}
log.Printf("Hello gray World")
// ...
}
```
The above program can be invoked as:
go run test.go -graylog=localhost:12201
go run test.go -graylog=localhost:12201
Because GELF messages are sent over UDP, graylog server availability
doesn't impact application performance or response time. There is a
small, fixed overhead per log call, regardless of whether the target
When using UDP messages may be dropped or re-ordered. However, Graylog
server availability will not impact application performance; there is
a small, fixed overhead per log call regardless of whether the target
server is reachable or not.
To Do
-----

147
vendor/github.com/Graylog2/go-gelf/gelf/message.go generated vendored Normal file
View File

@ -0,0 +1,147 @@
package gelf
import (
"bytes"
"encoding/json"
"time"
)
// Message represents the contents of the GELF message. It is gzipped
// before sending.
type Message struct {
Version string `json:"version"`
Host string `json:"host"`
Short string `json:"short_message"`
Full string `json:"full_message,omitempty"`
TimeUnix float64 `json:"timestamp"`
Level int32 `json:"level,omitempty"`
Facility string `json:"facility,omitempty"`
Extra map[string]interface{} `json:"-"`
RawExtra json.RawMessage `json:"-"`
}
// Syslog severity levels
const (
LOG_EMERG = 0
LOG_ALERT = 1
LOG_CRIT = 2
LOG_ERR = 3
LOG_WARNING = 4
LOG_NOTICE = 5
LOG_INFO = 6
LOG_DEBUG = 7
)
func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
// write up until the final }
if _, err = buf.Write(b[:len(b)-1]); err != nil {
return err
}
if len(m.Extra) > 0 {
eb, err := json.Marshal(m.Extra)
if err != nil {
return err
}
// merge serialized message + serialized extra map
if err = buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil {
return err
}
}
if len(m.RawExtra) > 0 {
if err := buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil {
return err
}
}
// write final closing quotes
return buf.WriteByte('}')
}
func (m *Message) UnmarshalJSON(data []byte) error {
i := make(map[string]interface{}, 16)
if err := json.Unmarshal(data, &i); err != nil {
return err
}
for k, v := range i {
if k[0] == '_' {
if m.Extra == nil {
m.Extra = make(map[string]interface{}, 1)
}
m.Extra[k] = v
continue
}
switch k {
case "version":
m.Version = v.(string)
case "host":
m.Host = v.(string)
case "short_message":
m.Short = v.(string)
case "full_message":
m.Full = v.(string)
case "timestamp":
m.TimeUnix = v.(float64)
case "level":
m.Level = int32(v.(float64))
case "facility":
m.Facility = v.(string)
}
}
return nil
}
func (m *Message) toBytes() (messageBytes []byte, err error) {
buf := newBuffer()
defer bufPool.Put(buf)
if err = m.MarshalJSONBuf(buf); err != nil {
return nil, err
}
messageBytes = buf.Bytes()
return messageBytes, nil
}
func constructMessage(p []byte, hostname string, facility string, file string, line int) (m *Message) {
// remove trailing and leading whitespace
p = bytes.TrimSpace(p)
// If there are newlines in the message, use the first line
// for the short message and set the full message to the
// original input. If the input has no newlines, stick the
// whole thing in Short.
short := p
full := []byte("")
if i := bytes.IndexRune(p, '\n'); i > 0 {
short = p[:i]
full = p
}
m = &Message{
Version: "1.1",
Host: hostname,
Short: string(short),
Full: string(full),
TimeUnix: float64(time.Now().Unix()),
Level: 6, // info
Facility: facility,
Extra: map[string]interface{}{
"_file": file,
"_line": line,
},
}
return m
}

93
vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go generated vendored Normal file
View File

@ -0,0 +1,93 @@
package gelf
import (
"bufio"
"encoding/json"
"fmt"
"net"
)
type TCPReader struct {
listener *net.TCPListener
conn net.Conn
messages chan []byte
}
func newTCPReader(addr string) (*TCPReader, chan string, error) {
var err error
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, nil, fmt.Errorf("ListenTCP: %s", err)
}
r := &TCPReader{
listener: listener,
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
}
signal := make(chan string, 1)
go r.listenUntilCloseSignal(signal)
return r, signal, nil
}
func (r *TCPReader) listenUntilCloseSignal(signal chan string) {
defer func() { signal <- "done" }()
defer r.listener.Close()
for {
conn, err := r.listener.Accept()
if err != nil {
break
}
go handleConnection(conn, r.messages)
select {
case sig := <-signal:
if sig == "stop" {
break
}
default:
}
}
}
func (r *TCPReader) addr() string {
return r.listener.Addr().String()
}
func handleConnection(conn net.Conn, messages chan<- []byte) {
defer conn.Close()
reader := bufio.NewReader(conn)
var b []byte
var err error
for {
if b, err = reader.ReadBytes(0); err != nil {
continue
}
if len(b) > 0 {
messages <- b
}
}
}
func (r *TCPReader) readMessage() (*Message, error) {
b := <-r.messages
var msg Message
if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return &msg, nil
}
func (r *TCPReader) Close() {
r.listener.Close()
}

97
vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go generated vendored Normal file
View File

@ -0,0 +1,97 @@
package gelf
import (
"fmt"
"net"
"os"
"sync"
"time"
)
const (
DefaultMaxReconnect = 3
DefaultReconnectDelay = 1
)
type TCPWriter struct {
GelfWriter
mu sync.Mutex
MaxReconnect int
ReconnectDelay time.Duration
}
func NewTCPWriter(addr string) (*TCPWriter, error) {
var err error
w := new(TCPWriter)
w.MaxReconnect = DefaultMaxReconnect
w.ReconnectDelay = DefaultReconnectDelay
w.proto = "tcp"
w.addr = addr
if w.conn, err = net.Dial("tcp", addr); err != nil {
return nil, err
}
if w.hostname, err = os.Hostname(); err != nil {
return nil, err
}
return w, nil
}
// WriteMessage sends the specified message to the GELF server
// specified in the call to New(). It assumes all the fields are
// filled out appropriately. In general, clients will want to use
// Write, rather than WriteMessage.
func (w *TCPWriter) WriteMessage(m *Message) (err error) {
messageBytes, err := m.toBytes()
if err != nil {
return err
}
messageBytes = append(messageBytes, 0)
n, err := w.writeToSocketWithReconnectAttempts(messageBytes)
if err != nil {
return err
}
if n != len(messageBytes) {
return fmt.Errorf("bad write (%d/%d)", n, len(messageBytes))
}
return nil
}
func (w *TCPWriter) Write(p []byte) (n int, err error) {
file, line := getCallerIgnoringLogMulti(1)
m := constructMessage(p, w.hostname, w.Facility, file, line)
if err = w.WriteMessage(m); err != nil {
return 0, err
}
return len(p), nil
}
func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) {
var errConn error
w.mu.Lock()
for i := 0; n <= w.MaxReconnect; i++ {
errConn = nil
n, err = w.conn.Write(zBytes)
if err != nil {
time.Sleep(w.ReconnectDelay * time.Second)
w.conn, errConn = net.Dial("tcp", w.addr)
} else {
break
}
}
w.mu.Unlock()
if errConn != nil {
return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn)
}
return n, nil
}

231
vendor/github.com/Graylog2/go-gelf/gelf/udpwriter.go generated vendored Normal file
View File

@ -0,0 +1,231 @@
// Copyright 2012 SocialCode. All rights reserved.
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.
package gelf
import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"crypto/rand"
"fmt"
"io"
"net"
"os"
"path"
"sync"
)
type UDPWriter struct {
GelfWriter
CompressionLevel int // one of the consts from compress/flate
CompressionType CompressType
}
// What compression type the writer should use when sending messages
// to the graylog2 server
type CompressType int
const (
CompressGzip CompressType = iota
CompressZlib
CompressNone
)
// Used to control GELF chunking. Should be less than (MTU - len(UDP
// header)).
//
// TODO: generate dynamically using Path MTU Discovery?
const (
ChunkSize = 1420
chunkedHeaderLen = 12
chunkedDataLen = ChunkSize - chunkedHeaderLen
)
var (
magicChunked = []byte{0x1e, 0x0f}
magicZlib = []byte{0x78}
magicGzip = []byte{0x1f, 0x8b}
)
// numChunks returns the number of GELF chunks necessary to transmit
// the given compressed buffer.
func numChunks(b []byte) int {
lenB := len(b)
if lenB <= ChunkSize {
return 1
}
return len(b)/chunkedDataLen + 1
}
// New returns a new GELF Writer. This writer can be used to send the
// output of the standard Go log functions to a central GELF server by
// passing it to log.SetOutput()
func NewUDPWriter(addr string) (*UDPWriter, error) {
var err error
w := new(UDPWriter)
w.CompressionLevel = flate.BestSpeed
if w.conn, err = net.Dial("udp", addr); err != nil {
return nil, err
}
if w.hostname, err = os.Hostname(); err != nil {
return nil, err
}
w.Facility = path.Base(os.Args[0])
return w, nil
}
// writes the gzip compressed byte array to the connection as a series
// of GELF chunked messages. The format is documented at
// http://docs.graylog.org/en/2.1/pages/gelf.html as:
//
// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
// total, chunk-data
func (w *GelfWriter) writeChunked(zBytes []byte) (err error) {
b := make([]byte, 0, ChunkSize)
buf := bytes.NewBuffer(b)
nChunksI := numChunks(zBytes)
if nChunksI > 128 {
return fmt.Errorf("msg too large, would need %d chunks", nChunksI)
}
nChunks := uint8(nChunksI)
// use urandom to get a unique message id
msgId := make([]byte, 8)
n, err := io.ReadFull(rand.Reader, msgId)
if err != nil || n != 8 {
return fmt.Errorf("rand.Reader: %d/%s", n, err)
}
bytesLeft := len(zBytes)
for i := uint8(0); i < nChunks; i++ {
buf.Reset()
// manually write header. Don't care about
// host/network byte order, because the spec only
// deals in individual bytes.
buf.Write(magicChunked) //magic
buf.Write(msgId)
buf.WriteByte(i)
buf.WriteByte(nChunks)
// slice out our chunk from zBytes
chunkLen := chunkedDataLen
if chunkLen > bytesLeft {
chunkLen = bytesLeft
}
off := int(i) * chunkedDataLen
chunk := zBytes[off : off+chunkLen]
buf.Write(chunk)
// write this chunk, and make sure the write was good
n, err := w.conn.Write(buf.Bytes())
if err != nil {
return fmt.Errorf("Write (chunk %d/%d): %s", i,
nChunks, err)
}
if n != len(buf.Bytes()) {
return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)",
i, nChunks, n, len(buf.Bytes()))
}
bytesLeft -= chunkLen
}
if bytesLeft != 0 {
return fmt.Errorf("error: %d bytes left after sending", bytesLeft)
}
return nil
}
// 1k bytes buffer by default
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
func newBuffer() *bytes.Buffer {
b := bufPool.Get().(*bytes.Buffer)
if b != nil {
b.Reset()
return b
}
return bytes.NewBuffer(nil)
}
// WriteMessage sends the specified message to the GELF server
// specified in the call to New(). It assumes all the fields are
// filled out appropriately. In general, clients will want to use
// Write, rather than WriteMessage.
func (w *UDPWriter) WriteMessage(m *Message) (err error) {
mBuf := newBuffer()
defer bufPool.Put(mBuf)
if err = m.MarshalJSONBuf(mBuf); err != nil {
return err
}
mBytes := mBuf.Bytes()
var (
zBuf *bytes.Buffer
zBytes []byte
)
var zw io.WriteCloser
switch w.CompressionType {
case CompressGzip:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressZlib:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressNone:
zBytes = mBytes
default:
panic(fmt.Sprintf("unknown compression type %d",
w.CompressionType))
}
if zw != nil {
if err != nil {
return
}
if _, err = zw.Write(mBytes); err != nil {
zw.Close()
return
}
zw.Close()
zBytes = zBuf.Bytes()
}
if numChunks(zBytes) > 1 {
return w.writeChunked(zBytes)
}
n, err := w.conn.Write(zBytes)
if err != nil {
return
}
if n != len(zBytes) {
return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
}
return nil
}
// Write encodes the given string in a GELF message and sends it to
// the server specified in New().
func (w *UDPWriter) Write(p []byte) (n int, err error) {
// 1 for the function that called us.
file, line := getCallerIgnoringLogMulti(1)
m := constructMessage(p, w.hostname, w.Facility, file, line)
if err = w.WriteMessage(m); err != nil {
return 0, err
}
return len(p), nil
}

41
vendor/github.com/Graylog2/go-gelf/gelf/utils.go generated vendored Normal file
View File

@ -0,0 +1,41 @@
package gelf
import (
"runtime"
"strings"
)
// getCaller returns the filename and the line info of a function
// further down in the call stack. Passing 0 in as callDepth would
// return info on the function calling getCallerIgnoringLog, 1 the
// parent function, and so on. Any suffixes passed to getCaller are
// path fragments like "/pkg/log/log.go", and functions in the call
// stack from that file are ignored.
func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
// bump by 1 to ignore the getCaller (this) stackframe
callDepth++
outer:
for {
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
break
}
for _, s := range suffixesToIgnore {
if strings.HasSuffix(file, s) {
callDepth++
continue outer
}
}
break
}
return
}
func getCallerIgnoringLogMulti(callDepth int) (string, int) {
// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go")
}

View File

@ -5,414 +5,27 @@
package gelf
import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"time"
)
type Writer interface {
Close() error
Write([]byte) (int, error)
WriteMessage(*Message) error
}
// Writer implements io.Writer and is used to send both discrete
// messages to a graylog2 server, or data from a stream-oriented
// interface (like the functions in log).
type Writer struct {
mu sync.Mutex
conn net.Conn
hostname string
Facility string // defaults to current process name
CompressionLevel int // one of the consts from compress/flate
CompressionType CompressType
}
// What compression type the writer should use when sending messages
// to the graylog2 server
type CompressType int
const (
CompressGzip CompressType = iota
CompressZlib
CompressNone
)
// Message represents the contents of the GELF message. It is gzipped
// before sending.
type Message struct {
Version string `json:"version"`
Host string `json:"host"`
Short string `json:"short_message"`
Full string `json:"full_message,omitempty"`
TimeUnix float64 `json:"timestamp"`
Level int32 `json:"level,omitempty"`
Facility string `json:"facility,omitempty"`
Extra map[string]interface{} `json:"-"`
RawExtra json.RawMessage `json:"-"`
}
// Used to control GELF chunking. Should be less than (MTU - len(UDP
// header)).
//
// TODO: generate dynamically using Path MTU Discovery?
const (
ChunkSize = 1420
chunkedHeaderLen = 12
chunkedDataLen = ChunkSize - chunkedHeaderLen
)
var (
magicChunked = []byte{0x1e, 0x0f}
magicZlib = []byte{0x78}
magicGzip = []byte{0x1f, 0x8b}
)
// Syslog severity levels
const (
LOG_EMERG = int32(0)
LOG_ALERT = int32(1)
LOG_CRIT = int32(2)
LOG_ERR = int32(3)
LOG_WARNING = int32(4)
LOG_NOTICE = int32(5)
LOG_INFO = int32(6)
LOG_DEBUG = int32(7)
)
// numChunks returns the number of GELF chunks necessary to transmit
// the given compressed buffer.
func numChunks(b []byte) int {
lenB := len(b)
if lenB <= ChunkSize {
return 1
}
return len(b)/chunkedDataLen + 1
}
// New returns a new GELF Writer. This writer can be used to send the
// output of the standard Go log functions to a central GELF server by
// passing it to log.SetOutput()
func NewWriter(addr string) (*Writer, error) {
var err error
w := new(Writer)
w.CompressionLevel = flate.BestSpeed
if w.conn, err = net.Dial("udp", addr); err != nil {
return nil, err
}
if w.hostname, err = os.Hostname(); err != nil {
return nil, err
}
w.Facility = path.Base(os.Args[0])
return w, nil
}
// writes the gzip compressed byte array to the connection as a series
// of GELF chunked messages. The format is documented at
// http://docs.graylog.org/en/2.1/pages/gelf.html as:
//
// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
// total, chunk-data
func (w *Writer) writeChunked(zBytes []byte) (err error) {
b := make([]byte, 0, ChunkSize)
buf := bytes.NewBuffer(b)
nChunksI := numChunks(zBytes)
if nChunksI > 128 {
return fmt.Errorf("msg too large, would need %d chunks", nChunksI)
}
nChunks := uint8(nChunksI)
// use urandom to get a unique message id
msgId := make([]byte, 8)
n, err := io.ReadFull(rand.Reader, msgId)
if err != nil || n != 8 {
return fmt.Errorf("rand.Reader: %d/%s", n, err)
}
bytesLeft := len(zBytes)
for i := uint8(0); i < nChunks; i++ {
buf.Reset()
// manually write header. Don't care about
// host/network byte order, because the spec only
// deals in individual bytes.
buf.Write(magicChunked) //magic
buf.Write(msgId)
buf.WriteByte(i)
buf.WriteByte(nChunks)
// slice out our chunk from zBytes
chunkLen := chunkedDataLen
if chunkLen > bytesLeft {
chunkLen = bytesLeft
}
off := int(i) * chunkedDataLen
chunk := zBytes[off : off+chunkLen]
buf.Write(chunk)
// write this chunk, and make sure the write was good
n, err := w.conn.Write(buf.Bytes())
if err != nil {
return fmt.Errorf("Write (chunk %d/%d): %s", i,
nChunks, err)
}
if n != len(buf.Bytes()) {
return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)",
i, nChunks, n, len(buf.Bytes()))
}
bytesLeft -= chunkLen
}
if bytesLeft != 0 {
return fmt.Errorf("error: %d bytes left after sending", bytesLeft)
}
return nil
}
// 1k bytes buffer by default
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
func newBuffer() *bytes.Buffer {
b := bufPool.Get().(*bytes.Buffer)
if b != nil {
b.Reset()
return b
}
return bytes.NewBuffer(nil)
}
// WriteMessage sends the specified message to the GELF server
// specified in the call to New(). It assumes all the fields are
// filled out appropriately. In general, clients will want to use
// Write, rather than WriteMessage.
func (w *Writer) WriteMessage(m *Message) (err error) {
mBuf := newBuffer()
defer bufPool.Put(mBuf)
if err = m.MarshalJSONBuf(mBuf); err != nil {
return err
}
mBytes := mBuf.Bytes()
var (
zBuf *bytes.Buffer
zBytes []byte
)
var zw io.WriteCloser
switch w.CompressionType {
case CompressGzip:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressZlib:
zBuf = newBuffer()
defer bufPool.Put(zBuf)
zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel)
case CompressNone:
zBytes = mBytes
default:
panic(fmt.Sprintf("unknown compression type %d",
w.CompressionType))
}
if zw != nil {
if err != nil {
return
}
if _, err = zw.Write(mBytes); err != nil {
zw.Close()
return
}
zw.Close()
zBytes = zBuf.Bytes()
}
if numChunks(zBytes) > 1 {
return w.writeChunked(zBytes)
}
n, err := w.conn.Write(zBytes)
if err != nil {
return
}
if n != len(zBytes) {
return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
}
return nil
type GelfWriter struct {
addr string
conn net.Conn
hostname string
Facility string // defaults to current process name
proto string
}
// Close connection and interrupt blocked Read or Write operations
func (w *Writer) Close() error {
func (w *GelfWriter) Close() error {
return w.conn.Close()
}
/*
func (w *Writer) Alert(m string) (err error)
func (w *Writer) Close() error
func (w *Writer) Crit(m string) (err error)
func (w *Writer) Debug(m string) (err error)
func (w *Writer) Emerg(m string) (err error)
func (w *Writer) Err(m string) (err error)
func (w *Writer) Info(m string) (err error)
func (w *Writer) Notice(m string) (err error)
func (w *Writer) Warning(m string) (err error)
*/
// getCaller returns the filename and the line info of a function
// further down in the call stack. Passing 0 in as callDepth would
// return info on the function calling getCallerIgnoringLog, 1 the
// parent function, and so on. Any suffixes passed to getCaller are
// path fragments like "/pkg/log/log.go", and functions in the call
// stack from that file are ignored.
func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
// bump by 1 to ignore the getCaller (this) stackframe
callDepth++
outer:
for {
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
break
}
for _, s := range suffixesToIgnore {
if strings.HasSuffix(file, s) {
callDepth++
continue outer
}
}
break
}
return
}
func getCallerIgnoringLogMulti(callDepth int) (string, int) {
// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go")
}
// Write encodes the given string in a GELF message and sends it to
// the server specified in New().
func (w *Writer) Write(p []byte) (n int, err error) {
// 1 for the function that called us.
file, line := getCallerIgnoringLogMulti(1)
// remove trailing and leading whitespace
p = bytes.TrimSpace(p)
// If there are newlines in the message, use the first line
// for the short message and set the full message to the
// original input. If the input has no newlines, stick the
// whole thing in Short.
short := p
full := []byte("")
if i := bytes.IndexRune(p, '\n'); i > 0 {
short = p[:i]
full = p
}
m := Message{
Version: "1.1",
Host: w.hostname,
Short: string(short),
Full: string(full),
TimeUnix: float64(time.Now().Unix()),
Level: 6, // info
Facility: w.Facility,
Extra: map[string]interface{}{
"_file": file,
"_line": line,
},
}
if err = w.WriteMessage(&m); err != nil {
return 0, err
}
return len(p), nil
}
func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
// write up until the final }
if _, err = buf.Write(b[:len(b)-1]); err != nil {
return err
}
if len(m.Extra) > 0 {
eb, err := json.Marshal(m.Extra)
if err != nil {
return err
}
// merge serialized message + serialized extra map
if err = buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil {
return err
}
}
if len(m.RawExtra) > 0 {
if err := buf.WriteByte(','); err != nil {
return err
}
// write serialized extra bytes, without enclosing quotes
if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil {
return err
}
}
// write final closing quotes
return buf.WriteByte('}')
}
func (m *Message) UnmarshalJSON(data []byte) error {
i := make(map[string]interface{}, 16)
if err := json.Unmarshal(data, &i); err != nil {
return err
}
for k, v := range i {
if k[0] == '_' {
if m.Extra == nil {
m.Extra = make(map[string]interface{}, 1)
}
m.Extra[k] = v
continue
}
switch k {
case "version":
m.Version = v.(string)
case "host":
m.Host = v.(string)
case "short_message":
m.Short = v.(string)
case "full_message":
m.Full = v.(string)
case "timestamp":
m.TimeUnix = v.(float64)
case "level":
m.Level = int32(v.(float64))
case "facility":
m.Facility = v.(string)
}
}
return nil
}