From 688e67e1d3a7d6cd6ab2ec2ca231167c34a63208 Mon Sep 17 00:00:00 2001 From: Sebastiaan van Stijn Date: Mon, 15 Apr 2019 02:11:27 +0200 Subject: [PATCH] bump fluent/fluent-logger-golang v1.4.0 - Add RequestAck to enable at-least-once message transferring - Add Async option to update sending message in asynchronous way - Deprecate AsyncConnect (Use Async instead) full diff: https://github.com/fluent/fluent-logger-golang/compare/v1.3.0...v1.4.0 Signed-off-by: Sebastiaan van Stijn --- daemon/logger/fluentd/fluentd.go | 2 +- vendor.conf | 2 +- .../fluent/fluent-logger-golang/README.md | 10 + .../fluent-logger-golang/fluent/fluent.go | 244 +++++--- .../fluent-logger-golang/fluent/proto.go | 14 +- .../fluent-logger-golang/fluent/proto_gen.go | 554 +++++++++++++++--- .../fluent/test_message_gen.go | 36 +- .../fluent-logger-golang/fluent/version.go | 2 +- 8 files changed, 663 insertions(+), 201 deletions(-) diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index 19652d6439..cf7f3e9985 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -133,7 +133,7 @@ func New(info logger.Info) (logger.Logger, error) { BufferLimit: bufferLimit, RetryWait: retryWait, MaxRetry: maxRetries, - AsyncConnect: asyncConnect, + Async: asyncConnect, SubSecondPrecision: subSecondPrecision, } diff --git a/vendor.conf b/vendor.conf index 55c4e6f467..a1c7aaa872 100644 --- a/vendor.conf +++ b/vendor.conf @@ -95,7 +95,7 @@ github.com/golang/protobuf aa810b61a9c79d51363740d207bb github.com/Graylog2/go-gelf 4143646226541087117ff2f83334ea48b3201841 # fluent-logger-golang deps -github.com/fluent/fluent-logger-golang 8bbc2356beaf021b04c9bd5cdc76ea5a7ccb40ec # v1.3.0 +github.com/fluent/fluent-logger-golang 7a6c9dcd7f14c2ed5d8c55c11b894e5455ee311b # v1.4.0 github.com/philhofer/fwd bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0 github.com/tinylib/msgp 3b556c64540842d4f82967be066a7f7fffc3adad diff --git a/vendor/github.com/fluent/fluent-logger-golang/README.md b/vendor/github.com/fluent/fluent-logger-golang/README.md index cbb8bdc542..5930602619 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/README.md +++ b/vendor/github.com/fluent/fluent-logger-golang/README.md @@ -64,6 +64,16 @@ f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"}) Sets the timeout for Write call of logger.Post. Since the default is zero value, Write will not time out. +### Async + +Enable asynchronous I/O (connect and write) for sending events to Fluentd. +The default is false. + +### RequestAck + +Sets whether to request acknowledgment from Fluentd to increase the reliability +of the connection. The default is false. + ## Tests ``` go test diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go index 4693c5c3b5..5bbd52668e 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go @@ -6,12 +6,17 @@ import ( "fmt" "math" "net" + "os" "reflect" "strconv" "sync" "time" + "bytes" + "encoding/base64" + "encoding/binary" "github.com/tinylib/msgp/msgp" + "math/rand" ) const ( @@ -21,8 +26,9 @@ const ( defaultPort = 24224 defaultTimeout = 3 * time.Second defaultWriteTimeout = time.Duration(0) // Write() will not time out - defaultBufferLimit = 8 * 1024 * 1024 + defaultBufferLimit = 8 * 1024 defaultRetryWait = 500 + defaultMaxRetryWait = 60000 defaultMaxRetry = 13 defaultReconnectWaitIncreRate = 1.5 // Default sub-second precision value to false since it is only compatible @@ -40,24 +46,36 @@ type Config struct { BufferLimit int `json:"buffer_limit"` RetryWait int `json:"retry_wait"` MaxRetry int `json:"max_retry"` + MaxRetryWait int `json:"max_retry_wait"` TagPrefix string `json:"tag_prefix"` - AsyncConnect bool `json:"async_connect"` - MarshalAsJSON bool `json:"marshal_as_json"` + Async bool `json:"async"` + // Deprecated: Use Async instead + AsyncConnect bool `json:"async_connect"` + MarshalAsJSON bool `json:"marshal_as_json"` // Sub-second precision timestamps are only possible for those using fluentd // v0.14+ and serializing their messages with msgpack. SubSecondPrecision bool `json:"sub_second_precision"` + + // RequestAck sends the chunk option with a unique ID. The server will + // respond with an acknowledgement. This option improves the reliability + // of the message transmission. + RequestAck bool `json:"request_ack"` +} + +type msgToSend struct { + data []byte + ack string } type Fluent struct { Config - mubuff sync.Mutex - pending []byte + pending chan *msgToSend + wg sync.WaitGroup - muconn sync.Mutex - conn net.Conn - reconnecting bool + muconn sync.Mutex + conn net.Conn } // New creates a new Logger. @@ -89,11 +107,22 @@ func New(config Config) (f *Fluent, err error) { if config.MaxRetry == 0 { config.MaxRetry = defaultMaxRetry } + if config.MaxRetryWait == 0 { + config.MaxRetryWait = defaultMaxRetryWait + } if config.AsyncConnect { - f = &Fluent{Config: config, reconnecting: true} - go f.reconnect() + fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead") + config.Async = config.Async || config.AsyncConnect + } + if config.Async { + f = &Fluent{ + Config: config, + pending: make(chan *msgToSend, config.BufferLimit), + } + f.wg.Add(1) + go f.run() } else { - f = &Fluent{Config: config, reconnecting: false} + f = &Fluent{Config: config} err = f.connect() } return @@ -173,28 +202,25 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err } func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error { - var data []byte + var msg *msgToSend var err error - if data, err = f.EncodeData(tag, tm, message); err != nil { + if msg, err = f.EncodeData(tag, tm, message); err != nil { return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err) } - return f.postRawData(data) + return f.postRawData(msg) } // Deprecated: Use EncodeAndPostData instead -func (f *Fluent) PostRawData(data []byte) { - f.postRawData(data) +func (f *Fluent) PostRawData(msg *msgToSend) { + f.postRawData(msg) } -func (f *Fluent) postRawData(data []byte) error { - if err := f.appendBuffer(data); err != nil { - return err +func (f *Fluent) postRawData(msg *msgToSend) error { + if f.Config.Async { + return f.appendBuffer(msg) } - if err := f.send(); err != nil { - f.close() - return err - } - return nil + // Synchronous write + return f.write(msg) } // For sending forward protocol adopted JSON @@ -207,43 +233,80 @@ type MessageChunk struct { // So, it should write JSON marshaler by hand. func (chunk *MessageChunk) MarshalJSON() ([]byte, error) { data, err := json.Marshal(chunk.message.Record) - return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag, - chunk.message.Time, data)), err + if err != nil { + return nil, err + } + option, err := json.Marshal(chunk.message.Option) + if err != nil { + return nil, err + } + return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag, + chunk.message.Time, data, option)), err } -func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) { +// getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack +// mechanism, see +// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option +func getUniqueID(timeUnix int64) (string, error) { + buf := bytes.NewBuffer(nil) + enc := base64.NewEncoder(base64.StdEncoding, buf) + if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil { + enc.Close() + return "", err + } + if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil { + enc.Close() + return "", err + } + // encoder needs to be closed before buf.String(), defer does not work + // here + enc.Close() + return buf.String(), nil +} + +func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) { + option := make(map[string]string) + msg = &msgToSend{} timeUnix := tm.Unix() + if f.Config.RequestAck { + var err error + msg.ack, err = getUniqueID(timeUnix) + if err != nil { + return nil, err + } + option["chunk"] = msg.ack + } if f.Config.MarshalAsJSON { - msg := Message{Tag: tag, Time: timeUnix, Record: message} - chunk := &MessageChunk{message: msg} - data, err = json.Marshal(chunk) + m := Message{Tag: tag, Time: timeUnix, Record: message, Option: option} + chunk := &MessageChunk{message: m} + msg.data, err = json.Marshal(chunk) } else if f.Config.SubSecondPrecision { - msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message} - data, err = msg.MarshalMsg(nil) + m := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option} + msg.data, err = m.MarshalMsg(nil) } else { - msg := &Message{Tag: tag, Time: timeUnix, Record: message} - data, err = msg.MarshalMsg(nil) + m := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option} + msg.data, err = m.MarshalMsg(nil) } return } -// Close closes the connection. +// Close closes the connection, waiting for pending logs to be sent func (f *Fluent) Close() (err error) { - if len(f.pending) > 0 { - err = f.send() + if f.Config.Async { + close(f.pending) + f.wg.Wait() } f.close() return } // appendBuffer appends data to buffer with lock. -func (f *Fluent) appendBuffer(data []byte) error { - f.mubuff.Lock() - defer f.mubuff.Unlock() - if len(f.pending)+len(data) > f.Config.BufferLimit { - return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)) +func (f *Fluent) appendBuffer(msg *msgToSend) error { + select { + case f.pending <- msg: + default: + return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit) } - f.pending = append(f.pending, data...) return nil } @@ -259,8 +322,6 @@ func (f *Fluent) close() { // connect establishes a new connection using the specified transport. func (f *Fluent) connect() (err error) { - f.muconn.Lock() - defer f.muconn.Unlock() switch f.Config.FluentNetwork { case "tcp": @@ -270,63 +331,78 @@ func (f *Fluent) connect() (err error) { default: err = net.UnknownNetworkError(f.Config.FluentNetwork) } + return err +} - if err == nil { - f.reconnecting = false +func (f *Fluent) run() { + for { + select { + case entry, ok := <-f.pending: + if !ok { + f.wg.Done() + return + } + err := f.write(entry) + if err != nil { + fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) + } + } } - return } func e(x, y float64) int { return int(math.Pow(x, y)) } -func (f *Fluent) reconnect() { - for i := 0; ; i++ { - err := f.connect() - if err == nil { - f.send() - return +func (f *Fluent) write(msg *msgToSend) error { + + for i := 0; i < f.Config.MaxRetry; i++ { + + // Connect if needed + f.muconn.Lock() + if f.conn == nil { + err := f.connect() + if err != nil { + f.muconn.Unlock() + waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) + if waitTime > f.Config.MaxRetryWait { + waitTime = f.Config.MaxRetryWait + } + time.Sleep(time.Duration(waitTime) * time.Millisecond) + continue + } } - if i == f.Config.MaxRetry { - // TODO: What we can do when connection failed MaxRetry times? - panic("fluent#reconnect: failed to reconnect!") - } - waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) - time.Sleep(time.Duration(waitTime) * time.Millisecond) - } -} + f.muconn.Unlock() -func (f *Fluent) send() error { - f.muconn.Lock() - defer f.muconn.Unlock() - - if f.conn == nil { - if f.reconnecting == false { - f.reconnecting = true - go f.reconnect() - } - return errors.New("fluent#send: can't send logs, client is reconnecting") - } - - f.mubuff.Lock() - defer f.mubuff.Unlock() - - var err error - if len(f.pending) > 0 { + // We're connected, write msg t := f.Config.WriteTimeout if time.Duration(0) < t { f.conn.SetWriteDeadline(time.Now().Add(t)) } else { f.conn.SetWriteDeadline(time.Time{}) } - _, err = f.conn.Write(f.pending) + _, err := f.conn.Write(msg.data) if err != nil { - f.conn.Close() - f.conn = nil + f.close() } else { - f.pending = f.pending[:0] + // Acknowledgment check + if msg.ack != "" { + resp := &AckResp{} + if f.Config.MarshalAsJSON { + dec := json.NewDecoder(f.conn) + err = dec.Decode(resp) + } else { + r := msgp.NewReader(f.conn) + err = resp.DecodeMsg(r) + } + if err != nil || resp.Ack != msg.ack { + f.close() + continue + } + } + return err } } - return err + + return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry) } diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go index 158e22da71..76fc860ee3 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go @@ -16,9 +16,9 @@ type Entry struct { //msgp:tuple Forward type Forward struct { - Tag string `msg:"tag"` - Entries []Entry `msg:"entries"` - Option interface{} `msg:"option"` + Tag string `msg:"tag"` + Entries []Entry `msg:"entries"` + Option map[string]string } //msgp:tuple Message @@ -26,7 +26,7 @@ type Message struct { Tag string `msg:"tag"` Time int64 `msg:"time"` Record interface{} `msg:"record"` - Option interface{} `msg:"option"` + Option map[string]string } //msgp:tuple MessageExt @@ -34,7 +34,11 @@ type MessageExt struct { Tag string `msg:"tag"` Time EventTime `msg:"time,extension"` Record interface{} `msg:"record"` - Option interface{} `msg:"option"` + Option map[string]string +} + +type AckResp struct { + Ack string `json:"ack" msg:"ack"` } // EventTime is an extension to the serialized time value. It builds in support diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/proto_gen.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/proto_gen.go index 5b88a688f8..e808880c6a 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/proto_gen.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/proto_gen.go @@ -1,30 +1,134 @@ package fluent -// NOTE: THIS FILE WAS PRODUCED BY THE -// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) -// DO NOT EDIT +// Code generated by github.com/tinylib/msgp DO NOT EDIT. import ( "github.com/tinylib/msgp/msgp" ) // DecodeMsg implements msgp.Decodable -func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) { - var zxvk uint32 - zxvk, err = dc.ReadArrayHeader() +func (z *AckResp) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ack": + z.Ack, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Ack") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z AckResp) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "ack" + err = en.Append(0x81, 0xa3, 0x61, 0x63, 0x6b) if err != nil { return } - if zxvk != 2 { - err = msgp.ArrayError{Wanted: 2, Got: zxvk} + err = en.WriteString(z.Ack) + if err != nil { + err = msgp.WrapError(err, "Ack") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z AckResp) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "ack" + o = append(o, 0x81, 0xa3, 0x61, 0x63, 0x6b) + o = msgp.AppendString(o, z.Ack) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *AckResp) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ack": + z.Ack, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Ack") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z AckResp) Msgsize() (s int) { + s = 1 + 4 + msgp.StringPrefixSize + len(z.Ack) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} return } z.Time, err = dc.ReadInt64() if err != nil { + err = msgp.WrapError(err, "Time") return } z.Record, err = dc.ReadIntf() if err != nil { + err = msgp.WrapError(err, "Record") return } return @@ -35,14 +139,16 @@ func (z Entry) EncodeMsg(en *msgp.Writer) (err error) { // array header, size 2 err = en.Append(0x92) if err != nil { - return err + return } err = en.WriteInt64(z.Time) if err != nil { + err = msgp.WrapError(err, "Time") return } err = en.WriteIntf(z.Record) if err != nil { + err = msgp.WrapError(err, "Record") return } return @@ -56,6 +162,7 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendInt64(o, z.Time) o, err = msgp.AppendIntf(o, z.Record) if err != nil { + err = msgp.WrapError(err, "Record") return } return @@ -63,21 +170,24 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) { // UnmarshalMsg implements msgp.Unmarshaler func (z *Entry) UnmarshalMsg(bts []byte) (o []byte, err error) { - var zbzg uint32 - zbzg, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } - if zbzg != 2 { - err = msgp.ArrayError{Wanted: 2, Got: zbzg} + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} return } z.Time, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Time") return } z.Record, bts, err = msgp.ReadIntfBytes(bts) if err != nil { + err = msgp.WrapError(err, "Record") return } o = bts @@ -92,52 +202,83 @@ func (z Entry) Msgsize() (s int) { // DecodeMsg implements msgp.Decodable func (z *Forward) DecodeMsg(dc *msgp.Reader) (err error) { - var zcmr uint32 - zcmr, err = dc.ReadArrayHeader() + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err) return } - if zcmr != 3 { - err = msgp.ArrayError{Wanted: 3, Got: zcmr} + if zb0001 != 3 { + err = msgp.ArrayError{Wanted: 3, Got: zb0001} return } z.Tag, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Tag") return } - var zajw uint32 - zajw, err = dc.ReadArrayHeader() + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err, "Entries") return } - if cap(z.Entries) >= int(zajw) { - z.Entries = (z.Entries)[:zajw] + if cap(z.Entries) >= int(zb0002) { + z.Entries = (z.Entries)[:zb0002] } else { - z.Entries = make([]Entry, zajw) + z.Entries = make([]Entry, zb0002) } - for zbai := range z.Entries { - var zwht uint32 - zwht, err = dc.ReadArrayHeader() + for za0001 := range z.Entries { + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err, "Entries", za0001) return } - if zwht != 2 { - err = msgp.ArrayError{Wanted: 2, Got: zwht} + if zb0003 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0003} return } - z.Entries[zbai].Time, err = dc.ReadInt64() + z.Entries[za0001].Time, err = dc.ReadInt64() if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Time") return } - z.Entries[zbai].Record, err = dc.ReadIntf() + z.Entries[za0001].Record, err = dc.ReadIntf() if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Record") return } } - z.Option, err = dc.ReadIntf() + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err, "Option") return } + if z.Option == nil { + z.Option = make(map[string]string, zb0004) + } else if len(z.Option) > 0 { + for key := range z.Option { + delete(z.Option, key) + } + } + for zb0004 > 0 { + zb0004-- + var za0002 string + var za0003 string + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + za0003, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Option", za0002) + return + } + z.Option[za0002] = za0003 + } return } @@ -146,35 +287,52 @@ func (z *Forward) EncodeMsg(en *msgp.Writer) (err error) { // array header, size 3 err = en.Append(0x93) if err != nil { - return err + return } err = en.WriteString(z.Tag) if err != nil { + err = msgp.WrapError(err, "Tag") return } err = en.WriteArrayHeader(uint32(len(z.Entries))) if err != nil { + err = msgp.WrapError(err, "Entries") return } - for zbai := range z.Entries { + for za0001 := range z.Entries { // array header, size 2 err = en.Append(0x92) - if err != nil { - return err - } - err = en.WriteInt64(z.Entries[zbai].Time) if err != nil { return } - err = en.WriteIntf(z.Entries[zbai].Record) + err = en.WriteInt64(z.Entries[za0001].Time) if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Time") + return + } + err = en.WriteIntf(z.Entries[za0001].Record) + if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Record") return } } - err = en.WriteIntf(z.Option) + err = en.WriteMapHeader(uint32(len(z.Option))) if err != nil { + err = msgp.WrapError(err, "Option") return } + for za0002, za0003 := range z.Option { + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + err = en.WriteString(za0003) + if err != nil { + err = msgp.WrapError(err, "Option", za0002) + return + } + } return } @@ -185,70 +343,103 @@ func (z *Forward) MarshalMsg(b []byte) (o []byte, err error) { o = append(o, 0x93) o = msgp.AppendString(o, z.Tag) o = msgp.AppendArrayHeader(o, uint32(len(z.Entries))) - for zbai := range z.Entries { + for za0001 := range z.Entries { // array header, size 2 o = append(o, 0x92) - o = msgp.AppendInt64(o, z.Entries[zbai].Time) - o, err = msgp.AppendIntf(o, z.Entries[zbai].Record) + o = msgp.AppendInt64(o, z.Entries[za0001].Time) + o, err = msgp.AppendIntf(o, z.Entries[za0001].Record) if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Record") return } } - o, err = msgp.AppendIntf(o, z.Option) - if err != nil { - return + o = msgp.AppendMapHeader(o, uint32(len(z.Option))) + for za0002, za0003 := range z.Option { + o = msgp.AppendString(o, za0002) + o = msgp.AppendString(o, za0003) } return } // UnmarshalMsg implements msgp.Unmarshaler func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) { - var zhct uint32 - zhct, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } - if zhct != 3 { - err = msgp.ArrayError{Wanted: 3, Got: zhct} + if zb0001 != 3 { + err = msgp.ArrayError{Wanted: 3, Got: zb0001} return } z.Tag, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Tag") return } - var zcua uint32 - zcua, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Entries") return } - if cap(z.Entries) >= int(zcua) { - z.Entries = (z.Entries)[:zcua] + if cap(z.Entries) >= int(zb0002) { + z.Entries = (z.Entries)[:zb0002] } else { - z.Entries = make([]Entry, zcua) + z.Entries = make([]Entry, zb0002) } - for zbai := range z.Entries { - var zxhx uint32 - zxhx, bts, err = msgp.ReadArrayHeaderBytes(bts) + for za0001 := range z.Entries { + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Entries", za0001) return } - if zxhx != 2 { - err = msgp.ArrayError{Wanted: 2, Got: zxhx} + if zb0003 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0003} return } - z.Entries[zbai].Time, bts, err = msgp.ReadInt64Bytes(bts) + z.Entries[za0001].Time, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Time") return } - z.Entries[zbai].Record, bts, err = msgp.ReadIntfBytes(bts) + z.Entries[za0001].Record, bts, err = msgp.ReadIntfBytes(bts) if err != nil { + err = msgp.WrapError(err, "Entries", za0001, "Record") return } } - z.Option, bts, err = msgp.ReadIntfBytes(bts) + var zb0004 uint32 + zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Option") return } + if z.Option == nil { + z.Option = make(map[string]string, zb0004) + } else if len(z.Option) > 0 { + for key := range z.Option { + delete(z.Option, key) + } + } + for zb0004 > 0 { + var za0002 string + var za0003 string + zb0004-- + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + za0003, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Option", za0002) + return + } + z.Option[za0002] = za0003 + } o = bts return } @@ -256,40 +447,75 @@ func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *Forward) Msgsize() (s int) { s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ArrayHeaderSize - for zbai := range z.Entries { - s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[zbai].Record) + for za0001 := range z.Entries { + s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[za0001].Record) + } + s += msgp.MapHeaderSize + if z.Option != nil { + for za0002, za0003 := range z.Option { + _ = za0003 + s += msgp.StringPrefixSize + len(za0002) + msgp.StringPrefixSize + len(za0003) + } } - s += msgp.GuessSize(z.Option) return } // DecodeMsg implements msgp.Decodable func (z *Message) DecodeMsg(dc *msgp.Reader) (err error) { - var zlqf uint32 - zlqf, err = dc.ReadArrayHeader() + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err) return } - if zlqf != 4 { - err = msgp.ArrayError{Wanted: 4, Got: zlqf} + if zb0001 != 4 { + err = msgp.ArrayError{Wanted: 4, Got: zb0001} return } z.Tag, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Tag") return } z.Time, err = dc.ReadInt64() if err != nil { + err = msgp.WrapError(err, "Time") return } z.Record, err = dc.ReadIntf() if err != nil { + err = msgp.WrapError(err, "Record") return } - z.Option, err = dc.ReadIntf() + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err, "Option") return } + if z.Option == nil { + z.Option = make(map[string]string, zb0002) + } else if len(z.Option) > 0 { + for key := range z.Option { + delete(z.Option, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 string + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Option", za0001) + return + } + z.Option[za0001] = za0002 + } return } @@ -298,24 +524,40 @@ func (z *Message) EncodeMsg(en *msgp.Writer) (err error) { // array header, size 4 err = en.Append(0x94) if err != nil { - return err + return } err = en.WriteString(z.Tag) if err != nil { + err = msgp.WrapError(err, "Tag") return } err = en.WriteInt64(z.Time) if err != nil { + err = msgp.WrapError(err, "Time") return } err = en.WriteIntf(z.Record) if err != nil { + err = msgp.WrapError(err, "Record") return } - err = en.WriteIntf(z.Option) + err = en.WriteMapHeader(uint32(len(z.Option))) if err != nil { + err = msgp.WrapError(err, "Option") return } + for za0001, za0002 := range z.Option { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Option", za0001) + return + } + } return } @@ -328,79 +570,145 @@ func (z *Message) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendInt64(o, z.Time) o, err = msgp.AppendIntf(o, z.Record) if err != nil { + err = msgp.WrapError(err, "Record") return } - o, err = msgp.AppendIntf(o, z.Option) - if err != nil { - return + o = msgp.AppendMapHeader(o, uint32(len(z.Option))) + for za0001, za0002 := range z.Option { + o = msgp.AppendString(o, za0001) + o = msgp.AppendString(o, za0002) } return } // UnmarshalMsg implements msgp.Unmarshaler func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error) { - var zdaf uint32 - zdaf, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } - if zdaf != 4 { - err = msgp.ArrayError{Wanted: 4, Got: zdaf} + if zb0001 != 4 { + err = msgp.ArrayError{Wanted: 4, Got: zb0001} return } z.Tag, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Tag") return } z.Time, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { + err = msgp.WrapError(err, "Time") return } z.Record, bts, err = msgp.ReadIntfBytes(bts) if err != nil { + err = msgp.WrapError(err, "Record") return } - z.Option, bts, err = msgp.ReadIntfBytes(bts) + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Option") return } + if z.Option == nil { + z.Option = make(map[string]string, zb0002) + } else if len(z.Option) > 0 { + for key := range z.Option { + delete(z.Option, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 string + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Option", za0001) + return + } + z.Option[za0001] = za0002 + } o = bts return } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *Message) Msgsize() (s int) { - s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option) + s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.MapHeaderSize + if z.Option != nil { + for za0001, za0002 := range z.Option { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) + } + } return } // DecodeMsg implements msgp.Decodable func (z *MessageExt) DecodeMsg(dc *msgp.Reader) (err error) { - var zpks uint32 - zpks, err = dc.ReadArrayHeader() + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() if err != nil { + err = msgp.WrapError(err) return } - if zpks != 4 { - err = msgp.ArrayError{Wanted: 4, Got: zpks} + if zb0001 != 4 { + err = msgp.ArrayError{Wanted: 4, Got: zb0001} return } z.Tag, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Tag") return } err = dc.ReadExtension(&z.Time) if err != nil { + err = msgp.WrapError(err, "Time") return } z.Record, err = dc.ReadIntf() if err != nil { + err = msgp.WrapError(err, "Record") return } - z.Option, err = dc.ReadIntf() + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err, "Option") return } + if z.Option == nil { + z.Option = make(map[string]string, zb0002) + } else if len(z.Option) > 0 { + for key := range z.Option { + delete(z.Option, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 string + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + za0002, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Option", za0001) + return + } + z.Option[za0001] = za0002 + } return } @@ -409,24 +717,40 @@ func (z *MessageExt) EncodeMsg(en *msgp.Writer) (err error) { // array header, size 4 err = en.Append(0x94) if err != nil { - return err + return } err = en.WriteString(z.Tag) if err != nil { + err = msgp.WrapError(err, "Tag") return } err = en.WriteExtension(&z.Time) if err != nil { + err = msgp.WrapError(err, "Time") return } err = en.WriteIntf(z.Record) if err != nil { + err = msgp.WrapError(err, "Record") return } - err = en.WriteIntf(z.Option) + err = en.WriteMapHeader(uint32(len(z.Option))) if err != nil { + err = msgp.WrapError(err, "Option") return } + for za0001, za0002 := range z.Option { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + err = en.WriteString(za0002) + if err != nil { + err = msgp.WrapError(err, "Option", za0001) + return + } + } return } @@ -438,52 +762,90 @@ func (z *MessageExt) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, z.Tag) o, err = msgp.AppendExtension(o, &z.Time) if err != nil { + err = msgp.WrapError(err, "Time") return } o, err = msgp.AppendIntf(o, z.Record) if err != nil { + err = msgp.WrapError(err, "Record") return } - o, err = msgp.AppendIntf(o, z.Option) - if err != nil { - return + o = msgp.AppendMapHeader(o, uint32(len(z.Option))) + for za0001, za0002 := range z.Option { + o = msgp.AppendString(o, za0001) + o = msgp.AppendString(o, za0002) } return } // UnmarshalMsg implements msgp.Unmarshaler func (z *MessageExt) UnmarshalMsg(bts []byte) (o []byte, err error) { - var zjfb uint32 - zjfb, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } - if zjfb != 4 { - err = msgp.ArrayError{Wanted: 4, Got: zjfb} + if zb0001 != 4 { + err = msgp.ArrayError{Wanted: 4, Got: zb0001} return } z.Tag, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Tag") return } bts, err = msgp.ReadExtensionBytes(bts, &z.Time) if err != nil { + err = msgp.WrapError(err, "Time") return } z.Record, bts, err = msgp.ReadIntfBytes(bts) if err != nil { + err = msgp.WrapError(err, "Record") return } - z.Option, bts, err = msgp.ReadIntfBytes(bts) + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err, "Option") return } + if z.Option == nil { + z.Option = make(map[string]string, zb0002) + } else if len(z.Option) > 0 { + for key := range z.Option { + delete(z.Option, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 string + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Option") + return + } + za0002, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Option", za0001) + return + } + z.Option[za0001] = za0002 + } o = bts return } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *MessageExt) Msgsize() (s int) { - s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option) + s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.MapHeaderSize + if z.Option != nil { + for za0001, za0002 := range z.Option { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) + } + } return } diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/test_message_gen.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/test_message_gen.go index 17a45e22a3..591e1a9757 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/test_message_gen.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/test_message_gen.go @@ -1,8 +1,6 @@ package fluent -// NOTE: THIS FILE WAS PRODUCED BY THE -// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) -// DO NOT EDIT +// Code generated by github.com/tinylib/msgp DO NOT EDIT. import ( "github.com/tinylib/msgp/msgp" @@ -12,31 +10,36 @@ import ( func (z *TestMessage) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte _ = field - var zxvk uint32 - zxvk, err = dc.ReadMapHeader() + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() if err != nil { + err = msgp.WrapError(err) return } - for zxvk > 0 { - zxvk-- + for zb0001 > 0 { + zb0001-- field, err = dc.ReadMapKeyPtr() if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "foo": z.Foo, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Foo") return } case "hoge": z.Hoge, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Hoge") return } default: err = dc.Skip() if err != nil { + err = msgp.WrapError(err) return } } @@ -50,19 +53,21 @@ func (z TestMessage) EncodeMsg(en *msgp.Writer) (err error) { // write "foo" err = en.Append(0x82, 0xa3, 0x66, 0x6f, 0x6f) if err != nil { - return err + return } err = en.WriteString(z.Foo) if err != nil { + err = msgp.WrapError(err, "Foo") return } // write "hoge" err = en.Append(0xa4, 0x68, 0x6f, 0x67, 0x65) if err != nil { - return err + return } err = en.WriteString(z.Hoge) if err != nil { + err = msgp.WrapError(err, "Hoge") return } return @@ -85,31 +90,36 @@ func (z TestMessage) MarshalMsg(b []byte) (o []byte, err error) { func (z *TestMessage) UnmarshalMsg(bts []byte) (o []byte, err error) { var field []byte _ = field - var zbzg uint32 - zbzg, bts, err = msgp.ReadMapHeaderBytes(bts) + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { + err = msgp.WrapError(err) return } - for zbzg > 0 { - zbzg-- + for zb0001 > 0 { + zb0001-- field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { + err = msgp.WrapError(err) return } switch msgp.UnsafeString(field) { case "foo": z.Foo, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Foo") return } case "hoge": z.Hoge, bts, err = msgp.ReadStringBytes(bts) if err != nil { + err = msgp.WrapError(err, "Hoge") return } default: bts, err = msgp.Skip(bts) if err != nil { + err = msgp.WrapError(err) return } } diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go index c6ec7e41e2..83e8932492 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/version.go @@ -1,3 +1,3 @@ package fluent -const Version = "1.3.0" +const Version = "1.4.0"