1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Fix fd leak on attach

With a full attach, each attach was leaking 4 goroutines.
This updates attach to use errgroup instead of the hodge-podge of
waitgroups and channels.

In addition, the detach event was never being sent.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2018-05-31 10:01:27 -04:00
parent b85799b63f
commit 0f51477017
2 changed files with 88 additions and 97 deletions

View file

@ -3,11 +3,12 @@ package stream // import "github.com/docker/docker/container/stream"
import ( import (
"context" "context"
"io" "io"
"sync"
"github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/term" "github.com/docker/docker/pkg/term"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
@ -57,117 +58,107 @@ func (c *Config) AttachStreams(cfg *AttachConfig) {
// CopyStreams starts goroutines to copy data in and out to/from the container // CopyStreams starts goroutines to copy data in and out to/from the container
func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error { func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error {
var ( var group errgroup.Group
wg sync.WaitGroup
errors = make(chan error, 3)
)
if cfg.Stdin != nil {
wg.Add(1)
}
if cfg.Stdout != nil {
wg.Add(1)
}
if cfg.Stderr != nil {
wg.Add(1)
}
// Connect stdin of container to the attach stdin stream. // Connect stdin of container to the attach stdin stream.
go func() { if cfg.Stdin != nil {
if cfg.Stdin == nil { group.Go(func() error {
return logrus.Debug("attach: stdin: begin")
} defer logrus.Debug("attach: stdin: end")
logrus.Debug("attach: stdin: begin")
var err error defer func() {
if cfg.TTY { if cfg.CloseStdin && !cfg.TTY {
_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys) cfg.CStdin.Close()
} else { } else {
_, err = pools.Copy(cfg.CStdin, cfg.Stdin) // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
} if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
}
}()
var err error
if cfg.TTY {
_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
} else {
_, err = pools.Copy(cfg.CStdin, cfg.Stdin)
}
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
logrus.WithError(err).Debug("error on attach stdin")
return errors.Wrap(err, "error on attach stdin")
}
return nil
})
}
attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) error {
logrus.Debugf("attach: %s: begin", name)
defer logrus.Debugf("attach: %s: end", name)
defer func() {
// Make sure stdin gets closed
if cfg.Stdin != nil {
cfg.Stdin.Close()
}
streamPipe.Close()
}()
_, err := pools.Copy(stream, streamPipe)
if err == io.ErrClosedPipe { if err == io.ErrClosedPipe {
err = nil err = nil
} }
if err != nil { if err != nil {
logrus.Errorf("attach: stdin: %s", err) logrus.WithError(err).Debugf("attach: %s", name)
errors <- err return errors.Wrapf(err, "error attaching %s stream", name)
} }
if cfg.CloseStdin && !cfg.TTY { return nil
cfg.CStdin.Close() }
} else {
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr if cfg.Stdout != nil {
group.Go(func() error {
return attachStream("stdout", cfg.Stdout, cfg.CStdout)
})
}
if cfg.Stderr != nil {
group.Go(func() error {
return attachStream("stderr", cfg.Stderr, cfg.CStderr)
})
}
errs := make(chan error, 1)
go func() {
defer logrus.Debug("attach done")
groupErr := make(chan error, 1)
go func() {
groupErr <- group.Wait()
}()
select {
case <-ctx.Done():
// close all pipes
if cfg.CStdin != nil {
cfg.CStdin.Close()
}
if cfg.CStdout != nil { if cfg.CStdout != nil {
cfg.CStdout.Close() cfg.CStdout.Close()
} }
if cfg.CStderr != nil { if cfg.CStderr != nil {
cfg.CStderr.Close() cfg.CStderr.Close()
} }
}
logrus.Debug("attach: stdin: end")
wg.Done()
}()
attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { // Now with these closed, wait should return.
if stream == nil { if err := group.Wait(); err != nil {
return errs <- err
} return
logrus.Debugf("attach: %s: begin", name)
_, err := pools.Copy(stream, streamPipe)
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
logrus.Errorf("attach: %s: %v", name, err)
errors <- err
}
// Make sure stdin gets closed
if cfg.Stdin != nil {
cfg.Stdin.Close()
}
streamPipe.Close()
logrus.Debugf("attach: %s: end", name)
wg.Done()
}
go attachStream("stdout", cfg.Stdout, cfg.CStdout)
go attachStream("stderr", cfg.Stderr, cfg.CStderr)
errs := make(chan error, 1)
go func() {
defer close(errs)
errs <- func() error {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// close all pipes
if cfg.CStdin != nil {
cfg.CStdin.Close()
}
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
<-done
} }
close(errors) errs <- ctx.Err()
for err := range errors { case err := <-groupErr:
if err != nil { errs <- err
return err }
}
}
return nil
}()
}() }()
return errs return errs

View file

@ -176,7 +176,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
ctx := c.InitAttachContext() ctx := c.InitAttachContext()
err := <-c.StreamConfig.CopyStreams(ctx, cfg) err := <-c.StreamConfig.CopyStreams(ctx, cfg)
if err != nil { if err != nil {
if _, ok := err.(term.EscapeError); ok { if _, ok := errors.Cause(err).(term.EscapeError); ok || err == context.Canceled {
daemon.LogContainerEvent(c, "detach") daemon.LogContainerEvent(c, "detach")
} else { } else {
logrus.Errorf("attach failed with error: %v", err) logrus.Errorf("attach failed with error: %v", err)