diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index d6312b660a..3d6466f09d 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -61,6 +61,7 @@ type logStream struct { logStreamName string logGroupName string logCreateGroup bool + logNonBlocking bool multilinePattern *regexp.Regexp client api messages chan *logger.Message @@ -129,6 +130,8 @@ func New(info logger.Info) (logger.Logger, error) { } } + logNonBlocking := info.Config["mode"] == "non-blocking" + if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] } @@ -142,19 +145,54 @@ func New(info logger.Info) (logger.Logger, error) { if err != nil { return nil, err } + containerStream := &logStream{ logStreamName: logStreamName, logGroupName: logGroupName, logCreateGroup: logCreateGroup, + logNonBlocking: logNonBlocking, multilinePattern: multilinePattern, client: client, messages: make(chan *logger.Message, 4096), } - err = containerStream.create() - if err != nil { - return nil, err + + creationDone := make(chan bool) + if logNonBlocking { + go func() { + backoff := 1 + maxBackoff := 32 + for { + // If logger is closed we are done + containerStream.lock.RLock() + if containerStream.closed { + containerStream.lock.RUnlock() + break + } + containerStream.lock.RUnlock() + err := containerStream.create() + if err == nil { + break + } + + time.Sleep(time.Duration(backoff) * time.Second) + if backoff < maxBackoff { + backoff *= 2 + } + logrus. + WithError(err). + WithField("container-id", info.ContainerID). + WithField("container-name", info.ContainerName). + Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds") + } + close(creationDone) + }() + } else { + if err = containerStream.create(); err != nil { + return nil, err + } + close(creationDone) } - go containerStream.collectBatch() + go containerStream.collectBatch(creationDone) return containerStream, nil } @@ -296,9 +334,18 @@ func (l *logStream) BufSize() int { func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() - if !l.closed { - l.messages <- msg + if l.closed { + return errors.New("awslogs is closed") } + if l.logNonBlocking { + select { + case l.messages <- msg: + return nil + default: + return errors.New("awslogs buffer is full") + } + } + l.messages <- msg return nil } @@ -324,7 +371,9 @@ func (l *logStream) create() error { return l.createLogStream() } } - return err + if err != nil { + return err + } } return nil @@ -401,7 +450,9 @@ var newTicker = func(freq time.Duration) *time.Ticker { // seconds. When events are ready to be processed for submission to CloudWatch // Logs, the processEvents method is called. If a multiline pattern is not // configured, log events are submitted to the processEvents method immediately. -func (l *logStream) collectBatch() { +func (l *logStream) collectBatch(created chan bool) { + // Wait for the logstream/group to be created + <-created ticker := newTicker(batchPublishFrequency) var eventBuffer []byte var eventBufferTimestamp int64 diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index 4a37d98167..2884f01e28 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -201,6 +201,93 @@ func TestCreateAlreadyExists(t *testing.T) { } } +func TestLogClosed(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + closed: true, + } + err := stream.Log(&logger.Message{}) + if err == nil { + t.Fatal("Expected non-nil error") + } +} + +func TestLogBlocking(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message), + } + + errorCh := make(chan error, 1) + started := make(chan bool) + go func() { + started <- true + err := stream.Log(&logger.Message{}) + errorCh <- err + }() + <-started + select { + case err := <-errorCh: + t.Fatal("Expected stream.Log to block: ", err) + default: + break + } + select { + case <-stream.messages: + break + default: + t.Fatal("Expected to be able to read from stream.messages but was unable to") + } + select { + case err := <-errorCh: + if err != nil { + t.Fatal(err) + } + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for read") + } +} + +func TestLogNonBlockingBufferEmpty(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message, 1), + logNonBlocking: true, + } + err := stream.Log(&logger.Message{}) + if err != nil { + t.Fatal(err) + } +} + +func TestLogNonBlockingBufferFull(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message, 1), + logNonBlocking: true, + } + stream.messages <- &logger.Message{} + errorCh := make(chan error) + started := make(chan bool) + go func() { + started <- true + err := stream.Log(&logger.Message{}) + errorCh <- err + }() + <-started + select { + case err := <-errorCh: + if err == nil { + t.Fatal("Expected non-nil error") + } + case <-time.After(30 * time.Second): + t.Fatal("Expected Log call to not block") + } +} func TestPublishBatchSuccess(t *testing.T) { mockClient := newMockClient() stream := &logStream{ @@ -410,8 +497,9 @@ func TestCollectBatchSimple(t *testing.T) { C: ticks, } } - - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -454,7 +542,9 @@ func TestCollectBatchTicker(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline + " 1"), @@ -526,7 +616,9 @@ func TestCollectBatchMultilinePattern(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -580,7 +672,9 @@ func BenchmarkCollectBatch(b *testing.B) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() @@ -610,7 +704,9 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) { C: ticks, } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() @@ -640,7 +736,9 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -702,7 +800,9 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -750,7 +850,9 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) // Log max event size longline := strings.Repeat("A", maximumBytesPerEvent) @@ -801,7 +903,9 @@ func TestCollectBatchClose(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -844,7 +948,9 @@ func TestCollectBatchLineSplit(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) longline := strings.Repeat("A", maximumBytesPerEvent) stream.Log(&logger.Message{ @@ -891,7 +997,9 @@ func TestCollectBatchMaxEvents(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) line := "A" for i := 0; i <= maximumLogEventsPerPut; i++ { @@ -946,7 +1054,9 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes) // maxline is the maximum line that could be submitted after @@ -1025,7 +1135,9 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) times := maximumLogEventsPerPut expectedEvents := []*cloudwatchlogs.InputLogEvent{}