mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
only close LogDriver after LogCopier is done
this prevents the copier from sending messages in the buffer to the closed driver. If the copied took longer than the timeout to drain the buffer, this aborts the copier read loop and return back so we can cleanup resources properly. Signed-off-by: Daniel Dao <dqminh@cloudflare.com>
This commit is contained in:
parent
2af5574795
commit
bfa80edf4b
3 changed files with 74 additions and 20 deletions
|
@ -369,6 +369,9 @@ func (m *containerMonitor) resetContainer(lock bool) {
|
|||
select {
|
||||
case <-time.After(loggerCloseTimeout):
|
||||
logrus.Warnf("Logger didn't exit in time: logs may be truncated")
|
||||
container.LogCopier.Close()
|
||||
// always waits for the LogCopier to finished before closing
|
||||
<-exit
|
||||
case <-exit:
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,14 +20,16 @@ type Copier struct {
|
|||
srcs map[string]io.Reader
|
||||
dst Logger
|
||||
copyJobs sync.WaitGroup
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
// NewCopier creates a new Copier
|
||||
func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) *Copier {
|
||||
return &Copier{
|
||||
cid: cid,
|
||||
srcs: srcs,
|
||||
dst: dst,
|
||||
cid: cid,
|
||||
srcs: srcs,
|
||||
dst: dst,
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,24 +46,28 @@ func (c *Copier) copySrc(name string, src io.Reader) {
|
|||
reader := bufio.NewReader(src)
|
||||
|
||||
for {
|
||||
line, err := reader.ReadBytes('\n')
|
||||
line = bytes.TrimSuffix(line, []byte{'\n'})
|
||||
|
||||
// ReadBytes can return full or partial output even when it failed.
|
||||
// e.g. it can return a full entry and EOF.
|
||||
if err == nil || len(line) > 0 {
|
||||
if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
|
||||
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
logrus.Errorf("Error scanning log stream: %s", err)
|
||||
}
|
||||
select {
|
||||
case <-c.closed:
|
||||
return
|
||||
}
|
||||
default:
|
||||
line, err := reader.ReadBytes('\n')
|
||||
line = bytes.TrimSuffix(line, []byte{'\n'})
|
||||
|
||||
// ReadBytes can return full or partial output even when it failed.
|
||||
// e.g. it can return a full entry and EOF.
|
||||
if err == nil || len(line) > 0 {
|
||||
if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
|
||||
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
logrus.Errorf("Error scanning log stream: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,3 +75,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
|
|||
func (c *Copier) Wait() {
|
||||
c.copyJobs.Wait()
|
||||
}
|
||||
|
||||
// Close closes the copier
|
||||
func (c *Copier) Close() {
|
||||
select {
|
||||
case <-c.closed:
|
||||
default:
|
||||
close(c.closed)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,9 +10,15 @@ import (
|
|||
|
||||
type TestLoggerJSON struct {
|
||||
*json.Encoder
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func (l *TestLoggerJSON) Log(m *Message) error { return l.Encode(m) }
|
||||
func (l *TestLoggerJSON) Log(m *Message) error {
|
||||
if l.delay > 0 {
|
||||
time.Sleep(l.delay)
|
||||
}
|
||||
return l.Encode(m)
|
||||
}
|
||||
|
||||
func (l *TestLoggerJSON) Close() error { return nil }
|
||||
|
||||
|
@ -94,3 +100,33 @@ func TestCopier(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCopierSlow(t *testing.T) {
|
||||
stdoutLine := "Line that thinks that it is log line from docker stdout"
|
||||
var stdout bytes.Buffer
|
||||
for i := 0; i < 30; i++ {
|
||||
if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var jsonBuf bytes.Buffer
|
||||
//encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
|
||||
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}
|
||||
|
||||
cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657"
|
||||
c := NewCopier(cid, map[string]io.Reader{"stdout": &stdout}, jsonLog)
|
||||
c.Run()
|
||||
wait := make(chan struct{})
|
||||
go func() {
|
||||
c.Wait()
|
||||
close(wait)
|
||||
}()
|
||||
<-time.After(150 * time.Millisecond)
|
||||
c.Close()
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatalf("failed to exit in time after the copier is closed")
|
||||
case <-wait:
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue