mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Support of CreateLogGroup for awslogs
This fix tries to address the issue raised in 29344 where it was not possible to create log group for awslogs (CloudWatch) on-demand. Log group has to be created explicitly before container is running. This behavior is inconsistent with AWS logs agent where log groups are always created as needed. There were several concerns previously (See comments in 19617 and 29344): 1. There is a limit of 500 log groups/account/region so resource might be exhausted if there is any typo or incorrect region. 2. Logs are generated for every container so CreateLogGroup (or equally, DescribeLogGroups) might be called every time, which is redundant and potentially surprising. 3. CreateLogStream and CreateLogGroup have different IAM policies. This fix addresses the issue by add `--log-opt awslogs-create-group` which by default is `false`. It requires user to explicitly request that log groups be created as needed. Related unit test has been updated. And tests have also been done manually in AWS. This fix fixes 29334. Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
This commit is contained in:
parent
cd6a61f1b1
commit
864b0c2e47
3 changed files with 136 additions and 14 deletions
|
@ -2,11 +2,13 @@
|
||||||
package awslogs
|
package awslogs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -21,6 +23,7 @@ import (
|
||||||
"github.com/docker/docker/daemon/logger"
|
"github.com/docker/docker/daemon/logger"
|
||||||
"github.com/docker/docker/daemon/logger/loggerutils"
|
"github.com/docker/docker/daemon/logger/loggerutils"
|
||||||
"github.com/docker/docker/dockerversion"
|
"github.com/docker/docker/dockerversion"
|
||||||
|
"github.com/docker/docker/pkg/templates"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -29,6 +32,7 @@ const (
|
||||||
regionEnvKey = "AWS_REGION"
|
regionEnvKey = "AWS_REGION"
|
||||||
logGroupKey = "awslogs-group"
|
logGroupKey = "awslogs-group"
|
||||||
logStreamKey = "awslogs-stream"
|
logStreamKey = "awslogs-stream"
|
||||||
|
logCreateGroupKey = "awslogs-create-group"
|
||||||
tagKey = "tag"
|
tagKey = "tag"
|
||||||
batchPublishFrequency = 5 * time.Second
|
batchPublishFrequency = 5 * time.Second
|
||||||
|
|
||||||
|
@ -43,21 +47,24 @@ const (
|
||||||
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
|
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
|
||||||
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
|
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
|
||||||
invalidSequenceTokenCode = "InvalidSequenceTokenException"
|
invalidSequenceTokenCode = "InvalidSequenceTokenException"
|
||||||
|
resourceNotFoundCode = "ResourceNotFoundException"
|
||||||
|
|
||||||
userAgentHeader = "User-Agent"
|
userAgentHeader = "User-Agent"
|
||||||
)
|
)
|
||||||
|
|
||||||
type logStream struct {
|
type logStream struct {
|
||||||
logStreamName string
|
logStreamName string
|
||||||
logGroupName string
|
logGroupName string
|
||||||
client api
|
logCreateGroup bool
|
||||||
messages chan *logger.Message
|
client api
|
||||||
lock sync.RWMutex
|
messages chan *logger.Message
|
||||||
closed bool
|
lock sync.RWMutex
|
||||||
sequenceToken *string
|
closed bool
|
||||||
|
sequenceToken *string
|
||||||
}
|
}
|
||||||
|
|
||||||
type api interface {
|
type api interface {
|
||||||
|
CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
|
||||||
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
||||||
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
|
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
|
||||||
}
|
}
|
||||||
|
@ -84,7 +91,7 @@ func init() {
|
||||||
|
|
||||||
// New creates an awslogs logger using the configuration passed in on the
|
// New creates an awslogs logger using the configuration passed in on the
|
||||||
// context. Supported context configuration variables are awslogs-region,
|
// context. Supported context configuration variables are awslogs-region,
|
||||||
// awslogs-group, and awslogs-stream. When available, configuration is
|
// awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is
|
||||||
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
|
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
|
||||||
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
|
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
|
||||||
// the EC2 Instance Metadata Service.
|
// the EC2 Instance Metadata Service.
|
||||||
|
@ -94,6 +101,13 @@ func New(info logger.Info) (logger.Logger, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
logCreateGroup := false
|
||||||
|
if info.Config[logCreateGroupKey] != "" {
|
||||||
|
logCreateGroup, err = strconv.ParseBool(info.Config[logCreateGroupKey])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if info.Config[logStreamKey] != "" {
|
if info.Config[logStreamKey] != "" {
|
||||||
logStreamName = info.Config[logStreamKey]
|
logStreamName = info.Config[logStreamKey]
|
||||||
|
@ -103,10 +117,11 @@ func New(info logger.Info) (logger.Logger, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
containerStream := &logStream{
|
containerStream := &logStream{
|
||||||
logStreamName: logStreamName,
|
logStreamName: logStreamName,
|
||||||
logGroupName: logGroupName,
|
logGroupName: logGroupName,
|
||||||
client: client,
|
logCreateGroup: logCreateGroup,
|
||||||
messages: make(chan *logger.Message, 4096),
|
client: client,
|
||||||
|
messages: make(chan *logger.Message, 4096),
|
||||||
}
|
}
|
||||||
err = containerStream.create()
|
err = containerStream.create()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -117,6 +132,19 @@ func New(info logger.Info) (logger.Logger, error) {
|
||||||
return containerStream, nil
|
return containerStream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseLogGroup(info logger.Info, groupTemplate string) (string, error) {
|
||||||
|
tmpl, err := templates.NewParse("log-group", groupTemplate)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
if err := tmpl.Execute(buf, &info); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// newRegionFinder is a variable such that the implementation
|
// newRegionFinder is a variable such that the implementation
|
||||||
// can be swapped out for unit tests.
|
// can be swapped out for unit tests.
|
||||||
var newRegionFinder = func() regionFinder {
|
var newRegionFinder = func() regionFinder {
|
||||||
|
@ -192,8 +220,50 @@ func (l *logStream) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// create creates a log stream for the instance of the awslogs logging driver
|
// create creates log group and log stream for the instance of the awslogs logging driver
|
||||||
func (l *logStream) create() error {
|
func (l *logStream) create() error {
|
||||||
|
if err := l.createLogStream(); err != nil {
|
||||||
|
if l.logCreateGroup {
|
||||||
|
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode {
|
||||||
|
if err := l.createLogGroup(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return l.createLogStream()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createLogGroup creates a log group for the instance of the awslogs logging driver
|
||||||
|
func (l *logStream) createLogGroup() error {
|
||||||
|
if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
|
LogGroupName: aws.String(l.logGroupName),
|
||||||
|
}); err != nil {
|
||||||
|
if awsErr, ok := err.(awserr.Error); ok {
|
||||||
|
fields := logrus.Fields{
|
||||||
|
"errorCode": awsErr.Code(),
|
||||||
|
"message": awsErr.Message(),
|
||||||
|
"origError": awsErr.OrigErr(),
|
||||||
|
"logGroupName": l.logGroupName,
|
||||||
|
"logCreateGroup": l.logCreateGroup,
|
||||||
|
}
|
||||||
|
if awsErr.Code() == resourceAlreadyExistsCode {
|
||||||
|
// Allow creation to succeed
|
||||||
|
logrus.WithFields(fields).Info("Log group already exists")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
logrus.WithFields(fields).Error("Failed to create log group")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createLogStream creates a log stream for the instance of the awslogs logging driver
|
||||||
|
func (l *logStream) createLogStream() error {
|
||||||
input := &cloudwatchlogs.CreateLogStreamInput{
|
input := &cloudwatchlogs.CreateLogStreamInput{
|
||||||
LogGroupName: aws.String(l.logGroupName),
|
LogGroupName: aws.String(l.logGroupName),
|
||||||
LogStreamName: aws.String(l.logStreamName),
|
LogStreamName: aws.String(l.logStreamName),
|
||||||
|
@ -349,12 +419,13 @@ func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenc
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
|
// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
|
||||||
// awslogs-group, and awslogs-stream
|
// awslogs-group, awslogs-stream, awslogs-create-group
|
||||||
func ValidateLogOpt(cfg map[string]string) error {
|
func ValidateLogOpt(cfg map[string]string) error {
|
||||||
for key := range cfg {
|
for key := range cfg {
|
||||||
switch key {
|
switch key {
|
||||||
case logGroupKey:
|
case logGroupKey:
|
||||||
case logStreamKey:
|
case logStreamKey:
|
||||||
|
case logCreateGroupKey:
|
||||||
case regionKey:
|
case regionKey:
|
||||||
case tagKey:
|
case tagKey:
|
||||||
default:
|
default:
|
||||||
|
@ -364,6 +435,11 @@ func ValidateLogOpt(cfg map[string]string) error {
|
||||||
if cfg[logGroupKey] == "" {
|
if cfg[logGroupKey] == "" {
|
||||||
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
|
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
|
||||||
}
|
}
|
||||||
|
if cfg[logCreateGroupKey] != "" {
|
||||||
|
if _, err := strconv.ParseBool(cfg[logCreateGroupKey]); err != nil {
|
||||||
|
return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -106,6 +106,37 @@ func TestCreateSuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCreateLogGroupSuccess(t *testing.T) {
|
||||||
|
mockClient := newMockClient()
|
||||||
|
stream := &logStream{
|
||||||
|
client: mockClient,
|
||||||
|
logGroupName: groupName,
|
||||||
|
logStreamName: streamName,
|
||||||
|
logCreateGroup: true,
|
||||||
|
}
|
||||||
|
mockClient.createLogGroupResult <- &createLogGroupResult{}
|
||||||
|
mockClient.createLogStreamResult <- &createLogStreamResult{}
|
||||||
|
|
||||||
|
err := stream.create()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Received unexpected err: %v\n", err)
|
||||||
|
}
|
||||||
|
argument := <-mockClient.createLogStreamArgument
|
||||||
|
if argument.LogGroupName == nil {
|
||||||
|
t.Fatal("Expected non-nil LogGroupName")
|
||||||
|
}
|
||||||
|
if *argument.LogGroupName != groupName {
|
||||||
|
t.Errorf("Expected LogGroupName to be %s", groupName)
|
||||||
|
}
|
||||||
|
if argument.LogStreamName == nil {
|
||||||
|
t.Fatal("Expected non-nil LogStreamName")
|
||||||
|
}
|
||||||
|
if *argument.LogStreamName != streamName {
|
||||||
|
t.Errorf("Expected LogStreamName to be %s", streamName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCreateError(t *testing.T) {
|
func TestCreateError(t *testing.T) {
|
||||||
mockClient := newMockClient()
|
mockClient := newMockClient()
|
||||||
stream := &logStream{
|
stream := &logStream{
|
||||||
|
|
|
@ -3,12 +3,19 @@ package awslogs
|
||||||
import "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
import "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||||
|
|
||||||
type mockcwlogsclient struct {
|
type mockcwlogsclient struct {
|
||||||
|
createLogGroupArgument chan *cloudwatchlogs.CreateLogGroupInput
|
||||||
|
createLogGroupResult chan *createLogGroupResult
|
||||||
createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput
|
createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput
|
||||||
createLogStreamResult chan *createLogStreamResult
|
createLogStreamResult chan *createLogStreamResult
|
||||||
putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput
|
putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput
|
||||||
putLogEventsResult chan *putLogEventsResult
|
putLogEventsResult chan *putLogEventsResult
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type createLogGroupResult struct {
|
||||||
|
successResult *cloudwatchlogs.CreateLogGroupOutput
|
||||||
|
errorResult error
|
||||||
|
}
|
||||||
|
|
||||||
type createLogStreamResult struct {
|
type createLogStreamResult struct {
|
||||||
successResult *cloudwatchlogs.CreateLogStreamOutput
|
successResult *cloudwatchlogs.CreateLogStreamOutput
|
||||||
errorResult error
|
errorResult error
|
||||||
|
@ -21,6 +28,8 @@ type putLogEventsResult struct {
|
||||||
|
|
||||||
func newMockClient() *mockcwlogsclient {
|
func newMockClient() *mockcwlogsclient {
|
||||||
return &mockcwlogsclient{
|
return &mockcwlogsclient{
|
||||||
|
createLogGroupArgument: make(chan *cloudwatchlogs.CreateLogGroupInput, 1),
|
||||||
|
createLogGroupResult: make(chan *createLogGroupResult, 1),
|
||||||
createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1),
|
createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1),
|
||||||
createLogStreamResult: make(chan *createLogStreamResult, 1),
|
createLogStreamResult: make(chan *createLogStreamResult, 1),
|
||||||
putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1),
|
putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1),
|
||||||
|
@ -37,6 +46,12 @@ func newMockClientBuffered(buflen int) *mockcwlogsclient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockcwlogsclient) CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) {
|
||||||
|
m.createLogGroupArgument <- input
|
||||||
|
output := <-m.createLogGroupResult
|
||||||
|
return output.successResult, output.errorResult
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
|
func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
|
||||||
m.createLogStreamArgument <- input
|
m.createLogStreamArgument <- input
|
||||||
output := <-m.createLogStreamResult
|
output := <-m.createLogStreamResult
|
||||||
|
|
Loading…
Reference in a new issue