bump fluent/fluent-logger-golang v1.4.0

- Add RequestAck to enable at-least-once message transferring
- Add Async option to update sending message in asynchronous way
- Deprecate AsyncConnect (Use Async instead)

full diff: https://github.com/fluent/fluent-logger-golang/compare/v1.3.0...v1.4.0

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2019-04-15 02:11:27 +02:00
parent ad9362bb15
commit 688e67e1d3
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
8 changed files with 663 additions and 201 deletions

View File

@ -133,7 +133,7 @@ func New(info logger.Info) (logger.Logger, error) {
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
AsyncConnect: asyncConnect,
Async: asyncConnect,
SubSecondPrecision: subSecondPrecision,
}

View File

@ -95,7 +95,7 @@ github.com/golang/protobuf aa810b61a9c79d51363740d207bb
github.com/Graylog2/go-gelf 4143646226541087117ff2f83334ea48b3201841
# fluent-logger-golang deps
github.com/fluent/fluent-logger-golang 8bbc2356beaf021b04c9bd5cdc76ea5a7ccb40ec # v1.3.0
github.com/fluent/fluent-logger-golang 7a6c9dcd7f14c2ed5d8c55c11b894e5455ee311b # v1.4.0
github.com/philhofer/fwd bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0
github.com/tinylib/msgp 3b556c64540842d4f82967be066a7f7fffc3adad

View File

