mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
156 lines
3.1 KiB
Go
156 lines
3.1 KiB
Go
package gelf
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
)
|
|
|
|
type TCPReader struct {
|
|
listener *net.TCPListener
|
|
conn net.Conn
|
|
messages chan []byte
|
|
}
|
|
|
|
type connChannels struct {
|
|
drop chan string
|
|
confirm chan string
|
|
}
|
|
|
|
func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
|
|
var err error
|
|
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
|
|
}
|
|
|
|
listener, err := net.ListenTCP("tcp", tcpAddr)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
|
|
}
|
|
|
|
r := &TCPReader{
|
|
listener: listener,
|
|
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
|
|
}
|
|
|
|
closeSignal := make(chan string, 1)
|
|
doneSignal := make(chan string, 1)
|
|
|
|
go r.listenUntilCloseSignal(closeSignal, doneSignal)
|
|
|
|
return r, closeSignal, doneSignal, nil
|
|
}
|
|
|
|
func (r *TCPReader) accepter(connections chan net.Conn) {
|
|
for {
|
|
conn, err := r.listener.Accept()
|
|
if err != nil {
|
|
break
|
|
}
|
|
connections <- conn
|
|
}
|
|
}
|
|
|
|
func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
|
|
defer func() { doneSignal <- "done" }()
|
|
defer r.listener.Close()
|
|
var conns []connChannels
|
|
connectionsChannel := make(chan net.Conn, 1)
|
|
go r.accepter(connectionsChannel)
|
|
for {
|
|
select {
|
|
case conn := <-connectionsChannel:
|
|
dropSignal := make(chan string, 1)
|
|
dropConfirm := make(chan string, 1)
|
|
channels := connChannels{drop: dropSignal, confirm: dropConfirm}
|
|
go handleConnection(conn, r.messages, dropSignal, dropConfirm)
|
|
conns = append(conns, channels)
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case sig := <-closeSignal:
|
|
if sig == "stop" || sig == "drop" {
|
|
if len(conns) >= 1 {
|
|
for _, s := range conns {
|
|
if s.drop != nil {
|
|
s.drop <- "drop"
|
|
<-s.confirm
|
|
conns = append(conns[:0], conns[1:]...)
|
|
}
|
|
}
|
|
if sig == "stop" {
|
|
return
|
|
}
|
|
} else if sig == "stop" {
|
|
closeSignal <- "stop"
|
|
}
|
|
if sig == "drop" {
|
|
doneSignal <- "done"
|
|
}
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *TCPReader) addr() string {
|
|
return r.listener.Addr().String()
|
|
}
|
|
|
|
func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
|
|
defer func() { dropConfirm <- "done" }()
|
|
defer conn.Close()
|
|
reader := bufio.NewReader(conn)
|
|
|
|
var b []byte
|
|
var err error
|
|
drop := false
|
|
canDrop := false
|
|
|
|
for {
|
|
conn.SetDeadline(time.Now().Add(2 * time.Second))
|
|
if b, err = reader.ReadBytes(0); err != nil {
|
|
if drop {
|
|
return
|
|
}
|
|
} else if len(b) > 0 {
|
|
messages <- b
|
|
canDrop = true
|
|
if drop {
|
|
return
|
|
}
|
|
} else if drop {
|
|
return
|
|
}
|
|
select {
|
|
case sig := <-dropSignal:
|
|
if sig == "drop" {
|
|
drop = true
|
|
time.Sleep(1 * time.Second)
|
|
if canDrop {
|
|
return
|
|
}
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *TCPReader) readMessage() (*Message, error) {
|
|
b := <-r.messages
|
|
|
|
var msg Message
|
|
if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil {
|
|
return nil, fmt.Errorf("json.Unmarshal: %s", err)
|
|
}
|
|
|
|
return &msg, nil
|
|
}
|
|
|
|
func (r *TCPReader) Close() {
|
|
r.listener.Close()
|
|
}
|