diff --git a/contrib/completion/bash/docker b/contrib/completion/bash/docker index f386791df7..e6fca33862 100644 --- a/contrib/completion/bash/docker +++ b/contrib/completion/bash/docker @@ -713,7 +713,7 @@ __docker_complete_log_drivers() { __docker_complete_log_options() { # see docs/reference/logging/index.md - local awslogs_options="awslogs-region awslogs-group awslogs-stream" + local awslogs_options="awslogs-region awslogs-group awslogs-stream awslogs-create-group" local fluentd_options="env fluentd-address fluentd-async-connect fluentd-buffer-limit fluentd-retry-wait fluentd-max-retries labels tag" local gcplogs_options="env gcp-log-cmd gcp-project labels" local gelf_options="env gelf-address gelf-compression-level gelf-compression-type labels tag" diff --git a/contrib/completion/zsh/_docker b/contrib/completion/zsh/_docker index 813f7a7181..a76414c921 100644 --- a/contrib/completion/zsh/_docker +++ b/contrib/completion/zsh/_docker @@ -223,7 +223,7 @@ __docker_get_log_options() { local log_driver=${opt_args[--log-driver]:-"all"} local -a awslogs_options fluentd_options gelf_options journald_options json_file_options logentries_options syslog_options splunk_options - awslogs_options=("awslogs-region" "awslogs-group" "awslogs-stream") + awslogs_options=("awslogs-region" "awslogs-group" "awslogs-stream" "awslogs-create-group") fluentd_options=("env" "fluentd-address" "fluentd-async-connect" "fluentd-buffer-limit" "fluentd-retry-wait" "fluentd-max-retries" "labels" "tag") gcplogs_options=("env" "gcp-log-cmd" "gcp-project" "labels") gelf_options=("env" "gelf-address" "gelf-compression-level" "gelf-compression-type" "labels" "tag") diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 086eda550c..78995f3fad 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -2,11 +2,13 @@ package awslogs import ( + "bytes" "errors" "fmt" "os" "runtime" "sort" + "strconv" "strings" "sync" "time" @@ -21,6 +23,7 @@ import ( "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" "github.com/docker/docker/dockerversion" + "github.com/docker/docker/pkg/templates" ) const ( @@ -29,6 +32,7 @@ const ( regionEnvKey = "AWS_REGION" logGroupKey = "awslogs-group" logStreamKey = "awslogs-stream" + logCreateGroupKey = "awslogs-create-group" tagKey = "tag" batchPublishFrequency = 5 * time.Second @@ -43,21 +47,24 @@ const ( resourceAlreadyExistsCode = "ResourceAlreadyExistsException" dataAlreadyAcceptedCode = "DataAlreadyAcceptedException" invalidSequenceTokenCode = "InvalidSequenceTokenException" + resourceNotFoundCode = "ResourceNotFoundException" userAgentHeader = "User-Agent" ) type logStream struct { - logStreamName string - logGroupName string - client api - messages chan *logger.Message - lock sync.RWMutex - closed bool - sequenceToken *string + logStreamName string + logGroupName string + logCreateGroup bool + client api + messages chan *logger.Message + lock sync.RWMutex + closed bool + sequenceToken *string } type api interface { + CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) } @@ -84,7 +91,7 @@ func init() { // New creates an awslogs logger using the configuration passed in on the // context. Supported context configuration variables are awslogs-region, -// awslogs-group, and awslogs-stream. When available, configuration is +// awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is // also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID, // AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and // the EC2 Instance Metadata Service. @@ -94,6 +101,13 @@ func New(info logger.Info) (logger.Logger, error) { if err != nil { return nil, err } + logCreateGroup := false + if info.Config[logCreateGroupKey] != "" { + logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey]) + if err != nil { + return nil, err + } + } if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] @@ -103,10 +117,11 @@ func New(info logger.Info) (logger.Logger, error) { return nil, err } containerStream := &logStream{ - logStreamName: logStreamName, - logGroupName: logGroupName, - client: client, - messages: make(chan *logger.Message, 4096), + logStreamName: logStreamName, + logGroupName: logGroupName, + logCreateGroup: logCreateGroup, + client: client, + messages: make(chan *logger.Message, 4096), } err = containerStream.create() if err != nil { @@ -117,6 +132,19 @@ func New(info logger.Info) (logger.Logger, error) { return containerStream, nil } +func parseLogGroup(info logger.Info, groupTemplate string) (string, error) { + tmpl, err := templates.NewParse("log-group", groupTemplate) + if err != nil { + return "", err + } + buf := new(bytes.Buffer) + if err := tmpl.Execute(buf, &info); err != nil { + return "", err + } + + return buf.String(), nil +} + // newRegionFinder is a variable such that the implementation // can be swapped out for unit tests. var newRegionFinder = func() regionFinder { @@ -192,8 +220,50 @@ func (l *logStream) Close() error { return nil } -// create creates a log stream for the instance of the awslogs logging driver +// create creates log group and log stream for the instance of the awslogs logging driver func (l *logStream) create() error { + if err := l.createLogStream(); err != nil { + if l.logCreateGroup { + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode { + if err := l.createLogGroup(); err != nil { + return err + } + return l.createLogStream() + } + } + return err + } + + return nil +} + +// createLogGroup creates a log group for the instance of the awslogs logging driver +func (l *logStream) createLogGroup() error { + if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: aws.String(l.logGroupName), + }); err != nil { + if awsErr, ok := err.(awserr.Error); ok { + fields := logrus.Fields{ + "errorCode": awsErr.Code(), + "message": awsErr.Message(), + "origError": awsErr.OrigErr(), + "logGroupName": l.logGroupName, + "logCreateGroup": l.logCreateGroup, + } + if awsErr.Code() == resourceAlreadyExistsCode { + // Allow creation to succeed + logrus.WithFields(fields).Info("Log group already exists") + return nil + } + logrus.WithFields(fields).Error("Failed to create log group") + } + return err + } + return nil +} + +// createLogStream creates a log stream for the instance of the awslogs logging driver +func (l *logStream) createLogStream() error { input := &cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String(l.logGroupName), LogStreamName: aws.String(l.logStreamName), @@ -349,12 +419,13 @@ func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenc } // ValidateLogOpt looks for awslogs-specific log options awslogs-region, -// awslogs-group, and awslogs-stream +// awslogs-group, awslogs-stream, awslogs-create-group func ValidateLogOpt(cfg map[string]string) error { for key := range cfg { switch key { case logGroupKey: case logStreamKey: + case logCreateGroupKey: case regionKey: case tagKey: default: @@ -364,6 +435,11 @@ func ValidateLogOpt(cfg map[string]string) error { if cfg[logGroupKey] == "" { return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey) } + if cfg[logCreateGroupKey] != "" { + if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil { + return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err) + } + } return nil } diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index a7ac7be4d8..ac0bb09c39 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -106,6 +106,37 @@ func TestCreateSuccess(t *testing.T) { } } +func TestCreateLogGroupSuccess(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + logCreateGroup: true, + } + mockClient.createLogGroupResult <- &createLogGroupResult{} + mockClient.createLogStreamResult <- &createLogStreamResult{} + + err := stream.create() + + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + argument := <-mockClient.createLogStreamArgument + if argument.LogGroupName == nil { + t.Fatal("Expected non-nil LogGroupName") + } + if *argument.LogGroupName != groupName { + t.Errorf("Expected LogGroupName to be %s", groupName) + } + if argument.LogStreamName == nil { + t.Fatal("Expected non-nil LogStreamName") + } + if *argument.LogStreamName != streamName { + t.Errorf("Expected LogStreamName to be %s", streamName) + } +} + func TestCreateError(t *testing.T) { mockClient := newMockClient() stream := &logStream{ diff --git a/daemon/logger/awslogs/cwlogsiface_mock_test.go b/daemon/logger/awslogs/cwlogsiface_mock_test.go index b768a3d7ec..82bb34b0a6 100644 --- a/daemon/logger/awslogs/cwlogsiface_mock_test.go +++ b/daemon/logger/awslogs/cwlogsiface_mock_test.go @@ -3,12 +3,19 @@ package awslogs import "github.com/aws/aws-sdk-go/service/cloudwatchlogs" type mockcwlogsclient struct { + createLogGroupArgument chan *cloudwatchlogs.CreateLogGroupInput + createLogGroupResult chan *createLogGroupResult createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput createLogStreamResult chan *createLogStreamResult putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput putLogEventsResult chan *putLogEventsResult } +type createLogGroupResult struct { + successResult *cloudwatchlogs.CreateLogGroupOutput + errorResult error +} + type createLogStreamResult struct { successResult *cloudwatchlogs.CreateLogStreamOutput errorResult error @@ -21,6 +28,8 @@ type putLogEventsResult struct { func newMockClient() *mockcwlogsclient { return &mockcwlogsclient{ + createLogGroupArgument: make(chan *cloudwatchlogs.CreateLogGroupInput, 1), + createLogGroupResult: make(chan *createLogGroupResult, 1), createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1), createLogStreamResult: make(chan *createLogStreamResult, 1), putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1), @@ -37,6 +46,12 @@ func newMockClientBuffered(buflen int) *mockcwlogsclient { } } +func (m *mockcwlogsclient) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + m.createLogGroupArgument <- input + output := <-m.createLogGroupResult + return output.successResult, output.errorResult +} + func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { m.createLogStreamArgument <- input output := <-m.createLogStreamResult