From aa3ce07c41e0da9331f0659f28fed7f35846556c Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 29 Nov 2017 19:15:20 -0500 Subject: [PATCH] Update daemon code for containerd API changes Signed-off-by: Michael Crosby --- container/container.go | 14 ++-- daemon/exec/exec.go | 14 ++-- daemon/start_unix.go | 4 +- libcontainerd/client_daemon.go | 76 ++++++++++---------- libcontainerd/client_daemon_linux.go | 7 +- libcontainerd/client_daemon_windows.go | 6 +- libcontainerd/io.go | 4 +- libcontainerd/io_unix.go | 6 +- libcontainerd/io_windows.go | 6 +- libcontainerd/remote_daemon_options_linux.go | 2 +- libcontainerd/types.go | 5 +- plugin/executor/containerd/containerd.go | 24 +++---- 12 files changed, 86 insertions(+), 82 deletions(-) diff --git a/container/container.go b/container/container.go index 3e8a370241..11814b7719 100644 --- a/container/container.go +++ b/container/container.go @@ -15,7 +15,7 @@ import ( "syscall" "time" - "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" containertypes "github.com/docker/docker/api/types/container" mounttypes "github.com/docker/docker/api/types/mount" networktypes "github.com/docker/docker/api/types/network" @@ -1004,7 +1004,7 @@ func (container *Container) CloseStreams() error { } // InitializeStdio is called by libcontainerd to connect the stdio. -func (container *Container) InitializeStdio(iop *libcontainerd.IOPipe) (containerd.IO, error) { +func (container *Container) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) { if err := container.startLogging(); err != nil { container.Reset(false) return nil, err @@ -1020,7 +1020,7 @@ func (container *Container) InitializeStdio(iop *libcontainerd.IOPipe) (containe } } - return &cio{IO: iop, sc: container.StreamConfig}, nil + return &rio{IO: iop, sc: container.StreamConfig}, nil } // SecretMountPath returns the path of the secret mount for the container @@ -1078,19 +1078,19 @@ func (container *Container) CreateDaemonEnvironment(tty bool, linkedEnv []string return env } -type cio struct { - containerd.IO +type rio struct { + cio.IO sc *stream.Config } -func (i *cio) Close() error { +func (i *rio) Close() error { i.IO.Close() return i.sc.CloseStreams() } -func (i *cio) Wait() { +func (i *rio) Wait() { i.sc.Wait() i.IO.Wait() diff --git a/daemon/exec/exec.go b/daemon/exec/exec.go index 7aa2383e32..abd8bf0c10 100644 --- a/daemon/exec/exec.go +++ b/daemon/exec/exec.go @@ -4,7 +4,7 @@ import ( "runtime" "sync" - "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/docker/docker/container/stream" "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/stringid" @@ -43,26 +43,26 @@ func NewConfig() *Config { } } -type cio struct { - containerd.IO +type rio struct { + cio.IO sc *stream.Config } -func (i *cio) Close() error { +func (i *rio) Close() error { i.IO.Close() return i.sc.CloseStreams() } -func (i *cio) Wait() { +func (i *rio) Wait() { i.sc.Wait() i.IO.Wait() } // InitializeStdio is called by libcontainerd to connect the stdio. -func (c *Config) InitializeStdio(iop *libcontainerd.IOPipe) (containerd.IO, error) { +func (c *Config) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) { c.StreamConfig.CopyToPipe(iop) if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { @@ -73,7 +73,7 @@ func (c *Config) InitializeStdio(iop *libcontainerd.IOPipe) (containerd.IO, erro } } - return &cio{IO: iop, sc: c.StreamConfig}, nil + return &rio{IO: iop, sc: c.StreamConfig}, nil } // CloseStreams closes the stdio streams for the exec diff --git a/daemon/start_unix.go b/daemon/start_unix.go index a8402bb303..119b0a9240 100644 --- a/daemon/start_unix.go +++ b/daemon/start_unix.go @@ -7,7 +7,7 @@ import ( "os/exec" "path/filepath" - "github.com/containerd/containerd/linux/runcopts" + "github.com/containerd/containerd/linux/runctypes" "github.com/docker/docker/container" "github.com/pkg/errors" ) @@ -42,7 +42,7 @@ func (daemon *Daemon) getLibcontainerdCreateOptions(container *container.Contain if err != nil { return nil, err } - opts := &runcopts.RuncOptions{ + opts := &runctypes.RuncOptions{ Runtime: path, RuntimeRoot: filepath.Join(daemon.configStore.ExecRoot, fmt.Sprintf("runtime-%s", container.HostConfig.Runtime)), diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go index f1b5f011f8..5c26bc257e 100644 --- a/libcontainerd/client_daemon.go +++ b/libcontainerd/client_daemon.go @@ -21,12 +21,14 @@ import ( "google.golang.org/grpc/status" "github.com/containerd/containerd" + "github.com/containerd/containerd/api/events" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/archive" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/linux/runcopts" + "github.com/containerd/containerd/linux/runctypes" "github.com/containerd/typeurl" "github.com/docker/docker/pkg/ioutils" "github.com/opencontainers/image-spec/specs-go/v1" @@ -70,7 +72,7 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba c.Lock() defer c.Unlock() - var cio containerd.IO + var rio cio.IO defer func() { err = wrapError(err) }() @@ -81,20 +83,20 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba } defer func() { - if err != nil && cio != nil { - cio.Cancel() - cio.Close() + if err != nil && rio != nil { + rio.Cancel() + rio.Close() } }() - t, err := ctr.Task(ctx, func(fifos *containerd.FIFOSet) (containerd.IO, error) { + t, err := ctr.Task(ctx, func(fifos *cio.FIFOSet) (cio.IO, error) { io, err := newIOPipe(fifos) if err != nil { return nil, err } - cio, err = attachStdio(io) - return cio, err + rio, err = attachStdio(io) + return rio, err }) if err != nil && !strings.Contains(err.Error(), "no running task found") { return false, -1, err @@ -168,7 +170,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin var ( cp *types.Descriptor t containerd.Task - cio containerd.IO + rio cio.IO err error stdinCloseSync = make(chan struct{}) ) @@ -203,14 +205,14 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin } uid, gid := getSpecUser(spec) t, err = ctr.ctr.NewTask(ctx, - func(id string) (containerd.IO, error) { + func(id string) (cio.IO, error) { fifos := newFIFOSet(ctr.bundleDir, id, InitProcessName, withStdin, spec.Process.Terminal) - cio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio) - return cio, err + rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio) + return rio, err }, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error { info.Checkpoint = cp - info.Options = &runcopts.CreateOptions{ + info.Options = &runctypes.CreateOptions{ IoUid: uint32(uid), IoGid: uint32(gid), } @@ -218,9 +220,9 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin }) if err != nil { close(stdinCloseSync) - if cio != nil { - cio.Cancel() - cio.Close() + if rio != nil { + rio.Cancel() + rio.Close() } return -1, err } @@ -259,7 +261,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * var ( p containerd.Process - cio containerd.IO + rio cio.IO err error stdinCloseSync = make(chan struct{}) ) @@ -268,23 +270,23 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec * defer func() { if err != nil { - if cio != nil { - cio.Cancel() - cio.Close() + if rio != nil { + rio.Cancel() + rio.Close() } rmFIFOSet(fifos) } }() - p, err = ctr.task.Exec(ctx, processID, spec, func(id string) (containerd.IO, error) { - cio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio) - return cio, err + p, err = ctr.task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { + rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio) + return rio, err }) if err != nil { close(stdinCloseSync) - if cio != nil { - cio.Cancel() - cio.Close() + if rio != nil { + rio.Cancel() + rio.Close() } return -1, err } @@ -569,7 +571,7 @@ func (c *client) getProcess(containerID, processID string) (containerd.Process, // createIO creates the io to be used by a process // This needs to get a pointer to interface as upon closure the process may not have yet been registered -func (c *client) createIO(fifos *containerd.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (containerd.IO, error) { +func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) { io, err := newIOPipe(fifos) if err != nil { return nil, err @@ -601,12 +603,12 @@ func (c *client) createIO(fifos *containerd.FIFOSet, containerID, processID stri }) } - cio, err := attachStdio(io) + rio, err := attachStdio(io) if err != nil { io.Cancel() io.Close() } - return cio, err + return rio, err } func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) { @@ -710,21 +712,21 @@ func (c *client) processEventStream(ctx context.Context) { c.logger.WithField("topic", ev.Topic).Debug("event") switch t := v.(type) { - case *eventsapi.TaskCreate: + case *events.TaskCreate: et = EventCreate ei = EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ContainerID, Pid: t.Pid, } - case *eventsapi.TaskStart: + case *events.TaskStart: et = EventStart ei = EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ContainerID, Pid: t.Pid, } - case *eventsapi.TaskExit: + case *events.TaskExit: et = EventExit ei = EventInfo{ ContainerID: t.ContainerID, @@ -733,32 +735,32 @@ func (c *client) processEventStream(ctx context.Context) { ExitCode: t.ExitStatus, ExitedAt: t.ExitedAt, } - case *eventsapi.TaskOOM: + case *events.TaskOOM: et = EventOOM ei = EventInfo{ ContainerID: t.ContainerID, OOMKilled: true, } oomKilled = true - case *eventsapi.TaskExecAdded: + case *events.TaskExecAdded: et = EventExecAdded ei = EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ExecID, } - case *eventsapi.TaskExecStarted: + case *events.TaskExecStarted: et = EventExecStarted ei = EventInfo{ ContainerID: t.ContainerID, ProcessID: t.ExecID, Pid: t.Pid, } - case *eventsapi.TaskPaused: + case *events.TaskPaused: et = EventPaused ei = EventInfo{ ContainerID: t.ContainerID, } - case *eventsapi.TaskResumed: + case *events.TaskResumed: et = EventResumed ei = EventInfo{ ContainerID: t.ContainerID, diff --git a/libcontainerd/client_daemon_linux.go b/libcontainerd/client_daemon_linux.go index 14966f00b1..9a98fbdf13 100644 --- a/libcontainerd/client_daemon_linux.go +++ b/libcontainerd/client_daemon_linux.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/docker/docker/pkg/idtools" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/sirupsen/logrus" @@ -79,8 +80,8 @@ func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) { return p, nil } -func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *containerd.FIFOSet { - fifos := &containerd.FIFOSet{ +func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *cio.FIFOSet { + fifos := &cio.FIFOSet{ Terminal: withTerminal, Out: filepath.Join(bundleDir, processID+"-stdout"), } @@ -96,7 +97,7 @@ func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTermina return fifos } -func rmFIFOSet(fset *containerd.FIFOSet) { +func rmFIFOSet(fset *cio.FIFOSet) { for _, fn := range []string{fset.Out, fset.In, fset.Err} { if fn != "" { if err := os.RemoveAll(fn); err != nil { diff --git a/libcontainerd/client_daemon_windows.go b/libcontainerd/client_daemon_windows.go index 9bb5d86f44..10309cd1f4 100644 --- a/libcontainerd/client_daemon_windows.go +++ b/libcontainerd/client_daemon_windows.go @@ -3,7 +3,7 @@ package libcontainerd import ( "fmt" - "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/windows/hcsshimtypes" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" @@ -35,8 +35,8 @@ func pipeName(containerID, processID, name string) string { return fmt.Sprintf(`\\.\pipe\containerd-%s-%s-%s`, containerID, processID, name) } -func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *containerd.FIFOSet { - fifos := &containerd.FIFOSet{ +func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *cio.FIFOSet { + fifos := &cio.FIFOSet{ Terminal: withTerminal, Out: pipeName(containerID, processID, "stdout"), } diff --git a/libcontainerd/io.go b/libcontainerd/io.go index 2c4af58ce9..25a894b078 100644 --- a/libcontainerd/io.go +++ b/libcontainerd/io.go @@ -1,9 +1,9 @@ package libcontainerd -import "github.com/containerd/containerd" +import "github.com/containerd/containerd/cio" // Config returns the containerd.IOConfig of this pipe set -func (p *IOPipe) Config() containerd.IOConfig { +func (p *IOPipe) Config() cio.Config { return p.config } diff --git a/libcontainerd/io_unix.go b/libcontainerd/io_unix.go index 0c08b20136..8e597914ee 100644 --- a/libcontainerd/io_unix.go +++ b/libcontainerd/io_unix.go @@ -7,12 +7,12 @@ import ( "io" "syscall" - "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/containerd/fifo" "github.com/pkg/errors" ) -func newIOPipe(fifos *containerd.FIFOSet) (*IOPipe, error) { +func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) { var ( err error ctx, cancel = context.WithCancel(context.Background()) @@ -20,7 +20,7 @@ func newIOPipe(fifos *containerd.FIFOSet) (*IOPipe, error) { iop = &IOPipe{ Terminal: fifos.Terminal, cancel: cancel, - config: containerd.IOConfig{ + config: cio.Config{ Terminal: fifos.Terminal, Stdin: fifos.In, Stdout: fifos.Out, diff --git a/libcontainerd/io_windows.go b/libcontainerd/io_windows.go index 312bdbd8cf..f2e5a93fe1 100644 --- a/libcontainerd/io_windows.go +++ b/libcontainerd/io_windows.go @@ -7,7 +7,7 @@ import ( "sync" winio "github.com/Microsoft/go-winio" - "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/pkg/errors" ) @@ -90,7 +90,7 @@ func (wp *winpipe) Close() error { } } -func newIOPipe(fifos *containerd.FIFOSet) (*IOPipe, error) { +func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) { var ( err error ctx, cancel = context.WithCancel(context.Background()) @@ -98,7 +98,7 @@ func newIOPipe(fifos *containerd.FIFOSet) (*IOPipe, error) { iop = &IOPipe{ Terminal: fifos.Terminal, cancel: cancel, - config: containerd.IOConfig{ + config: cio.Config{ Terminal: fifos.Terminal, Stdin: fifos.In, Stdout: fifos.Out, diff --git a/libcontainerd/remote_daemon_options_linux.go b/libcontainerd/remote_daemon_options_linux.go index 1e5a98124a..7376c06119 100644 --- a/libcontainerd/remote_daemon_options_linux.go +++ b/libcontainerd/remote_daemon_options_linux.go @@ -27,7 +27,7 @@ type subreaper bool func (s subreaper) Apply(r Remote) error { if remote, ok := r.(*remote); ok { - remote.Subreaper = bool(s) + remote.NoSubreaper = !bool(s) return nil } return fmt.Errorf("WithSubreaper option not supported for this remote") diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 9eede43a49..346fd241f1 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -6,6 +6,7 @@ import ( "time" "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/opencontainers/runtime-spec/specs-go" ) @@ -106,7 +107,7 @@ type Client interface { } // StdioCallback is called to connect a container or process stdio. -type StdioCallback func(*IOPipe) (containerd.IO, error) +type StdioCallback func(*IOPipe) (cio.IO, error) // IOPipe contains the stdio streams. type IOPipe struct { @@ -116,7 +117,7 @@ type IOPipe struct { Terminal bool // Whether stderr is connected on Windows cancel context.CancelFunc - config containerd.IOConfig + config cio.Config } // ServerVersion contains version information as retrieved from the diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go index d93b8b75ec..98394679d5 100644 --- a/plugin/executor/containerd/containerd.go +++ b/plugin/executor/containerd/containerd.go @@ -6,8 +6,8 @@ import ( "path/filepath" "sync" - "github.com/containerd/containerd" - "github.com/containerd/containerd/linux/runcopts" + "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/linux/runctypes" "github.com/docker/docker/api/errdefs" "github.com/docker/docker/libcontainerd" "github.com/opencontainers/runtime-spec/specs-go" @@ -46,7 +46,7 @@ type Executor struct { // Create creates a new container func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error { - opts := runcopts.RuncOptions{ + opts := runctypes.RuncOptions{ RuntimeRoot: filepath.Join(e.rootDir, "runtime-root"), } ctx := context.Background() @@ -110,37 +110,37 @@ func (e *Executor) ProcessEvent(id string, et libcontainerd.EventType, ei libcon return nil } -type cio struct { - containerd.IO +type rio struct { + cio.IO wg sync.WaitGroup } -func (c *cio) Wait() { +func (c *rio) Wait() { c.wg.Wait() c.IO.Wait() } func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerd.StdioCallback { - return func(iop *libcontainerd.IOPipe) (containerd.IO, error) { + return func(iop *libcontainerd.IOPipe) (cio.IO, error) { if iop.Stdin != nil { iop.Stdin.Close() // closing stdin shouldn't be needed here, it should never be open panic("plugin stdin shouldn't have been created!") } - cio := &cio{IO: iop} - cio.wg.Add(2) + rio := &rio{IO: iop} + rio.wg.Add(2) go func() { io.Copy(stdout, iop.Stdout) stdout.Close() - cio.wg.Done() + rio.wg.Done() }() go func() { io.Copy(stderr, iop.Stderr) stderr.Close() - cio.wg.Done() + rio.wg.Done() }() - return cio, nil + return rio, nil } }