From 054abff3b67bb5d66323e5418a43c845a3eac8a1 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Tue, 22 Nov 2016 21:55:27 -0500 Subject: [PATCH] Implement optional ring buffer for container logs This allows the user to set a logging mode to "blocking" (default), or "non-blocking", which uses the ring buffer as a proxy to the real log driver. This allows a container to never be blocked on stdio at the cost of dropping log messages. Introduces 2 new log-opts that works for all drivers, `log-mode` and `log-size`. `log-mode` takes a value of "blocking", or "non-blocking" I chose not to implement this as a bool since it is difficult to determine if the mode was set to false vs just not set... especially difficult when merging the default daemon config with the container config. `log-size` takes a size string, e.g. `2MB`, which sets the max size of the ring buffer. When the max size is reached, it will start dropping log messages. ``` BenchmarkRingLoggerThroughputNoReceiver-8 2000000000 36.2 ns/op 856.35 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputWithReceiverDelay0-8 300000000 156 ns/op 198.48 MB/s 32 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay1-8 2000000000 36.1 ns/op 857.80 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay10-8 1000000000 36.2 ns/op 856.53 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay50-8 2000000000 34.7 ns/op 894.65 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay100-8 2000000000 35.1 ns/op 883.91 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay300-8 1000000000 35.9 ns/op 863.90 MB/s 0 B/op 0 allocs/op BenchmarkRingLoggerThroughputConsumeDelay500-8 2000000000 35.8 ns/op 866.88 MB/s 0 B/op 0 allocs/op ``` Signed-off-by: Brian Goff --- api/types/container/host_config.go | 11 + container/container.go | 21 +- daemon/logger/awslogs/cloudwatchlogs.go | 2 +- daemon/logger/copier.go | 19 +- daemon/logger/factory.go | 33 ++- daemon/logger/logger.go | 18 -- daemon/logger/logger_test.go | 26 --- daemon/logger/ring.go | 210 +++++++++++++++++ daemon/logger/ring_test.go | 299 ++++++++++++++++++++++++ daemon/logger/splunk/splunk_test.go | 40 ++-- docs/api/version-history.md | 2 + 11 files changed, 607 insertions(+), 74 deletions(-) delete mode 100644 daemon/logger/logger_test.go create mode 100644 daemon/logger/ring.go create mode 100644 daemon/logger/ring_test.go diff --git a/api/types/container/host_config.go b/api/types/container/host_config.go index d4a0c575f7..15a84b82d7 100644 --- a/api/types/container/host_config.go +++ b/api/types/container/host_config.go @@ -223,6 +223,17 @@ func (rp *RestartPolicy) IsSame(tp *RestartPolicy) bool { return rp.Name == tp.Name && rp.MaximumRetryCount == tp.MaximumRetryCount } +// LogMode is a type to define the available modes for logging +// These modes affect how logs are handled when log messages start piling up. +type LogMode string + +// Available logging modes +const ( + LogModeUnset = "" + LogModeBlocking LogMode = "blocking" + LogModeNonBlock LogMode = "non-blocking" +) + // LogConfig represents the logging configuration of the container. type LogConfig struct { Type string diff --git a/container/container.go b/container/container.go index 311953685e..9c9b07fe1d 100644 --- a/container/container.go +++ b/container/container.go @@ -37,6 +37,7 @@ import ( "github.com/docker/docker/runconfig" "github.com/docker/docker/volume" "github.com/docker/go-connections/nat" + "github.com/docker/go-units" "github.com/docker/libnetwork" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" @@ -316,7 +317,7 @@ func (container *Container) CheckpointDir() string { // StartLogger starts a new logger driver for the container. func (container *Container) StartLogger() (logger.Logger, error) { cfg := container.HostConfig.LogConfig - c, err := logger.GetLogDriver(cfg.Type) + initDriver, err := logger.GetLogDriver(cfg.Type) if err != nil { return nil, fmt.Errorf("failed to get logging factory: %v", err) } @@ -341,7 +342,23 @@ func (container *Container) StartLogger() (logger.Logger, error) { return nil, err } } - return c(info) + + l, err := initDriver(info) + if err != nil { + return nil, err + } + + if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock { + bufferSize := int64(-1) + if s, exists := cfg.Config["max-buffer-size"]; exists { + bufferSize, err = units.RAMInBytes(s) + if err != nil { + return nil, err + } + } + l = logger.NewRingLogger(l, info, bufferSize) + } + return l, nil } // GetProcessLabel returns the process label for the container. diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 78995f3fad..e27b380941 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -204,7 +204,7 @@ func (l *logStream) Log(msg *logger.Message) error { defer l.lock.RUnlock() if !l.closed { // buffer up the data, making sure to copy the Line data - l.messages <- logger.CopyMessage(msg) + l.messages <- msg } return nil } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 10ab46e162..e8b06e58fe 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -47,7 +47,6 @@ func (c *Copier) copySrc(name string, src io.Reader) { buf := make([]byte, bufSize) n := 0 eof := false - msg := &Message{Source: name} for { select { @@ -78,13 +77,16 @@ func (c *Copier) copySrc(name string, src io.Reader) { // Break up the data that we've buffered up into lines, and log each in turn. p := 0 for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) { - msg.Line = buf[p : p+q] - msg.Timestamp = time.Now().UTC() - msg.Partial = false select { case <-c.closed: return default: + msg := &Message{ + Source: name, + Timestamp: time.Now().UTC(), + } + msg.Line = append(msg.Line, buf[p:p+q]...) + if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) } @@ -96,9 +98,14 @@ func (c *Copier) copySrc(name string, src io.Reader) { // noting that it's a partial log line. if eof || (p == 0 && n == len(buf)) { if p < n { - msg.Line = buf[p:n] - msg.Timestamp = time.Now().UTC() + msg := &Message{ + Source: name, + Timestamp: time.Now().UTC(), + Partial: true, + } + msg.Line = append(msg.Line, buf[p:n]...) msg.Partial = true + if logErr := c.dst.Log(msg); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) } diff --git a/daemon/logger/factory.go b/daemon/logger/factory.go index 678e7cb223..192d3e0df2 100644 --- a/daemon/logger/factory.go +++ b/daemon/logger/factory.go @@ -3,6 +3,10 @@ package logger import ( "fmt" "sync" + + containertypes "github.com/docker/docker/api/types/container" + units "github.com/docker/go-units" + "github.com/pkg/errors" ) // Creator builds a logging driver instance with given context. @@ -85,6 +89,11 @@ func GetLogDriver(name string) (Creator, error) { return factory.get(name) } +var builtInLogOpts = map[string]bool{ + "mode": true, + "max-buffer-size": true, +} + // ValidateLogOpts checks the options for the given log driver. The // options supported are specific to the LogDriver implementation. func ValidateLogOpts(name string, cfg map[string]string) error { @@ -92,13 +101,35 @@ func ValidateLogOpts(name string, cfg map[string]string) error { return nil } + switch containertypes.LogMode(cfg["mode"]) { + case containertypes.LogModeBlocking, containertypes.LogModeNonBlock, containertypes.LogModeUnset: + default: + return fmt.Errorf("logger: logging mode not supported: %s", cfg["mode"]) + } + + if s, ok := cfg["max-buffer-size"]; ok { + if containertypes.LogMode(cfg["mode"]) != containertypes.LogModeNonBlock { + return fmt.Errorf("logger: max-buffer-size option is only supported with 'mode=%s'", containertypes.LogModeNonBlock) + } + if _, err := units.RAMInBytes(s); err != nil { + return errors.Wrap(err, "error parsing option max-buffer-size") + } + } + if !factory.driverRegistered(name) { return fmt.Errorf("logger: no log driver named '%s' is registered", name) } + filteredOpts := make(map[string]string, len(builtInLogOpts)) + for k, v := range cfg { + if !builtInLogOpts[k] { + filteredOpts[k] = v + } + } + validator := factory.getLogOptValidator(name) if validator != nil { - return validator(cfg) + return validator(filteredOpts) } return nil } diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 672f57f819..dadd6f7cfc 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -37,24 +37,6 @@ type Message struct { Partial bool } -// CopyMessage creates a copy of the passed-in Message which will remain -// unchanged if the original is changed. Log drivers which buffer Messages -// rather than dispatching them during their Log() method should use this -// function to obtain a Message whose Line member's contents won't change. -func CopyMessage(msg *Message) *Message { - m := new(Message) - m.Line = make([]byte, len(msg.Line)) - copy(m.Line, msg.Line) - m.Source = msg.Source - m.Timestamp = msg.Timestamp - m.Partial = msg.Partial - m.Attrs = make(LogAttributes) - for k, v := range msg.Attrs { - m.Attrs[k] = v - } - return m -} - // LogAttributes is used to hold the extra attributes available in the log message // Primarily used for converting the map type to string and sorting. type LogAttributes map[string]string diff --git a/daemon/logger/logger_test.go b/daemon/logger/logger_test.go deleted file mode 100644 index 16e1514d2d..0000000000 --- a/daemon/logger/logger_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package logger - -import ( - "reflect" - "testing" - "time" -) - -func TestCopyMessage(t *testing.T) { - msg := &Message{ - Line: []byte("test line."), - Source: "stdout", - Timestamp: time.Now(), - Attrs: LogAttributes{ - "key1": "val1", - "key2": "val2", - "key3": "val3", - }, - Partial: true, - } - - m := CopyMessage(msg) - if !reflect.DeepEqual(m, msg) { - t.Fatalf("CopyMessage failed to copy message") - } -} diff --git a/daemon/logger/ring.go b/daemon/logger/ring.go new file mode 100644 index 0000000000..9748a558c5 --- /dev/null +++ b/daemon/logger/ring.go @@ -0,0 +1,210 @@ +package logger + +import ( + "errors" + "sync" + "sync/atomic" + + "github.com/Sirupsen/logrus" +) + +const ( + defaultRingMaxSize = 1e6 // 1MB +) + +// RingLogger is a ring buffer that implements the Logger interface. +// This is used when lossy logging is OK. +type RingLogger struct { + buffer *messageRing + l Logger + logInfo Info + closeFlag int32 +} + +type ringWithReader struct { + *RingLogger +} + +func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher { + reader, ok := r.l.(LogReader) + if !ok { + // something is wrong if we get here + panic("expected log reader") + } + return reader.ReadLogs(cfg) +} + +func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger { + l := &RingLogger{ + buffer: newRing(maxSize), + l: driver, + logInfo: logInfo, + } + go l.run() + return l +} + +// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping +// the passed in logger. +func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger { + if maxSize < 0 { + maxSize = defaultRingMaxSize + } + l := newRingLogger(driver, logInfo, maxSize) + if _, ok := driver.(LogReader); ok { + return &ringWithReader{l} + } + return l +} + +// Log queues messages into the ring buffer +func (r *RingLogger) Log(msg *Message) error { + if r.closed() { + return errClosed + } + return r.buffer.Enqueue(msg) +} + +// Name returns the name of the underlying logger +func (r *RingLogger) Name() string { + return r.l.Name() +} + +func (r *RingLogger) closed() bool { + return atomic.LoadInt32(&r.closeFlag) == 1 +} + +func (r *RingLogger) setClosed() { + atomic.StoreInt32(&r.closeFlag, 1) +} + +// Close closes the logger +func (r *RingLogger) Close() error { + r.setClosed() + r.buffer.Close() + // empty out the queue + for _, msg := range r.buffer.Drain() { + if err := r.l.Log(msg); err != nil { + logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) + break + } + } + return r.l.Close() +} + +// run consumes messages from the ring buffer and forwards them to the underling +// logger. +// This is run in a goroutine when the RingLogger is created +func (r *RingLogger) run() { + for { + if r.closed() { + return + } + msg, err := r.buffer.Dequeue() + if err != nil { + // buffer is closed + return + } + if err := r.l.Log(msg); err != nil { + logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l) + } + } +} + +type messageRing struct { + mu sync.Mutex + // singals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added + wait *sync.Cond + + sizeBytes int64 // current buffer size + maxBytes int64 // max buffer size size + queue []*Message + closed bool +} + +func newRing(maxBytes int64) *messageRing { + queueSize := 1000 + if maxBytes == 0 || maxBytes == 1 { + // With 0 or 1 max byte size, the maximum size of the queue would only ever be 1 + // message long. + queueSize = 1 + } + + r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes} + r.wait = sync.NewCond(&r.mu) + return r +} + +// Enqueue adds a message to the buffer queue +// If the message is too big for the buffer it drops the oldest messages to make room +// If there are no messages in the queue and the message is still too big, it adds the message anyway. +func (r *messageRing) Enqueue(m *Message) error { + mSize := int64(len(m.Line)) + + r.mu.Lock() + if r.closed { + r.mu.Unlock() + return errClosed + } + if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 { + r.wait.Signal() + r.mu.Unlock() + return nil + } + + r.queue = append(r.queue, m) + r.sizeBytes += mSize + r.wait.Signal() + r.mu.Unlock() + return nil +} + +// Dequeue pulls a message off the queue +// If there are no messages, it waits for one. +// If the buffer is closed, it will return immediately. +func (r *messageRing) Dequeue() (*Message, error) { + r.mu.Lock() + for len(r.queue) == 0 && !r.closed { + r.wait.Wait() + } + + if r.closed { + r.mu.Unlock() + return nil, errClosed + } + + msg := r.queue[0] + r.queue = r.queue[1:] + r.sizeBytes -= int64(len(msg.Line)) + r.mu.Unlock() + return msg, nil +} + +var errClosed = errors.New("closed") + +// Close closes the buffer ensuring no new messages can be added. +// Any callers waiting to dequeue a message will be woken up. +func (r *messageRing) Close() { + r.mu.Lock() + if r.closed { + r.mu.Unlock() + return + } + + r.closed = true + r.wait.Broadcast() + r.mu.Unlock() + return +} + +// Drain drains all messages from the queue. +// This can be used after `Close()` to get any remaining messages that were in queue. +func (r *messageRing) Drain() []*Message { + r.mu.Lock() + ls := make([]*Message, 0, len(r.queue)) + ls = append(ls, r.queue...) + r.sizeBytes = 0 + r.queue = r.queue[:0] + r.mu.Unlock() + return ls +} diff --git a/daemon/logger/ring_test.go b/daemon/logger/ring_test.go new file mode 100644 index 0000000000..ed08cc4222 --- /dev/null +++ b/daemon/logger/ring_test.go @@ -0,0 +1,299 @@ +package logger + +import ( + "context" + "strconv" + "testing" + "time" +) + +type mockLogger struct{ c chan *Message } + +func (l *mockLogger) Log(msg *Message) error { + l.c <- msg + return nil +} + +func (l *mockLogger) Name() string { + return "mock" +} + +func (l *mockLogger) Close() error { + return nil +} + +func TestRingLogger(t *testing.T) { + mockLog := &mockLogger{make(chan *Message)} // no buffer on this channel + ring := newRingLogger(mockLog, Info{}, 1) + defer ring.setClosed() + + // this should never block + ring.Log(&Message{Line: []byte("1")}) + ring.Log(&Message{Line: []byte("2")}) + ring.Log(&Message{Line: []byte("3")}) + + select { + case msg := <-mockLog.c: + if string(msg.Line) != "1" { + t.Fatalf("got unexpected msg: %q", string(msg.Line)) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout reading log message") + } + + select { + case msg := <-mockLog.c: + t.Fatalf("expected no more messages in the queue, got: %q", string(msg.Line)) + default: + } +} + +func TestRingCap(t *testing.T) { + r := newRing(5) + for i := 0; i < 10; i++ { + // queue messages with "0" to "10" + // the "5" to "10" messages should be dropped since we only allow 5 bytes in the buffer + if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { + t.Fatal(err) + } + } + + // should have messages in the queue for "5" to "10" + for i := 0; i < 5; i++ { + m, err := r.Dequeue() + if err != nil { + t.Fatal(err) + } + if string(m.Line) != strconv.Itoa(i) { + t.Fatalf("got unexpected message for iter %d: %s", i, string(m.Line)) + } + } + + // queue a message that's bigger than the buffer cap + if err := r.Enqueue(&Message{Line: []byte("hello world")}); err != nil { + t.Fatal(err) + } + + // queue another message that's bigger than the buffer cap + if err := r.Enqueue(&Message{Line: []byte("eat a banana")}); err != nil { + t.Fatal(err) + } + + m, err := r.Dequeue() + if err != nil { + t.Fatal(err) + } + if string(m.Line) != "hello world" { + t.Fatalf("got unexpected message: %s", string(m.Line)) + } + if len(r.queue) != 0 { + t.Fatalf("expected queue to be empty, got: %d", len(r.queue)) + } +} + +func TestRingClose(t *testing.T) { + r := newRing(1) + if err := r.Enqueue(&Message{Line: []byte("hello")}); err != nil { + t.Fatal(err) + } + r.Close() + if err := r.Enqueue(&Message{}); err != errClosed { + t.Fatalf("expected errClosed, got: %v", err) + } + if len(r.queue) != 1 { + t.Fatal("expected empty queue") + } + if m, err := r.Dequeue(); err == nil || m != nil { + t.Fatal("exepcted err on Dequeue after close") + } + + ls := r.Drain() + if len(ls) != 1 { + t.Fatalf("expected one message: %v", ls) + } + if string(ls[0].Line) != "hello" { + t.Fatalf("got unexpected message: %s", string(ls[0].Line)) + } +} + +func TestRingDrain(t *testing.T) { + r := newRing(5) + for i := 0; i < 5; i++ { + if err := r.Enqueue(&Message{Line: []byte(strconv.Itoa(i))}); err != nil { + t.Fatal(err) + } + } + + ls := r.Drain() + if len(ls) != 5 { + t.Fatal("got unexpected length after drain") + } + + for i := 0; i < 5; i++ { + if string(ls[i].Line) != strconv.Itoa(i) { + t.Fatalf("got unexpected message at position %d: %s", i, string(ls[i].Line)) + } + } + if r.sizeBytes != 0 { + t.Fatalf("expected buffer size to be 0 after drain, got: %d", r.sizeBytes) + } + + ls = r.Drain() + if len(ls) != 0 { + t.Fatalf("expected 0 messages on 2nd drain: %v", ls) + } + +} + +type nopLogger struct{} + +func (nopLogger) Name() string { return "nopLogger" } +func (nopLogger) Close() error { return nil } +func (nopLogger) Log(*Message) error { return nil } + +func BenchmarkRingLoggerThroughputNoReceiver(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputWithReceiverDelay0(b *testing.B) { + l := NewRingLogger(nopLogger{}, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func consumeWithDelay(delay time.Duration, c <-chan *Message) (cancel func()) { + started := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + close(started) + ticker := time.NewTicker(delay) + for range ticker.C { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-c: + } + } + }() + <-started + return cancel +} + +func BenchmarkRingLoggerThroughputConsumeDelay1(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(1*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay10(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(10*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay50(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(50*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay100(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(100*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay300(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(300*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkRingLoggerThroughputConsumeDelay500(b *testing.B) { + mockLog := &mockLogger{make(chan *Message)} + defer mockLog.Close() + l := NewRingLogger(mockLog, Info{}, -1) + msg := &Message{Line: []byte("hello humans and everyone else!")} + b.SetBytes(int64(len(msg.Line))) + + cancel := consumeWithDelay(500*time.Millisecond, mockLog.c) + defer cancel() + + for i := 0; i < b.N; i++ { + if err := l.Log(msg); err != nil { + b.Fatal(err) + } + } +} diff --git a/daemon/logger/splunk/splunk_test.go b/daemon/logger/splunk/splunk_test.go index e7e3d68744..90d5051627 100644 --- a/daemon/logger/splunk/splunk_test.go +++ b/daemon/logger/splunk/splunk_test.go @@ -133,11 +133,11 @@ func TestDefault(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notajson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notajson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -262,7 +262,7 @@ func TestInlineFormatWithNonDefaultOptions(t *testing.T) { } messageTime := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("1"), "stdout", messageTime, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("1"), Source: "stdout", Timestamp: messageTime}); err != nil { t.Fatal(err) } @@ -361,11 +361,11 @@ func TestJsonFormat(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -478,11 +478,11 @@ func TestRawFormat(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -592,11 +592,11 @@ func TestRawFormatWithLabels(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -705,11 +705,11 @@ func TestRawFormatWithoutTag(t *testing.T) { } message1Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("{\"a\":\"b\"}"), "stdout", message1Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("{\"a\":\"b\"}"), Source: "stdout", Timestamp: message1Time}); err != nil { t.Fatal(err) } message2Time := time.Now() - if err := loggerDriver.Log(&logger.Message{[]byte("notjson"), "stdout", message2Time, nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("notjson"), Source: "stdout", Timestamp: message2Time}); err != nil { t.Fatal(err) } @@ -790,7 +790,7 @@ func TestBatching(t *testing.T) { } for i := 0; i < defaultStreamChannelSize*4; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -856,7 +856,7 @@ func TestFrequency(t *testing.T) { } for i := 0; i < 10; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } time.Sleep(15 * time.Millisecond) @@ -937,7 +937,7 @@ func TestOneMessagePerRequest(t *testing.T) { } for i := 0; i < 10; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1045,7 +1045,7 @@ func TestSkipVerify(t *testing.T) { } for i := 0; i < defaultStreamChannelSize*2; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1057,7 +1057,7 @@ func TestSkipVerify(t *testing.T) { hec.simulateServerError = false for i := defaultStreamChannelSize * 2; i < defaultStreamChannelSize*4; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1127,7 +1127,7 @@ func TestBufferMaximum(t *testing.T) { } for i := 0; i < 11; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1216,7 +1216,7 @@ func TestServerAlwaysDown(t *testing.T) { } for i := 0; i < 5; i++ { - if err := loggerDriver.Log(&logger.Message{[]byte(fmt.Sprintf("%d", i)), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte(fmt.Sprintf("%d", i)), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } } @@ -1269,7 +1269,7 @@ func TestCannotSendAfterClose(t *testing.T) { t.Fatal(err) } - if err := loggerDriver.Log(&logger.Message{[]byte("message1"), "stdout", time.Now(), nil, false}); err != nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("message1"), Source: "stdout", Timestamp: time.Now()}); err != nil { t.Fatal(err) } @@ -1278,7 +1278,7 @@ func TestCannotSendAfterClose(t *testing.T) { t.Fatal(err) } - if err := loggerDriver.Log(&logger.Message{[]byte("message2"), "stdout", time.Now(), nil, false}); err == nil { + if err := loggerDriver.Log(&logger.Message{Line: []byte("message2"), Source: "stdout", Timestamp: time.Now()}); err == nil { t.Fatal("Driver should not allow to send messages after close") } diff --git a/docs/api/version-history.md b/docs/api/version-history.md index b295eda91c..e8cb046d0c 100644 --- a/docs/api/version-history.md +++ b/docs/api/version-history.md @@ -82,6 +82,8 @@ keywords: "API, Docker, rcli, REST, documentation" * `GET /secrets/{id}` returns information on the secret `id`. * `POST /secrets/{id}/update` updates the secret `id`. * `POST /services/(id or name)/update` now accepts service name or prefix of service id as a parameter. +* `POST /containers/create` added 2 built-in log-opts that work on all logging drivers, +`mode` (`blocking`|`non-blocking`), and `max-buffer-size` (e.g. `2m`) which enables a non-blocking log buffer. ## v1.24 API changes