1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/vendor/github.com/Graylog2/go-gelf/gelf/tcpreader.go
Ghislain Bourgeois b24c8e07f1 Update to latest go-gelf version and add tests
Signed-off-by: Ghislain Bourgeois <ghislain.bourgeois@gmail.com>
2017-09-06 15:55:58 -04:00

93 lines
1.6 KiB
Go

package gelf
import (
"bufio"
"encoding/json"
"fmt"
"net"
)
type TCPReader struct {
listener *net.TCPListener
conn net.Conn
messages chan []byte
}
func newTCPReader(addr string) (*TCPReader, chan string, error) {
var err error
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, nil, fmt.Errorf("ListenTCP: %s", err)
}
r := &TCPReader{
listener: listener,
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
}
signal := make(chan string, 1)
go r.listenUntilCloseSignal(signal)
return r, signal, nil
}
func (r *TCPReader) listenUntilCloseSignal(signal chan string) {
defer func() { signal <- "done" }()
defer r.listener.Close()
for {
conn, err := r.listener.Accept()
if err != nil {
break
}
go handleConnection(conn, r.messages)
select {
case sig := <-signal:
if sig == "stop" {
break
}
default:
}
}
}
func (r *TCPReader) addr() string {
return r.listener.Addr().String()
}
func handleConnection(conn net.Conn, messages chan<- []byte) {
defer conn.Close()
reader := bufio.NewReader(conn)
var b []byte
var err error
for {
if b, err = reader.ReadBytes(0); err != nil {
continue
}
if len(b) > 0 {
messages <- b
}
}
}
func (r *TCPReader) readMessage() (*Message, error) {
b := <-r.messages
var msg Message
if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return &msg, nil
}
func (r *TCPReader) Close() {
r.listener.Close()
}