1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #28410 from cpuguy83/move_streamconfig

Move `StreamConfig` out of `runconfig`
This commit is contained in:
Justin Cormack 2016-11-14 21:50:52 +00:00 committed by GitHub
commit 65835bfaa6
7 changed files with 92 additions and 66 deletions

View file

@ -19,6 +19,7 @@ import (
containertypes "github.com/docker/docker/api/types/container" containertypes "github.com/docker/docker/api/types/container"
mounttypes "github.com/docker/docker/api/types/mount" mounttypes "github.com/docker/docker/api/types/mount"
networktypes "github.com/docker/docker/api/types/network" networktypes "github.com/docker/docker/api/types/network"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/daemon/exec" "github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/daemon/logger/jsonfilelog"
@ -65,7 +66,7 @@ func (DetachError) Error() string {
// CommonContainer holds the fields for a container which are // CommonContainer holds the fields for a container which are
// applicable across all platforms supported by the daemon. // applicable across all platforms supported by the daemon.
type CommonContainer struct { type CommonContainer struct {
*runconfig.StreamConfig StreamConfig *stream.Config
// embed for Container to support states directly. // embed for Container to support states directly.
*State `json:"State"` // Needed for remote api version <= 1.11 *State `json:"State"` // Needed for remote api version <= 1.11
Root string `json:"-"` // Path to the "home" of the container, including metadata. Root string `json:"-"` // Path to the "home" of the container, including metadata.
@ -109,7 +110,7 @@ func NewBaseContainer(id, root string) *Container {
ExecCommands: exec.NewStore(), ExecCommands: exec.NewStore(),
Root: root, Root: root,
MountPoints: make(map[string]*volume.MountPoint), MountPoints: make(map[string]*volume.MountPoint),
StreamConfig: runconfig.NewStreamConfig(), StreamConfig: stream.NewConfig(),
attachContext: &attachContext{}, attachContext: &attachContext{},
}, },
} }
@ -377,7 +378,7 @@ func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr
// AttachStreams connects streams to a TTY. // AttachStreams connects streams to a TTY.
// Used by exec too. Should this move somewhere else? // Used by exec too. Should this move somewhere else?
func AttachStreams(ctx context.Context, streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error { func AttachStreams(ctx context.Context, streamConfig *stream.Config, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
var ( var (
cStdout, cStderr io.ReadCloser cStdout, cStderr io.ReadCloser
cStdin io.WriteCloser cStdin io.WriteCloser
@ -1064,6 +1065,26 @@ func (container *Container) startLogging() error {
return nil return nil
} }
// StdinPipe gets the stdin stream of the container
func (container *Container) StdinPipe() io.WriteCloser {
return container.StreamConfig.StdinPipe()
}
// StdoutPipe gets the stdout stream of the container
func (container *Container) StdoutPipe() io.ReadCloser {
return container.StreamConfig.StdoutPipe()
}
// StderrPipe gets the stderr stream of the container
func (container *Container) StderrPipe() io.ReadCloser {
return container.StreamConfig.StderrPipe()
}
// CloseStreams closes the container's stdio streams
func (container *Container) CloseStreams() error {
return container.StreamConfig.CloseStreams()
}
// InitializeStdio is called by libcontainerd to connect the stdio. // InitializeStdio is called by libcontainerd to connect the stdio.
func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error { func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
if err := container.startLogging(); err != nil { if err := container.startLogging(); err != nil {
@ -1073,7 +1094,7 @@ func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
container.StreamConfig.CopyToPipe(iop) container.StreamConfig.CopyToPipe(iop)
if container.Stdin() == nil && !container.Config.Tty { if container.StreamConfig.Stdin() == nil && !container.Config.Tty {
if iop.Stdin != nil { if iop.Stdin != nil {
if err := iop.Stdin.Close(); err != nil { if err := iop.Stdin.Close(); err != nil {
logrus.Warnf("error closing stdin: %+v", err) logrus.Warnf("error closing stdin: %+v", err)

View file

@ -23,7 +23,7 @@ func (container *Container) Reset(lock bool) {
// Re-create a brand new stdin pipe once the container exited // Re-create a brand new stdin pipe once the container exited
if container.Config.OpenStdin { if container.Config.OpenStdin {
container.NewInputPipes() container.StreamConfig.NewInputPipes()
} }
if container.LogDriver != nil { if container.LogDriver != nil {

View file

@ -1,4 +1,4 @@
package runconfig package stream
import ( import (
"fmt" "fmt"
@ -14,16 +14,16 @@ import (
"github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pools"
) )
// StreamConfig holds information about I/O streams managed together. // Config holds information about I/O streams managed together.
// //
// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data // config.StdinPipe returns a WriteCloser which can be used to feed data
// to the standard input of the streamConfig's active process. // to the standard input of the streamConfig's active process.
// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser // config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
// which can be used to retrieve the standard output (and error) generated // which can be used to retrieve the standard output (and error) generated
// by the container's active process. The output (and error) are actually // by the container's active process. The output (and error) are actually
// copied and delivered to all StdoutPipe and StderrPipe consumers, using // copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster". // a kind of "broadcaster".
type StreamConfig struct { type Config struct {
sync.WaitGroup sync.WaitGroup
stdout *broadcaster.Unbuffered stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered stderr *broadcaster.Unbuffered
@ -31,76 +31,76 @@ type StreamConfig struct {
stdinPipe io.WriteCloser stdinPipe io.WriteCloser
} }
// NewStreamConfig creates a stream config and initializes // NewConfig creates a stream config and initializes
// the standard err and standard out to new unbuffered broadcasters. // the standard err and standard out to new unbuffered broadcasters.
func NewStreamConfig() *StreamConfig { func NewConfig() *Config {
return &StreamConfig{ return &Config{
stderr: new(broadcaster.Unbuffered), stderr: new(broadcaster.Unbuffered),
stdout: new(broadcaster.Unbuffered), stdout: new(broadcaster.Unbuffered),
} }
} }
// Stdout returns the standard output in the configuration. // Stdout returns the standard output in the configuration.
func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered { func (c *Config) Stdout() *broadcaster.Unbuffered {
return streamConfig.stdout return c.stdout
} }
// Stderr returns the standard error in the configuration. // Stderr returns the standard error in the configuration.
func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered { func (c *Config) Stderr() *broadcaster.Unbuffered {
return streamConfig.stderr return c.stderr
} }
// Stdin returns the standard input in the configuration. // Stdin returns the standard input in the configuration.
func (streamConfig *StreamConfig) Stdin() io.ReadCloser { func (c *Config) Stdin() io.ReadCloser {
return streamConfig.stdin return c.stdin
} }
// StdinPipe returns an input writer pipe as an io.WriteCloser. // StdinPipe returns an input writer pipe as an io.WriteCloser.
func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser { func (c *Config) StdinPipe() io.WriteCloser {
return streamConfig.stdinPipe return c.stdinPipe
} }
// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe. // StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new out pipe to the Stdout broadcaster. // It adds this new out pipe to the Stdout broadcaster.
func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser { func (c *Config) StdoutPipe() io.ReadCloser {
bytesPipe := ioutils.NewBytesPipe() bytesPipe := ioutils.NewBytesPipe()
streamConfig.stdout.Add(bytesPipe) c.stdout.Add(bytesPipe)
return bytesPipe return bytesPipe
} }
// StderrPipe creates a new io.ReadCloser with an empty bytes pipe. // StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new err pipe to the Stderr broadcaster. // It adds this new err pipe to the Stderr broadcaster.
func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser { func (c *Config) StderrPipe() io.ReadCloser {
bytesPipe := ioutils.NewBytesPipe() bytesPipe := ioutils.NewBytesPipe()
streamConfig.stderr.Add(bytesPipe) c.stderr.Add(bytesPipe)
return bytesPipe return bytesPipe
} }
// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe. // NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
func (streamConfig *StreamConfig) NewInputPipes() { func (c *Config) NewInputPipes() {
streamConfig.stdin, streamConfig.stdinPipe = io.Pipe() c.stdin, c.stdinPipe = io.Pipe()
} }
// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input. // NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
func (streamConfig *StreamConfig) NewNopInputPipe() { func (c *Config) NewNopInputPipe() {
streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) c.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
} }
// CloseStreams ensures that the configured streams are properly closed. // CloseStreams ensures that the configured streams are properly closed.
func (streamConfig *StreamConfig) CloseStreams() error { func (c *Config) CloseStreams() error {
var errors []string var errors []string
if streamConfig.stdin != nil { if c.stdin != nil {
if err := streamConfig.stdin.Close(); err != nil { if err := c.stdin.Close(); err != nil {
errors = append(errors, fmt.Sprintf("error close stdin: %s", err)) errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
} }
} }
if err := streamConfig.stdout.Clean(); err != nil { if err := c.stdout.Clean(); err != nil {
errors = append(errors, fmt.Sprintf("error close stdout: %s", err)) errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
} }
if err := streamConfig.stderr.Clean(); err != nil { if err := c.stderr.Clean(); err != nil {
errors = append(errors, fmt.Sprintf("error close stderr: %s", err)) errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
} }
@ -112,25 +112,25 @@ func (streamConfig *StreamConfig) CloseStreams() error {
} }
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe // CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) { func (c *Config) CopyToPipe(iop libcontainerd.IOPipe) {
copyFunc := func(w io.Writer, r io.Reader) { copyFunc := func(w io.Writer, r io.Reader) {
streamConfig.Add(1) c.Add(1)
go func() { go func() {
if _, err := pools.Copy(w, r); err != nil { if _, err := pools.Copy(w, r); err != nil {
logrus.Errorf("stream copy error: %+v", err) logrus.Errorf("stream copy error: %+v", err)
} }
streamConfig.Done() c.Done()
}() }()
} }
if iop.Stdout != nil { if iop.Stdout != nil {
copyFunc(streamConfig.Stdout(), iop.Stdout) copyFunc(c.Stdout(), iop.Stdout)
} }
if iop.Stderr != nil { if iop.Stderr != nil {
copyFunc(streamConfig.Stderr(), iop.Stderr) copyFunc(c.Stderr(), iop.Stderr)
} }
if stdin := streamConfig.Stdin(); stdin != nil { if stdin := c.Stdin(); stdin != nil {
if iop.Stdin != nil { if iop.Stdin != nil {
go func() { go func() {
pools.Copy(iop.Stdin, stdin) pools.Copy(iop.Stdin, stdin)

View file

@ -91,9 +91,9 @@ func (daemon *Daemon) load(id string) (*container.Container, error) {
func (daemon *Daemon) Register(c *container.Container) error { func (daemon *Daemon) Register(c *container.Container) error {
// Attach to stdout and stderr // Attach to stdout and stderr
if c.Config.OpenStdin { if c.Config.OpenStdin {
c.NewInputPipes() c.StreamConfig.NewInputPipes()
} else { } else {
c.NewNopInputPipe() c.StreamConfig.NewNopInputPipe()
} }
daemon.containers.Add(c.ID, c) daemon.containers.Add(c.ID, c)

View file

@ -195,9 +195,9 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R
} }
if ec.OpenStdin { if ec.OpenStdin {
ec.NewInputPipes() ec.StreamConfig.NewInputPipes()
} else { } else {
ec.NewNopInputPipe() ec.StreamConfig.NewNopInputPipe()
} }
p := libcontainerd.Process{ p := libcontainerd.Process{

View file

@ -5,9 +5,9 @@ import (
"sync" "sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/libcontainerd" "github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/runconfig"
) )
// Config holds the configurations for execs. The Daemon keeps // Config holds the configurations for execs. The Daemon keeps
@ -15,30 +15,30 @@ import (
// examined both during and after completion. // examined both during and after completion.
type Config struct { type Config struct {
sync.Mutex sync.Mutex
*runconfig.StreamConfig StreamConfig *stream.Config
ID string ID string
Running bool Running bool
ExitCode *int ExitCode *int
OpenStdin bool OpenStdin bool
OpenStderr bool OpenStderr bool
OpenStdout bool OpenStdout bool
CanRemove bool CanRemove bool
ContainerID string ContainerID string
DetachKeys []byte DetachKeys []byte
Entrypoint string Entrypoint string
Args []string Args []string
Tty bool Tty bool
Privileged bool Privileged bool
User string User string
Env []string Env []string
Pid int Pid int
} }
// NewConfig initializes the a new exec configuration // NewConfig initializes the a new exec configuration
func NewConfig() *Config { func NewConfig() *Config {
return &Config{ return &Config{
ID: stringid.GenerateNonCryptoID(), ID: stringid.GenerateNonCryptoID(),
StreamConfig: runconfig.NewStreamConfig(), StreamConfig: stream.NewConfig(),
} }
} }
@ -46,7 +46,7 @@ func NewConfig() *Config {
func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error { func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
c.StreamConfig.CopyToPipe(iop) c.StreamConfig.CopyToPipe(iop)
if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
if iop.Stdin != nil { if iop.Stdin != nil {
if err := iop.Stdin.Close(); err != nil { if err := iop.Stdin.Close(); err != nil {
logrus.Errorf("error closing exec stdin: %+v", err) logrus.Errorf("error closing exec stdin: %+v", err)
@ -57,6 +57,11 @@ func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
return nil return nil
} }
// CloseStreams closes the stdio streams for the exec
func (c *Config) CloseStreams() error {
return c.StreamConfig.CloseStreams()
}
// Store keeps track of the exec configurations. // Store keeps track of the exec configurations.
type Store struct { type Store struct {
commands map[string]*Config commands map[string]*Config

View file

@ -39,7 +39,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
} }
c.Lock() c.Lock()
c.Wait() c.StreamConfig.Wait()
c.Reset(false) c.Reset(false)
restart, wait, err := c.RestartManager().ShouldRestart(e.ExitCode, false, time.Since(c.StartedAt)) restart, wait, err := c.RestartManager().ShouldRestart(e.ExitCode, false, time.Since(c.StartedAt))
@ -88,7 +88,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
defer execConfig.Unlock() defer execConfig.Unlock()
execConfig.ExitCode = &ec execConfig.ExitCode = &ec
execConfig.Running = false execConfig.Running = false
execConfig.Wait() execConfig.StreamConfig.Wait()
if err := execConfig.CloseStreams(); err != nil { if err := execConfig.CloseStreams(); err != nil {
logrus.Errorf("%s: %s", c.ID, err) logrus.Errorf("%s: %s", c.ID, err)
} }