diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go new file mode 100644 index 0000000000..5b75c919ef --- /dev/null +++ b/daemon/logger/copier.go @@ -0,0 +1,48 @@ +package logger + +import ( + "bufio" + "io" + "time" + + "github.com/Sirupsen/logrus" +) + +// Copier can copy logs from specified sources to Logger and attach +// ContainerID and Timestamp. +// Writes are concurrent, so you need implement some sync in your logger +type Copier struct { + // cid is container id for which we copying logs + cid string + // srcs is map of name -> reader pairs, for example "stdout", "stderr" + srcs map[string]io.Reader + dst Logger +} + +// NewCopier creates new Copier +func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) (*Copier, error) { + return &Copier{ + cid: cid, + srcs: srcs, + dst: dst, + }, nil +} + +// Run starts logs copying +func (c *Copier) Run() { + for src, w := range c.srcs { + go c.copySrc(src, w) + } +} + +func (c *Copier) copySrc(name string, src io.Reader) { + scanner := bufio.NewScanner(src) + for scanner.Scan() { + if err := c.dst.Log(&Message{ContainerID: c.cid, Line: scanner.Bytes(), Source: name, Timestamp: time.Now().UTC()}); err != nil { + logrus.Errorf("Failed to log msg %q for logger %s: %s", scanner.Bytes(), c.dst.Name(), err) + } + } + if err := scanner.Err(); err != nil { + logrus.Errorf("Error scanning log stream: %s", err) + } +} diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go new file mode 100644 index 0000000000..0071af3a5f --- /dev/null +++ b/daemon/logger/copier_test.go @@ -0,0 +1,100 @@ +package logger + +import ( + "bytes" + "encoding/json" + "io" + "testing" + "time" +) + +type TestLoggerJSON struct { + *json.Encoder +} + +func (l *TestLoggerJSON) Log(m *Message) error { + return l.Encode(m) +} + +func (l *TestLoggerJSON) Close() error { + return nil +} + +func (l *TestLoggerJSON) Name() string { + return "json" +} + +type TestLoggerText struct { + *bytes.Buffer +} + +func (l *TestLoggerText) Log(m *Message) error { + _, err := l.WriteString(m.ContainerID + " " + m.Source + " " + string(m.Line) + "\n") + return err +} + +func (l *TestLoggerText) Close() error { + return nil +} + +func (l *TestLoggerText) Name() string { + return "text" +} + +func TestCopier(t *testing.T) { + stdoutLine := "Line that thinks that it is log line from docker stdout" + stderrLine := "Line that thinks that it is log line from docker stderr" + var stdout bytes.Buffer + var stderr bytes.Buffer + for i := 0; i < 30; i++ { + if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil { + t.Fatal(err) + } + if _, err := stderr.WriteString(stderrLine + "\n"); err != nil { + t.Fatal(err) + } + } + + var jsonBuf bytes.Buffer + + jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} + + cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657" + c, err := NewCopier(cid, + map[string]io.Reader{ + "stdout": &stdout, + "stderr": &stderr, + }, + jsonLog) + if err != nil { + t.Fatal(err) + } + c.Run() + time.Sleep(100 * time.Millisecond) + dec := json.NewDecoder(&jsonBuf) + for { + var msg Message + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + if msg.Source != "stdout" && msg.Source != "stderr" { + t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr") + } + if msg.ContainerID != cid { + t.Fatalf("Wrong ContainerID: %q, expected %q", msg.ContainerID, cid) + } + if msg.Source == "stdout" { + if string(msg.Line) != stdoutLine { + t.Fatalf("Wrong Line: %q, expected %q", msg.Line, stdoutLine) + } + } + if msg.Source == "stderr" { + if string(msg.Line) != stderrLine { + t.Fatalf("Wrong Line: %q, expected %q", msg.Line, stderrLine) + } + } + } +} diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go new file mode 100644 index 0000000000..078e67d8e9 --- /dev/null +++ b/daemon/logger/logger.go @@ -0,0 +1,18 @@ +package logger + +import "time" + +// Message is datastructure that represents record from some container +type Message struct { + ContainerID string + Line []byte + Source string + Timestamp time.Time +} + +// Logger is interface for docker logging drivers +type Logger interface { + Log(*Message) error + Name() string + Close() error +}