1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/container/stream/attach.go
Stephen J Day 0cd4ab3f9a
pkg/package: remove promise package
The promise package represents a simple enough concurrency pattern that
replicating it in place is sufficient. To end the propagation of this
package, it has been removed and the uses have been inlined.

While this code could likely be refactored to be simpler without the
package, the changes have been minimized to reduce the possibility of
defects. Someone else may want to do further refactoring to remove
closures and reduce the number of goroutines in use.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-09-21 17:56:45 -07:00

185 lines
4 KiB
Go

package stream
import (
"io"
"sync"
"golang.org/x/net/context"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/term"
"github.com/sirupsen/logrus"
)
var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
// AttachConfig is the config struct used to attach a client to a stream's stdio
type AttachConfig struct {
// Tells the attach copier that the stream's stdin is a TTY and to look for
// escape sequences in stdin to detach from the stream.
// When true the escape sequence is not passed to the underlying stream
TTY bool
// Specifies the detach keys the client will be using
// Only useful when `TTY` is true
DetachKeys []byte
// CloseStdin signals that once done, stdin for the attached stream should be closed
// For example, this would close the attached container's stdin.
CloseStdin bool
// UseStd* indicate whether the client has requested to be connected to the
// given stream or not. These flags are used instead of checking Std* != nil
// at points before the client streams Std* are wired up.
UseStdin, UseStdout, UseStderr bool
// CStd* are the streams directly connected to the container
CStdin io.WriteCloser
CStdout, CStderr io.ReadCloser
// Provide client streams to wire up to
Stdin io.ReadCloser
Stdout, Stderr io.Writer
}
// AttachStreams attaches the container's streams to the AttachConfig
func (c *Config) AttachStreams(cfg *AttachConfig) {
if cfg.UseStdin {
cfg.CStdin = c.StdinPipe()
}
if cfg.UseStdout {
cfg.CStdout = c.StdoutPipe()
}
if cfg.UseStderr {
cfg.CStderr = c.StderrPipe()
}
}
// CopyStreams starts goroutines to copy data in and out to/from the container
func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error {
var (
wg sync.WaitGroup
errors = make(chan error, 3)
)
if cfg.Stdin != nil {
wg.Add(1)
}
if cfg.Stdout != nil {
wg.Add(1)
}
if cfg.Stderr != nil {
wg.Add(1)
}
// Connect stdin of container to the attach stdin stream.
go func() {
if cfg.Stdin == nil {
return
}
logrus.Debug("attach: stdin: begin")
var err error
if cfg.TTY {
_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
} else {
_, err = pools.Copy(cfg.CStdin, cfg.Stdin)
}
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
logrus.Errorf("attach: stdin: %s", err)
errors <- err
}
if cfg.CloseStdin && !cfg.TTY {
cfg.CStdin.Close()
} else {
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
}
logrus.Debug("attach: stdin: end")
wg.Done()
}()
attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
if stream == nil {
return
}
logrus.Debugf("attach: %s: begin", name)
_, err := pools.Copy(stream, streamPipe)
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
logrus.Errorf("attach: %s: %v", name, err)
errors <- err
}
// Make sure stdin gets closed
if cfg.Stdin != nil {
cfg.Stdin.Close()
}
streamPipe.Close()
logrus.Debugf("attach: %s: end", name)
wg.Done()
}
go attachStream("stdout", cfg.Stdout, cfg.CStdout)
go attachStream("stderr", cfg.Stderr, cfg.CStderr)
errs := make(chan error, 1)
go func() {
defer close(errs)
errs <- func() error {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// close all pipes
if cfg.CStdin != nil {
cfg.CStdin.Close()
}
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
<-done
}
close(errors)
for err := range errors {
if err != nil {
return err
}
}
return nil
}()
}()
return errs
}
func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {
if len(keys) == 0 {
keys = defaultEscapeSequence
}
pr := term.NewEscapeProxy(src, keys)
defer src.Close()
return pools.Copy(dst, pr)
}