@ -64,6 +64,16 @@ f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"})
Sets the timeout for Write call of logger.Post.
Since the default is zero value, Write will not time out.
### Async
Enable asynchronous I/O (connect and write) for sending events to Fluentd.
The default is false.
### RequestAck
Sets whether to request acknowledgment from Fluentd to increase the reliability
of the connection. The default is false.
## Tests
```
go test

View File

@ -6,12 +6,17 @@ import (
"fmt"
"math"
"net"
"os"
"reflect"
"strconv"
"sync"
"time"
"bytes"
"encoding/base64"
"encoding/binary"
"github.com/tinylib/msgp/msgp"
"math/rand"
)
const (
@ -21,8 +26,9 @@ const (
defaultPort = 24224
defaultTimeout = 3 * time.Second
defaultWriteTimeout = time.Duration(0) // Write() will not time out
defaultBufferLimit = 8 * 1024 * 1024
defaultBufferLimit = 8 * 1024
defaultRetryWait = 500
defaultMaxRetryWait = 60000
defaultMaxRetry = 13
defaultReconnectWaitIncreRate = 1.5
// Default sub-second precision value to false since it is only compatible
@ -40,24 +46,36 @@ type Config struct {
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
MaxRetryWait int `json:"max_retry_wait"`
TagPrefix string `json:"tag_prefix"`
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`
Async bool `json:"async"`
// Deprecated: Use Async instead
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`
// Sub-second precision timestamps are only possible for those using fluentd
// v0.14+ and serializing their messages with msgpack.
SubSecondPrecision bool `json:"sub_second_precision"`
// RequestAck sends the chunk option with a unique ID. The server will
// respond with an acknowledgement. This option improves the reliability
// of the message transmission.
RequestAck bool `json:"request_ack"`
}
type msgToSend struct {
data []byte
ack string
}
type Fluent struct {
Config
mubuff sync.Mutex
pending []byte
pending chan *msgToSend
wg sync.WaitGroup
muconn sync.Mutex
conn net.Conn
reconnecting bool
muconn sync.Mutex
conn net.Conn
}
// New creates a new Logger.
@ -89,11 +107,22 @@ func New(config Config) (f *Fluent, err error) {
if config.MaxRetry == 0 {
config.MaxRetry = defaultMaxRetry
}
if config.MaxRetryWait == 0 {
config.MaxRetryWait = defaultMaxRetryWait
}
if config.AsyncConnect {
f = &Fluent{Config: config, reconnecting: true}
go f.reconnect()
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
config.Async = config.Async || config.AsyncConnect
}
if config.Async {
f = &Fluent{
Config: config,
pending: make(chan *msgToSend, config.BufferLimit),
}
f.wg.Add(1)
go f.run()
} else {
f = &Fluent{Config: config, reconnecting: false}
f = &Fluent{Config: config}
err = f.connect()
}
return
@ -173,28 +202,25 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
}
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
var data []byte
var msg *msgToSend
var err error
if data, err = f.EncodeData(tag, tm, message); err != nil {
if msg, err = f.EncodeData(tag, tm, message); err != nil {
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
}
return f.postRawData(data)
return f.postRawData(msg)
}
// Deprecated: Use EncodeAndPostData instead
func (f *Fluent) PostRawData(data []byte) {
f.postRawData(data)
func (f *Fluent) PostRawData(msg *msgToSend) {
f.postRawData(msg)
}
func (f *Fluent) postRawData(data []byte) error {
if err := f.appendBuffer(data); err != nil {
return err
func (f *Fluent) postRawData(msg *msgToSend) error {
if f.Config.Async {
return f.appendBuffer(msg)
}
if err := f.send(); err != nil {
f.close()
return err
}
return nil
// Synchronous write
return f.write(msg)
}
// For sending forward protocol adopted JSON
@ -207,43 +233,80 @@ type MessageChunk struct {
// So, it should write JSON marshaler by hand.
func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
data, err := json.Marshal(chunk.message.Record)
return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag,
chunk.message.Time, data)), err
if err != nil {
return nil, err
}
option, err := json.Marshal(chunk.message.Option)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag,
chunk.message.Time, data, option)), err
}
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
// getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack
// mechanism, see
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
func getUniqueID(timeUnix int64) (string, error) {
buf := bytes.NewBuffer(nil)
enc := base64.NewEncoder(base64.StdEncoding, buf)
if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil {
enc.Close()
return "", err
}
if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
enc.Close()
return "", err
}
// encoder needs to be closed before buf.String(), defer does not work
// here
enc.Close()
return buf.String(), nil
}
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
option := make(map[string]string)
msg = &msgToSend{}
timeUnix := tm.Unix()
if f.Config.RequestAck {
var err error
msg.ack, err = getUniqueID(timeUnix)
if err != nil {
return nil, err
}
option["chunk"] = msg.ack
}
if f.Config.MarshalAsJSON {
msg := Message{Tag: tag, Time: timeUnix, Record: message}
chunk := &MessageChunk{message: msg}
data, err = json.Marshal(chunk)
m := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
chunk := &MessageChunk{message: m}
msg.data, err = json.Marshal(chunk)
} else if f.Config.SubSecondPrecision {
msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message}
data, err = msg.MarshalMsg(nil)
m := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
msg.data, err = m.MarshalMsg(nil)
} else {
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
data, err = msg.MarshalMsg(nil)
m := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
msg.data, err = m.MarshalMsg(nil)
}
return
}
// Close closes the connection.
// Close closes the connection, waiting for pending logs to be sent
func (f *Fluent) Close() (err error) {
if len(f.pending) > 0 {
err = f.send()
if f.Config.Async {
close(f.pending)
f.wg.Wait()
}
f.close()
return
}
// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(data []byte) error {
f.mubuff.Lock()
defer f.mubuff.Unlock()
if len(f.pending)+len(data) > f.Config.BufferLimit {
return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit))
func (f *Fluent) appendBuffer(msg *msgToSend) error {
select {
case f.pending <- msg:
default:
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
}
f.pending = append(f.pending, data...)
return nil
}
@ -259,8 +322,6 @@ func (f *Fluent) close() {
// connect establishes a new connection using the specified transport.
func (f *Fluent) connect() (err error) {
f.muconn.Lock()
defer f.muconn.Unlock()
switch f.Config.FluentNetwork {
case "tcp":
@ -270,63 +331,78 @@ func (f *Fluent) connect() (err error) {
default:
err = net.UnknownNetworkError(f.Config.FluentNetwork)
}
return err
}
if err == nil {
f.reconnecting = false
func (f *Fluent) run() {
for {
select {
case entry, ok := <-f.pending:
if !ok {
f.wg.Done()
return
}
err := f.write(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
}
}
}
return
}
func e(x, y float64) int {
return int(math.Pow(x, y))
}
func (f *Fluent) reconnect() {
for i := 0; ; i++ {
err := f.connect()
if err == nil {
f.send()
return
func (f *Fluent) write(msg *msgToSend) error {
for i := 0; i < f.Config.MaxRetry; i++ {
// Connect if needed
f.muconn.Lock()
if f.conn == nil {
err := f.connect()
if err != nil {
f.muconn.Unlock()
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
if waitTime > f.Config.MaxRetryWait {
waitTime = f.Config.MaxRetryWait
}
time.Sleep(time.Duration(waitTime) * time.Millisecond)
continue
}
}
if i == f.Config.MaxRetry {
// TODO: What we can do when connection failed MaxRetry times?
panic("fluent#reconnect: failed to reconnect!")
}
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
time.Sleep(time.Duration(waitTime) * time.Millisecond)
}
}
f.muconn.Unlock()
func (f *Fluent) send() error {
f.muconn.Lock()
defer f.muconn.Unlock()
if f.conn == nil {
if f.reconnecting == false {
f.reconnecting = true
go f.reconnect()
}
return errors.New("fluent#send: can't send logs, client is reconnecting")
}
f.mubuff.Lock()
defer f.mubuff.Unlock()
var err error
if len(f.pending) > 0 {
// We're connected, write msg
t := f.Config.WriteTimeout
if time.Duration(0) < t {
f.conn.SetWriteDeadline(time.Now().Add(t))
} else {
f.conn.SetWriteDeadline(time.Time{})
}
_, err = f.conn.Write(f.pending)
_, err := f.conn.Write(msg.data)
if err != nil {
f.conn.Close()
f.conn = nil
f.close()
} else {
f.pending = f.pending[:0]
// Acknowledgment check
if msg.ack != "" {
resp := &AckResp{}
if f.Config.MarshalAsJSON {
dec := json.NewDecoder(f.conn)
err = dec.Decode(resp)
} else {
r := msgp.NewReader(f.conn)
err = resp.DecodeMsg(r)
}
if err != nil || resp.Ack != msg.ack {
f.close()
continue
}
}
return err
}
}
return err
return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
}

View File

@ -16,9 +16,9 @@ type Entry struct {
//msgp:tuple Forward
type Forward struct {
Tag string `msg:"tag"`
Entries []Entry `msg:"entries"`
Option interface{} `msg:"option"`
Tag string `msg:"tag"`
Entries []Entry `msg:"entries"`
Option map[string]string
}
//msgp:tuple Message
@ -26,7 +26,7 @@ type Message struct {
Tag string `msg:"tag"`
Time int64 `msg:"time"`
Record interface{} `msg:"record"`
Option interface{} `msg:"option"`
Option map[string]string
}
//msgp:tuple MessageExt
@ -34,7 +34,11 @@ type MessageExt struct {
Tag string `msg:"tag"`
Time EventTime `msg:"time,extension"`
Record interface{} `msg:"record"`
Option interface{} `msg:"option"`
Option map[string]string
}
type AckResp struct {
Ack string `json:"ack" msg:"ack"`
}
// EventTime is an extension to the serialized time value. It builds in support

View File

@ -1,30 +1,134 @@
package fluent
// NOTE: THIS FILE WAS PRODUCED BY THE
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
// DO NOT EDIT
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) {
var zxvk uint32
zxvk, err = dc.ReadArrayHeader()
func (z *AckResp) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ack":
z.Ack, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Ack")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z AckResp) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "ack"
err = en.Append(0x81, 0xa3, 0x61, 0x63, 0x6b)
if err != nil {
return
}
if zxvk != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zxvk}
err = en.WriteString(z.Ack)
if err != nil {
err = msgp.WrapError(err, "Ack")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z AckResp) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "ack"
o = append(o, 0x81, 0xa3, 0x61, 0x63, 0x6b)
o = msgp.AppendString(o, z.Ack)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *AckResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ack":
z.Ack, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Ack")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z AckResp) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.Ack)
return
}
// DecodeMsg implements msgp.Decodable
func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0001}
return
}
z.Time, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
return
@ -35,14 +139,16 @@ func (z Entry) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 2
err = en.Append(0x92)
if err != nil {
return err
return
}
err = en.WriteInt64(z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
err = en.WriteIntf(z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
return
@ -56,6 +162,7 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendInt64(o, z.Time)
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
return
@ -63,21 +170,24 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Entry) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zbzg uint32
zbzg, bts, err = msgp.ReadArrayHeaderBytes(bts)
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zbzg != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zbzg}
if zb0001 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0001}
return
}
z.Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
o = bts
@ -92,52 +202,83 @@ func (z Entry) Msgsize() (s int) {
// DecodeMsg implements msgp.Decodable
func (z *Forward) DecodeMsg(dc *msgp.Reader) (err error) {
var zcmr uint32
zcmr, err = dc.ReadArrayHeader()
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zcmr != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zcmr}
if zb0001 != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zb0001}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
var zajw uint32
zajw, err = dc.ReadArrayHeader()
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
if cap(z.Entries) >= int(zajw) {
z.Entries = (z.Entries)[:zajw]
if cap(z.Entries) >= int(zb0002) {
z.Entries = (z.Entries)[:zb0002]
} else {
z.Entries = make([]Entry, zajw)
z.Entries = make([]Entry, zb0002)
}
for zbai := range z.Entries {
var zwht uint32
zwht, err = dc.ReadArrayHeader()
for za0001 := range z.Entries {
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
if zwht != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zwht}
if zb0003 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0003}
return
}
z.Entries[zbai].Time, err = dc.ReadInt64()
z.Entries[za0001].Time, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Time")
return
}
z.Entries[zbai].Record, err = dc.ReadIntf()
z.Entries[za0001].Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
z.Option, err = dc.ReadIntf()
var zb0004 uint32
zb0004, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0004)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0004 > 0 {
zb0004--
var za0002 string
var za0003 string
za0002, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0003, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option", za0002)
return
}
z.Option[za0002] = za0003
}
return
}
@ -146,35 +287,52 @@ func (z *Forward) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 3
err = en.Append(0x93)
if err != nil {
return err
return
}
err = en.WriteString(z.Tag)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = en.WriteArrayHeader(uint32(len(z.Entries)))
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
for zbai := range z.Entries {
for za0001 := range z.Entries {
// array header, size 2
err = en.Append(0x92)
if err != nil {
return err
}
err = en.WriteInt64(z.Entries[zbai].Time)
if err != nil {
return
}
err = en.WriteIntf(z.Entries[zbai].Record)
err = en.WriteInt64(z.Entries[za0001].Time)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Time")
return
}
err = en.WriteIntf(z.Entries[za0001].Record)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
err = en.WriteIntf(z.Option)
err = en.WriteMapHeader(uint32(len(z.Option)))
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
for za0002, za0003 := range z.Option {
err = en.WriteString(za0002)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
err = en.WriteString(za0003)
if err != nil {
err = msgp.WrapError(err, "Option", za0002)
return
}
}
return
}
@ -185,70 +343,103 @@ func (z *Forward) MarshalMsg(b []byte) (o []byte, err error) {
o = append(o, 0x93)
o = msgp.AppendString(o, z.Tag)
o = msgp.AppendArrayHeader(o, uint32(len(z.Entries)))
for zbai := range z.Entries {
for za0001 := range z.Entries {
// array header, size 2
o = append(o, 0x92)
o = msgp.AppendInt64(o, z.Entries[zbai].Time)
o, err = msgp.AppendIntf(o, z.Entries[zbai].Record)
o = msgp.AppendInt64(o, z.Entries[za0001].Time)
o, err = msgp.AppendIntf(o, z.Entries[za0001].Record)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
o, err = msgp.AppendIntf(o, z.Option)
if err != nil {
return
o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
for za0002, za0003 := range z.Option {
o = msgp.AppendString(o, za0002)
o = msgp.AppendString(o, za0003)
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zhct uint32
zhct, bts, err = msgp.ReadArrayHeaderBytes(bts)
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zhct != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zhct}
if zb0001 != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zb0001}
return
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
var zcua uint32
zcua, bts, err = msgp.ReadArrayHeaderBytes(bts)
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
if cap(z.Entries) >= int(zcua) {
z.Entries = (z.Entries)[:zcua]
if cap(z.Entries) >= int(zb0002) {
z.Entries = (z.Entries)[:zb0002]
} else {
z.Entries = make([]Entry, zcua)
z.Entries = make([]Entry, zb0002)
}
for zbai := range z.Entries {
var zxhx uint32
zxhx, bts, err = msgp.ReadArrayHeaderBytes(bts)
for za0001 := range z.Entries {
var zb0003 uint32
zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
if zxhx != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zxhx}
if zb0003 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0003}
return
}
z.Entries[zbai].Time, bts, err = msgp.ReadInt64Bytes(bts)
z.Entries[za0001].Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Time")
return
}
z.Entries[zbai].Record, bts, err = msgp.ReadIntfBytes(bts)
z.Entries[za0001].Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
z.Option, bts, err = msgp.ReadIntfBytes(bts)
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0004)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0004 > 0 {
var za0002 string
var za0003 string
zb0004--
za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0003, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option", za0002)
return
}
z.Option[za0002] = za0003
}
o = bts
return
}
@ -256,40 +447,75 @@ func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Forward) Msgsize() (s int) {
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ArrayHeaderSize
for zbai := range z.Entries {
s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[zbai].Record)
for za0001 := range z.Entries {
s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[za0001].Record)
}
s += msgp.MapHeaderSize
if z.Option != nil {
for za0002, za0003 := range z.Option {
_ = za0003
s += msgp.StringPrefixSize + len(za0002) + msgp.StringPrefixSize + len(za0003)
}
}
s += msgp.GuessSize(z.Option)
return
}
// DecodeMsg implements msgp.Decodable
func (z *Message) DecodeMsg(dc *msgp.Reader) (err error) {
var zlqf uint32
zlqf, err = dc.ReadArrayHeader()
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zlqf != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zlqf}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
z.Time, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
z.Option, err = dc.ReadIntf()
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 string
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
return
}
@ -298,24 +524,40 @@ func (z *Message) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 4
err = en.Append(0x94)
if err != nil {
return err
return
}
err = en.WriteString(z.Tag)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = en.WriteInt64(z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
err = en.WriteIntf(z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
err = en.WriteIntf(z.Option)
err = en.WriteMapHeader(uint32(len(z.Option)))
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
for za0001, za0002 := range z.Option {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
err = en.WriteString(za0002)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
}
return
}
@ -328,79 +570,145 @@ func (z *Message) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendInt64(o, z.Time)
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
o, err = msgp.AppendIntf(o, z.Option)
if err != nil {
return
o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
for za0001, za0002 := range z.Option {
o = msgp.AppendString(o, za0001)
o = msgp.AppendString(o, za0002)
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zdaf uint32
zdaf, bts, err = msgp.ReadArrayHeaderBytes(bts)
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zdaf != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zdaf}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
return
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
z.Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
z.Option, bts, err = msgp.ReadIntfBytes(bts)
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 string
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Message) Msgsize() (s int) {
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.MapHeaderSize
if z.Option != nil {
for za0001, za0002 := range z.Option {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
}
}
return
}
// DecodeMsg implements msgp.Decodable
func (z *MessageExt) DecodeMsg(dc *msgp.Reader) (err error) {
var zpks uint32
zpks, err = dc.ReadArrayHeader()
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zpks != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zpks}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = dc.ReadExtension(&z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
z.Option, err = dc.ReadIntf()
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 string
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
return
}
@ -409,24 +717,40 @@ func (z *MessageExt) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 4
err = en.Append(0x94)
if err != nil {
return err
return
}
err = en.WriteString(z.Tag)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = en.WriteExtension(&z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
err = en.WriteIntf(z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
err = en.WriteIntf(z.Option)
err = en.WriteMapHeader(uint32(len(z.Option)))
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
for za0001, za0002 := range z.Option {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
err = en.WriteString(za0002)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
}
return
}
@ -438,52 +762,90 @@ func (z *MessageExt) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendString(o, z.Tag)
o, err = msgp.AppendExtension(o, &z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
o, err = msgp.AppendIntf(o, z.Option)
if err != nil {
return
o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
for za0001, za0002 := range z.Option {
o = msgp.AppendString(o, za0001)
o = msgp.AppendString(o, za0002)
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MessageExt) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zjfb uint32
zjfb, bts, err = msgp.ReadArrayHeaderBytes(bts)
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zjfb != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zjfb}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
return
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
bts, err = msgp.ReadExtensionBytes(bts, &z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
z.Option, bts, err = msgp.ReadIntfBytes(bts)
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 string
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MessageExt) Msgsize() (s int) {
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.MapHeaderSize
if z.Option != nil {
for za0001, za0002 := range z.Option {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
}
}
return
}

View File

@ -1,8 +1,6 @@
package fluent
// NOTE: THIS FILE WAS PRODUCED BY THE
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
// DO NOT EDIT
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
@ -12,31 +10,36 @@ import (
func (z *TestMessage) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zxvk uint32
zxvk, err = dc.ReadMapHeader()
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zxvk > 0 {
zxvk--
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "foo":
z.Foo, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Foo")
return
}
case "hoge":
z.Hoge, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Hoge")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
@ -50,19 +53,21 @@ func (z TestMessage) EncodeMsg(en *msgp.Writer) (err error) {
// write "foo"
err = en.Append(0x82, 0xa3, 0x66, 0x6f, 0x6f)
if err != nil {
return err
return
}
err = en.WriteString(z.Foo)
if err != nil {
err = msgp.WrapError(err, "Foo")
return
}
// write "hoge"
err = en.Append(0xa4, 0x68, 0x6f, 0x67, 0x65)
if err != nil {
return err
return
}
err = en.WriteString(z.Hoge)
if err != nil {
err = msgp.WrapError(err, "Hoge")
return
}
return
@ -85,31 +90,36 @@ func (z TestMessage) MarshalMsg(b []byte) (o []byte, err error) {
func (z *TestMessage) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zbzg uint32
zbzg, bts, err = msgp.ReadMapHeaderBytes(bts)
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zbzg > 0 {
zbzg--
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "foo":
z.Foo, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Foo")
return
}
case "hoge":
z.Hoge, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Hoge")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}

View File

@ -1,3 +1,3 @@
package fluent
const Version = "1.3.0"
const Version = "1.4.0"