1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Wait for copier finishing it's work before closing logger

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
Alexander Morozov 2015-03-18 11:44:14 -07:00
parent 50bf3cbedc
commit b6a42673ab
4 changed files with 39 additions and 6 deletions

View file

@ -105,6 +105,7 @@ type Container struct {
execCommands *execStore execCommands *execStore
// logDriver for closing // logDriver for closing
logDriver logger.Logger logDriver logger.Logger
logCopier *logger.Copier
AppliedVolumesFrom map[string]struct{} AppliedVolumesFrom map[string]struct{}
} }
@ -1383,11 +1384,12 @@ func (container *Container) startLogging() error {
return fmt.Errorf("Unknown logging driver: %s", cfg.Type) return fmt.Errorf("Unknown logging driver: %s", cfg.Type)
} }
if copier, err := logger.NewCopier(container.ID, map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l); err != nil { copier, err := logger.NewCopier(container.ID, map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
if err != nil {
return err return err
} else {
copier.Run()
} }
container.logCopier = copier
copier.Run()
container.logDriver = l container.logDriver = l
return nil return nil

View file

@ -3,6 +3,7 @@ package logger
import ( import (
"bufio" "bufio"
"io" "io"
"sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
@ -15,8 +16,9 @@ type Copier struct {
// cid is container id for which we copying logs // cid is container id for which we copying logs
cid string cid string
// srcs is map of name -> reader pairs, for example "stdout", "stderr" // srcs is map of name -> reader pairs, for example "stdout", "stderr"
srcs map[string]io.Reader srcs map[string]io.Reader
dst Logger dst Logger
copyJobs sync.WaitGroup
} }
// NewCopier creates new Copier // NewCopier creates new Copier
@ -31,11 +33,13 @@ func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) (*Copier, erro
// Run starts logs copying // Run starts logs copying
func (c *Copier) Run() { func (c *Copier) Run() {
for src, w := range c.srcs { for src, w := range c.srcs {
c.copyJobs.Add(1)
go c.copySrc(src, w) go c.copySrc(src, w)
} }
} }
func (c *Copier) copySrc(name string, src io.Reader) { func (c *Copier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
scanner := bufio.NewScanner(src) scanner := bufio.NewScanner(src)
for scanner.Scan() { for scanner.Scan() {
if err := c.dst.Log(&Message{ContainerID: c.cid, Line: scanner.Bytes(), Source: name, Timestamp: time.Now().UTC()}); err != nil { if err := c.dst.Log(&Message{ContainerID: c.cid, Line: scanner.Bytes(), Source: name, Timestamp: time.Now().UTC()}); err != nil {
@ -46,3 +50,8 @@ func (c *Copier) copySrc(name string, src io.Reader) {
logrus.Errorf("Error scanning log stream: %s", err) logrus.Errorf("Error scanning log stream: %s", err)
} }
} }
// Wait waits until all copying is done
func (c *Copier) Wait() {
c.copyJobs.Wait()
}

View file

@ -70,7 +70,16 @@ func TestCopier(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
c.Run() c.Run()
time.Sleep(100 * time.Millisecond) wait := make(chan struct{})
go func() {
c.Wait()
close(wait)
}()
select {
case <-time.After(1 * time.Second):
t.Fatal("Copier failed to do its work in 1 second")
case <-wait:
}
dec := json.NewDecoder(&jsonBuf) dec := json.NewDecoder(&jsonBuf)
for { for {
var msg Message var msg Message

View file

@ -303,7 +303,20 @@ func (m *containerMonitor) resetContainer(lock bool) {
} }
if container.logDriver != nil { if container.logDriver != nil {
if container.logCopier != nil {
exit := make(chan struct{})
go func() {
container.logCopier.Wait()
close(exit)
}()
select {
case <-time.After(1 * time.Second):
log.Warnf("Logger didn't exit in time: logs may be truncated")
case <-exit:
}
}
container.logDriver.Close() container.logDriver.Close()
container.logCopier = nil
container.logDriver = nil container.logDriver = nil
} }