diff --git a/container/container.go b/container/container.go index 2e033dffaa..77d978070b 100644 --- a/container/container.go +++ b/container/container.go @@ -19,6 +19,7 @@ import ( containertypes "github.com/docker/docker/api/types/container" mounttypes "github.com/docker/docker/api/types/mount" networktypes "github.com/docker/docker/api/types/network" + "github.com/docker/docker/container/stream" "github.com/docker/docker/daemon/exec" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/jsonfilelog" @@ -65,7 +66,7 @@ func (DetachError) Error() string { // CommonContainer holds the fields for a container which are // applicable across all platforms supported by the daemon. type CommonContainer struct { - *runconfig.StreamConfig + StreamConfig *stream.Config // embed for Container to support states directly. *State `json:"State"` // Needed for remote api version <= 1.11 Root string `json:"-"` // Path to the "home" of the container, including metadata. @@ -109,7 +110,7 @@ func NewBaseContainer(id, root string) *Container { ExecCommands: exec.NewStore(), Root: root, MountPoints: make(map[string]*volume.MountPoint), - StreamConfig: runconfig.NewStreamConfig(), + StreamConfig: stream.NewConfig(), attachContext: &attachContext{}, }, } @@ -377,7 +378,7 @@ func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr // AttachStreams connects streams to a TTY. // Used by exec too. Should this move somewhere else? -func AttachStreams(ctx context.Context, streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { +func AttachStreams(ctx context.Context, streamConfig *stream.Config, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { var ( cStdout, cStderr io.ReadCloser cStdin io.WriteCloser @@ -1064,6 +1065,26 @@ func (container *Container) startLogging() error { return nil } +// StdinPipe gets the stdin stream of the container +func (container *Container) StdinPipe() io.WriteCloser { + return container.StreamConfig.StdinPipe() +} + +// StdoutPipe gets the stdout stream of the container +func (container *Container) StdoutPipe() io.ReadCloser { + return container.StreamConfig.StdoutPipe() +} + +// StderrPipe gets the stderr stream of the container +func (container *Container) StderrPipe() io.ReadCloser { + return container.StreamConfig.StderrPipe() +} + +// CloseStreams closes the container's stdio streams +func (container *Container) CloseStreams() error { + return container.StreamConfig.CloseStreams() +} + // InitializeStdio is called by libcontainerd to connect the stdio. func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error { if err := container.startLogging(); err != nil { @@ -1073,7 +1094,7 @@ func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error { container.StreamConfig.CopyToPipe(iop) - if container.Stdin() == nil && !container.Config.Tty { + if container.StreamConfig.Stdin() == nil && !container.Config.Tty { if iop.Stdin != nil { if err := iop.Stdin.Close(); err != nil { logrus.Warnf("error closing stdin: %+v", err) diff --git a/container/monitor.go b/container/monitor.go index 6a7ece654d..f05e72b25f 100644 --- a/container/monitor.go +++ b/container/monitor.go @@ -23,7 +23,7 @@ func (container *Container) Reset(lock bool) { // Re-create a brand new stdin pipe once the container exited if container.Config.OpenStdin { - container.NewInputPipes() + container.StreamConfig.NewInputPipes() } if container.LogDriver != nil { diff --git a/runconfig/streams.go b/container/stream/streams.go similarity index 60% rename from runconfig/streams.go rename to container/stream/streams.go index 36d0810b00..a45e31f211 100644 --- a/runconfig/streams.go +++ b/container/stream/streams.go @@ -1,4 +1,4 @@ -package runconfig +package stream import ( "fmt" @@ -14,16 +14,16 @@ import ( "github.com/docker/docker/pkg/pools" ) -// StreamConfig holds information about I/O streams managed together. +// Config holds information about I/O streams managed together. // -// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data +// config.StdinPipe returns a WriteCloser which can be used to feed data // to the standard input of the streamConfig's active process. -// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser +// config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser // which can be used to retrieve the standard output (and error) generated // by the container's active process. The output (and error) are actually // copied and delivered to all StdoutPipe and StderrPipe consumers, using // a kind of "broadcaster". -type StreamConfig struct { +type Config struct { sync.WaitGroup stdout *broadcaster.Unbuffered stderr *broadcaster.Unbuffered @@ -31,76 +31,76 @@ type StreamConfig struct { stdinPipe io.WriteCloser } -// NewStreamConfig creates a stream config and initializes +// NewConfig creates a stream config and initializes // the standard err and standard out to new unbuffered broadcasters. -func NewStreamConfig() *StreamConfig { - return &StreamConfig{ +func NewConfig() *Config { + return &Config{ stderr: new(broadcaster.Unbuffered), stdout: new(broadcaster.Unbuffered), } } // Stdout returns the standard output in the configuration. -func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered { - return streamConfig.stdout +func (c *Config) Stdout() *broadcaster.Unbuffered { + return c.stdout } // Stderr returns the standard error in the configuration. -func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered { - return streamConfig.stderr +func (c *Config) Stderr() *broadcaster.Unbuffered { + return c.stderr } // Stdin returns the standard input in the configuration. -func (streamConfig *StreamConfig) Stdin() io.ReadCloser { - return streamConfig.stdin +func (c *Config) Stdin() io.ReadCloser { + return c.stdin } // StdinPipe returns an input writer pipe as an io.WriteCloser. -func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser { - return streamConfig.stdinPipe +func (c *Config) StdinPipe() io.WriteCloser { + return c.stdinPipe } // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe. // It adds this new out pipe to the Stdout broadcaster. -func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser { +func (c *Config) StdoutPipe() io.ReadCloser { bytesPipe := ioutils.NewBytesPipe() - streamConfig.stdout.Add(bytesPipe) + c.stdout.Add(bytesPipe) return bytesPipe } // StderrPipe creates a new io.ReadCloser with an empty bytes pipe. // It adds this new err pipe to the Stderr broadcaster. -func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser { +func (c *Config) StderrPipe() io.ReadCloser { bytesPipe := ioutils.NewBytesPipe() - streamConfig.stderr.Add(bytesPipe) + c.stderr.Add(bytesPipe) return bytesPipe } // NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe. -func (streamConfig *StreamConfig) NewInputPipes() { - streamConfig.stdin, streamConfig.stdinPipe = io.Pipe() +func (c *Config) NewInputPipes() { + c.stdin, c.stdinPipe = io.Pipe() } // NewNopInputPipe creates a new input pipe that will silently drop all messages in the input. -func (streamConfig *StreamConfig) NewNopInputPipe() { - streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) +func (c *Config) NewNopInputPipe() { + c.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) } // CloseStreams ensures that the configured streams are properly closed. -func (streamConfig *StreamConfig) CloseStreams() error { +func (c *Config) CloseStreams() error { var errors []string - if streamConfig.stdin != nil { - if err := streamConfig.stdin.Close(); err != nil { + if c.stdin != nil { + if err := c.stdin.Close(); err != nil { errors = append(errors, fmt.Sprintf("error close stdin: %s", err)) } } - if err := streamConfig.stdout.Clean(); err != nil { + if err := c.stdout.Clean(); err != nil { errors = append(errors, fmt.Sprintf("error close stdout: %s", err)) } - if err := streamConfig.stderr.Clean(); err != nil { + if err := c.stderr.Clean(); err != nil { errors = append(errors, fmt.Sprintf("error close stderr: %s", err)) } @@ -112,25 +112,25 @@ func (streamConfig *StreamConfig) CloseStreams() error { } // CopyToPipe connects streamconfig with a libcontainerd.IOPipe -func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) { +func (c *Config) CopyToPipe(iop libcontainerd.IOPipe) { copyFunc := func(w io.Writer, r io.Reader) { - streamConfig.Add(1) + c.Add(1) go func() { if _, err := pools.Copy(w, r); err != nil { logrus.Errorf("stream copy error: %+v", err) } - streamConfig.Done() + c.Done() }() } if iop.Stdout != nil { - copyFunc(streamConfig.Stdout(), iop.Stdout) + copyFunc(c.Stdout(), iop.Stdout) } if iop.Stderr != nil { - copyFunc(streamConfig.Stderr(), iop.Stderr) + copyFunc(c.Stderr(), iop.Stderr) } - if stdin := streamConfig.Stdin(); stdin != nil { + if stdin := c.Stdin(); stdin != nil { if iop.Stdin != nil { go func() { pools.Copy(iop.Stdin, stdin) diff --git a/daemon/container.go b/daemon/container.go index d171d27414..d27ed27dbf 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -91,9 +91,9 @@ func (daemon *Daemon) load(id string) (*container.Container, error) { func (daemon *Daemon) Register(c *container.Container) error { // Attach to stdout and stderr if c.Config.OpenStdin { - c.NewInputPipes() + c.StreamConfig.NewInputPipes() } else { - c.NewNopInputPipe() + c.StreamConfig.NewNopInputPipe() } daemon.containers.Add(c.ID, c) diff --git a/daemon/exec.go b/daemon/exec.go index 6e04dc2b0a..b9cc590674 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -195,9 +195,9 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R } if ec.OpenStdin { - ec.NewInputPipes() + ec.StreamConfig.NewInputPipes() } else { - ec.NewNopInputPipe() + ec.StreamConfig.NewNopInputPipe() } p := libcontainerd.Process{ diff --git a/daemon/exec/exec.go b/daemon/exec/exec.go index 093eeef6bd..933136f965 100644 --- a/daemon/exec/exec.go +++ b/daemon/exec/exec.go @@ -5,9 +5,9 @@ import ( "sync" "github.com/Sirupsen/logrus" + "github.com/docker/docker/container/stream" "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/stringid" - "github.com/docker/docker/runconfig" ) // Config holds the configurations for execs. The Daemon keeps @@ -15,30 +15,30 @@ import ( // examined both during and after completion. type Config struct { sync.Mutex - *runconfig.StreamConfig - ID string - Running bool - ExitCode *int - OpenStdin bool - OpenStderr bool - OpenStdout bool - CanRemove bool - ContainerID string - DetachKeys []byte - Entrypoint string - Args []string - Tty bool - Privileged bool - User string - Env []string - Pid int + StreamConfig *stream.Config + ID string + Running bool + ExitCode *int + OpenStdin bool + OpenStderr bool + OpenStdout bool + CanRemove bool + ContainerID string + DetachKeys []byte + Entrypoint string + Args []string + Tty bool + Privileged bool + User string + Env []string + Pid int } // NewConfig initializes the a new exec configuration func NewConfig() *Config { return &Config{ ID: stringid.GenerateNonCryptoID(), - StreamConfig: runconfig.NewStreamConfig(), + StreamConfig: stream.NewConfig(), } } @@ -46,7 +46,7 @@ func NewConfig() *Config { func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error { c.StreamConfig.CopyToPipe(iop) - if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { + if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { if iop.Stdin != nil { if err := iop.Stdin.Close(); err != nil { logrus.Errorf("error closing exec stdin: %+v", err) @@ -57,6 +57,11 @@ func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error { return nil } +// CloseStreams closes the stdio streams for the exec +func (c *Config) CloseStreams() error { + return c.StreamConfig.CloseStreams() +} + // Store keeps track of the exec configurations. type Store struct { commands map[string]*Config diff --git a/daemon/monitor.go b/daemon/monitor.go index aaffc2183b..ee0d1fcce0 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -39,7 +39,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { } c.Lock() - c.Wait() + c.StreamConfig.Wait() c.Reset(false) restart, wait, err := c.RestartManager().ShouldRestart(e.ExitCode, false, time.Since(c.StartedAt)) @@ -88,7 +88,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { defer execConfig.Unlock() execConfig.ExitCode = &ec execConfig.Running = false - execConfig.Wait() + execConfig.StreamConfig.Wait() if err := execConfig.CloseStreams(); err != nil { logrus.Errorf("%s: %s", c.ID, err) }