diff --git a/container/stream/attach.go b/container/stream/attach.go index cee90b88a1..1366dcb499 100644 --- a/container/stream/attach.go +++ b/container/stream/attach.go @@ -3,11 +3,12 @@ package stream // import "github.com/docker/docker/container/stream" import ( "context" "io" - "sync" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/term" + "github.com/pkg/errors" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q @@ -57,117 +58,107 @@ func (c *Config) AttachStreams(cfg *AttachConfig) { // CopyStreams starts goroutines to copy data in and out to/from the container func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error { - var ( - wg sync.WaitGroup - errors = make(chan error, 3) - ) - - if cfg.Stdin != nil { - wg.Add(1) - } - - if cfg.Stdout != nil { - wg.Add(1) - } - - if cfg.Stderr != nil { - wg.Add(1) - } + var group errgroup.Group // Connect stdin of container to the attach stdin stream. - go func() { - if cfg.Stdin == nil { - return - } - logrus.Debug("attach: stdin: begin") + if cfg.Stdin != nil { + group.Go(func() error { + logrus.Debug("attach: stdin: begin") + defer logrus.Debug("attach: stdin: end") - var err error - if cfg.TTY { - _, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys) - } else { - _, err = pools.Copy(cfg.CStdin, cfg.Stdin) - } + defer func() { + if cfg.CloseStdin && !cfg.TTY { + cfg.CStdin.Close() + } else { + // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr + if cfg.CStdout != nil { + cfg.CStdout.Close() + } + if cfg.CStderr != nil { + cfg.CStderr.Close() + } + } + }() + + var err error + if cfg.TTY { + _, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys) + } else { + _, err = pools.Copy(cfg.CStdin, cfg.Stdin) + } + if err == io.ErrClosedPipe { + err = nil + } + if err != nil { + logrus.WithError(err).Debug("error on attach stdin") + return errors.Wrap(err, "error on attach stdin") + } + return nil + }) + } + + attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) error { + logrus.Debugf("attach: %s: begin", name) + defer logrus.Debugf("attach: %s: end", name) + defer func() { + // Make sure stdin gets closed + if cfg.Stdin != nil { + cfg.Stdin.Close() + } + streamPipe.Close() + }() + + _, err := pools.Copy(stream, streamPipe) if err == io.ErrClosedPipe { err = nil } if err != nil { - logrus.Errorf("attach: stdin: %s", err) - errors <- err + logrus.WithError(err).Debugf("attach: %s", name) + return errors.Wrapf(err, "error attaching %s stream", name) } - if cfg.CloseStdin && !cfg.TTY { - cfg.CStdin.Close() - } else { - // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr + return nil + } + + if cfg.Stdout != nil { + group.Go(func() error { + return attachStream("stdout", cfg.Stdout, cfg.CStdout) + }) + } + if cfg.Stderr != nil { + group.Go(func() error { + return attachStream("stderr", cfg.Stderr, cfg.CStderr) + }) + } + + errs := make(chan error, 1) + go func() { + defer logrus.Debug("attach done") + groupErr := make(chan error, 1) + go func() { + groupErr <- group.Wait() + }() + select { + case <-ctx.Done(): + // close all pipes + if cfg.CStdin != nil { + cfg.CStdin.Close() + } if cfg.CStdout != nil { cfg.CStdout.Close() } if cfg.CStderr != nil { cfg.CStderr.Close() } - } - logrus.Debug("attach: stdin: end") - wg.Done() - }() - attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) { - if stream == nil { - return - } - - logrus.Debugf("attach: %s: begin", name) - _, err := pools.Copy(stream, streamPipe) - if err == io.ErrClosedPipe { - err = nil - } - if err != nil { - logrus.Errorf("attach: %s: %v", name, err) - errors <- err - } - // Make sure stdin gets closed - if cfg.Stdin != nil { - cfg.Stdin.Close() - } - streamPipe.Close() - logrus.Debugf("attach: %s: end", name) - wg.Done() - } - - go attachStream("stdout", cfg.Stdout, cfg.CStdout) - go attachStream("stderr", cfg.Stderr, cfg.CStderr) - - errs := make(chan error, 1) - - go func() { - defer close(errs) - errs <- func() error { - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case <-done: - case <-ctx.Done(): - // close all pipes - if cfg.CStdin != nil { - cfg.CStdin.Close() - } - if cfg.CStdout != nil { - cfg.CStdout.Close() - } - if cfg.CStderr != nil { - cfg.CStderr.Close() - } - <-done + // Now with these closed, wait should return. + if err := group.Wait(); err != nil { + errs <- err + return } - close(errors) - for err := range errors { - if err != nil { - return err - } - } - return nil - }() + errs <- ctx.Err() + case err := <-groupErr: + errs <- err + } }() return errs diff --git a/daemon/attach.go b/daemon/attach.go index 21ed9a06b7..fb14691d24 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -176,7 +176,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach ctx := c.InitAttachContext() err := <-c.StreamConfig.CopyStreams(ctx, cfg) if err != nil { - if _, ok := err.(term.EscapeError); ok { + if _, ok := errors.Cause(err).(term.EscapeError); ok || err == context.Canceled { daemon.LogContainerEvent(c, "detach") } else { logrus.Errorf("attach failed with error: %v", err)