From e136d3ef930407c8815b72d84f57c63980df7f6d Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 17 Oct 2016 14:39:52 -0700 Subject: [PATCH] Move stdio attach from libcontainerd backend to callback Signed-off-by: Tonis Tiigi (cherry picked from commit 37a3be2449d2a314305615ffcc287a598a829dba) --- container/container.go | 44 +++++++++++++++++++++++ daemon/daemon.go | 2 +- daemon/exec.go | 2 +- daemon/exec/exec.go | 18 ++++++++++ daemon/logs.go | 25 ------------- daemon/monitor.go | 56 ------------------------------ daemon/monitor_windows.go | 2 +- daemon/start.go | 2 +- libcontainerd/client_linux.go | 24 ++++++------- libcontainerd/client_solaris.go | 6 ++-- libcontainerd/client_windows.go | 20 ++++------- libcontainerd/container.go | 1 + libcontainerd/container_linux.go | 8 ++--- libcontainerd/container_windows.go | 6 ++-- libcontainerd/types.go | 10 +++--- plugin/manager.go | 37 ++++++++++---------- plugin/manager_linux.go | 4 +-- runconfig/streams.go | 34 ++++++++++++++++++ 18 files changed, 157 insertions(+), 144 deletions(-) diff --git a/container/container.go b/container/container.go index d34226922a..5635ecdd92 100644 --- a/container/container.go +++ b/container/container.go @@ -22,6 +22,7 @@ import ( "github.com/docker/docker/daemon/network" "github.com/docker/docker/image" "github.com/docker/docker/layer" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/promise" @@ -972,3 +973,46 @@ func (container *Container) CancelAttachContext() { } container.attachContext.mu.Unlock() } + +func (container *Container) startLogging() error { + if container.HostConfig.LogConfig.Type == "none" { + return nil // do not start logging routines + } + + l, err := container.StartLogger(container.HostConfig.LogConfig) + if err != nil { + return fmt.Errorf("Failed to initialize logging driver: %v", err) + } + + copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) + container.LogCopier = copier + copier.Run() + container.LogDriver = l + + // set LogPath field only for json-file logdriver + if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { + container.LogPath = jl.LogPath() + } + + return nil +} + +// InitializeStdio is called by libcontainerd to connect the stdio. +func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error { + if err := container.startLogging(); err != nil { + container.Reset(false) + return err + } + + container.StreamConfig.CopyToPipe(iop) + + if container.Stdin() == nil && !container.Config.Tty { + if iop.Stdin != nil { + if err := iop.Stdin.Close(); err != nil { + logrus.Error("error closing stdin: %+v", err) + } + } + } + + return nil +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 8c8d6dafc2..7fa200530f 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -175,7 +175,7 @@ func (daemon *Daemon) restore() error { defer wg.Done() rm := c.RestartManager(false) if c.IsRunning() || c.IsPaused() { - if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil { + if err := daemon.containerd.Restore(c.ID, c.InitializeStdio, libcontainerd.WithRestartManager(rm)); err != nil { logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err) return } diff --git a/daemon/exec.go b/daemon/exec.go index d57b6875d8..5584f3b5da 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -204,7 +204,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) - if err := d.containerd.AddProcess(ctx, c.ID, name, p); err != nil { + if err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio); err != nil { return err } diff --git a/daemon/exec/exec.go b/daemon/exec/exec.go index bbeb1c16a6..e950866620 100644 --- a/daemon/exec/exec.go +++ b/daemon/exec/exec.go @@ -1,8 +1,11 @@ package exec import ( + "runtime" "sync" + "github.com/Sirupsen/logrus" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/runconfig" ) @@ -37,6 +40,21 @@ func NewConfig() *Config { } } +// InitializeStdio is called by libcontainerd to connect the stdio. +func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error { + c.StreamConfig.CopyToPipe(iop) + + if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { + if iop.Stdin != nil { + if err := iop.Stdin.Close(); err != nil { + logrus.Error("error closing exec stdin: %+v", err) + } + } + } + + return nil +} + // Store keeps track of the exec configurations. type Store struct { commands map[string]*Config diff --git a/daemon/logs.go b/daemon/logs.go index 1b285c691d..e171c1508e 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -12,7 +12,6 @@ import ( "github.com/docker/docker/api/types/backend" "github.com/docker/docker/container" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/stdcopy" containertypes "github.com/docker/engine-api/types/container" @@ -120,30 +119,6 @@ func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, return container.StartLogger(container.HostConfig.LogConfig) } -// StartLogging initializes and starts the container logging stream. -func (daemon *Daemon) StartLogging(container *container.Container) error { - if container.HostConfig.LogConfig.Type == "none" { - return nil // do not start logging routines - } - - l, err := container.StartLogger(container.HostConfig.LogConfig) - if err != nil { - return fmt.Errorf("Failed to initialize logging driver: %v", err) - } - - copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) - container.LogCopier = copier - copier.Run() - container.LogDriver = l - - // set LogPath field only for json-file logdriver - if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { - container.LogPath = jl.LogPath() - } - - return nil -} - // mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified. func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error { if cfg.Type == "" { diff --git a/daemon/monitor.go b/daemon/monitor.go index 52d333f8a8..542d6b3b9e 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -3,13 +3,11 @@ package daemon import ( "errors" "fmt" - "io" "runtime" "strconv" "github.com/Sirupsen/logrus" "github.com/docker/docker/libcontainerd" - "github.com/docker/docker/runconfig" ) // StateChanged updates daemon state changes from containerd @@ -100,57 +98,3 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } - -// AttachStreams is called by libcontainerd to connect the stdio. -func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error { - var s *runconfig.StreamConfig - c := daemon.containers.Get(id) - if c == nil { - ec, err := daemon.getExecConfig(id) - if err != nil { - return fmt.Errorf("no such exec/container: %s", id) - } - s = ec.StreamConfig - } else { - s = c.StreamConfig - if err := daemon.StartLogging(c); err != nil { - c.Reset(false) - return err - } - } - - copyFunc := func(w io.Writer, r io.Reader) { - s.Add(1) - go func() { - if _, err := io.Copy(w, r); err != nil { - logrus.Errorf("%v stream copy error: %v", id, err) - } - s.Done() - }() - } - - if iop.Stdout != nil { - copyFunc(s.Stdout(), iop.Stdout) - } - if iop.Stderr != nil { - copyFunc(s.Stderr(), iop.Stderr) - } - - if stdin := s.Stdin(); stdin != nil { - if iop.Stdin != nil { - go func() { - io.Copy(iop.Stdin, stdin) - iop.Stdin.Close() - }() - } - } else { - if c != nil && !c.Config.Tty { - // tty is enabled, so dont close containerd's iopipe stdin. - if iop.Stdin != nil { - iop.Stdin.Close() - } - } - } - - return nil -} diff --git a/daemon/monitor_windows.go b/daemon/monitor_windows.go index b500ee60b9..9748de9201 100644 --- a/daemon/monitor_windows.go +++ b/daemon/monitor_windows.go @@ -28,7 +28,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon // Create a new servicing container, which will start, complete the update, and merge back the // results if it succeeded, all as part of the below function call. - if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, servicingOption); err != nil { + if err := daemon.containerd.Create((container.ID + "_servicing"), *spec, container.InitializeStdio, servicingOption); err != nil { container.SetExitCode(-1) return fmt.Errorf("Post-run update servicing failed: %s", err) } diff --git a/daemon/start.go b/daemon/start.go index 9af41f8915..470cb80a1c 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -141,7 +141,7 @@ func (daemon *Daemon) containerStart(container *container.Container) (err error) createOptions = append(createOptions, *copts...) } - if err := daemon.containerd.Create(container.ID, *spec, createOptions...); err != nil { + if err := daemon.containerd.Create(container.ID, *spec, container.InitializeStdio, createOptions...); err != nil { errDesc := grpc.ErrorDesc(err) logrus.Errorf("Create container failed with error: %s", errDesc) // if we receive an internal error from the initial start of a container then lets diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index 715a42f0d2..39b0999d3f 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -31,7 +31,10 @@ type client struct { liveRestore bool } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +// AddProcess is the handler for adding a process to an already running +// container. It's called through docker exec. It returns the system pid of the +// exec'd process. +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -112,14 +115,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly container.processes[processFriendlyName] = p - clnt.unlock(containerID) - - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(*iopipe); err != nil { p.closeFifos(iopipe) return err } - clnt.lock(containerID) return nil } @@ -149,7 +148,7 @@ func (clnt *client) prepareBundleDir(uid, gid int) (string, error) { return p, nil } -func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) { +func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) { clnt.lock(containerID) defer clnt.unlock(containerID) @@ -175,6 +174,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio if err := container.clean(); err != nil { return err } + container.attachStdio = attachStdio // hack for v1.12 backport defer func() { if err != nil { @@ -196,7 +196,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio return err } - return container.start() + return container.start(attachStdio) } func (clnt *client) Signal(containerID string, sig int) error { @@ -405,7 +405,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier { return w } -func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) { +func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) { clnt.lock(cont.Id) defer clnt.unlock(cont.Id) @@ -446,7 +446,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev return err }) - if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { container.closeFifos(iopipe) return err } @@ -538,7 +538,7 @@ func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) return ev, err } -func (clnt *client) Restore(containerID string, options ...CreateOption) error { +func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error { // Synchronize with live events clnt.remote.Lock() defer clnt.remote.Unlock() @@ -586,7 +586,7 @@ func (clnt *client) Restore(containerID string, options ...CreateOption) error { // container is still alive if clnt.liveRestore { - if err := clnt.restore(cont, ev, options...); err != nil { + if err := clnt.restore(cont, ev, attachStdio, options...); err != nil { logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err) } return nil diff --git a/libcontainerd/client_solaris.go b/libcontainerd/client_solaris.go index 1c14d301b5..78087ebc54 100644 --- a/libcontainerd/client_solaris.go +++ b/libcontainerd/client_solaris.go @@ -8,11 +8,11 @@ type client struct { // Platform specific properties below here. } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) error { return nil } -func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) { +func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) (err error) { return nil } @@ -37,7 +37,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) { } // Restore is the handler for restoring a container -func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error { +func (clnt *client) Restore(containerID string, attachStdio StdioCallback, unusedOnWindows ...CreateOption) error { return nil } diff --git a/libcontainerd/client_windows.go b/libcontainerd/client_windows.go index 8731a20098..562dacf5b1 100644 --- a/libcontainerd/client_windows.go +++ b/libcontainerd/client_windows.go @@ -38,7 +38,7 @@ const defaultOwner = "docker" // Create is the entrypoint to create a container from a spec, and if successfully // created, start it too. -func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) error { +func (clnt *client) Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error { logrus.Debugln("libcontainerd: client.Create() with spec", spec) configuration := &hcsshim.ContainerConfig{ @@ -143,7 +143,8 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio }, commandLine: strings.Join(spec.Process.Args, " "), }, - processes: make(map[string]*process), + processes: make(map[string]*process), + attachStdio: attachStdio, }, ociSpec: spec, hcsContainer: hcsContainer, @@ -160,7 +161,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio // internal structure, start will keep HCS in sync by deleting the // container there. logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID) - if err := container.start(); err != nil { + if err := container.start(attachStdio); err != nil { clnt.deleteContainer(containerID) return err } @@ -172,7 +173,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio // AddProcess is the handler for adding a process to an already running // container. It's called through docker exec. -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio StdioCallback) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -251,18 +252,11 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly // Add the process to the container's list of processes container.processes[processFriendlyName] = proc - // Make sure the lock is not held while calling back into the daemon - clnt.unlock(containerID) - // Tell the engine to attach streams back to the client - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(*iopipe); err != nil { return err } - // Lock again so that the defer unlock doesn't fail. (I really don't like this code) - clnt.lock(containerID) - // Spin up a go routine waiting for exit to handle cleanup go container.waitExit(proc, false) @@ -371,7 +365,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) { } // Restore is the handler for restoring a container -func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error { +func (clnt *client) Restore(containerID string, _ StdioCallback, unusedOnWindows ...CreateOption) error { // TODO Windows: Implement this. For now, just tell the backend the container exited. logrus.Debugf("libcontainerd: Restore(%s)", containerID) return clnt.backend.StateChanged(containerID, StateInfo{ diff --git a/libcontainerd/container.go b/libcontainerd/container.go index 30bc95028c..f5d29f42cf 100644 --- a/libcontainerd/container.go +++ b/libcontainerd/container.go @@ -20,6 +20,7 @@ type containerCommon struct { restarting bool processes map[string]*process startedAt time.Time + attachStdio StdioCallback // hack for v1.12 backport } // WithRestartManager sets the restartmanager to be used with the container. diff --git a/libcontainerd/container_linux.go b/libcontainerd/container_linux.go index e6907040c0..b1cf7ed0fa 100644 --- a/libcontainerd/container_linux.go +++ b/libcontainerd/container_linux.go @@ -89,7 +89,7 @@ func (ctr *container) spec() (*specs.Spec, error) { return &spec, nil } -func (ctr *container) start() error { +func (ctr *container) start(attachStdio StdioCallback) error { spec, err := ctr.spec() if err != nil { return nil @@ -108,7 +108,7 @@ func (ctr *container) start() error { // we need to delay stdin closure after container start or else "stdin close" // event will be rejected by containerd. - // stdin closure happens in AttachStreams + // stdin closure happens in attachStdio stdin := iopipe.Stdin iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error { var err error @@ -140,7 +140,7 @@ func (ctr *container) start() error { } ctr.client.appendContainer(ctr) - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { ctr.closeFifos(iopipe) return err } @@ -224,7 +224,7 @@ func (ctr *container) handleEvent(e *containerd.Event) error { defer ctr.client.unlock(ctr.containerID) ctr.restarting = false if err == nil { - if err = ctr.start(); err != nil { + if err = ctr.start(ctr.attachStdio); err != nil { logrus.Errorf("libcontainerd: error restarting %v", err) } } diff --git a/libcontainerd/container_windows.go b/libcontainerd/container_windows.go index 334880d970..b27437626f 100644 --- a/libcontainerd/container_windows.go +++ b/libcontainerd/container_windows.go @@ -36,7 +36,7 @@ func (ctr *container) newProcess(friendlyName string) *process { } } -func (ctr *container) start() error { +func (ctr *container) start(attachStdio StdioCallback) error { var err error isServicing := false @@ -143,7 +143,7 @@ func (ctr *container) start() error { ctr.client.appendContainer(ctr) - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := attachStdio(*iopipe); err != nil { // OK to return the error here, as waitExit will handle tear-down in HCS return err } @@ -258,7 +258,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err ctr.restarting = false ctr.client.deleteContainer(ctr.friendlyName) if err == nil { - if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.options...); err != nil { + if err = ctr.client.Create(ctr.containerID, ctr.ociSpec, ctr.attachStdio, ctr.options...); err != nil { logrus.Errorf("libcontainerd: error restarting %v", err) } } diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 8b9d4bbd13..0c11c41012 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -31,19 +31,18 @@ type CommonStateInfo struct { // FIXME: event? // Backend defines callbacks that the client of the library needs to implement. type Backend interface { StateChanged(containerID string, state StateInfo) error - AttachStreams(processFriendlyName string, io IOPipe) error } // Client provides access to containerd features. type Client interface { - Create(containerID string, spec Spec, options ...CreateOption) error + Create(containerID string, spec Spec, attachStdio StdioCallback, options ...CreateOption) error Signal(containerID string, sig int) error SignalProcess(containerID string, processFriendlyName string, sig int) error - AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) error + AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio StdioCallback) error Resize(containerID, processFriendlyName string, width, height int) error Pause(containerID string) error Resume(containerID string) error - Restore(containerID string, options ...CreateOption) error + Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error Stats(containerID string) (*Stats, error) GetPidsForContainer(containerID string) ([]int, error) Summary(containerID string) ([]Summary, error) @@ -55,6 +54,9 @@ type CreateOption interface { Apply(interface{}) error } +// StdioCallback is called to connect a container or process stdio. +type StdioCallback func(IOPipe) error + // IOPipe contains the stdio streams. type IOPipe struct { Stdin io.WriteCloser diff --git a/plugin/manager.go b/plugin/manager.go index 254db32f40..d351487259 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -273,24 +273,6 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } -// AttachStreams attaches io streams to the plugin -func (pm *Manager) AttachStreams(id string, iop libcontainerd.IOPipe) error { - iop.Stdin.Close() - - logger := logrus.New() - logger.Hooks.Add(logHook{id}) - // TODO: cache writer per id - w := logger.Writer() - go func() { - io.Copy(w, iop.Stdout) - }() - go func() { - // TODO: update logrus and use logger.WriterLevel - io.Copy(w, iop.Stderr) - }() - return nil -} - func (pm *Manager) init() error { dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json")) if err != nil { @@ -447,3 +429,22 @@ func computePrivileges(m *types.PluginManifest) types.PluginPrivileges { } return privileges } + +func attachToLog(id string) func(libcontainerd.IOPipe) error { + return func(iop libcontainerd.IOPipe) error { + iop.Stdin.Close() + + logger := logrus.New() + logger.Hooks.Add(logHook{id}) + // TODO: cache writer per id + w := logger.Writer() + go func() { + io.Copy(w, iop.Stdout) + }() + go func() { + // TODO: update logrus and use logger.WriterLevel + io.Copy(w, iop.Stderr) + }() + return nil + } +} diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index d18874d603..8e21157fcd 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -30,7 +30,7 @@ func (pm *Manager) enable(p *plugin, force bool) error { } p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0) - if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only + if err := pm.containerdClient.Create(p.PluginObj.ID, libcontainerd.Spec(*spec), attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only if err := p.restartManager.Cancel(); err != nil { logrus.Errorf("enable: restartManager.Cancel failed due to %v", err) } @@ -62,7 +62,7 @@ func (pm *Manager) enable(p *plugin, force bool) error { func (pm *Manager) restore(p *plugin) error { p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0) - return pm.containerdClient.Restore(p.PluginObj.ID, libcontainerd.WithRestartManager(p.restartManager)) + return pm.containerdClient.Restore(p.PluginObj.ID, attachToLog(p.PluginObj.ID), libcontainerd.WithRestartManager(p.restartManager)) } func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) { diff --git a/runconfig/streams.go b/runconfig/streams.go index 117fd89aee..8a91bde688 100644 --- a/runconfig/streams.go +++ b/runconfig/streams.go @@ -7,8 +7,11 @@ import ( "strings" "sync" + "github.com/Sirupsen/logrus" + "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/pools" ) // StreamConfig holds information about I/O streams managed together. @@ -107,3 +110,34 @@ func (streamConfig *StreamConfig) CloseStreams() error { return nil } + +// CopyToPipe connects streamconfig with a libcontainerd.IOPipe +func (streamConfig *StreamConfig) CopyToPipe(iop libcontainerd.IOPipe) { + copyFunc := func(w io.Writer, r io.Reader) { + streamConfig.Add(1) + go func() { + if _, err := pools.Copy(w, r); err != nil { + logrus.Errorf("stream copy error: %+v", err) + } + streamConfig.Done() + }() + } + + if iop.Stdout != nil { + copyFunc(streamConfig.Stdout(), iop.Stdout) + } + if iop.Stderr != nil { + copyFunc(streamConfig.Stderr(), iop.Stderr) + } + + if stdin := streamConfig.Stdin(); stdin != nil { + if iop.Stdin != nil { + go func() { + pools.Copy(iop.Stdin, stdin) + if err := iop.Stdin.Close(); err != nil { + logrus.Error("failed to clise stdin: %+v", err) + } + }() + } + } +}