From 5bc11021250e39e71ca9edf62ce5c33f15c3756f Mon Sep 17 00:00:00 2001 From: Ghislain Bourgeois Date: Mon, 11 Dec 2017 15:52:50 -0500 Subject: [PATCH 1/2] Fix vendoring of go-gelf to point to specific commit ID Signed-off-by: Ghislain Bourgeois --- vendor.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor.conf b/vendor.conf index 9ff52908b5..22e6f4e1d7 100644 --- a/vendor.conf +++ b/vendor.conf @@ -77,7 +77,7 @@ github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd852 github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4 # gelf logging driver deps -github.com/Graylog2/go-gelf v2 +github.com/Graylog2/go-gelf ba920adcee0319adcc15f52851f1458f56547975 github.com/fluent/fluent-logger-golang v1.3.0 # fluent-logger-golang deps From f9f3c49302fc80e586e3cb10af43114929556b47 Mon Sep 17 00:00:00 2001 From: Ghislain Bourgeois Date: Mon, 11 Dec 2017 09:22:51 -0500 Subject: [PATCH 2/2] Update Graylog2/go-gelf vendoring. Fixes #35613 Signed-off-by: Ghislain Bourgeois --- vendor.conf | 2 +- .../Graylog2/go-gelf/gelf/tcpreader.go | 97 +++++++++++++++---- .../Graylog2/go-gelf/gelf/tcpwriter.go | 12 ++- .../Graylog2/go-gelf/gelf/writer.go | 3 + 4 files changed, 94 insertions(+), 20 deletions(-) diff --git a/vendor.conf b/vendor.conf index 22e6f4e1d7..87f7930262 100644 --- a/vendor.conf +++ b/vendor.conf @@ -77,7 +77,7 @@ github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd852 github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4 # gelf logging driver deps -github.com/Graylog2/go-gelf ba920adcee0319adcc15f52851f1458f56547975 +github.com/Graylog2/go-gelf 4143646226541087117ff2f83334ea48b3201841 github.com/fluent/fluent-logger-golang v1.3.0 # fluent-logger-golang deps diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go index 8f22c9aea4..74255ec3be 100644 --- a/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go +++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net" + "time" ) type TCPReader struct { @@ -13,16 +14,21 @@ type TCPReader struct { messages chan []byte } -func newTCPReader(addr string) (*TCPReader, chan string, error) { +type connChannels struct { + drop chan string + confirm chan string +} + +func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) { var err error tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { - return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err) + return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err) } listener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { - return nil, nil, fmt.Errorf("ListenTCP: %s", err) + return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err) } r := &TCPReader{ @@ -30,26 +36,61 @@ func newTCPReader(addr string) (*TCPReader, chan string, error) { messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages } - signal := make(chan string, 1) + closeSignal := make(chan string, 1) + doneSignal := make(chan string, 1) - go r.listenUntilCloseSignal(signal) + go r.listenUntilCloseSignal(closeSignal, doneSignal) - return r, signal, nil + return r, closeSignal, doneSignal, nil } -func (r *TCPReader) listenUntilCloseSignal(signal chan string) { - defer func() { signal <- "done" }() - defer r.listener.Close() +func (r *TCPReader) accepter(connections chan net.Conn) { for { conn, err := r.listener.Accept() if err != nil { break } - go handleConnection(conn, r.messages) + connections <- conn + } +} + +func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) { + defer func() { doneSignal <- "done" }() + defer r.listener.Close() + var conns []connChannels + connectionsChannel := make(chan net.Conn, 1) + go r.accepter(connectionsChannel) + for { select { - case sig := <-signal: - if sig == "stop" { - break + case conn := <-connectionsChannel: + dropSignal := make(chan string, 1) + dropConfirm := make(chan string, 1) + channels := connChannels{drop: dropSignal, confirm: dropConfirm} + go handleConnection(conn, r.messages, dropSignal, dropConfirm) + conns = append(conns, channels) + default: + } + + select { + case sig := <-closeSignal: + if sig == "stop" || sig == "drop" { + if len(conns) >= 1 { + for _, s := range conns { + if s.drop != nil { + s.drop <- "drop" + <-s.confirm + conns = append(conns[:0], conns[1:]...) + } + } + if sig == "stop" { + return + } + } else if sig == "stop" { + closeSignal <- "stop" + } + if sig == "drop" { + doneSignal <- "done" + } } default: } @@ -60,19 +101,41 @@ func (r *TCPReader) addr() string { return r.listener.Addr().String() } -func handleConnection(conn net.Conn, messages chan<- []byte) { +func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) { + defer func() { dropConfirm <- "done" }() defer conn.Close() reader := bufio.NewReader(conn) var b []byte var err error + drop := false + canDrop := false for { + conn.SetDeadline(time.Now().Add(2 * time.Second)) if b, err = reader.ReadBytes(0); err != nil { - continue - } - if len(b) > 0 { + if drop { + return + } + } else if len(b) > 0 { messages <- b + canDrop = true + if drop { + return + } + } else if drop { + return + } + select { + case sig := <-dropSignal: + if sig == "drop" { + drop = true + time.Sleep(1 * time.Second) + if canDrop { + return + } + } + default: } } } diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go index ab95cbcd02..da1390d1d6 100644 --- a/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go +++ b/vendor/github.com/Graylog2/go-gelf/gelf/tcpwriter.go @@ -75,12 +75,17 @@ func (w *TCPWriter) Write(p []byte) (n int, err error) { func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) { var errConn error + var i int w.mu.Lock() - for i := 0; n <= w.MaxReconnect; i++ { + for i = 0; i <= w.MaxReconnect; i++ { errConn = nil - n, err = w.conn.Write(zBytes) + if w.conn != nil { + n, err = w.conn.Write(zBytes) + } else { + err = fmt.Errorf("Connection was nil, will attempt reconnect") + } if err != nil { time.Sleep(w.ReconnectDelay * time.Second) w.conn, errConn = net.Dial("tcp", w.addr) @@ -90,6 +95,9 @@ func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, er } w.mu.Unlock() + if i > w.MaxReconnect { + return 0, fmt.Errorf("Maximum reconnection attempts was reached; giving up") + } if errConn != nil { return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn) } diff --git a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go index 93c36929b4..153be2c340 100644 --- a/vendor/github.com/Graylog2/go-gelf/gelf/writer.go +++ b/vendor/github.com/Graylog2/go-gelf/gelf/writer.go @@ -27,5 +27,8 @@ type GelfWriter struct { // Close connection and interrupt blocked Read or Write operations func (w *GelfWriter) Close() error { + if w.conn == nil { + return nil + } return w.conn.Close() }