diff --git a/daemon/cluster/executor/backend.go b/daemon/cluster/executor/backend.go index 1a1d4ff091..d0c8af6cea 100644 --- a/daemon/cluster/executor/backend.go +++ b/daemon/cluster/executor/backend.go @@ -6,6 +6,7 @@ import ( "github.com/docker/distribution" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" @@ -28,6 +29,7 @@ type Backend interface { CreateManagedContainer(config types.ContainerCreateConfig, validateHostname bool) (container.ContainerCreateCreatedBody, error) ContainerStart(name string, hostConfig *container.HostConfig, validateHostname bool, checkpoint string, checkpointDir string) error ContainerStop(name string, seconds *int) error + ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error ActivateContainerServiceBinding(containerName string) error DeactivateContainerServiceBinding(containerName string) error diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go index 98ea358100..29e6d0b4d2 100644 --- a/daemon/cluster/executor/container/adapter.go +++ b/daemon/cluster/executor/container/adapter.go @@ -12,6 +12,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api/server/httputils" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/versions" @@ -20,6 +21,7 @@ import ( "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/protobuf/ptypes" "golang.org/x/net/context" "golang.org/x/time/rate" ) @@ -376,6 +378,56 @@ func (c *containerAdapter) deactivateServiceBinding() error { return c.backend.DeactivateContainerServiceBinding(c.container.name()) } +func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) { + reader, writer := io.Pipe() + + apiOptions := &backend.ContainerLogsConfig{ + ContainerLogsOptions: types.ContainerLogsOptions{ + Follow: options.Follow, + + // TODO(stevvooe): Parse timestamp out of message. This + // absolutely needs to be done before going to production with + // this, at it is completely redundant. + Timestamps: true, + Details: false, // no clue what to do with this, let's just deprecate it. + }, + OutStream: writer, + } + + if options.Since != nil { + since, err := ptypes.Timestamp(options.Since) + if err != nil { + return nil, err + } + apiOptions.Since = since.Format(time.RFC3339Nano) + } + + if options.Tail < 0 { + // See protobuf documentation for details of how this works. + apiOptions.Tail = fmt.Sprint(-options.Tail - 1) + } else if options.Tail > 0 { + return nil, fmt.Errorf("tail relative to start of logs not supported via docker API") + } + + if len(options.Streams) == 0 { + // empty == all + apiOptions.ShowStdout, apiOptions.ShowStderr = true, true + } else { + for _, stream := range options.Streams { + switch stream { + case api.LogStreamStdout: + apiOptions.ShowStdout = true + case api.LogStreamStderr: + apiOptions.ShowStderr = true + } + } + } + + chStarted := make(chan struct{}) + go c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted) + return reader, nil +} + // todo: typed/wrapped errors func isContainerCreateNameConflict(err error) bool { return strings.Contains(err.Error(), "Conflict. The name") diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go index 47fc2bf7a5..103f47e965 100644 --- a/daemon/cluster/executor/container/controller.go +++ b/daemon/cluster/executor/container/controller.go @@ -1,8 +1,13 @@ package container import ( + "bufio" + "bytes" + "encoding/binary" "fmt" + "io" "os" + "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" @@ -11,8 +16,10 @@ import ( "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/protobuf/ptypes" "github.com/pkg/errors" "golang.org/x/net/context" + "golang.org/x/time/rate" ) // controller implements agent.Controller against docker's API. @@ -374,6 +381,128 @@ func (r *controller) Remove(ctx context.Context) error { return nil } +// waitReady waits for a container to be "ready". +// Ready means it's past the started state. +func (r *controller) waitReady(pctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(pctx) + defer cancel() + + eventq := r.adapter.events(ctx) + + ctnr, err := r.adapter.inspect(ctx) + if err != nil { + if !isUnknownContainer(err) { + return errors.Wrap(err, "inspect container failed") + } + } else { + switch ctnr.State.Status { + case "running", "exited", "dead": + return nil + } + } + + for { + select { + case event := <-eventq: + if !r.matchevent(event) { + continue + } + + switch event.Action { + case "start": + return nil + } + case <-ctx.Done(): + return ctx.Err() + case <-r.closed: + return r.err + } + } +} + +func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error { + if err := r.checkClosed(); err != nil { + return err + } + + if err := r.waitReady(ctx); err != nil { + return errors.Wrap(err, "container not ready for logs") + } + + rc, err := r.adapter.logs(ctx, options) + if err != nil { + return errors.Wrap(err, "failed getting container logs") + } + defer rc.Close() + + var ( + // use a rate limiter to keep things under control but also provides some + // ability coalesce messages. + limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s + msgctx = api.LogContext{ + NodeID: r.task.NodeID, + ServiceID: r.task.ServiceID, + TaskID: r.task.ID, + } + ) + + brd := bufio.NewReader(rc) + for { + // so, message header is 8 bytes, treat as uint64, pull stream off MSB + var header uint64 + if err := binary.Read(brd, binary.BigEndian, &header); err != nil { + if err == io.EOF { + return nil + } + + return errors.Wrap(err, "failed reading log header") + } + + stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3)) + + // limit here to decrease allocation back pressure. + if err := limiter.WaitN(ctx, int(size)); err != nil { + return errors.Wrap(err, "failed rate limiter") + } + + buf := make([]byte, size) + _, err := io.ReadFull(brd, buf) + if err != nil { + return errors.Wrap(err, "failed reading buffer") + } + + // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish + parts := bytes.SplitN(buf, []byte(" "), 2) + if len(parts) != 2 { + return fmt.Errorf("invalid timestamp in log message: %v", buf) + } + + ts, err := time.Parse(time.RFC3339Nano, string(parts[0])) + if err != nil { + return errors.Wrap(err, "failed to parse timestamp") + } + + tsp, err := ptypes.TimestampProto(ts) + if err != nil { + return errors.Wrap(err, "failed to convert timestamp") + } + + if err := publisher.Publish(ctx, api.LogMessage{ + Context: msgctx, + Timestamp: tsp, + Stream: api.LogStream(stream), + + Data: parts[1], + }); err != nil { + return errors.Wrap(err, "failed to publish log message") + } + } +} + // Close the runner and clean up any ephemeral resources. func (r *controller) Close() error { select {