1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

refactor logs and support service logs /w tty

Refactor container logs system to make communicating log messages
internally much simpler. Move responsibility for marshalling log
messages into the REST server. Support TTY logs. Pave the way for fixing
the ambiguous bytestream format. Pave the way for fixing details.

Signed-off-by: Drew Erny <drew.erny@docker.com>
This commit is contained in:
Drew Erny 2017-03-20 10:07:04 -07:00
parent f6b7dc9837
commit 1044093bb0
14 changed files with 454 additions and 315 deletions

View file

@ -0,0 +1,92 @@
package httputils
import (
"fmt"
"io"
"sort"
"strings"
"golang.org/x/net/context"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/stdcopy"
)
// WriteLogStream writes an encoded byte stream of log messages from the
// messages channel, multiplexing them with a stdcopy.Writer if mux is true
func WriteLogStream(ctx context.Context, w io.Writer, msgs <-chan *backend.LogMessage, config *types.ContainerLogsOptions, mux bool) {
wf := ioutils.NewWriteFlusher(w)
defer wf.Close()
wf.Flush()
// this might seem like doing below is clear:
// var outStream io.Writer = wf
// however, this GREATLY DISPLEASES golint, and if you do that, it will
// fail CI. we need outstream to be type writer because if we mux streams,
// we will need to reassign all of the streams to be stdwriters, which only
// conforms to the io.Writer interface.
var outStream io.Writer
outStream = wf
errStream := outStream
sysErrStream := errStream
if mux {
sysErrStream = stdcopy.NewStdWriter(outStream, stdcopy.Systemerr)
errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
}
for {
msg, ok := <-msgs
if !ok {
return
}
// check if the message contains an error. if so, write that error
// and exit
if msg.Err != nil {
fmt.Fprintf(sysErrStream, "Error grabbing logs: %v\n", msg.Err)
continue
}
logLine := msg.Line
if config.Details {
logLine = append([]byte(stringAttrs(msg.Attrs)+" "), logLine...)
}
if config.Timestamps {
// TODO(dperny) the format is defined in
// daemon/logger/logger.go as logger.TimeFormat. importing
// logger is verboten (not part of backend) so idk if just
// importing the same thing from jsonlog is good enough
logLine = append([]byte(msg.Timestamp.Format(jsonlog.RFC3339NanoFixed)+" "), logLine...)
}
if msg.Source == "stdout" && config.ShowStdout {
outStream.Write(logLine)
}
if msg.Source == "stderr" && config.ShowStderr {
errStream.Write(logLine)
}
}
}
type byKey []string
func (s byKey) Len() int { return len(s) }
func (s byKey) Less(i, j int) bool {
keyI := strings.Split(s[i], "=")
keyJ := strings.Split(s[j], "=")
return keyI[0] < keyJ[0]
}
func (s byKey) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func stringAttrs(a backend.LogAttributes) string {
var ss byKey
for k, v := range a {
ss = append(ss, k+"="+v)
}
sort.Sort(ss)
return strings.Join(ss, ",")
}

View file

