// Copyright 2012 SocialCode. All rights reserved. // Use of this source code is governed by the MIT // license that can be found in the LICENSE file. package gelf import ( "bytes" "compress/flate" "compress/gzip" "compress/zlib" "crypto/rand" "fmt" "io" "net" "os" "path" "sync" ) type UDPWriter struct { GelfWriter CompressionLevel int // one of the consts from compress/flate CompressionType CompressType } // What compression type the writer should use when sending messages // to the graylog2 server type CompressType int const ( CompressGzip CompressType = iota CompressZlib CompressNone ) // Used to control GELF chunking. Should be less than (MTU - len(UDP // header)). // // TODO: generate dynamically using Path MTU Discovery? const ( ChunkSize = 1420 chunkedHeaderLen = 12 chunkedDataLen = ChunkSize - chunkedHeaderLen ) var ( magicChunked = []byte{0x1e, 0x0f} magicZlib = []byte{0x78} magicGzip = []byte{0x1f, 0x8b} ) // numChunks returns the number of GELF chunks necessary to transmit // the given compressed buffer. func numChunks(b []byte) int { lenB := len(b) if lenB <= ChunkSize { return 1 } return len(b)/chunkedDataLen + 1 } // New returns a new GELF Writer. This writer can be used to send the // output of the standard Go log functions to a central GELF server by // passing it to log.SetOutput() func NewUDPWriter(addr string) (*UDPWriter, error) { var err error w := new(UDPWriter) w.CompressionLevel = flate.BestSpeed if w.conn, err = net.Dial("udp", addr); err != nil { return nil, err } if w.hostname, err = os.Hostname(); err != nil { return nil, err } w.Facility = path.Base(os.Args[0]) return w, nil } // writes the gzip compressed byte array to the connection as a series // of GELF chunked messages. The format is documented at // http://docs.graylog.org/en/2.1/pages/gelf.html as: // // 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte // total, chunk-data func (w *GelfWriter) writeChunked(zBytes []byte) (err error) { b := make([]byte, 0, ChunkSize) buf := bytes.NewBuffer(b) nChunksI := numChunks(zBytes) if nChunksI > 128 { return fmt.Errorf("msg too large, would need %d chunks", nChunksI) } nChunks := uint8(nChunksI) // use urandom to get a unique message id msgId := make([]byte, 8) n, err := io.ReadFull(rand.Reader, msgId) if err != nil || n != 8 { return fmt.Errorf("rand.Reader: %d/%s", n, err) } bytesLeft := len(zBytes) for i := uint8(0); i < nChunks; i++ { buf.Reset() // manually write header. Don't care about // host/network byte order, because the spec only // deals in individual bytes. buf.Write(magicChunked) //magic buf.Write(msgId) buf.WriteByte(i) buf.WriteByte(nChunks) // slice out our chunk from zBytes chunkLen := chunkedDataLen if chunkLen > bytesLeft { chunkLen = bytesLeft } off := int(i) * chunkedDataLen chunk := zBytes[off : off+chunkLen] buf.Write(chunk) // write this chunk, and make sure the write was good n, err := w.conn.Write(buf.Bytes()) if err != nil { return fmt.Errorf("Write (chunk %d/%d): %s", i, nChunks, err) } if n != len(buf.Bytes()) { return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)", i, nChunks, n, len(buf.Bytes())) } bytesLeft -= chunkLen } if bytesLeft != 0 { return fmt.Errorf("error: %d bytes left after sending", bytesLeft) } return nil } // 1k bytes buffer by default var bufPool = sync.Pool{ New: func() interface{} { return bytes.NewBuffer(make([]byte, 0, 1024)) }, } func newBuffer() *bytes.Buffer { b := bufPool.Get().(*bytes.Buffer) if b != nil { b.Reset() return b } return bytes.NewBuffer(nil) } // WriteMessage sends the specified message to the GELF server // specified in the call to New(). It assumes all the fields are // filled out appropriately. In general, clients will want to use // Write, rather than WriteMessage. func (w *UDPWriter) WriteMessage(m *Message) (err error) { mBuf := newBuffer() defer bufPool.Put(mBuf) if err = m.MarshalJSONBuf(mBuf); err != nil { return err } mBytes := mBuf.Bytes() var ( zBuf *bytes.Buffer zBytes []byte ) var zw io.WriteCloser switch w.CompressionType { case CompressGzip: zBuf = newBuffer() defer bufPool.Put(zBuf) zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) case CompressZlib: zBuf = newBuffer() defer bufPool.Put(zBuf) zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) case CompressNone: zBytes = mBytes default: panic(fmt.Sprintf("unknown compression type %d", w.CompressionType)) } if zw != nil { if err != nil { return } if _, err = zw.Write(mBytes); err != nil { zw.Close() return } zw.Close() zBytes = zBuf.Bytes() } if numChunks(zBytes) > 1 { return w.writeChunked(zBytes) } n, err := w.conn.Write(zBytes) if err != nil { return } if n != len(zBytes) { return fmt.Errorf("bad write (%d/%d)", n, len(zBytes)) } return nil } // Write encodes the given string in a GELF message and sends it to // the server specified in New(). func (w *UDPWriter) Write(p []byte) (n int, err error) { // 1 for the function that called us. file, line := getCallerIgnoringLogMulti(1) m := constructMessage(p, w.hostname, w.Facility, file, line) if err = w.WriteMessage(m); err != nil { return 0, err } return len(p), nil }