1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Add TCP support for GELF log driver

Signed-off-by: Ghislain Bourgeois <ghislain.bourgeois@gmail.com>
This commit is contained in:
Ghislain Bourgeois 2017-09-06 17:45:26 -04:00
parent e21f7b6e76
commit e17f351114
2 changed files with 224 additions and 60 deletions

View file

@ -47,10 +47,6 @@ func New(info logger.Info) (logger.Logger, error) {
return nil, err return nil, err
} }
if address.Scheme == "tcp" {
return nil, fmt.Errorf("gelf: TCP not yet implemented")
}
// collect extra data for GELF message // collect extra data for GELF message
hostname, err := info.Hostname() hostname, err := info.Hostname()
if err != nil { if err != nil {
@ -93,10 +89,58 @@ func New(info logger.Info) (logger.Logger, error) {
return nil, err return nil, err
} }
// create new gelfWriter var gelfWriter gelf.Writer
gelfWriter, err := gelf.NewUDPWriter(address.String()) if address.Scheme == "udp" {
gelfWriter, err = newGELFUDPWriter(address.Host, info)
if err != nil { if err != nil {
return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address.String(), err) return nil, err
}
} else if address.Scheme == "tcp" {
gelfWriter, err = newGELFTCPWriter(address.Host, info)
if err != nil {
return nil, err
}
}
return &gelfLogger{
writer: gelfWriter,
info: info,
hostname: hostname,
rawExtra: rawExtra,
}, nil
}
// create new TCP gelfWriter
func newGELFTCPWriter(address string, info logger.Info) (gelf.Writer, error) {
gelfWriter, err := gelf.NewTCPWriter(address)
if err != nil {
return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
}
if v, ok := info.Config["gelf-tcp-max-reconnect"]; ok {
i, err := strconv.Atoi(v)
if err != nil || i < 0 {
return nil, fmt.Errorf("gelf-tcp-max-reconnect must be a positive integer")
}
gelfWriter.MaxReconnect = i
}
if v, ok := info.Config["gelf-tcp-reconnect-delay"]; ok {
i, err := strconv.Atoi(v)
if err != nil || i < 0 {
return nil, fmt.Errorf("gelf-tcp-reconnect-delay must be a positive integer")
}
gelfWriter.ReconnectDelay = time.Duration(i)
}
return gelfWriter, nil
}
// create new UDP gelfWriter
func newGELFUDPWriter(address string, info logger.Info) (gelf.Writer, error) {
gelfWriter, err := gelf.NewUDPWriter(address)
if err != nil {
return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
} }
if v, ok := info.Config["gelf-compression-type"]; ok { if v, ok := info.Config["gelf-compression-type"]; ok {
@ -120,12 +164,7 @@ func New(info logger.Info) (logger.Logger, error) {
gelfWriter.CompressionLevel = val gelfWriter.CompressionLevel = val
} }
return &gelfLogger{ return gelfWriter, nil
writer: gelfWriter,
info: info,
hostname: hostname,
rawExtra: rawExtra,
}, nil
} }
func (s *gelfLogger) Log(msg *logger.Message) error { func (s *gelfLogger) Log(msg *logger.Message) error {

View file

@ -3,10 +3,13 @@
package gelf package gelf
import ( import (
"net"
"testing" "testing"
"github.com/docker/docker/daemon/logger"
) )
//Validate parseAddress // Validate parseAddress
func TestParseAddress(t *testing.T) { func TestParseAddress(t *testing.T) {
url, err := parseAddress("udp://127.0.0.1:12201") url, err := parseAddress("udp://127.0.0.1:12201")
if err != nil { if err != nil {
@ -27,52 +30,7 @@ func TestParseAddress(t *testing.T) {
} }
} }
//Validate UDP options // Validate TCP options
func TestUDPValidateLogOpt(t *testing.T) {
err := ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"tag": "testtag",
"labels": "testlabel",
"env": "testenv",
"env-regex": "testenv-regex",
"gelf-compression-level": "9",
"gelf-compression-type": "gzip",
})
if err != nil {
t.Fatal(err)
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-level": "ultra",
"gelf-compression-type": "zlib",
})
if err == nil {
t.Fatal("Expected compression level error")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-type": "rar",
})
if err == nil {
t.Fatal("Expected compression type error")
}
err = ValidateLogOpt(map[string]string{
"invalid": "invalid",
})
if err == nil {
t.Fatal("Expected unknown option error")
}
err = ValidateLogOpt(map[string]string{})
if err == nil {
t.Fatal("Expected required parameter error")
}
}
//Validate TCP options
func TestTCPValidateLogOpt(t *testing.T) { func TestTCPValidateLogOpt(t *testing.T) {
err := ValidateLogOpt(map[string]string{ err := ValidateLogOpt(map[string]string{
"gelf-address": "tcp://127.0.0.1:12201", "gelf-address": "tcp://127.0.0.1:12201",
@ -133,3 +91,170 @@ func TestTCPValidateLogOpt(t *testing.T) {
t.Fatal("Expected TCP reconnect to be invalid for UDP") t.Fatal("Expected TCP reconnect to be invalid for UDP")
} }
} }
// Validate UDP options
func TestUDPValidateLogOpt(t *testing.T) {
err := ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"tag": "testtag",
"labels": "testlabel",
"env": "testenv",
"env-regex": "testenv-regex",
"gelf-compression-level": "9",
"gelf-compression-type": "gzip",
})
if err != nil {
t.Fatal(err)
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-level": "ultra",
"gelf-compression-type": "zlib",
})
if err == nil {
t.Fatal("Expected compression level error")
}
err = ValidateLogOpt(map[string]string{
"gelf-address": "udp://127.0.0.1:12201",
"gelf-compression-type": "rar",
})
if err == nil {
t.Fatal("Expected compression type error")
}
err = ValidateLogOpt(map[string]string{
"invalid": "invalid",
})
if err == nil {
t.Fatal("Expected unknown option error")
}
err = ValidateLogOpt(map[string]string{})
if err == nil {
t.Fatal("Expected required parameter error")
}
}
// Validate newGELFTCPWriter
func TestNewGELFTCPWriter(t *testing.T) {
address := "127.0.0.1:0"
tcpAddr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
t.Fatal(err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
t.Fatal(err)
}
url := "tcp://" + listener.Addr().String()
info := logger.Info{
Config: map[string]string{
"gelf-address": url,
"gelf-tcp-max-reconnect": "0",
"gelf-tcp-reconnect-delay": "0",
"tag": "{{.ID}}",
},
ContainerID: "12345678901234567890",
}
writer, err := newGELFTCPWriter(listener.Addr().String(), info)
if err != nil {
t.Fatal(err)
}
err = writer.Close()
if err != nil {
t.Fatal(err)
}
err = listener.Close()
if err != nil {
t.Fatal(err)
}
}
// Validate newGELFUDPWriter
func TestNewGELFUDPWriter(t *testing.T) {
address := "127.0.0.1:0"
info := logger.Info{
Config: map[string]string{
"gelf-address": "udp://127.0.0.1:0",
"gelf-compression-level": "5",
"gelf-compression-type": "gzip",
},
}
writer, err := newGELFUDPWriter(address, info)
if err != nil {
t.Fatal(err)
}
writer.Close()
if err != nil {
t.Fatal(err)
}
}
// Validate New for TCP
func TestNewTCP(t *testing.T) {
address := "127.0.0.1:0"
tcpAddr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
t.Fatal(err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
t.Fatal(err)
}
url := "tcp://" + listener.Addr().String()
info := logger.Info{
Config: map[string]string{
"gelf-address": url,
"gelf-tcp-max-reconnect": "0",
"gelf-tcp-reconnect-delay": "0",
},
ContainerID: "12345678901234567890",
}
logger, err := New(info)
if err != nil {
t.Fatal(err)
}
err = logger.Close()
if err != nil {
t.Fatal(err)
}
err = listener.Close()
if err != nil {
t.Fatal(err)
}
}
// Validate New for UDP
func TestNewUDP(t *testing.T) {
info := logger.Info{
Config: map[string]string{
"gelf-address": "udp://127.0.0.1:0",
"gelf-compression-level": "5",
"gelf-compression-type": "gzip",
},
ContainerID: "12345678901234567890",
}
logger, err := New(info)
if err != nil {
t.Fatal(err)
}
err = logger.Close()
if err != nil {
t.Fatal(err)
}
}