diff --git a/container/container.go b/container/container.go index 0043c7091c..e39083bce9 100644 --- a/container/container.go +++ b/container/container.go @@ -25,6 +25,7 @@ import ( "github.com/docker/docker/daemon/network" "github.com/docker/docker/image" "github.com/docker/docker/layer" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/promise" @@ -1012,3 +1013,46 @@ func (container *Container) CancelAttachContext() { } container.attachContext.mu.Unlock() } + +func (container *Container) startLogging() error { + if container.HostConfig.LogConfig.Type == "none" { + return nil // do not start logging routines + } + + l, err := container.StartLogger(container.HostConfig.LogConfig) + if err != nil { + return fmt.Errorf("Failed to initialize logging driver: %v", err) + } + + copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) + container.LogCopier = copier + copier.Run() + container.LogDriver = l + + // set LogPath field only for json-file logdriver + if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { + container.LogPath = jl.LogPath() + } + + return nil +} + +// InitializeStdio is called by libcontainerd to connect the stdio. +func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error { + if err := container.startLogging(); err != nil { + container.Reset(false) + return err + } + + container.StreamConfig.CopyToPipe(iop) + + if container.Stdin() == nil && !container.Config.Tty { + if iop.Stdin != nil { + if err := iop.Stdin.Close(); err != nil { + logrus.Error("error closing stdin: %+v", err) + } + } + } + + return nil +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 8c64facce2..f2e02ac287 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -185,7 +185,7 @@ func (daemon *Daemon) restore() error { if c.IsRunning() || c.IsPaused() { c.RestartManager().Cancel() // manually start containers because some need to wait for swarm networking - if err := daemon.containerd.Restore(c.ID); err != nil { + if err := daemon.containerd.Restore(c.ID, c.InitializeStdio); err != nil { logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err) return } diff --git a/daemon/exec.go b/daemon/exec.go index 50fb15ccda..6e04dc2b0a 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -212,7 +212,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) - systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p) + systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio) if err != nil { return err } diff --git a/daemon/exec/exec.go b/daemon/exec/exec.go index c29e89c438..d07a883687 100644 --- a/daemon/exec/exec.go +++ b/daemon/exec/exec.go @@ -1,8 +1,11 @@ package exec import ( + "runtime" "sync" + "github.com/Sirupsen/logrus" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/runconfig" ) @@ -39,6 +42,21 @@ func NewConfig() *Config { } } +// InitializeStdio is called by libcontainerd to connect the stdio. +func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error { + c.StreamConfig.CopyToPipe(iop) + + if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { + if iop.Stdin != nil { + if err := iop.Stdin.Close(); err != nil { + logrus.Error("error closing exec stdin: %+v", err) + } + } + } + + return nil +} + // Store keeps track of the exec configurations. type Store struct { commands map[string]*Config diff --git a/daemon/logs.go b/daemon/logs.go index 0e1fdfaa5a..cc34b82083 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -14,7 +14,6 @@ import ( timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/container" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/stdcopy" ) @@ -121,30 +120,6 @@ func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, return container.StartLogger(container.HostConfig.LogConfig) } -// StartLogging initializes and starts the container logging stream. -func (daemon *Daemon) StartLogging(container *container.Container) error { - if container.HostConfig.LogConfig.Type == "none" { - return nil // do not start logging routines - } - - l, err := container.StartLogger(container.HostConfig.LogConfig) - if err != nil { - return fmt.Errorf("Failed to initialize logging driver: %v", err) - } - - copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) - container.LogCopier = copier - copier.Run() - container.LogDriver = l - - // set LogPath field only for json-file logdriver - if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { - container.LogPath = jl.LogPath() - } - - return nil -} - // mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified. func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error { if cfg.Type == "" { diff --git a/daemon/monitor.go b/daemon/monitor.go index a5e779d976..49f9c49ea2 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -3,17 +3,14 @@ package daemon import ( "errors" "fmt" - "io" "runtime" "strconv" "time" "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types" - "github.com/docker/docker/daemon/exec" "github.com/docker/docker/libcontainerd" "github.com/docker/docker/restartmanager" - "github.com/docker/docker/runconfig" ) // StateChanged updates daemon state changes from containerd @@ -133,69 +130,3 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } - -// AttachStreams is called by libcontainerd to connect the stdio. -func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error { - var ( - s *runconfig.StreamConfig - ec *exec.Config - ) - - c := daemon.containers.Get(id) - if c == nil { - var err error - ec, err = daemon.getExecConfig(id) - if err != nil { - return fmt.Errorf("no such exec/container: %s", id) - } - s = ec.StreamConfig - } else { - s = c.StreamConfig - if err := daemon.StartLogging(c); err != nil { - c.Reset(false) - return err - } - } - - copyFunc := func(w io.Writer, r io.Reader) { - s.Add(1) - go func() { - if _, err := io.Copy(w, r); err != nil { - logrus.Errorf("%v stream copy error: %v", id, err) - } - s.Done() - }() - } - - if iop.Stdout != nil { - copyFunc(s.Stdout(), iop.Stdout) - } - if iop.Stderr != nil { - copyFunc(s.Stderr(), iop.Stderr) - } - - if stdin := s.Stdin(); stdin != nil { - if iop.Stdin != nil { - go func() { - io.Copy(iop.Stdin, stdin) - if err := iop.Stdin.Close(); err != nil { - logrus.Error(err) - } - }() - } - } else { - //TODO(swernli): On Windows, not closing stdin when no tty is requested by the exec Config - // results in a hang. We should re-evaluate generalizing this fix for all OSes if - // we can determine that is the right thing to do more generally. - if (c != nil && !c.Config.Tty) || (ec != nil && !ec.Tty && runtime.GOOS == "windows") { - // tty is enabled, so dont close containerd's iopipe stdin. - if iop.Stdin != nil { - if err := iop.Stdin.Close(); err != nil { - logrus.Error(err) - } - } - } - } - - return nil -} diff --git a/daemon/monitor_windows.go b/daemon/monitor_windows.go index c5319e8876..9648b1b415 100644 --- a/daemon/monitor_windows.go +++ b/daemon/monitor_windows.go @@ -37,7 +37,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon // Create a new servicing container, which will start, complete the update, and merge back the // results if it succeeded, all as part of the below function call. - if err := daemon.containerd.Create((container.ID + "_servicing"), "", "", *spec, newOpts...); err != nil { + if err := daemon.containerd.Create((container.ID + "_servicing"), "", "", *spec, container.InitializeStdio, newOpts...); err != nil { container.SetExitCode(-1) return fmt.Errorf("Post-run update servicing failed: %s", err) } diff --git a/daemon/start.go b/daemon/start.go index c38cddfb3a..2cde100702 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -149,7 +149,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint container.ResetRestartManager(true) } - if err := daemon.containerd.Create(container.ID, checkpoint, container.CheckpointDir(), *spec, createOptions...); err != nil { + if err := daemon.containerd.Create(container.ID, checkpoint, container.CheckpointDir(), *spec, container.InitializeStdio, createOptions...); err != nil { errDesc := grpc.ErrorDesc(err) logrus.Errorf("Create container failed with error: %s", errDesc) // if we receive an internal error from the initial start of a container then lets diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index a315e9a577..90e6d4cc2d 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -34,7 +34,7 @@ type client struct { // AddProcess is the handler for adding a process to an already running // container. It's called through docker exec. It returns the system pid of the // exec'd process. -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) (int, error) { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (int, error) { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -116,14 +116,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly container.processes[processFriendlyName] = p - clnt.unlock(containerID) - - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(*iopipe); err != nil { p.closeFifos(iopipe) return -1, err } - clnt.lock(containerID) return int(resp.SystemPid), nil } @@ -153,7 +149,7 @@ func (clnt *client) prepareBundleDir(uid, gid int) (string, error) { return p, nil } -func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) (err error) { +func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) (err error) { clnt.lock(containerID) defer clnt.unlock(containerID) @@ -195,7 +191,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir return err } - return container.start(checkpoint, checkpointDir) + return container.start(checkpoint, checkpointDir, attachStdio) } func (clnt *client) Signal(containerID string, sig int) error { @@ -404,7 +400,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier { return w } -func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) { +func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) { clnt.lock(cont.Id) defer clnt.unlock(cont.Id) @@ -445,7 +441,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev return err }) - if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { container.closeFifos(iopipe) return err } @@ -537,7 +533,7 @@ func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) return ev, err } -func (clnt *client) Restore(containerID string, options ...CreateOption) error { +func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error { // Synchronize with live events clnt.remote.Lock() defer clnt.remote.Unlock() @@ -585,7 +581,7 @@ func (clnt *client) Restore(containerID string, options ...CreateOption) error { // container is still alive if clnt.liveRestore { - if err := clnt.restore(cont, ev, options...); err != nil { + if err := clnt.restore(cont, ev, attachStdio, options...); err != nil { logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err) } return nil diff --git a/libcontainerd/client_windows.go b/libcontainerd/client_windows.go index 9e3e1043a0..eca9b2d9fa 100644 --- a/libcontainerd/client_windows.go +++ b/libcontainerd/client_windows.go @@ -94,7 +94,7 @@ const defaultOwner = "docker" // }, // "Servicing": false //} -func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) error { +func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) error { clnt.lock(containerID) defer clnt.unlock(containerID) logrus.Debugln("libcontainerd: client.Create() with spec", spec) @@ -253,7 +253,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir // internal structure, start will keep HCS in sync by deleting the // container there. logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID) - if err := container.start(); err != nil { + if err := container.start(attachStdio); err != nil { clnt.deleteContainer(containerID) return err } @@ -266,7 +266,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir // AddProcess is the handler for adding a process to an already running // container. It's called through docker exec. It returns the system pid of the // exec'd process. -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) (int, error) { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio StdioCallback) (int, error) { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -343,18 +343,11 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly // Add the process to the container's list of processes container.processes[processFriendlyName] = proc - // Make sure the lock is not held while calling back into the daemon - clnt.unlock(containerID) - // Tell the engine to attach streams back to the client - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(*iopipe); err != nil { return -1, err } - // Lock again so that the defer unlock doesn't fail. (I really don't like this code) - clnt.lock(containerID) - // Spin up a go routine waiting for exit to handle cleanup go container.waitExit(proc, false) @@ -544,7 +537,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) { } // Restore is the handler for restoring a container -func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error { +func (clnt *client) Restore(containerID string, _ StdioCallback, unusedOnWindows ...CreateOption) error { // TODO Windows: Implement this. For now, just tell the backend the container exited. logrus.Debugf("libcontainerd: Restore(%s)", containerID) return clnt.backend.StateChanged(containerID, StateInfo{ diff --git a/libcontainerd/container_linux.go b/libcontainerd/container_linux.go index 16ca72f297..90ea44c95e 100644 --- a/libcontainerd/container_linux.go +++ b/libcontainerd/container_linux.go @@ -88,7 +88,7 @@ func (ctr *container) spec() (*specs.Spec, error) { return &spec, nil } -func (ctr *container) start(checkpoint string, checkpointDir string) error { +func (ctr *container) start(checkpoint string, checkpointDir string, attachStdio StdioCallback) error { spec, err := ctr.spec() if err != nil { return nil @@ -107,7 +107,7 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error { // we need to delay stdin closure after container start or else "stdin close" // event will be rejected by containerd. - // stdin closure happens in AttachStreams + // stdin closure happens in attachStdio stdin := iopipe.Stdin iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { var err error @@ -141,7 +141,7 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error { } ctr.client.appendContainer(ctr) - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { ctr.closeFifos(iopipe) return err } diff --git a/libcontainerd/container_windows.go b/libcontainerd/container_windows.go index 5f7ddb4939..21fb6ac835 100644 --- a/libcontainerd/container_windows.go +++ b/libcontainerd/container_windows.go @@ -40,7 +40,7 @@ func (ctr *container) newProcess(friendlyName string) *process { // start starts a created container. // Caller needs to lock container ID before calling this method. -func (ctr *container) start() error { +func (ctr *container) start(attachStdio StdioCallback) error { var err error isServicing := false @@ -147,7 +147,7 @@ func (ctr *container) start() error { ctr.client.appendContainer(ctr) - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { // OK to return the error here, as waitExit will handle tear-down in HCS return err } diff --git a/libcontainerd/types.go b/libcontainerd/types.go index f808a08151..e2d3df6e7a 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -31,19 +31,18 @@ type CommonStateInfo struct { // FIXME: event? // Backend defines callbacks that the client of the library needs to implement. type Backend interface { StateChanged(containerID string, state StateInfo) error - AttachStreams(processFriendlyName string, io IOPipe) error } // Client provides access to containerd features. type Client interface { - Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) error + Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) error Signal(containerID string, sig int) error SignalProcess(containerID string, processFriendlyName string, sig int) error - AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) (int, error) + AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio StdioCallback) (int, error) Resize(containerID, processFriendlyName string, width, height int) error Pause(containerID string) error Resume(containerID string) error - Restore(containerID string, options ...CreateOption) error + Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error Stats(containerID string) (*Stats, error) GetPidsForContainer(containerID string) ([]int, error) Summary(containerID string) ([]Summary, error) @@ -58,6 +57,9 @@ type CreateOption interface { Apply(interface{}) error } +// StdioCallback is called to connect a container or process stdio. +type StdioCallback func(IOPipe) error + // IOPipe contains the stdio streams. type IOPipe struct { Stdin io.WriteCloser diff --git a/plugin/manager.go b/plugin/manager.go index cd17e11c8b..c574aa108a 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -100,24 +100,6 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } -// AttachStreams attaches io streams to the plugin -func (pm *Manager) AttachStreams(id string, iop libcontainerd.IOPipe) error { - iop.Stdin.Close() - - logger := logrus.New() - logger.Hooks.Add(logHook{id}) - // TODO: cache writer per id - w := logger.Writer() - go func() { - io.Copy(w, iop.Stdout) - }() - go func() { - // TODO: update logrus and use logger.WriterLevel - io.Copy(w, iop.Stderr) - }() - return nil -} - func (pm *Manager) init() error { dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json")) if err != nil { @@ -169,3 +151,22 @@ func (l logHook) Fire(entry *logrus.Entry) error { entry.Data = logrus.Fields{"plugin": l.id} return nil } + +func attachToLog(id string) func(libcontainerd.IOPipe) error { + return func(iop libcontainerd.IOPipe) error { + iop.Stdin.Close() + + logger := logrus.New() + logger.Hooks.Add(logHook{id}) + // TODO: cache writer per id + w := logger.Writer() + go func() { + io.Copy(w, iop.Stdout) + }() + go func() { + // TODO: update logrus and use logger.WriterLevel + io.Copy(w, iop.Stderr) + }() + return nil + } +} diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index 14a577ddc5..a06b040f42 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -26,7 +26,7 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error { p.Lock() p.Restart = true p.Unlock() - if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec)); err != nil { + if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec), attachToLog(p.GetID())); err != nil { return err } @@ -45,7 +45,7 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error { } func (pm *Manager) restore(p *v2.Plugin) error { - return pm.containerdClient.Restore(p.GetID()) + return pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID())) } func (pm *Manager) disable(p *v2.Plugin) error { diff --git a/runconfig/streams.go b/runconfig/streams.go index 117fd89aee..8a91bde688 100644 --- a/runconfig/streams.go +++ b/runconfig/streams.go @@ -7,8 +7,11 @@ import ( "strings" "sync" + "github.com/Sirupsen/logrus" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/pools" ) // StreamConfig holds information about I/O streams managed together. @@ -107,3 +110,34 @@ func (streamConfig *StreamConfig) CloseStreams() error { return nil } + +// CopyToPipe connects streamconfig with a libcontainerd.IOPipe +func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) { + copyFunc := func(w io.Writer, r io.Reader) { + streamConfig.Add(1) + go func() { + if _, err := pools.Copy(w, r); err != nil { + logrus.Errorf("stream copy error: %+v", err) + } + streamConfig.Done() + }() + } + + if iop.Stdout != nil { + copyFunc(streamConfig.Stdout(), iop.Stdout) + } + if iop.Stderr != nil { + copyFunc(streamConfig.Stderr(), iop.Stderr) + } + + if stdin := streamConfig.Stdin(); stdin != nil { + if iop.Stdin != nil { + go func() { + pools.Copy(iop.Stdin, stdin) + if err := iop.Stdin.Close(); err != nil { + logrus.Error("failed to clise stdin: %+v", err) + } + }() + } + } +}