@ -51,7 +51,7 @@ type stateBackend interface {
type monitorBackend interface {
ContainerChanges(name string) ([]archive.Change, error)
ContainerInspect(name string, size bool, version string) (interface{}, error)
ContainerLogs(ctx context.Context, name string, config *backend.ContainerLogsConfig, started chan struct{}) error
ContainerLogs(ctx context.Context, name string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error)
ContainerStats(ctx context.Context, name string, config *backend.ContainerStatsConfig) error
ContainerTop(name string, psArgs string) (*container.ContainerTopOKBody, error)

View file

@ -10,6 +10,7 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
"github.com/docker/docker/api/server/httputils"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
@ -18,7 +19,6 @@ import (
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/stdcopy"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
)
@ -91,33 +91,38 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
}
containerName := vars["name"]
logsConfig := &backend.ContainerLogsConfig{
ContainerLogsOptions: types.ContainerLogsOptions{
Follow: httputils.BoolValue(r, "follow"),
Timestamps: httputils.BoolValue(r, "timestamps"),
Since: r.Form.Get("since"),
Tail: r.Form.Get("tail"),
ShowStdout: stdout,
ShowStderr: stderr,
Details: httputils.BoolValue(r, "details"),
},
OutStream: w,
logsConfig := &types.ContainerLogsOptions{
Follow: httputils.BoolValue(r, "follow"),
Timestamps: httputils.BoolValue(r, "timestamps"),
Since: r.Form.Get("since"),
Tail: r.Form.Get("tail"),
ShowStdout: stdout,
ShowStderr: stderr,
Details: httputils.BoolValue(r, "details"),
}
chStarted := make(chan struct{})
if err := s.backend.ContainerLogs(ctx, containerName, logsConfig, chStarted); err != nil {
select {
case <-chStarted:
// The client may be expecting all of the data we're sending to
// be multiplexed, so mux it through the Systemerr stream, which
// will cause the client to throw an error when demuxing
stdwriter := stdcopy.NewStdWriter(logsConfig.OutStream, stdcopy.Systemerr)
fmt.Fprintf(stdwriter, "Error running logs job: %v\n", err)
default:
return err
}
// doesn't matter what version the client is on, we're using this internally only
// also do we need size? i'm thinkin no we don't
raw, err := s.backend.ContainerInspect(containerName, false, api.DefaultVersion)
if err != nil {
return err
}
container, ok := raw.(*types.ContainerJSON)
if !ok {
// %T prints the type. handy!
return fmt.Errorf("expected container to be *types.ContainerJSON but got %T", raw)
}
msgs, err := s.backend.ContainerLogs(ctx, containerName, logsConfig)
if err != nil {
return err
}
// if has a tty, we're not muxing streams. if it doesn't, we are. simple.
// this is the point of no return for writing a response. once we call
// WriteLogStream, the response has been started and errors will be
// returned in band by WriteLogStream
httputils.WriteLogStream(ctx, w, msgs, logsConfig, !container.Config.Tty)
return nil
}

View file

@ -21,7 +21,7 @@ type Backend interface {
CreateService(types.ServiceSpec, string) (*basictypes.ServiceCreateResponse, error)
UpdateService(string, uint64, types.ServiceSpec, basictypes.ServiceUpdateOptions) (*basictypes.ServiceUpdateResponse, error)
RemoveService(string) error
ServiceLogs(context.Context, *backend.LogSelector, *backend.ContainerLogsConfig, chan struct{}) error
ServiceLogs(context.Context, *backend.LogSelector, *basictypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error)
GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
GetNode(string) (types.Node, error)
UpdateNode(string, uint64, types.NodeSpec) error

View file

@ -7,7 +7,6 @@ import (
"github.com/docker/docker/api/server/httputils"
basictypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/pkg/stdcopy"
"golang.org/x/net/context"
)
@ -24,32 +23,43 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *
return fmt.Errorf("Bad parameters: you must choose at least one stream")
}
logsConfig := &backend.ContainerLogsConfig{
ContainerLogsOptions: basictypes.ContainerLogsOptions{
Follow: httputils.BoolValue(r, "follow"),
Timestamps: httputils.BoolValue(r, "timestamps"),
Since: r.Form.Get("since"),
Tail: r.Form.Get("tail"),
ShowStdout: stdout,
ShowStderr: stderr,
Details: httputils.BoolValue(r, "details"),
},
OutStream: w,
// there is probably a neater way to manufacture the ContainerLogsOptions
// struct, probably in the caller, to eliminate the dependency on net/http
logsConfig := &basictypes.ContainerLogsOptions{
Follow: httputils.BoolValue(r, "follow"),
Timestamps: httputils.BoolValue(r, "timestamps"),
Since: r.Form.Get("since"),
Tail: r.Form.Get("tail"),
ShowStdout: stdout,
ShowStderr: stderr,
Details: httputils.BoolValue(r, "details"),
}
chStarted := make(chan struct{})
if err := sr.backend.ServiceLogs(ctx, selector, logsConfig, chStarted); err != nil {
select {
case <-chStarted:
// The client may be expecting all of the data we're sending to
// be multiplexed, so send it through OutStream, which will
// have been set up to handle that if needed.
stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr)
fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err)
default:
tty := false
// checking for whether logs are TTY involves iterating over every service
// and task. idk if there is a better way
for _, service := range selector.Services {
s, err := sr.backend.GetService(service)
if err != nil {
// maybe should return some context with this error?
return err
}
tty = s.Spec.TaskTemplate.ContainerSpec.TTY || tty
}
for _, task := range selector.Tasks {
t, err := sr.backend.GetTask(task)
if err != nil {
// as above
return err
}
tty = t.Spec.ContainerSpec.TTY || tty
}
msgs, err := sr.backend.ServiceLogs(ctx, selector, logsConfig)
if err != nil {
return err
}
httputils.WriteLogStream(ctx, w, msgs, logsConfig, !tty)
return nil
}

View file

@ -3,6 +3,7 @@ package backend
import (
"io"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/streamformatter"
@ -25,13 +26,28 @@ type ContainerAttachConfig struct {
MuxStreams bool
}
// ContainerLogsConfig holds configs for logging operations. Exists
// for users of the backend to to pass it a logging configuration.
type ContainerLogsConfig struct {
types.ContainerLogsOptions
OutStream io.Writer
// LogMessage is datastructure that represents piece of output produced by some
// container. The Line member is a slice of an array whose contents can be
// changed after a log driver's Log() method returns.
// changes to this struct need to be reflect in the reset method in
// daemon/logger/logger.go
type LogMessage struct {
Line []byte
Source string
Timestamp time.Time
Attrs LogAttributes
Partial bool
// Err is an error associated with a message. Completeness of a message
// with Err is not expected, tho it may be partially complete (fields may
// be missing, gibberish, or nil)
Err error
}
// LogAttributes is used to hold the extra attributes available in the log message
// Primarily used for converting the map type to string and sorting.
type LogAttributes map[string]string
// LogSelector is a list of services and tasks that should be returned as part
// of a log stream. It is similar to swarmapi.LogSelector, with the difference
// that the names don't have to be resolved to IDs; this is mostly to avoid

View file

@ -73,6 +73,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
Timestamps: opts.timestamps,
Follow: opts.follow,
Tail: opts.tail,
Details: true,
}
cli := dockerCli.Client()
@ -80,6 +81,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
var (
maxLength = 1
responseBody io.ReadCloser
tty bool
)
service, _, err := cli.ServiceInspectWithRaw(ctx, opts.target)
@ -89,6 +91,14 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
return err
}
task, _, err := cli.TaskInspectWithRaw(ctx, opts.target)
tty = task.Spec.ContainerSpec.TTY
// TODO(dperny) hot fix until we get a nice details system squared away,
// ignores details (including task context) if we have a TTY log
if tty {
options.Details = false
}
responseBody, err = cli.TaskLogs(ctx, opts.target, options)
if err != nil {
if client.IsErrTaskNotFound(err) {
// if the task ALSO isn't found, rewrite the error to be clear
@ -100,6 +110,13 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
maxLength = getMaxLength(task.Slot)
responseBody, err = cli.TaskLogs(ctx, opts.target, options)
} else {
tty = service.Spec.TaskTemplate.ContainerSpec.TTY
// TODO(dperny) hot fix until we get a nice details system squared away,
// ignores details (including task context) if we have a TTY log
if tty {
options.Details = false
}
responseBody, err = cli.ServiceLogs(ctx, opts.target, options)
if err != nil {
return err
@ -112,6 +129,11 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
}
defer responseBody.Close()
if tty {
_, err = io.Copy(dockerCli.Out(), responseBody)
return err
}
taskFormatter := newTaskFormatter(cli, opts, maxLength)
stdout := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Out()}

View file

@ -33,7 +33,7 @@ type Backend interface {
CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
ContainerStop(name string, seconds *int) error
ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
ContainerLogs(context.Context, string, *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error)
ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
ActivateContainerServiceBinding(containerName string) error
DeactivateContainerServiceBinding(containerName string) error

View file

@ -396,26 +396,15 @@ func (c *containerAdapter) deactivateServiceBinding() error {
return c.backend.DeactivateContainerServiceBinding(c.container.name())
}
func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
// we can't handle the peculiarities of a TTY-attached container yet
conf := c.container.config()
if conf != nil && conf.Tty {
return nil, errors.New("logs not supported on containers with a TTY attached")
}
func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
apiOptions := &types.ContainerLogsOptions{
Follow: options.Follow,
reader, writer := io.Pipe()
apiOptions := &backend.ContainerLogsConfig{
ContainerLogsOptions: types.ContainerLogsOptions{
Follow: options.Follow,
// TODO(stevvooe): Parse timestamp out of message. This
// absolutely needs to be done before going to production with
// this, at it is completely redundant.
Timestamps: true,
Details: false, // no clue what to do with this, let's just deprecate it.
},
OutStream: writer,
// TODO(stevvooe): Parse timestamp out of message. This
// absolutely needs to be done before going to production with
// this, at it is completely redundant.
Timestamps: true,
Details: false, // no clue what to do with this, let's just deprecate it.
}
if options.Since != nil {
@ -449,14 +438,11 @@ func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscription
}
}
}
chStarted := make(chan struct{})
go func() {
defer writer.Close()
c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
}()
return reader, nil
msgs, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
if err != nil {
return nil, err
}
return msgs, nil
}
// todo: typed/wrapped errors

View file

@ -1,11 +1,7 @@
package container
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"os"
"strconv"
"strings"
@ -445,11 +441,12 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
return errors.Wrap(err, "container not ready for logs")
}
rc, err := r.adapter.logs(ctx, options)
logsContext, cancel := context.WithCancel(ctx)
msgs, err := r.adapter.logs(logsContext, options)
defer cancel()
if err != nil {
return errors.Wrap(err, "failed getting container logs")
}
defer rc.Close()
var (
// use a rate limiter to keep things under control but also provides some
@ -462,53 +459,38 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
}
)
brd := bufio.NewReader(rc)
for {
// so, message header is 8 bytes, treat as uint64, pull stream off MSB
var header uint64
if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "failed reading log header")
msg, ok := <-msgs
if !ok {
// we're done here, no more messages
return nil
}
stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))
if msg.Err != nil {
// the defered cancel closes the adapter's log stream
return msg.Err
}
// limit here to decrease allocation back pressure.
if err := limiter.WaitN(ctx, int(size)); err != nil {
// wait here for the limiter to catch up
if err := limiter.WaitN(ctx, len(msg.Line)); err != nil {
return errors.Wrap(err, "failed rate limiter")
}
buf := make([]byte, size)
_, err := io.ReadFull(brd, buf)
if err != nil {
return errors.Wrap(err, "failed reading buffer")
}
// Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
parts := bytes.SplitN(buf, []byte(" "), 2)
if len(parts) != 2 {
return fmt.Errorf("invalid timestamp in log message: %v", buf)
}
ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
if err != nil {
return errors.Wrap(err, "failed to parse timestamp")
}
tsp, err := gogotypes.TimestampProto(ts)
tsp, err := gogotypes.TimestampProto(msg.Timestamp)
if err != nil {
return errors.Wrap(err, "failed to convert timestamp")
}
var stream api.LogStream
if msg.Source == "stdout" {
stream = api.LogStreamStdout
} else if msg.Source == "stderr" {
stream = api.LogStreamStderr
}
if err := publisher.Publish(ctx, api.LogMessage{
Context: msgctx,
Timestamp: tsp,
Stream: api.LogStream(stream),
Data: parts[1],
Stream: stream,
Data: msg.Line,
}); err != nil {
return errors.Wrap(err, "failed to publish log message")
}

View file

@ -18,9 +18,6 @@ import (
types "github.com/docker/docker/api/types/swarm"
timetypes "github.com/docker/docker/api/types/time"
"github.com/docker/docker/daemon/cluster/convert"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stdcopy"
runconfigopts "github.com/docker/docker/runconfig/opts"
swarmapi "github.com/docker/swarmkit/api"
gogotypes "github.com/gogo/protobuf/types"
@ -303,56 +300,44 @@ func (c *Cluster) RemoveService(input string) error {
}
// ServiceLogs collects service logs and writes them back to `config.OutStream`
func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *backend.ContainerLogsConfig, started chan struct{}) error {
func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *apitypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
c.mu.RLock()
defer func() {
select {
case <-started:
// if we've started streaming logs, we are no longer holding the
// lock and do not have to release it
return
default:
// before we start, though, we're holding this lock and it needs to
// be released
c.mu.RUnlock()
}
}()
defer c.mu.RUnlock()
state := c.currentNodeState()
if !state.IsActiveManager() {
return c.errNoManager(state)
return nil, c.errNoManager(state)
}
swarmSelector, tty, err := convertSelector(ctx, state.controlClient, selector)
swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
if err != nil {
return errors.Wrap(err, "error making log selector")
}
// TODO(dperny) this goes away when we support TTY logs, which is in the works
if tty {
return errors.New("service logs not supported on tasks with a TTY attached")
return nil, errors.Wrap(err, "error making log selector")
}
// set the streams we'll use
stdStreams := []swarmapi.LogStream{}
if config.ContainerLogsOptions.ShowStdout {
if config.ShowStdout {
stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
}
if config.ContainerLogsOptions.ShowStderr {
if config.ShowStderr {
stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
}
// Get tail value squared away - the number of previous log lines we look at
var tail int64
// in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
// it to -1 (all). i don't agree with that, but i also think no tail value
// should be legitimate. if you don't pass tail, we assume you want "all"
if config.Tail == "all" || config.Tail == "" {
// tail of 0 means send all logs on the swarmkit side
tail = 0
} else {
t, err := strconv.Atoi(config.Tail)
if err != nil {
return errors.New("tail value must be a positive integer or \"all\"")
return nil, errors.New("tail value must be a positive integer or \"all\"")
}
if t < 0 {
return errors.New("negative tail values not supported")
return nil, errors.New("negative tail values not supported")
}
// we actually use negative tail in swarmkit to represent messages
// backwards starting from the beginning. also, -1 means no logs. so,
@ -370,12 +355,12 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector
if config.Since != "" {
s, n, err := timetypes.ParseTimestamps(config.Since, 0)
if err != nil {
return errors.Wrap(err, "could not parse since timestamp")
return nil, errors.Wrap(err, "could not parse since timestamp")
}
since := time.Unix(s, n)
sinceProto, err = gogotypes.TimestampProto(since)
if err != nil {
return errors.Wrap(err, "could not parse timestamp to proto")
return nil, errors.Wrap(err, "could not parse timestamp to proto")
}
}
@ -389,106 +374,96 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector
},
})
if err != nil {
return err
return nil, err
}
wf := ioutils.NewWriteFlusher(config.OutStream)
defer wf.Close()
// Release the lock before starting the stream.
//
// this feels like it could be racy because we would double unlock if we
// somehow returned right after we unlocked but before we closed, but I do
// not think such a thing is possible. i wish it were possible to atomically
// close and unlock but that might be overkill. programming is hard.
c.mu.RUnlock()
close(started)
wf.Flush()
outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
for {
// Check the context before doing anything.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
subscribeMsg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
for _, msg := range subscribeMsg.Messages {
data := []byte{}
if config.Timestamps {
ts, err := gogotypes.TimestampFromProto(msg.Timestamp)
if err != nil {
return err
messageChan := make(chan *backend.LogMessage, 1)
go func() {
defer close(messageChan)
for {
// Check the context before doing anything.
select {
case <-ctx.Done():
return
default:
}
subscribeMsg, err := stream.Recv()
if err == io.EOF {
return
}
// if we're not io.EOF, push the message in and return
if err != nil {
select {
case <-ctx.Done():
case messageChan <- &backend.LogMessage{Err: err}:
}
data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
return
}
data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
contextPrefix, msg.Context.NodeID,
contextPrefix, msg.Context.ServiceID,
contextPrefix, msg.Context.TaskID,
))...)
for _, msg := range subscribeMsg.Messages {
// make a new message
m := new(backend.LogMessage)
m.Attrs = make(backend.LogAttributes)
// add the timestamp, adding the error if it fails
m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
if err != nil {
m.Err = err
}
m.Attrs[contextPrefix+".node.id"] = msg.Context.NodeID
m.Attrs[contextPrefix+".service.id"] = msg.Context.ServiceID
m.Attrs[contextPrefix+".task.id"] = msg.Context.TaskID
switch msg.Stream {
case swarmapi.LogStreamStdout:
m.Source = "stdout"
case swarmapi.LogStreamStderr:
m.Source = "stderr"
}
m.Line = msg.Data
data = append(data, msg.Data...)
switch msg.Stream {
case swarmapi.LogStreamStdout:
outStream.Write(data)
case swarmapi.LogStreamStderr:
errStream.Write(data)
// there could be a case where the reader stops accepting
// messages and the context is canceled. we need to check that
// here, or otherwise we risk blocking forever on the message
// send.
select {
case <-ctx.Done():
return
case messageChan <- m:
}
}
}
}
}()
return messageChan, nil
}
// convertSelector takes a backend.LogSelector, which contains raw names that
// may or may not be valid, and converts them to an api.LogSelector proto. It
// also returns a boolean, true if any of the services use a TTY (false
// otherwise) and an error if something fails
func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, bool, error) {
// if ANY tasks use a TTY, don't mux streams
var tty bool
// returns an error if something fails
func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
// don't rely on swarmkit to resolve IDs, do it ourselves
swarmSelector := &swarmapi.LogSelector{}
for _, s := range selector.Services {
service, err := getService(ctx, cc, s)
if err != nil {
return nil, false, err
return nil, err
}
c := service.Spec.Task.GetContainer()
if c == nil {
return nil, false, errors.New("logs only supported on container tasks")
return nil, errors.New("logs only supported on container tasks")
}
// set TTY true if we have a TTY service, or if it's already true
tty = tty || c.TTY
swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
}
for _, t := range selector.Tasks {
task, err := getTask(ctx, cc, t)
if err != nil {
return nil, false, err
return nil, err
}
c := task.Spec.GetContainer()
if c == nil {
return nil, false, errors.New("logs only supported on container tasks")
return nil, errors.New("logs only supported on container tasks")
}
tty = tty || c.TTY
swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
}
return swarmSelector, tty, nil
return swarmSelector, nil
}
// imageWithDigestString takes an image such as name or name:tag

View file

@ -9,11 +9,10 @@ package logger
import (
"errors"
"sort"
"strings"
"sync"
"time"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/pkg/jsonlog"
)
@ -43,14 +42,13 @@ func PutMessage(msg *Message) {
// Message is datastructure that represents piece of output produced by some
// container. The Line member is a slice of an array whose contents can be
// changed after a log driver's Log() method returns.
//
// Message is subtyped from backend.LogMessage because there is a lot of
// internal complexity around the Message type that should not be exposed
// to any package not explicitly importing the logger type.
//
// Any changes made to this struct must also be updated in the `reset` function
type Message struct {
Line []byte
Source string
Timestamp time.Time
Attrs LogAttributes
Partial bool
}
type Message backend.LogMessage
// reset sets the message back to default values
// This is used when putting a message back into the message pool.
@ -60,31 +58,20 @@ func (m *Message) reset() {
m.Source = ""
m.Attrs = nil
m.Partial = false
m.Err = nil
}
// AsLogMessage returns a pointer to the message as a pointer to
// backend.LogMessage, which is an identical type with a different purpose
func (m *Message) AsLogMessage() *backend.LogMessage {
return (*backend.LogMessage)(m)
}
// LogAttributes is used to hold the extra attributes available in the log message
// Primarily used for converting the map type to string and sorting.
type LogAttributes map[string]string
type byKey []string
func (s byKey) Len() int { return len(s) }
func (s byKey) Less(i, j int) bool {
keyI := strings.Split(s[i], "=")
keyJ := strings.Split(s[j], "=")
return keyI[0] < keyJ[0]
}
func (s byKey) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (a LogAttributes) String() string {
var ss byKey
for k, v := range a {
ss = append(ss, k+"="+v)
}
sort.Sort(ss)
return strings.Join(ss, ",")
}
// Imported here so it can be used internally with less refactoring
type LogAttributes backend.LogAttributes
// Logger is the interface for docker logging drivers.
type Logger interface {

View file

@ -2,48 +2,57 @@ package daemon
import (
"errors"
"io"
"strconv"
"time"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend"
containertypes "github.com/docker/docker/api/types/container"
timetypes "github.com/docker/docker/api/types/time"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stdcopy"
)
// ContainerLogs hooks up a container's stdout and stderr streams
// configured with the given struct.
func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
// ContainerLogs copies the container's log channel to the channel provided in
// the config. If ContainerLogs returns an error, no messages have been copied.
// and the channel will be closed without data.
//
// if it returns nil, the config channel will be active and return log
// messages until it runs out or the context is canceled.
func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
lg := logrus.WithFields(logrus.Fields{
"module": "daemon",
"method": "(*Daemon).ContainerLogs",
"container": containerName,
})
if !(config.ShowStdout || config.ShowStderr) {
return errors.New("You must choose at least one stream")
return nil, errors.New("You must choose at least one stream")
}
container, err := daemon.GetContainer(containerName)
if err != nil {
return err
return nil, err
}
if container.RemovalInProgress || container.Dead {
return errors.New("can not get logs from container which is dead or marked for removal")
return nil, errors.New("can not get logs from container which is dead or marked for removal")
}
if container.HostConfig.LogConfig.Type == "none" {
return logger.ErrReadLogsNotSupported
return nil, logger.ErrReadLogsNotSupported
}
cLog, err := daemon.getLogger(container)
if err != nil {
return err
return nil, err
}
logReader, ok := cLog.(logger.LogReader)
if !ok {
return logger.ErrReadLogsNotSupported
return nil, logger.ErrReadLogsNotSupported
}
follow := config.Follow && container.IsRunning()
@ -52,76 +61,91 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
tailLines = -1
}
logrus.Debug("logs: begin stream")
var since time.Time
if config.Since != "" {
s, n, err := timetypes.ParseTimestamps(config.Since, 0)
if err != nil {
return err
return nil, err
}
since = time.Unix(s, n)
}
readConfig := logger.ReadConfig{
Since: since,
Tail: tailLines,
Follow: follow,
}
logs := logReader.ReadLogs(readConfig)
// Close logWatcher on exit
defer func() {
logs.Close()
if cLog != container.LogDriver {
// Since the logger isn't cached in the container, which
// occurs if it is running, it must get explicitly closed
// here to avoid leaking it and any file handles it has.
if err := cLog.Close(); err != nil {
logrus.Errorf("Error closing logger: %v", err)
// past this point, we can't possibly return any errors, so we can just
// start a goroutine and return to tell the caller not to expect errors
// (if the caller wants to give up on logs, they have to cancel the context)
// this goroutine functions as a shim between the logger and the caller.
messageChan := make(chan *backend.LogMessage, 1)
go func() {
// set up some defers
defer func() {
// ok so this function, originally, was placed right after that
// logger.ReadLogs call above. I THINK that means it sets off the
// chain of events that results in the logger needing to be closed.
// i do not know if an error in time parsing above causing an early
// return will result in leaking the logger. if that is the case,
// it would also have been a bug in the original code
logs.Close()
if cLog != container.LogDriver {
// Since the logger isn't cached in the container, which
// occurs if it is running, it must get explicitly closed
// here to avoid leaking it and any file handles it has.
if err := cLog.Close(); err != nil {
logrus.Errorf("Error closing logger: %v", err)
}
}
}()
// close the messages channel. closing is the only way to signal above
// that we're doing with logs (other than context cancel i guess).
defer close(messageChan)
lg.Debug("begin logs")
for {
select {
// i do not believe as the system is currently designed any error
// is possible, but we should be prepared to handle it anyway. if
// we do get an error, copy only the error field to a new object so
// we don't end up with partial data in the other fields
case err := <-logs.Err:
lg.Errorf("Error streaming logs: %v", err)
select {
case <-ctx.Done():
case messageChan <- &backend.LogMessage{Err: err}:
}
return
case <-ctx.Done():
lg.Debug("logs: end stream, ctx is done: %v", ctx.Err())
return
case msg, ok := <-logs.Msg:
// there is some kind of pool or ring buffer in the logger that
// produces these messages, and a possible future optimization
// might be to use that pool and reuse message objects
if !ok {
lg.Debug("end logs")
return
}
m := msg.AsLogMessage() // just a pointer conversion, does not copy data
// there could be a case where the reader stops accepting
// messages and the context is canceled. we need to check that
// here, or otherwise we risk blocking forever on the message
// send.
select {
case <-ctx.Done():
return
case messageChan <- m:
}
}
}
}()
wf := ioutils.NewWriteFlusher(config.OutStream)
defer wf.Close()
close(started)
wf.Flush()
var outStream io.Writer
outStream = wf
errStream := outStream
if !container.Config.Tty {
errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
}
for {
select {
case err := <-logs.Err:
logrus.Errorf("Error streaming logs: %v", err)
return nil
case <-ctx.Done():
logrus.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
return nil
case msg, ok := <-logs.Msg:
if !ok {
logrus.Debug("logs: end stream")
return nil
}
logLine := msg.Line
if config.Details {
logLine = append([]byte(msg.Attrs.String()+" "), logLine...)
}
if config.Timestamps {
logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...)
}
if msg.Source == "stdout" && config.ShowStdout {
outStream.Write(logLine)
}
if msg.Source == "stderr" && config.ShowStderr {
errStream.Write(logLine)
}
}
}
return messageChan, nil
}
func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, error) {

View file

@ -254,3 +254,43 @@ func (s *DockerSwarmSuite) TestServiceLogsTaskLogs(c *check.C) {
}
}
}
func (s *DockerSwarmSuite) TestServiceLogsTTY(c *check.C) {
testRequires(c, ExperimentalDaemon)
d := s.AddDaemon(c, true, true)
name := "TestServiceLogsTTY"
result := icmd.RunCmd(d.Command(
// create a service
"service", "create",
// name it $name
"--name", name,
// use a TTY
"-t",
// busybox image, shell string
"busybox", "sh", "-c",
// echo to stdout and stderr
"echo out; (echo err 1>&2); sleep 10000",
))
result.Assert(c, icmd.Expected{})
id := strings.TrimSpace(result.Stdout())
c.Assert(id, checker.Not(checker.Equals), "")
// so, right here, we're basically inspecting by id and returning only
// the ID. if they don't match, the service doesn't exist.
result = icmd.RunCmd(d.Command("service", "inspect", "--format=\"{{.ID}}\"", id))
result.Assert(c, icmd.Expected{Out: id})
// make sure task has been deployed.
waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 1)
// and make sure we have all the log lines
waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 2)
cmd := d.Command("service", "logs", name)
result = icmd.RunCmd(cmd)
// for some reason there is carriage return in the output. i think this is
// just expected.
c.Assert(result, icmd.Matches, icmd.Expected{Out: "out\r\nerr\r\n"})
}