2015-05-29 17:00:46 -04:00
|
|
|
// Copyright 2012 SocialCode. All rights reserved.
|
|
|
|
// Use of this source code is governed by the MIT
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
package gelf
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"compress/gzip"
|
|
|
|
"compress/zlib"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Reader struct {
|
|
|
|
mu sync.Mutex
|
|
|
|
conn net.Conn
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewReader(addr string) (*Reader, error) {
|
|
|
|
var err error
|
|
|
|
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
conn, err := net.ListenUDP("udp", udpAddr)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("ListenUDP: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
r := new(Reader)
|
|
|
|
r.conn = conn
|
|
|
|
return r, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Reader) Addr() string {
|
|
|
|
return r.conn.LocalAddr().String()
|
|
|
|
}
|
|
|
|
|
|
|
|
// FIXME: this will discard data if p isn't big enough to hold the
|
|
|
|
// full message.
|
|
|
|
func (r *Reader) Read(p []byte) (int, error) {
|
|
|
|
msg, err := r.ReadMessage()
|
|
|
|
if err != nil {
|
|
|
|
return -1, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var data string
|
|
|
|
|
|
|
|
if msg.Full == "" {
|
|
|
|
data = msg.Short
|
|
|
|
} else {
|
|
|
|
data = msg.Full
|
|
|
|
}
|
|
|
|
|
|
|
|
return strings.NewReader(data).Read(p)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Reader) ReadMessage() (*Message, error) {
|
|
|
|
cBuf := make([]byte, ChunkSize)
|
|
|
|
var (
|
|
|
|
err error
|
|
|
|
n, length int
|
|
|
|
cid, ocid []byte
|
|
|
|
seq, total uint8
|
|
|
|
cHead []byte
|
|
|
|
cReader io.Reader
|
|
|
|
chunks [][]byte
|
|
|
|
)
|
|
|
|
|
|
|
|
for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
|
|
|
|
if n, err = r.conn.Read(cBuf); err != nil {
|
|
|
|
return nil, fmt.Errorf("Read: %s", err)
|
|
|
|
}
|
|
|
|
cHead, cBuf = cBuf[:2], cBuf[:n]
|
|
|
|
|
|
|
|
if bytes.Equal(cHead, magicChunked) {
|
|
|
|
//fmt.Printf("chunked %v\n", cBuf[:14])
|
|
|
|
cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
|
|
|
|
if ocid != nil && !bytes.Equal(cid, ocid) {
|
|
|
|
return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
|
|
|
|
} else if ocid == nil {
|
|
|
|
ocid = cid
|
|
|
|
chunks = make([][]byte, total)
|
|
|
|
}
|
|
|
|
n = len(cBuf) - chunkedHeaderLen
|
|
|
|
//fmt.Printf("setting chunks[%d]: %d\n", seq, n)
|
|
|
|
chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
|
|
|
|
length += n
|
|
|
|
} else { //not chunked
|
|
|
|
if total > 0 {
|
|
|
|
return nil, fmt.Errorf("out-of-band message (not chunked)")
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//fmt.Printf("\nchunks: %v\n", chunks)
|
|
|
|
|
|
|
|
if length > 0 {
|
|
|
|
if cap(cBuf) < length {
|
|
|
|
cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
|
|
|
|
}
|
|
|
|
cBuf = cBuf[:0]
|
|
|
|
for i := range chunks {
|
|
|
|
//fmt.Printf("appending %d %v\n", i, chunks[i])
|
|
|
|
cBuf = append(cBuf, chunks[i]...)
|
|
|
|
}
|
|
|
|
cHead = cBuf[:2]
|
|
|
|
}
|
|
|
|
|
|
|
|
// the data we get from the wire is compressed
|
|
|
|
if bytes.Equal(cHead, magicGzip) {
|
|
|
|
cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
|
|
|
|
} else if cHead[0] == magicZlib[0] &&
|
|
|
|
(int(cHead[0])*256+int(cHead[1]))%31 == 0 {
|
|
|
|
// zlib is slightly more complicated, but correct
|
|
|
|
cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
|
|
|
|
} else {
|
2016-01-21 07:28:25 -05:00
|
|
|
// compliance with https://github.com/Graylog2/graylog2-server
|
|
|
|
// treating all messages as uncompressed if they are not gzip, zlib or
|
|
|
|
// chunked
|
|
|
|
cReader = bytes.NewReader(cBuf)
|
2015-05-29 17:00:46 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("NewReader: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
msg := new(Message)
|
2016-01-21 07:28:25 -05:00
|
|
|
if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
|
2015-05-29 17:00:46 -04:00
|
|
|
return nil, fmt.Errorf("json.Unmarshal: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return msg, nil
|
|
|
|
}
|