From 1044093bb0aa12eb8972361a93b9bc8c4ddd857b Mon Sep 17 00:00:00 2001 From: Drew Erny Date: Mon, 20 Mar 2017 10:07:04 -0700 Subject: [PATCH] refactor logs and support service logs /w tty Refactor container logs system to make communicating log messages internally much simpler. Move responsibility for marshalling log messages into the REST server. Support TTY logs. Pave the way for fixing the ambiguous bytestream format. Pave the way for fixing details. Signed-off-by: Drew Erny --- api/server/httputils/write_log_stream.go | 92 ++++++++++ api/server/router/container/backend.go | 2 +- .../router/container/container_routes.go | 53 +++--- api/server/router/swarm/backend.go | 2 +- api/server/router/swarm/helpers.go | 54 +++--- api/types/backend/backend.go | 26 ++- cli/command/service/logs.go | 22 +++ daemon/cluster/executor/backend.go | 2 +- daemon/cluster/executor/container/adapter.go | 40 ++-- .../cluster/executor/container/controller.go | 62 +++---- daemon/cluster/services.go | 171 ++++++++---------- daemon/logger/logger.go | 47 ++--- daemon/logs.go | 156 +++++++++------- ...cker_cli_service_logs_experimental_test.go | 40 ++++ 14 files changed, 454 insertions(+), 315 deletions(-) create mode 100644 api/server/httputils/write_log_stream.go diff --git a/api/server/httputils/write_log_stream.go b/api/server/httputils/write_log_stream.go new file mode 100644 index 0000000000..5793a99ff4 --- /dev/null +++ b/api/server/httputils/write_log_stream.go @@ -0,0 +1,92 @@ +package httputils + +import ( + "fmt" + "io" + "sort" + "strings" + + "golang.org/x/net/context" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" + "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/stdcopy" +) + +// WriteLogStream writes an encoded byte stream of log messages from the +// messages channel, multiplexing them with a stdcopy.Writer if mux is true +func WriteLogStream(ctx context.Context, w io.Writer, msgs <-chan *backend.LogMessage, config *types.ContainerLogsOptions, mux bool) { + wf := ioutils.NewWriteFlusher(w) + defer wf.Close() + + wf.Flush() + + // this might seem like doing below is clear: + // var outStream io.Writer = wf + // however, this GREATLY DISPLEASES golint, and if you do that, it will + // fail CI. we need outstream to be type writer because if we mux streams, + // we will need to reassign all of the streams to be stdwriters, which only + // conforms to the io.Writer interface. + var outStream io.Writer + outStream = wf + errStream := outStream + sysErrStream := errStream + if mux { + sysErrStream = stdcopy.NewStdWriter(outStream, stdcopy.Systemerr) + errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) + outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) + } + + for { + msg, ok := <-msgs + if !ok { + return + } + // check if the message contains an error. if so, write that error + // and exit + if msg.Err != nil { + fmt.Fprintf(sysErrStream, "Error grabbing logs: %v\n", msg.Err) + continue + } + logLine := msg.Line + if config.Details { + logLine = append([]byte(stringAttrs(msg.Attrs)+" "), logLine...) + } + if config.Timestamps { + // TODO(dperny) the format is defined in + // daemon/logger/logger.go as logger.TimeFormat. importing + // logger is verboten (not part of backend) so idk if just + // importing the same thing from jsonlog is good enough + logLine = append([]byte(msg.Timestamp.Format(jsonlog.RFC3339NanoFixed)+" "), logLine...) + } + if msg.Source == "stdout" && config.ShowStdout { + outStream.Write(logLine) + } + if msg.Source == "stderr" && config.ShowStderr { + errStream.Write(logLine) + } + } +} + +type byKey []string + +func (s byKey) Len() int { return len(s) } +func (s byKey) Less(i, j int) bool { + keyI := strings.Split(s[i], "=") + keyJ := strings.Split(s[j], "=") + return keyI[0] < keyJ[0] +} +func (s byKey) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func stringAttrs(a backend.LogAttributes) string { + var ss byKey + for k, v := range a { + ss = append(ss, k+"="+v) + } + sort.Sort(ss) + return strings.Join(ss, ",") +} diff --git a/api/server/router/container/backend.go b/api/server/router/container/backend.go index 6f729bea16..ce0ee8c9da 100644 --- a/api/server/router/container/backend.go +++ b/api/server/router/container/backend.go @@ -51,7 +51,7 @@ type stateBackend interface { type monitorBackend interface { ContainerChanges(name string) ([]archive.Change, error) ContainerInspect(name string, size bool, version string) (interface{}, error) - ContainerLogs(ctx context.Context, name string, config *backend.ContainerLogsConfig, started chan struct{}) error + ContainerLogs(ctx context.Context, name string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) ContainerStats(ctx context.Context, name string, config *backend.ContainerStatsConfig) error ContainerTop(name string, psArgs string) (*container.ContainerTopOKBody, error) diff --git a/api/server/router/container/container_routes.go b/api/server/router/container/container_routes.go index 55abf72dcb..bd151ab276 100644 --- a/api/server/router/container/container_routes.go +++ b/api/server/router/container/container_routes.go @@ -10,6 +10,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/docker/docker/api" "github.com/docker/docker/api/server/httputils" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" @@ -18,7 +19,6 @@ import ( "github.com/docker/docker/api/types/versions" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/signal" - "github.com/docker/docker/pkg/stdcopy" "golang.org/x/net/context" "golang.org/x/net/websocket" ) @@ -91,33 +91,38 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response } containerName := vars["name"] - logsConfig := &backend.ContainerLogsConfig{ - ContainerLogsOptions: types.ContainerLogsOptions{ - Follow: httputils.BoolValue(r, "follow"), - Timestamps: httputils.BoolValue(r, "timestamps"), - Since: r.Form.Get("since"), - Tail: r.Form.Get("tail"), - ShowStdout: stdout, - ShowStderr: stderr, - Details: httputils.BoolValue(r, "details"), - }, - OutStream: w, + logsConfig := &types.ContainerLogsOptions{ + Follow: httputils.BoolValue(r, "follow"), + Timestamps: httputils.BoolValue(r, "timestamps"), + Since: r.Form.Get("since"), + Tail: r.Form.Get("tail"), + ShowStdout: stdout, + ShowStderr: stderr, + Details: httputils.BoolValue(r, "details"), } - chStarted := make(chan struct{}) - if err := s.backend.ContainerLogs(ctx, containerName, logsConfig, chStarted); err != nil { - select { - case <-chStarted: - // The client may be expecting all of the data we're sending to - // be multiplexed, so mux it through the Systemerr stream, which - // will cause the client to throw an error when demuxing - stdwriter := stdcopy.NewStdWriter(logsConfig.OutStream, stdcopy.Systemerr) - fmt.Fprintf(stdwriter, "Error running logs job: %v\n", err) - default: - return err - } + // doesn't matter what version the client is on, we're using this internally only + // also do we need size? i'm thinkin no we don't + raw, err := s.backend.ContainerInspect(containerName, false, api.DefaultVersion) + if err != nil { + return err + } + container, ok := raw.(*types.ContainerJSON) + if !ok { + // %T prints the type. handy! + return fmt.Errorf("expected container to be *types.ContainerJSON but got %T", raw) } + msgs, err := s.backend.ContainerLogs(ctx, containerName, logsConfig) + if err != nil { + return err + } + + // if has a tty, we're not muxing streams. if it doesn't, we are. simple. + // this is the point of no return for writing a response. once we call + // WriteLogStream, the response has been started and errors will be + // returned in band by WriteLogStream + httputils.WriteLogStream(ctx, w, msgs, logsConfig, !container.Config.Tty) return nil } diff --git a/api/server/router/swarm/backend.go b/api/server/router/swarm/backend.go index 798913b921..28b9a98018 100644 --- a/api/server/router/swarm/backend.go +++ b/api/server/router/swarm/backend.go @@ -21,7 +21,7 @@ type Backend interface { CreateService(types.ServiceSpec, string) (*basictypes.ServiceCreateResponse, error) UpdateService(string, uint64, types.ServiceSpec, basictypes.ServiceUpdateOptions) (*basictypes.ServiceUpdateResponse, error) RemoveService(string) error - ServiceLogs(context.Context, *backend.LogSelector, *backend.ContainerLogsConfig, chan struct{}) error + ServiceLogs(context.Context, *backend.LogSelector, *basictypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) GetNodes(basictypes.NodeListOptions) ([]types.Node, error) GetNode(string) (types.Node, error) UpdateNode(string, uint64, types.NodeSpec) error diff --git a/api/server/router/swarm/helpers.go b/api/server/router/swarm/helpers.go index e15eead015..af745b84c3 100644 --- a/api/server/router/swarm/helpers.go +++ b/api/server/router/swarm/helpers.go @@ -7,7 +7,6 @@ import ( "github.com/docker/docker/api/server/httputils" basictypes "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" - "github.com/docker/docker/pkg/stdcopy" "golang.org/x/net/context" ) @@ -24,32 +23,43 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r * return fmt.Errorf("Bad parameters: you must choose at least one stream") } - logsConfig := &backend.ContainerLogsConfig{ - ContainerLogsOptions: basictypes.ContainerLogsOptions{ - Follow: httputils.BoolValue(r, "follow"), - Timestamps: httputils.BoolValue(r, "timestamps"), - Since: r.Form.Get("since"), - Tail: r.Form.Get("tail"), - ShowStdout: stdout, - ShowStderr: stderr, - Details: httputils.BoolValue(r, "details"), - }, - OutStream: w, + // there is probably a neater way to manufacture the ContainerLogsOptions + // struct, probably in the caller, to eliminate the dependency on net/http + logsConfig := &basictypes.ContainerLogsOptions{ + Follow: httputils.BoolValue(r, "follow"), + Timestamps: httputils.BoolValue(r, "timestamps"), + Since: r.Form.Get("since"), + Tail: r.Form.Get("tail"), + ShowStdout: stdout, + ShowStderr: stderr, + Details: httputils.BoolValue(r, "details"), } - chStarted := make(chan struct{}) - if err := sr.backend.ServiceLogs(ctx, selector, logsConfig, chStarted); err != nil { - select { - case <-chStarted: - // The client may be expecting all of the data we're sending to - // be multiplexed, so send it through OutStream, which will - // have been set up to handle that if needed. - stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr) - fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err) - default: + tty := false + // checking for whether logs are TTY involves iterating over every service + // and task. idk if there is a better way + for _, service := range selector.Services { + s, err := sr.backend.GetService(service) + if err != nil { + // maybe should return some context with this error? return err } + tty = s.Spec.TaskTemplate.ContainerSpec.TTY || tty + } + for _, task := range selector.Tasks { + t, err := sr.backend.GetTask(task) + if err != nil { + // as above + return err + } + tty = t.Spec.ContainerSpec.TTY || tty } + msgs, err := sr.backend.ServiceLogs(ctx, selector, logsConfig) + if err != nil { + return err + } + + httputils.WriteLogStream(ctx, w, msgs, logsConfig, !tty) return nil } diff --git a/api/types/backend/backend.go b/api/types/backend/backend.go index 4632c0cd9d..83efae300b 100644 --- a/api/types/backend/backend.go +++ b/api/types/backend/backend.go @@ -3,6 +3,7 @@ package backend import ( "io" + "time" "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/streamformatter" @@ -25,13 +26,28 @@ type ContainerAttachConfig struct { MuxStreams bool } -// ContainerLogsConfig holds configs for logging operations. Exists -// for users of the backend to to pass it a logging configuration. -type ContainerLogsConfig struct { - types.ContainerLogsOptions - OutStream io.Writer +// LogMessage is datastructure that represents piece of output produced by some +// container. The Line member is a slice of an array whose contents can be +// changed after a log driver's Log() method returns. +// changes to this struct need to be reflect in the reset method in +// daemon/logger/logger.go +type LogMessage struct { + Line []byte + Source string + Timestamp time.Time + Attrs LogAttributes + Partial bool + + // Err is an error associated with a message. Completeness of a message + // with Err is not expected, tho it may be partially complete (fields may + // be missing, gibberish, or nil) + Err error } +// LogAttributes is used to hold the extra attributes available in the log message +// Primarily used for converting the map type to string and sorting. +type LogAttributes map[string]string + // LogSelector is a list of services and tasks that should be returned as part // of a log stream. It is similar to swarmapi.LogSelector, with the difference // that the names don't have to be resolved to IDs; this is mostly to avoid diff --git a/cli/command/service/logs.go b/cli/command/service/logs.go index da2374f9dd..cfcb7ed105 100644 --- a/cli/command/service/logs.go +++ b/cli/command/service/logs.go @@ -73,6 +73,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { Timestamps: opts.timestamps, Follow: opts.follow, Tail: opts.tail, + Details: true, } cli := dockerCli.Client() @@ -80,6 +81,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { var ( maxLength = 1 responseBody io.ReadCloser + tty bool ) service, _, err := cli.ServiceInspectWithRaw(ctx, opts.target) @@ -89,6 +91,14 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { return err } task, _, err := cli.TaskInspectWithRaw(ctx, opts.target) + tty = task.Spec.ContainerSpec.TTY + // TODO(dperny) hot fix until we get a nice details system squared away, + // ignores details (including task context) if we have a TTY log + if tty { + options.Details = false + } + + responseBody, err = cli.TaskLogs(ctx, opts.target, options) if err != nil { if client.IsErrTaskNotFound(err) { // if the task ALSO isn't found, rewrite the error to be clear @@ -100,6 +110,13 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { maxLength = getMaxLength(task.Slot) responseBody, err = cli.TaskLogs(ctx, opts.target, options) } else { + tty = service.Spec.TaskTemplate.ContainerSpec.TTY + // TODO(dperny) hot fix until we get a nice details system squared away, + // ignores details (including task context) if we have a TTY log + if tty { + options.Details = false + } + responseBody, err = cli.ServiceLogs(ctx, opts.target, options) if err != nil { return err @@ -112,6 +129,11 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { } defer responseBody.Close() + if tty { + _, err = io.Copy(dockerCli.Out(), responseBody) + return err + } + taskFormatter := newTaskFormatter(cli, opts, maxLength) stdout := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Out()} diff --git a/daemon/cluster/executor/backend.go b/daemon/cluster/executor/backend.go index 5fe953ac05..acd018315b 100644 --- a/daemon/cluster/executor/backend.go +++ b/daemon/cluster/executor/backend.go @@ -33,7 +33,7 @@ type Backend interface { CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error) ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error ContainerStop(name string, seconds *int) error - ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error + ContainerLogs(context.Context, string, *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error ActivateContainerServiceBinding(containerName string) error DeactivateContainerServiceBinding(containerName string) error diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go index 02ccf62c7e..1c669e68e2 100644 --- a/daemon/cluster/executor/container/adapter.go +++ b/daemon/cluster/executor/container/adapter.go @@ -396,26 +396,15 @@ func (c *containerAdapter) deactivateServiceBinding() error { return c.backend.DeactivateContainerServiceBinding(c.container.name()) } -func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) { - // we can't handle the peculiarities of a TTY-attached container yet - conf := c.container.config() - if conf != nil && conf.Tty { - return nil, errors.New("logs not supported on containers with a TTY attached") - } +func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) { + apiOptions := &types.ContainerLogsOptions{ + Follow: options.Follow, - reader, writer := io.Pipe() - - apiOptions := &backend.ContainerLogsConfig{ - ContainerLogsOptions: types.ContainerLogsOptions{ - Follow: options.Follow, - - // TODO(stevvooe): Parse timestamp out of message. This - // absolutely needs to be done before going to production with - // this, at it is completely redundant. - Timestamps: true, - Details: false, // no clue what to do with this, let's just deprecate it. - }, - OutStream: writer, + // TODO(stevvooe): Parse timestamp out of message. This + // absolutely needs to be done before going to production with + // this, at it is completely redundant. + Timestamps: true, + Details: false, // no clue what to do with this, let's just deprecate it. } if options.Since != nil { @@ -449,14 +438,11 @@ func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscription } } } - - chStarted := make(chan struct{}) - go func() { - defer writer.Close() - c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted) - }() - - return reader, nil + msgs, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions) + if err != nil { + return nil, err + } + return msgs, nil } // todo: typed/wrapped errors diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go index da09d2ee47..5c6f803509 100644 --- a/daemon/cluster/executor/container/controller.go +++ b/daemon/cluster/executor/container/controller.go @@ -1,11 +1,7 @@ package container import ( - "bufio" - "bytes" - "encoding/binary" "fmt" - "io" "os" "strconv" "strings" @@ -445,11 +441,12 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti return errors.Wrap(err, "container not ready for logs") } - rc, err := r.adapter.logs(ctx, options) + logsContext, cancel := context.WithCancel(ctx) + msgs, err := r.adapter.logs(logsContext, options) + defer cancel() if err != nil { return errors.Wrap(err, "failed getting container logs") } - defer rc.Close() var ( // use a rate limiter to keep things under control but also provides some @@ -462,53 +459,38 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti } ) - brd := bufio.NewReader(rc) for { - // so, message header is 8 bytes, treat as uint64, pull stream off MSB - var header uint64 - if err := binary.Read(brd, binary.BigEndian, &header); err != nil { - if err == io.EOF { - return nil - } - - return errors.Wrap(err, "failed reading log header") + msg, ok := <-msgs + if !ok { + // we're done here, no more messages + return nil } - stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3)) + if msg.Err != nil { + // the defered cancel closes the adapter's log stream + return msg.Err + } - // limit here to decrease allocation back pressure. - if err := limiter.WaitN(ctx, int(size)); err != nil { + // wait here for the limiter to catch up + if err := limiter.WaitN(ctx, len(msg.Line)); err != nil { return errors.Wrap(err, "failed rate limiter") } - - buf := make([]byte, size) - _, err := io.ReadFull(brd, buf) - if err != nil { - return errors.Wrap(err, "failed reading buffer") - } - - // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish - parts := bytes.SplitN(buf, []byte(" "), 2) - if len(parts) != 2 { - return fmt.Errorf("invalid timestamp in log message: %v", buf) - } - - ts, err := time.Parse(time.RFC3339Nano, string(parts[0])) - if err != nil { - return errors.Wrap(err, "failed to parse timestamp") - } - - tsp, err := gogotypes.TimestampProto(ts) + tsp, err := gogotypes.TimestampProto(msg.Timestamp) if err != nil { return errors.Wrap(err, "failed to convert timestamp") } + var stream api.LogStream + if msg.Source == "stdout" { + stream = api.LogStreamStdout + } else if msg.Source == "stderr" { + stream = api.LogStreamStderr + } if err := publisher.Publish(ctx, api.LogMessage{ Context: msgctx, Timestamp: tsp, - Stream: api.LogStream(stream), - - Data: parts[1], + Stream: stream, + Data: msg.Line, }); err != nil { return errors.Wrap(err, "failed to publish log message") } diff --git a/daemon/cluster/services.go b/daemon/cluster/services.go index b12b60fcbd..8fd730eee7 100644 --- a/daemon/cluster/services.go +++ b/daemon/cluster/services.go @@ -18,9 +18,6 @@ import ( types "github.com/docker/docker/api/types/swarm" timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/daemon/cluster/convert" - "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/ioutils" - "github.com/docker/docker/pkg/stdcopy" runconfigopts "github.com/docker/docker/runconfig/opts" swarmapi "github.com/docker/swarmkit/api" gogotypes "github.com/gogo/protobuf/types" @@ -303,56 +300,44 @@ func (c *Cluster) RemoveService(input string) error { } // ServiceLogs collects service logs and writes them back to `config.OutStream` -func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *backend.ContainerLogsConfig, started chan struct{}) error { +func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *apitypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) { c.mu.RLock() - defer func() { - select { - case <-started: - // if we've started streaming logs, we are no longer holding the - // lock and do not have to release it - return - default: - // before we start, though, we're holding this lock and it needs to - // be released - c.mu.RUnlock() - } - }() + defer c.mu.RUnlock() + state := c.currentNodeState() if !state.IsActiveManager() { - return c.errNoManager(state) + return nil, c.errNoManager(state) } - swarmSelector, tty, err := convertSelector(ctx, state.controlClient, selector) + swarmSelector, err := convertSelector(ctx, state.controlClient, selector) if err != nil { - return errors.Wrap(err, "error making log selector") - } - - // TODO(dperny) this goes away when we support TTY logs, which is in the works - if tty { - return errors.New("service logs not supported on tasks with a TTY attached") + return nil, errors.Wrap(err, "error making log selector") } // set the streams we'll use stdStreams := []swarmapi.LogStream{} - if config.ContainerLogsOptions.ShowStdout { + if config.ShowStdout { stdStreams = append(stdStreams, swarmapi.LogStreamStdout) } - if config.ContainerLogsOptions.ShowStderr { + if config.ShowStderr { stdStreams = append(stdStreams, swarmapi.LogStreamStderr) } // Get tail value squared away - the number of previous log lines we look at var tail int64 + // in ContainerLogs, if the tail value is ANYTHING non-integer, we just set + // it to -1 (all). i don't agree with that, but i also think no tail value + // should be legitimate. if you don't pass tail, we assume you want "all" if config.Tail == "all" || config.Tail == "" { // tail of 0 means send all logs on the swarmkit side tail = 0 } else { t, err := strconv.Atoi(config.Tail) if err != nil { - return errors.New("tail value must be a positive integer or \"all\"") + return nil, errors.New("tail value must be a positive integer or \"all\"") } if t < 0 { - return errors.New("negative tail values not supported") + return nil, errors.New("negative tail values not supported") } // we actually use negative tail in swarmkit to represent messages // backwards starting from the beginning. also, -1 means no logs. so, @@ -370,12 +355,12 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector if config.Since != "" { s, n, err := timetypes.ParseTimestamps(config.Since, 0) if err != nil { - return errors.Wrap(err, "could not parse since timestamp") + return nil, errors.Wrap(err, "could not parse since timestamp") } since := time.Unix(s, n) sinceProto, err = gogotypes.TimestampProto(since) if err != nil { - return errors.Wrap(err, "could not parse timestamp to proto") + return nil, errors.Wrap(err, "could not parse timestamp to proto") } } @@ -389,106 +374,96 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector }, }) if err != nil { - return err + return nil, err } - wf := ioutils.NewWriteFlusher(config.OutStream) - defer wf.Close() - - // Release the lock before starting the stream. - // - // this feels like it could be racy because we would double unlock if we - // somehow returned right after we unlocked but before we closed, but I do - // not think such a thing is possible. i wish it were possible to atomically - // close and unlock but that might be overkill. programming is hard. - c.mu.RUnlock() - close(started) - - wf.Flush() - - outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout) - errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr) - - for { - // Check the context before doing anything. - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - subscribeMsg, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - for _, msg := range subscribeMsg.Messages { - data := []byte{} - - if config.Timestamps { - ts, err := gogotypes.TimestampFromProto(msg.Timestamp) - if err != nil { - return err + messageChan := make(chan *backend.LogMessage, 1) + go func() { + defer close(messageChan) + for { + // Check the context before doing anything. + select { + case <-ctx.Done(): + return + default: + } + subscribeMsg, err := stream.Recv() + if err == io.EOF { + return + } + // if we're not io.EOF, push the message in and return + if err != nil { + select { + case <-ctx.Done(): + case messageChan <- &backend.LogMessage{Err: err}: } - data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...) + return } - data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ", - contextPrefix, msg.Context.NodeID, - contextPrefix, msg.Context.ServiceID, - contextPrefix, msg.Context.TaskID, - ))...) + for _, msg := range subscribeMsg.Messages { + // make a new message + m := new(backend.LogMessage) + m.Attrs = make(backend.LogAttributes) + // add the timestamp, adding the error if it fails + m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp) + if err != nil { + m.Err = err + } + m.Attrs[contextPrefix+".node.id"] = msg.Context.NodeID + m.Attrs[contextPrefix+".service.id"] = msg.Context.ServiceID + m.Attrs[contextPrefix+".task.id"] = msg.Context.TaskID + switch msg.Stream { + case swarmapi.LogStreamStdout: + m.Source = "stdout" + case swarmapi.LogStreamStderr: + m.Source = "stderr" + } + m.Line = msg.Data - data = append(data, msg.Data...) - - switch msg.Stream { - case swarmapi.LogStreamStdout: - outStream.Write(data) - case swarmapi.LogStreamStderr: - errStream.Write(data) + // there could be a case where the reader stops accepting + // messages and the context is canceled. we need to check that + // here, or otherwise we risk blocking forever on the message + // send. + select { + case <-ctx.Done(): + return + case messageChan <- m: + } } } - } + }() + return messageChan, nil } // convertSelector takes a backend.LogSelector, which contains raw names that // may or may not be valid, and converts them to an api.LogSelector proto. It -// also returns a boolean, true if any of the services use a TTY (false -// otherwise) and an error if something fails -func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, bool, error) { - // if ANY tasks use a TTY, don't mux streams - var tty bool +// returns an error if something fails +func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) { // don't rely on swarmkit to resolve IDs, do it ourselves swarmSelector := &swarmapi.LogSelector{} for _, s := range selector.Services { service, err := getService(ctx, cc, s) if err != nil { - return nil, false, err + return nil, err } c := service.Spec.Task.GetContainer() if c == nil { - return nil, false, errors.New("logs only supported on container tasks") + return nil, errors.New("logs only supported on container tasks") } - // set TTY true if we have a TTY service, or if it's already true - tty = tty || c.TTY swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID) } for _, t := range selector.Tasks { task, err := getTask(ctx, cc, t) if err != nil { - return nil, false, err + return nil, err } c := task.Spec.GetContainer() if c == nil { - return nil, false, errors.New("logs only supported on container tasks") + return nil, errors.New("logs only supported on container tasks") } - tty = tty || c.TTY swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID) } - return swarmSelector, tty, nil + return swarmSelector, nil } // imageWithDigestString takes an image such as name or name:tag diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 7172663aa0..1135195dc2 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -9,11 +9,10 @@ package logger import ( "errors" - "sort" - "strings" "sync" "time" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/pkg/jsonlog" ) @@ -43,14 +42,13 @@ func PutMessage(msg *Message) { // Message is datastructure that represents piece of output produced by some // container. The Line member is a slice of an array whose contents can be // changed after a log driver's Log() method returns. +// +// Message is subtyped from backend.LogMessage because there is a lot of +// internal complexity around the Message type that should not be exposed +// to any package not explicitly importing the logger type. +// // Any changes made to this struct must also be updated in the `reset` function -type Message struct { - Line []byte - Source string - Timestamp time.Time - Attrs LogAttributes - Partial bool -} +type Message backend.LogMessage // reset sets the message back to default values // This is used when putting a message back into the message pool. @@ -60,31 +58,20 @@ func (m *Message) reset() { m.Source = "" m.Attrs = nil m.Partial = false + + m.Err = nil +} + +// AsLogMessage returns a pointer to the message as a pointer to +// backend.LogMessage, which is an identical type with a different purpose +func (m *Message) AsLogMessage() *backend.LogMessage { + return (*backend.LogMessage)(m) } // LogAttributes is used to hold the extra attributes available in the log message // Primarily used for converting the map type to string and sorting. -type LogAttributes map[string]string -type byKey []string - -func (s byKey) Len() int { return len(s) } -func (s byKey) Less(i, j int) bool { - keyI := strings.Split(s[i], "=") - keyJ := strings.Split(s[j], "=") - return keyI[0] < keyJ[0] -} -func (s byKey) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -func (a LogAttributes) String() string { - var ss byKey - for k, v := range a { - ss = append(ss, k+"="+v) - } - sort.Sort(ss) - return strings.Join(ss, ",") -} +// Imported here so it can be used internally with less refactoring +type LogAttributes backend.LogAttributes // Logger is the interface for docker logging drivers. type Logger interface { diff --git a/daemon/logs.go b/daemon/logs.go index b1b0bbb910..b207fb693e 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -2,48 +2,57 @@ package daemon import ( "errors" - "io" "strconv" "time" "golang.org/x/net/context" "github.com/Sirupsen/logrus" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" containertypes "github.com/docker/docker/api/types/container" timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/container" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/ioutils" - "github.com/docker/docker/pkg/stdcopy" ) -// ContainerLogs hooks up a container's stdout and stderr streams -// configured with the given struct. -func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error { +// ContainerLogs copies the container's log channel to the channel provided in +// the config. If ContainerLogs returns an error, no messages have been copied. +// and the channel will be closed without data. +// +// if it returns nil, the config channel will be active and return log +// messages until it runs out or the context is canceled. +func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) { + lg := logrus.WithFields(logrus.Fields{ + "module": "daemon", + "method": "(*Daemon).ContainerLogs", + "container": containerName, + }) + if !(config.ShowStdout || config.ShowStderr) { - return errors.New("You must choose at least one stream") + return nil, errors.New("You must choose at least one stream") } container, err := daemon.GetContainer(containerName) if err != nil { - return err + return nil, err } if container.RemovalInProgress || container.Dead { - return errors.New("can not get logs from container which is dead or marked for removal") + return nil, errors.New("can not get logs from container which is dead or marked for removal") } if container.HostConfig.LogConfig.Type == "none" { - return logger.ErrReadLogsNotSupported + return nil, logger.ErrReadLogsNotSupported } cLog, err := daemon.getLogger(container) if err != nil { - return err + return nil, err } + logReader, ok := cLog.(logger.LogReader) if !ok { - return logger.ErrReadLogsNotSupported + return nil, logger.ErrReadLogsNotSupported } follow := config.Follow && container.IsRunning() @@ -52,76 +61,91 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c tailLines = -1 } - logrus.Debug("logs: begin stream") - var since time.Time if config.Since != "" { s, n, err := timetypes.ParseTimestamps(config.Since, 0) if err != nil { - return err + return nil, err } since = time.Unix(s, n) } + readConfig := logger.ReadConfig{ Since: since, Tail: tailLines, Follow: follow, } + logs := logReader.ReadLogs(readConfig) - // Close logWatcher on exit - defer func() { - logs.Close() - if cLog != container.LogDriver { - // Since the logger isn't cached in the container, which - // occurs if it is running, it must get explicitly closed - // here to avoid leaking it and any file handles it has. - if err := cLog.Close(); err != nil { - logrus.Errorf("Error closing logger: %v", err) + + // past this point, we can't possibly return any errors, so we can just + // start a goroutine and return to tell the caller not to expect errors + // (if the caller wants to give up on logs, they have to cancel the context) + // this goroutine functions as a shim between the logger and the caller. + messageChan := make(chan *backend.LogMessage, 1) + go func() { + // set up some defers + defer func() { + // ok so this function, originally, was placed right after that + // logger.ReadLogs call above. I THINK that means it sets off the + // chain of events that results in the logger needing to be closed. + // i do not know if an error in time parsing above causing an early + // return will result in leaking the logger. if that is the case, + // it would also have been a bug in the original code + logs.Close() + if cLog != container.LogDriver { + // Since the logger isn't cached in the container, which + // occurs if it is running, it must get explicitly closed + // here to avoid leaking it and any file handles it has. + if err := cLog.Close(); err != nil { + logrus.Errorf("Error closing logger: %v", err) + } + } + }() + // close the messages channel. closing is the only way to signal above + // that we're doing with logs (other than context cancel i guess). + defer close(messageChan) + + lg.Debug("begin logs") + for { + select { + // i do not believe as the system is currently designed any error + // is possible, but we should be prepared to handle it anyway. if + // we do get an error, copy only the error field to a new object so + // we don't end up with partial data in the other fields + case err := <-logs.Err: + lg.Errorf("Error streaming logs: %v", err) + select { + case <-ctx.Done(): + case messageChan <- &backend.LogMessage{Err: err}: + } + return + case <-ctx.Done(): + lg.Debug("logs: end stream, ctx is done: %v", ctx.Err()) + return + case msg, ok := <-logs.Msg: + // there is some kind of pool or ring buffer in the logger that + // produces these messages, and a possible future optimization + // might be to use that pool and reuse message objects + if !ok { + lg.Debug("end logs") + return + } + m := msg.AsLogMessage() // just a pointer conversion, does not copy data + + // there could be a case where the reader stops accepting + // messages and the context is canceled. we need to check that + // here, or otherwise we risk blocking forever on the message + // send. + select { + case <-ctx.Done(): + return + case messageChan <- m: + } } } }() - - wf := ioutils.NewWriteFlusher(config.OutStream) - defer wf.Close() - close(started) - wf.Flush() - - var outStream io.Writer - outStream = wf - errStream := outStream - if !container.Config.Tty { - errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) - outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) - } - - for { - select { - case err := <-logs.Err: - logrus.Errorf("Error streaming logs: %v", err) - return nil - case <-ctx.Done(): - logrus.Debugf("logs: end stream, ctx is done: %v", ctx.Err()) - return nil - case msg, ok := <-logs.Msg: - if !ok { - logrus.Debug("logs: end stream") - return nil - } - logLine := msg.Line - if config.Details { - logLine = append([]byte(msg.Attrs.String()+" "), logLine...) - } - if config.Timestamps { - logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...) - } - if msg.Source == "stdout" && config.ShowStdout { - outStream.Write(logLine) - } - if msg.Source == "stderr" && config.ShowStderr { - errStream.Write(logLine) - } - } - } + return messageChan, nil } func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, error) { diff --git a/integration-cli/docker_cli_service_logs_experimental_test.go b/integration-cli/docker_cli_service_logs_experimental_test.go index 80a3b277b1..96bb48bfe1 100644 --- a/integration-cli/docker_cli_service_logs_experimental_test.go +++ b/integration-cli/docker_cli_service_logs_experimental_test.go @@ -254,3 +254,43 @@ func (s *DockerSwarmSuite) TestServiceLogsTaskLogs(c *check.C) { } } } + +func (s *DockerSwarmSuite) TestServiceLogsTTY(c *check.C) { + testRequires(c, ExperimentalDaemon) + + d := s.AddDaemon(c, true, true) + + name := "TestServiceLogsTTY" + + result := icmd.RunCmd(d.Command( + // create a service + "service", "create", + // name it $name + "--name", name, + // use a TTY + "-t", + // busybox image, shell string + "busybox", "sh", "-c", + // echo to stdout and stderr + "echo out; (echo err 1>&2); sleep 10000", + )) + + result.Assert(c, icmd.Expected{}) + id := strings.TrimSpace(result.Stdout()) + c.Assert(id, checker.Not(checker.Equals), "") + // so, right here, we're basically inspecting by id and returning only + // the ID. if they don't match, the service doesn't exist. + result = icmd.RunCmd(d.Command("service", "inspect", "--format=\"{{.ID}}\"", id)) + result.Assert(c, icmd.Expected{Out: id}) + + // make sure task has been deployed. + waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 1) + // and make sure we have all the log lines + waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 2) + + cmd := d.Command("service", "logs", name) + result = icmd.RunCmd(cmd) + // for some reason there is carriage return in the output. i think this is + // just expected. + c.Assert(result, icmd.Matches, icmd.Expected{Out: "out\r\nerr\r\n"}) +}