1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/logger/ring.go

235 lines
5.1 KiB
Go
Raw Normal View History

package logger // import "github.com/docker/docker/daemon/logger"
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
import (
"errors"
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
)
const (
defaultRingMaxSize = 1e6 // 1MB
)
// RingLogger is a ring buffer that implements the Logger interface.
// This is used when lossy logging is OK.
type RingLogger struct {
buffer *messageRing
l Logger
logInfo Info
closeFlag int32
}
var _ SizedLogger = &RingLogger{}
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
type ringWithReader struct {
*RingLogger
}
func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
reader, ok := r.l.(LogReader)
if !ok {
// something is wrong if we get here
panic("expected log reader")
}
return reader.ReadLogs(cfg)
}
func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
l := &RingLogger{
buffer: newRing(maxSize),
l: driver,
logInfo: logInfo,
}
go l.run()
return l
}
// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
// the passed in logger.
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
if maxSize < 0 {
maxSize = defaultRingMaxSize
}
l := newRingLogger(driver, logInfo, maxSize)
if _, ok := driver.(LogReader); ok {
return &ringWithReader{l}
}
return l
}
// BufSize returns the buffer size of the underlying logger.
// Returns -1 if the logger doesn't match SizedLogger interface.
func (r *RingLogger) BufSize() int {
if sl, ok := r.l.(SizedLogger); ok {
return sl.BufSize()
}
return -1
}
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
// Log queues messages into the ring buffer
func (r *RingLogger) Log(msg *Message) error {
if r.closed() {
return errClosed
}
return r.buffer.Enqueue(msg)
}
// Name returns the name of the underlying logger
func (r *RingLogger) Name() string {
return r.l.Name()
}
func (r *RingLogger) closed() bool {
return atomic.LoadInt32(&r.closeFlag) == 1
}
func (r *RingLogger) setClosed() {
atomic.StoreInt32(&r.closeFlag, 1)
}
// Close closes the logger
func (r *RingLogger) Close() error {
r.setClosed()
r.buffer.Close()
// empty out the queue
var logErr bool
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
for _, msg := range r.buffer.Drain() {
if logErr {
// some error logging a previous message, so re-insert to message pool
// and assume log driver is hosed
PutMessage(msg)
continue
}
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
if err := r.l.Log(msg); err != nil {
logrus.WithField("driver", r.l.Name()).
WithField("container", r.logInfo.ContainerID).
WithError(err).
Errorf("Error writing log message")
logErr = true
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
}
}
return r.l.Close()
}
// run consumes messages from the ring buffer and forwards them to the underling
// logger.
// This is run in a goroutine when the RingLogger is created
func (r *RingLogger) run() {
for {
if r.closed() {
return
}
msg, err := r.buffer.Dequeue()
if err != nil {
// buffer is closed
return
}
if err := r.l.Log(msg); err != nil {
logrus.WithField("driver", r.l.Name()).
WithField("container", r.logInfo.ContainerID).
WithError(err).
Errorf("Error writing log message")
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
}
}
}
type messageRing struct {
mu sync.Mutex
// signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
wait *sync.Cond
sizeBytes int64 // current buffer size
maxBytes int64 // max buffer size size
queue []*Message
closed bool
}
func newRing(maxBytes int64) *messageRing {
queueSize := 1000
if maxBytes == 0 || maxBytes == 1 {
// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
// message long.
queueSize = 1
}
r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
r.wait = sync.NewCond(&r.mu)
return r
}
// Enqueue adds a message to the buffer queue
// If the message is too big for the buffer it drops the new message.
Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2016-11-22 21:55:27 -05:00
// If there are no messages in the queue and the message is still too big, it adds the message anyway.
func (r *messageRing) Enqueue(m *Message) error {
mSize := int64(len(m.Line))
r.mu.Lock()
if r.closed {
r.mu.Unlock()
return errClosed
}
if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
r.wait.Signal()
r.mu.Unlock()
return nil
}
r.queue = append(r.queue, m)
r.sizeBytes += mSize
r.wait.Signal()
r.mu.Unlock()
return nil
}
// Dequeue pulls a message off the queue
// If there are no messages, it waits for one.
// If the buffer is closed, it will return immediately.
func (r *messageRing) Dequeue() (*Message, error) {
r.mu.Lock()
for len(r.queue) == 0 && !r.closed {
r.wait.Wait()
}
if r.closed {
r.mu.Unlock()
return nil, errClosed
}
msg := r.queue[0]
r.queue = r.queue[1:]
r.sizeBytes -= int64(len(msg.Line))
r.mu.Unlock()
return msg, nil
}
var errClosed = errors.New("closed")
// Close closes the buffer ensuring no new messages can be added.
// Any callers waiting to dequeue a message will be woken up.
func (r *messageRing) Close() {
r.mu.Lock()
if r.closed {
r.mu.Unlock()
return
}
r.closed = true
r.wait.Broadcast()
r.mu.Unlock()
}
// Drain drains all messages from the queue.
// This can be used after `Close()` to get any remaining messages that were in queue.
func (r *messageRing) Drain() []*Message {
r.mu.Lock()
ls := make([]*Message, 0, len(r.queue))
ls = append(ls, r.queue...)
r.sizeBytes = 0
r.queue = r.queue[:0]
r.mu.Unlock()
return ls
}