From b6a42673abb8c2df5920343b9144a4ce422b2d50 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 18 Mar 2015 11:44:14 -0700 Subject: [PATCH] Wait for copier finishing it's work before closing logger Signed-off-by: Alexander Morozov --- daemon/container.go | 8 +++++--- daemon/logger/copier.go | 13 +++++++++++-- daemon/logger/copier_test.go | 11 ++++++++++- daemon/monitor.go | 13 +++++++++++++ 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/daemon/container.go b/daemon/container.go index 05dcd76264..31bce1317a 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -105,6 +105,7 @@ type Container struct { execCommands *execStore // logDriver for closing logDriver logger.Logger + logCopier *logger.Copier AppliedVolumesFrom map[string]struct{} } @@ -1383,11 +1384,12 @@ func (container *Container) startLogging() error { return fmt.Errorf("Unknown logging driver: %s", cfg.Type) } - if copier, err := logger.NewCopier(container.ID, map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l); err != nil { + copier, err := logger.NewCopier(container.ID, map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) + if err != nil { return err - } else { - copier.Run() } + container.logCopier = copier + copier.Run() container.logDriver = l return nil diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 5b75c919ef..462e42346d 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -3,6 +3,7 @@ package logger import ( "bufio" "io" + "sync" "time" "github.com/Sirupsen/logrus" @@ -15,8 +16,9 @@ type Copier struct { // cid is container id for which we copying logs cid string // srcs is map of name -> reader pairs, for example "stdout", "stderr" - srcs map[string]io.Reader - dst Logger + srcs map[string]io.Reader + dst Logger + copyJobs sync.WaitGroup } // NewCopier creates new Copier @@ -31,11 +33,13 @@ func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) (*Copier, erro // Run starts logs copying func (c *Copier) Run() { for src, w := range c.srcs { + c.copyJobs.Add(1) go c.copySrc(src, w) } } func (c *Copier) copySrc(name string, src io.Reader) { + defer c.copyJobs.Done() scanner := bufio.NewScanner(src) for scanner.Scan() { if err := c.dst.Log(&Message{ContainerID: c.cid, Line: scanner.Bytes(), Source: name, Timestamp: time.Now().UTC()}); err != nil { @@ -46,3 +50,8 @@ func (c *Copier) copySrc(name string, src io.Reader) { logrus.Errorf("Error scanning log stream: %s", err) } } + +// Wait waits until all copying is done +func (c *Copier) Wait() { + c.copyJobs.Wait() +} diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index 0071af3a5f..45f76ac8e8 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -70,7 +70,16 @@ func TestCopier(t *testing.T) { t.Fatal(err) } c.Run() - time.Sleep(100 * time.Millisecond) + wait := make(chan struct{}) + go func() { + c.Wait() + close(wait) + }() + select { + case <-time.After(1 * time.Second): + t.Fatal("Copier failed to do its work in 1 second") + case <-wait: + } dec := json.NewDecoder(&jsonBuf) for { var msg Message diff --git a/daemon/monitor.go b/daemon/monitor.go index 97d451310b..7c18b7a38c 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -303,7 +303,20 @@ func (m *containerMonitor) resetContainer(lock bool) { } if container.logDriver != nil { + if container.logCopier != nil { + exit := make(chan struct{}) + go func() { + container.logCopier.Wait() + close(exit) + }() + select { + case <-time.After(1 * time.Second): + log.Warnf("Logger didn't exit in time: logs may be truncated") + case <-exit: + } + } container.logDriver.Close() + container.logCopier = nil container.logDriver = nil }