diff --git a/commands.go b/commands.go index 7f8f9eec0b..f0e1695b3f 100644 --- a/commands.go +++ b/commands.go @@ -1105,10 +1105,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error { return nil } - if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stdout=1", false, nil, cli.out); err != nil { - return err - } - if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stderr=1", false, nil, cli.err); err != nil { + if err := cli.hijack("POST", "/containers/"+cmd.Arg(0)+"/attach?logs=1&stdout=1&stderr=1", false, nil, cli.out); err != nil { return err } return nil diff --git a/commands_test.go b/commands_test.go index 3f4c53db03..233c6337d4 100644 --- a/commands_test.go +++ b/commands_test.go @@ -59,7 +59,6 @@ func assertPipe(input, output string, r io.Reader, w io.Writer, count int) error return nil } - // TestRunHostname checks that 'docker run -h' correctly sets a custom hostname func TestRunHostname(t *testing.T) { stdout, stdoutPipe := io.Pipe() @@ -91,7 +90,6 @@ func TestRunHostname(t *testing.T) { } - // TestAttachStdin checks attaching to stdin without stdout and stderr. // 'docker run -i -a stdin' should sends the client's stdin to the command, // then detach from it and print the container id. @@ -144,15 +142,17 @@ func TestRunAttachStdin(t *testing.T) { }) // Check logs - if cmdLogs, err := container.ReadLog("stdout"); err != nil { + if cmdLogs, err := container.ReadLog("json"); err != nil { t.Fatal(err) } else { if output, err := ioutil.ReadAll(cmdLogs); err != nil { t.Fatal(err) } else { - expectedLog := "hello\nhi there\n" - if string(output) != expectedLog { - t.Fatalf("Unexpected logs: should be '%s', not '%s'\n", expectedLog, output) + expectedLogs := []string{"{\"log\":\"hello\\n\",\"stream\":\"stdout\"", "{\"log\":\"hi there\\n\",\"stream\":\"stdout\""} + for _, expectedLog := range expectedLogs { + if !strings.Contains(string(output), expectedLog) { + t.Fatalf("Unexpected logs: should contains '%s', it is not '%s'\n", expectedLog, output) + } } } } diff --git a/container.go b/container.go index 4443ad52a3..d0a0dd7714 100644 --- a/container.go +++ b/container.go @@ -653,10 +653,10 @@ func (container *Container) Start(hostConfig *HostConfig) error { container.cmd = exec.Command("lxc-start", params...) // Setup logging of stdout and stderr to disk - if err := container.runtime.LogToDisk(container.stdout, container.logPath("stdout")); err != nil { + if err := container.runtime.LogToDisk(container.stdout, container.logPath("json"), "stdout"); err != nil { return err } - if err := container.runtime.LogToDisk(container.stderr, container.logPath("stderr")); err != nil { + if err := container.runtime.LogToDisk(container.stderr, container.logPath("json"), "stderr"); err != nil { return err } @@ -715,13 +715,13 @@ func (container *Container) StdinPipe() (io.WriteCloser, error) { func (container *Container) StdoutPipe() (io.ReadCloser, error) { reader, writer := io.Pipe() - container.stdout.AddWriter(writer) + container.stdout.AddWriter(writer, "") return utils.NewBufReader(reader), nil } func (container *Container) StderrPipe() (io.ReadCloser, error) { reader, writer := io.Pipe() - container.stderr.AddWriter(writer) + container.stderr.AddWriter(writer, "") return utils.NewBufReader(reader), nil } diff --git a/runtime.go b/runtime.go index a3abbc3b50..f4c5b4d380 100644 --- a/runtime.go +++ b/runtime.go @@ -167,12 +167,12 @@ func (runtime *Runtime) Register(container *Container) error { return nil } -func (runtime *Runtime) LogToDisk(src *utils.WriteBroadcaster, dst string) error { +func (runtime *Runtime) LogToDisk(src *utils.WriteBroadcaster, dst, stream string) error { log, err := os.OpenFile(dst, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return err } - src.AddWriter(log) + src.AddWriter(log, stream) return nil } diff --git a/server.go b/server.go index 954bbb208f..b92ed8fd73 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package docker import ( "bufio" + "encoding/json" "errors" "fmt" "github.com/dotcloud/docker/auth" @@ -1055,20 +1056,41 @@ func (srv *Server) ContainerAttach(name string, logs, stream, stdin, stdout, std } //logs if logs { - if stdout { - cLog, err := container.ReadLog("stdout") - if err != nil { - utils.Debugf("Error reading logs (stdout): %s", err) - } else if _, err := io.Copy(out, cLog); err != nil { - utils.Debugf("Error streaming logs (stdout): %s", err) + cLog, err := container.ReadLog("json") + if err != nil && os.IsNotExist(err) { + // Legacy logs + utils.Debugf("Old logs format") + if stdout { + cLog, err := container.ReadLog("stdout") + if err != nil { + utils.Debugf("Error reading logs (stdout): %s", err) + } else if _, err := io.Copy(out, cLog); err != nil { + utils.Debugf("Error streaming logs (stdout): %s", err) + } } - } - if stderr { - cLog, err := container.ReadLog("stderr") - if err != nil { - utils.Debugf("Error reading logs (stderr): %s", err) - } else if _, err := io.Copy(out, cLog); err != nil { - utils.Debugf("Error streaming logs (stderr): %s", err) + if stderr { + cLog, err := container.ReadLog("stderr") + if err != nil { + utils.Debugf("Error reading logs (stderr): %s", err) + } else if _, err := io.Copy(out, cLog); err != nil { + utils.Debugf("Error streaming logs (stderr): %s", err) + } + } + } else if err != nil { + utils.Debugf("Error reading logs (json): %s", err) + } else { + dec := json.NewDecoder(cLog) + for { + var l utils.JSONLog + if err := dec.Decode(&l); err == io.EOF { + break + } else if err != nil { + utils.Debugf("Error streaming logs: %s", err) + break + } + if (l.Stream == "stdout" && stdout) || (l.Stream == "stderr" && stderr) { + fmt.Fprintf(out, "%s", l.Log) + } } } } diff --git a/utils/utils.go b/utils/utils.go index c85731a2ad..77b3f879cd 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -248,30 +248,54 @@ func (r *bufReader) Close() error { type WriteBroadcaster struct { sync.Mutex - writers map[io.WriteCloser]struct{} + buf *bytes.Buffer + writers map[StreamWriter]bool } -func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser) { +type StreamWriter struct { + wc io.WriteCloser + stream string +} + +func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) { w.Lock() - w.writers[writer] = struct{}{} + sw := StreamWriter{wc: writer, stream: stream} + w.writers[sw] = true w.Unlock() } -// FIXME: Is that function used? -// FIXME: This relies on the concrete writer type used having equality operator -func (w *WriteBroadcaster) RemoveWriter(writer io.WriteCloser) { - w.Lock() - delete(w.writers, writer) - w.Unlock() +type JSONLog struct { + Log string `json:"log,omitempty"` + Stream string `json:"stream,omitempty"` + Created time.Time `json:"time"` } func (w *WriteBroadcaster) Write(p []byte) (n int, err error) { w.Lock() defer w.Unlock() - for writer := range w.writers { - if n, err := writer.Write(p); err != nil || n != len(p) { + w.buf.Write(p) + for sw := range w.writers { + lp := p + if sw.stream != "" { + lp = nil + for { + line, err := w.buf.ReadString('\n') + if err != nil { + w.buf.Write([]byte(line)) + break + } + b, err := json.Marshal(&JSONLog{Log: line, Stream: sw.stream, Created: time.Now()}) + if err != nil { + // On error, evict the writer + delete(w.writers, sw) + continue + } + lp = append(lp, b...) + } + } + if n, err := sw.wc.Write(lp); err != nil || n != len(lp) { // On error, evict the writer - delete(w.writers, writer) + delete(w.writers, sw) } } return len(p), nil @@ -280,15 +304,15 @@ func (w *WriteBroadcaster) Write(p []byte) (n int, err error) { func (w *WriteBroadcaster) CloseWriters() error { w.Lock() defer w.Unlock() - for writer := range w.writers { - writer.Close() + for sw := range w.writers { + sw.wc.Close() } - w.writers = make(map[io.WriteCloser]struct{}) + w.writers = make(map[StreamWriter]bool) return nil } func NewWriteBroadcaster() *WriteBroadcaster { - return &WriteBroadcaster{writers: make(map[io.WriteCloser]struct{})} + return &WriteBroadcaster{writers: make(map[StreamWriter]bool), buf: bytes.NewBuffer(nil)} } func GetTotalUsedFds() int {