1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Refactor libcontainerd to minimize c8d RPCs

The containerd client is very chatty at the best of times. Because the
libcontained API is stateless and references containers and processes by
string ID for every method call, the implementation is essentially
forced to use the containerd client in a way which amplifies the number
of redundant RPCs invoked to perform any operation. The libcontainerd
remote implementation has to reload the containerd container, task
and/or process metadata for nearly every operation. This in turn
amplifies the number of context switches between dockerd and containerd
to perform any container operation or handle a containerd event,
increasing the load on the system which could otherwise be allocated to
workloads.

Overhaul the libcontainerd interface to reduce the impedance mismatch
with the containerd client so that the containerd client can be used
more efficiently. Split the API out into container, task and process
interfaces which the consumer is expected to retain so that
libcontainerd can retain state---especially the analogous containerd
client objects---without having to manage any state-store inside the
libcontainerd client.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2022-05-10 15:59:00 -04:00
parent 57d2d6ef62
commit 4bafaa00aa
36 changed files with 1164 additions and 1119 deletions

View file

@ -19,7 +19,6 @@ import (
mounttypes "github.com/docker/docker/api/types/mount" mounttypes "github.com/docker/docker/api/types/mount"
swarmtypes "github.com/docker/docker/api/types/swarm" swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/container/stream" "github.com/docker/docker/container/stream"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog" "github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/daemon/logger/local" "github.com/docker/docker/daemon/logger/local"
@ -28,6 +27,7 @@ import (
"github.com/docker/docker/errdefs" "github.com/docker/docker/errdefs"
"github.com/docker/docker/image" "github.com/docker/docker/image"
"github.com/docker/docker/layer" "github.com/docker/docker/layer"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/containerfs" "github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/ioutils"
@ -86,7 +86,7 @@ type Container struct {
HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one
MountPoints map[string]*volumemounts.MountPoint MountPoints map[string]*volumemounts.MountPoint
HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
ExecCommands *exec.Store `json:"-"` ExecCommands *ExecStore `json:"-"`
DependencyStore agentexec.DependencyGetter `json:"-"` DependencyStore agentexec.DependencyGetter `json:"-"`
SecretReferences []*swarmtypes.SecretReference SecretReferences []*swarmtypes.SecretReference
ConfigReferences []*swarmtypes.ConfigReference ConfigReferences []*swarmtypes.ConfigReference
@ -121,7 +121,7 @@ func NewBaseContainer(id, root string) *Container {
return &Container{ return &Container{
ID: id, ID: id,
State: NewState(), State: NewState(),
ExecCommands: exec.NewStore(), ExecCommands: NewExecStore(),
Root: root, Root: root,
MountPoints: make(map[string]*volumemounts.MountPoint), MountPoints: make(map[string]*volumemounts.MountPoint),
StreamConfig: stream.NewConfig(), StreamConfig: stream.NewConfig(),
@ -752,6 +752,47 @@ func (container *Container) CreateDaemonEnvironment(tty bool, linkedEnv []string
return env return env
} }
// RestoreTask restores the containerd container and task handles and reattaches
// the IO for the running task. Container state is not synced with containerd's
// state.
//
// An errdefs.NotFound error is returned if the container does not exist in
// containerd. However, a nil error is returned if the task does not exist in
// containerd.
func (container *Container) RestoreTask(ctx context.Context, client libcontainerdtypes.Client) error {
container.Lock()
defer container.Unlock()
var err error
container.ctr, err = client.LoadContainer(ctx, container.ID)
if err != nil {
return err
}
container.task, err = container.ctr.AttachTask(ctx, container.InitializeStdio)
if err != nil && !errdefs.IsNotFound(err) {
return err
}
return nil
}
// GetRunningTask asserts that the container is running and returns the Task for
// the container. An errdefs.Conflict error is returned if the container is not
// in the Running state.
//
// A system error is returned if container is in a bad state: Running is true
// but has a nil Task.
//
// The container lock must be held when calling this method.
func (container *Container) GetRunningTask() (libcontainerdtypes.Task, error) {
if !container.Running {
return nil, errdefs.Conflict(fmt.Errorf("container %s is not running", container.ID))
}
tsk, ok := container.Task()
if !ok {
return nil, errdefs.System(errors.WithStack(fmt.Errorf("container %s is in Running state but has no containerd Task set", container.ID)))
}
return tsk, nil
}
type rio struct { type rio struct {
cio.IO cio.IO

View file

@ -1,20 +1,20 @@
package exec // import "github.com/docker/docker/daemon/exec" package container // import "github.com/docker/docker/container"
import ( import (
"context"
"runtime" "runtime"
"sync" "sync"
"github.com/containerd/containerd/cio" "github.com/containerd/containerd/cio"
"github.com/docker/docker/container/stream" "github.com/docker/docker/container/stream"
"github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// Config holds the configurations for execs. The Daemon keeps // ExecConfig holds the configurations for execs. The Daemon keeps
// track of both running and finished execs so that they can be // track of both running and finished execs so that they can be
// examined both during and after completion. // examined both during and after completion.
type Config struct { type ExecConfig struct {
sync.Mutex sync.Mutex
Started chan struct{} Started chan struct{}
StreamConfig *stream.Config StreamConfig *stream.Config
@ -25,7 +25,7 @@ type Config struct {
OpenStderr bool OpenStderr bool
OpenStdout bool OpenStdout bool
CanRemove bool CanRemove bool
ContainerID string Container *Container
DetachKeys []byte DetachKeys []byte
Entrypoint string Entrypoint string
Args []string Args []string
@ -34,39 +34,22 @@ type Config struct {
User string User string
WorkingDir string WorkingDir string
Env []string Env []string
Pid int Process types.Process
ConsoleSize *[2]uint ConsoleSize *[2]uint
} }
// NewConfig initializes the a new exec configuration // NewExecConfig initializes the a new exec configuration
func NewConfig() *Config { func NewExecConfig(c *Container) *ExecConfig {
return &Config{ return &ExecConfig{
ID: stringid.GenerateRandomID(), ID: stringid.GenerateRandomID(),
Container: c,
StreamConfig: stream.NewConfig(), StreamConfig: stream.NewConfig(),
Started: make(chan struct{}), Started: make(chan struct{}),
} }
} }
type rio struct {
cio.IO
sc *stream.Config
}
func (i *rio) Close() error {
i.IO.Close()
return i.sc.CloseStreams()
}
func (i *rio) Wait() {
i.sc.Wait(context.Background())
i.IO.Wait()
}
// InitializeStdio is called by libcontainerd to connect the stdio. // InitializeStdio is called by libcontainerd to connect the stdio.
func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) { func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
c.StreamConfig.CopyToPipe(iop) c.StreamConfig.CopyToPipe(iop)
if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" { if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
@ -81,32 +64,32 @@ func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
} }
// CloseStreams closes the stdio streams for the exec // CloseStreams closes the stdio streams for the exec
func (c *Config) CloseStreams() error { func (c *ExecConfig) CloseStreams() error {
return c.StreamConfig.CloseStreams() return c.StreamConfig.CloseStreams()
} }
// SetExitCode sets the exec config's exit code // SetExitCode sets the exec config's exit code
func (c *Config) SetExitCode(code int) { func (c *ExecConfig) SetExitCode(code int) {
c.ExitCode = &code c.ExitCode = &code
} }
// Store keeps track of the exec configurations. // ExecStore keeps track of the exec configurations.
type Store struct { type ExecStore struct {
byID map[string]*Config byID map[string]*ExecConfig
mu sync.RWMutex mu sync.RWMutex
} }
// NewStore initializes a new exec store. // NewExecStore initializes a new exec store.
func NewStore() *Store { func NewExecStore() *ExecStore {
return &Store{ return &ExecStore{
byID: make(map[string]*Config), byID: make(map[string]*ExecConfig),
} }
} }
// Commands returns the exec configurations in the store. // Commands returns the exec configurations in the store.
func (e *Store) Commands() map[string]*Config { func (e *ExecStore) Commands() map[string]*ExecConfig {
e.mu.RLock() e.mu.RLock()
byID := make(map[string]*Config, len(e.byID)) byID := make(map[string]*ExecConfig, len(e.byID))
for id, config := range e.byID { for id, config := range e.byID {
byID[id] = config byID[id] = config
} }
@ -115,14 +98,14 @@ func (e *Store) Commands() map[string]*Config {
} }
// Add adds a new exec configuration to the store. // Add adds a new exec configuration to the store.
func (e *Store) Add(id string, Config *Config) { func (e *ExecStore) Add(id string, Config *ExecConfig) {
e.mu.Lock() e.mu.Lock()
e.byID[id] = Config e.byID[id] = Config
e.mu.Unlock() e.mu.Unlock()
} }
// Get returns an exec configuration by its id. // Get returns an exec configuration by its id.
func (e *Store) Get(id string) *Config { func (e *ExecStore) Get(id string) *ExecConfig {
e.mu.RLock() e.mu.RLock()
res := e.byID[id] res := e.byID[id]
e.mu.RUnlock() e.mu.RUnlock()
@ -130,14 +113,14 @@ func (e *Store) Get(id string) *Config {
} }
// Delete removes an exec configuration from the store. // Delete removes an exec configuration from the store.
func (e *Store) Delete(id string, pid int) { func (e *ExecStore) Delete(id string) {
e.mu.Lock() e.mu.Lock()
delete(e.byID, id) delete(e.byID, id)
e.mu.Unlock() e.mu.Unlock()
} }
// List returns the list of exec ids in the store. // List returns the list of exec ids in the store.
func (e *Store) List() []string { func (e *ExecStore) List() []string {
var IDs []string var IDs []string
e.mu.RLock() e.mu.RLock()
for id := range e.byID { for id := range e.byID {

View file

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
units "github.com/docker/go-units" units "github.com/docker/go-units"
) )
@ -36,6 +37,14 @@ type State struct {
stopWaiters []chan<- StateStatus stopWaiters []chan<- StateStatus
removeOnlyWaiters []chan<- StateStatus removeOnlyWaiters []chan<- StateStatus
// The libcontainerd reference fields are unexported to force consumers
// to access them through the getter methods with multi-valued returns
// so that they can't forget to nil-check: the code won't compile unless
// the nil-check result is explicitly consumed or discarded.
ctr libcontainerdtypes.Container
task libcontainerdtypes.Task
} }
// StateStatus is used to return container wait results. // StateStatus is used to return container wait results.
@ -260,7 +269,7 @@ func (s *State) SetExitCode(ec int) {
} }
// SetRunning sets the state of the container to "running". // SetRunning sets the state of the container to "running".
func (s *State) SetRunning(pid int, initial bool) { func (s *State) SetRunning(ctr libcontainerdtypes.Container, tsk libcontainerdtypes.Task, initial bool) {
s.ErrorMsg = "" s.ErrorMsg = ""
s.Paused = false s.Paused = false
s.Running = true s.Running = true
@ -269,7 +278,13 @@ func (s *State) SetRunning(pid int, initial bool) {
s.Paused = false s.Paused = false
} }
s.ExitCodeValue = 0 s.ExitCodeValue = 0
s.Pid = pid s.ctr = ctr
s.task = tsk
if tsk != nil {
s.Pid = int(tsk.Pid())
} else {
s.Pid = 0
}
s.OOMKilled = false s.OOMKilled = false
if initial { if initial {
s.StartedAt = time.Now().UTC() s.StartedAt = time.Now().UTC()
@ -404,3 +419,21 @@ func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) {
} }
*waiters = nil *waiters = nil
} }
// C8dContainer returns a reference to the libcontainerd Container object for
// the container and whether the reference is valid.
//
// The container lock must be held when calling this method.
func (s *State) C8dContainer() (_ libcontainerdtypes.Container, ok bool) {
return s.ctr, s.ctr != nil
}
// Task returns a reference to the libcontainerd Task object for the container
// and whether the reference is valid.
//
// The container lock must be held when calling this method.
//
// See also: (*Container).GetRunningTask().
func (s *State) Task() (_ libcontainerdtypes.Task, ok bool) {
return s.task, s.task != nil
}

View file

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
) )
func TestIsValidHealthString(t *testing.T) { func TestIsValidHealthString(t *testing.T) {
@ -28,6 +29,13 @@ func TestIsValidHealthString(t *testing.T) {
} }
} }
type mockTask struct {
libcontainerdtypes.Task
pid uint32
}
func (t *mockTask) Pid() uint32 { return t.pid }
func TestStateRunStop(t *testing.T) { func TestStateRunStop(t *testing.T) {
s := NewState() s := NewState()
@ -60,7 +68,7 @@ func TestStateRunStop(t *testing.T) {
// Set the state to "Running". // Set the state to "Running".
s.Lock() s.Lock()
s.SetRunning(i, true) s.SetRunning(nil, &mockTask{pid: uint32(i)}, true)
s.Unlock() s.Unlock()
// Assert desired state. // Assert desired state.
@ -125,7 +133,7 @@ func TestStateTimeoutWait(t *testing.T) {
s := NewState() s := NewState()
s.Lock() s.Lock()
s.SetRunning(0, true) s.SetRunning(nil, nil, true)
s.Unlock() s.Unlock()
// Start a wait with a timeout. // Start a wait with a timeout.
@ -174,7 +182,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
s := NewState() s := NewState()
s.Lock() s.Lock()
s.SetRunning(0, true) s.SetRunning(nil, nil, true)
s.Unlock() s.Unlock()
waitC := s.Wait(context.Background(), WaitConditionNotRunning) waitC := s.Wait(context.Background(), WaitConditionNotRunning)
@ -185,7 +193,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
s.Unlock() s.Unlock()
s.Lock() s.Lock()
s.SetRunning(0, true) s.SetRunning(nil, nil, true)
s.Unlock() s.Unlock()
got := <-waitC got := <-waitC

View file

@ -57,8 +57,11 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
return err return err
} }
if !container.IsRunning() { container.Lock()
return fmt.Errorf("Container %s not running", name) tsk, err := container.GetRunningTask()
container.Unlock()
if err != nil {
return err
} }
if !validCheckpointNamePattern.MatchString(config.CheckpointID) { if !validCheckpointNamePattern.MatchString(config.CheckpointID) {
@ -70,7 +73,7 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
return fmt.Errorf("cannot checkpoint container %s: %s", name, err) return fmt.Errorf("cannot checkpoint container %s: %s", name, err)
} }
err = daemon.containerd.CreateCheckpoint(context.Background(), container.ID, checkpointDir, config.Exit) err = tsk.CreateCheckpoint(context.Background(), checkpointDir, config.Exit)
if err != nil { if err != nil {
os.RemoveAll(checkpointDir) os.RemoveAll(checkpointDir)
return fmt.Errorf("Cannot checkpoint container %s: %s", name, err) return fmt.Errorf("Cannot checkpoint container %s: %s", name, err)

View file

@ -30,7 +30,6 @@ import (
"github.com/docker/docker/daemon/config" "github.com/docker/docker/daemon/config"
ctrd "github.com/docker/docker/daemon/containerd" ctrd "github.com/docker/docker/daemon/containerd"
"github.com/docker/docker/daemon/events" "github.com/docker/docker/daemon/events"
"github.com/docker/docker/daemon/exec"
_ "github.com/docker/docker/daemon/graphdriver/register" // register graph drivers _ "github.com/docker/docker/daemon/graphdriver/register" // register graph drivers
"github.com/docker/docker/daemon/images" "github.com/docker/docker/daemon/images"
"github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger"
@ -75,7 +74,7 @@ type Daemon struct {
repository string repository string
containers container.Store containers container.Store
containersReplica container.ViewDB containersReplica container.ViewDB
execCommands *exec.Store execCommands *container.ExecStore
imageService ImageService imageService ImageService
configStore *config.Config configStore *config.Config
statsCollector *stats.Collector statsCollector *stats.Collector
@ -317,40 +316,43 @@ func (daemon *Daemon) restore() error {
logger(c).Debug("restoring container") logger(c).Debug("restoring container")
var ( var es *containerd.ExitStatus
err error
alive bool
ec uint32
exitedAt time.Time
process libcontainerdtypes.Process
)
alive, _, process, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio) if err := c.RestoreTask(context.Background(), daemon.containerd); err != nil && !errdefs.IsNotFound(err) {
if err != nil && !errdefs.IsNotFound(err) {
logger(c).WithError(err).Error("failed to restore container with containerd") logger(c).WithError(err).Error("failed to restore container with containerd")
return return
} }
logger(c).Debugf("alive: %v", alive)
if !alive { alive := false
// If process is not nil, cleanup dead container from containerd. status := containerd.Unknown
// If process is nil then the above `containerd.Restore` returned an errdefs.NotFoundError, if tsk, ok := c.Task(); ok {
// and docker's view of the container state will be updated accorrdingly via SetStopped further down. s, err := tsk.Status(context.Background())
if process != nil { if err != nil {
logger(c).Debug("cleaning up dead container process") logger(c).WithError(err).Error("failed to get task status")
ec, exitedAt, err = process.Delete(context.Background()) } else {
if err != nil && !errdefs.IsNotFound(err) { status = s.Status
logger(c).WithError(err).Error("failed to delete container from containerd") alive = status != containerd.Stopped
return if !alive {
logger(c).Debug("cleaning up dead container process")
es, err = tsk.Delete(context.Background())
if err != nil && !errdefs.IsNotFound(err) {
logger(c).WithError(err).Error("failed to delete task from containerd")
return
}
} else if !daemon.configStore.LiveRestoreEnabled {
logger(c).Debug("shutting down container considered alive by containerd")
if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) {
log.WithError(err).Error("error shutting down container")
return
}
status = containerd.Stopped
alive = false
c.ResetRestartManager(false)
} }
} }
} else if !daemon.configStore.LiveRestoreEnabled {
logger(c).Debug("shutting down container considered alive by containerd")
if err := daemon.shutdownContainer(c); err != nil && !errdefs.IsNotFound(err) {
log.WithError(err).Error("error shutting down container")
return
}
c.ResetRestartManager(false)
} }
// If the containerd task for the container was not found, docker's view of the
// container state will be updated accordingly via SetStopped further down.
if c.IsRunning() || c.IsPaused() { if c.IsRunning() || c.IsPaused() {
logger(c).Debug("syncing container on disk state with real state") logger(c).Debug("syncing container on disk state with real state")
@ -359,29 +361,22 @@ func (daemon *Daemon) restore() error {
switch { switch {
case c.IsPaused() && alive: case c.IsPaused() && alive:
s, err := daemon.containerd.Status(context.Background(), c.ID) logger(c).WithField("state", status).Info("restored container paused")
if err != nil { switch status {
logger(c).WithError(err).Error("failed to get container status") case containerd.Paused, containerd.Pausing:
} else { // nothing to do
logger(c).WithField("state", s).Info("restored container paused") case containerd.Unknown, containerd.Stopped, "":
switch s { log.WithField("status", status).Error("unexpected status for paused container during restore")
case containerd.Paused, containerd.Pausing: default:
// nothing to do // running
case containerd.Stopped: c.Lock()
alive = false c.Paused = false
case containerd.Unknown: daemon.setStateCounter(c)
log.Error("unknown status for paused container during restore") daemon.updateHealthMonitor(c)
default: if err := c.CheckpointTo(daemon.containersReplica); err != nil {
// running log.WithError(err).Error("failed to update paused container state")
c.Lock()
c.Paused = false
daemon.setStateCounter(c)
daemon.updateHealthMonitor(c)
if err := c.CheckpointTo(daemon.containersReplica); err != nil {
log.WithError(err).Error("failed to update paused container state")
}
c.Unlock()
} }
c.Unlock()
} }
case !c.IsPaused() && alive: case !c.IsPaused() && alive:
logger(c).Debug("restoring healthcheck") logger(c).Debug("restoring healthcheck")
@ -393,7 +388,12 @@ func (daemon *Daemon) restore() error {
if !alive { if !alive {
logger(c).Debug("setting stopped state") logger(c).Debug("setting stopped state")
c.Lock() c.Lock()
c.SetStopped(&container.ExitStatus{ExitCode: int(ec), ExitedAt: exitedAt}) var ces container.ExitStatus
if es != nil {
ces.ExitCode = int(es.ExitCode())
ces.ExitedAt = es.ExitTime()
}
c.SetStopped(&ces)
daemon.Cleanup(c) daemon.Cleanup(c)
if err := c.CheckpointTo(daemon.containersReplica); err != nil { if err := c.CheckpointTo(daemon.containersReplica); err != nil {
log.WithError(err).Error("failed to update stopped container state") log.WithError(err).Error("failed to update stopped container state")
@ -956,7 +956,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
if d.containersReplica, err = container.NewViewDB(); err != nil { if d.containersReplica, err = container.NewViewDB(); err != nil {
return nil, err return nil, err
} }
d.execCommands = exec.NewStore() d.execCommands = container.NewExecStore()
d.statsCollector = d.newStatsCollector(1 * time.Second) d.statsCollector = d.newStatsCollector(1 * time.Second)
d.EventsService = events.New() d.EventsService = events.New()

View file

@ -1387,10 +1387,13 @@ func copyBlkioEntry(entries []*statsV1.BlkIOEntry) []types.BlkioStatEntry {
} }
func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) { func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
if !c.IsRunning() { c.Lock()
return nil, errNotRunning(c.ID) task, err := c.GetRunningTask()
c.Unlock()
if err != nil {
return nil, err
} }
cs, err := daemon.containerd.Stats(context.Background(), c.ID) cs, err := task.Stats(context.Background())
if err != nil { if err != nil {
if strings.Contains(err.Error(), "container not found") { if strings.Contains(err.Error(), "container not found") {
return nil, containerNotFound(c.ID) return nil, containerNotFound(c.ID)

View file

@ -14,6 +14,7 @@ import (
containertypes "github.com/docker/docker/api/types/container" containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/config" "github.com/docker/docker/daemon/config"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/libcontainerd/local" "github.com/docker/docker/libcontainerd/local"
"github.com/docker/docker/libcontainerd/remote" "github.com/docker/docker/libcontainerd/remote"
"github.com/docker/docker/libnetwork" "github.com/docker/docker/libnetwork"
@ -515,14 +516,17 @@ func driverOptions(_ *config.Config) nwconfig.Option {
} }
func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) { func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
if !c.IsRunning() { c.Lock()
return nil, errNotRunning(c.ID) task, err := c.GetRunningTask()
c.Unlock()
if err != nil {
return nil, err
} }
// Obtain the stats from HCS via libcontainerd // Obtain the stats from HCS via libcontainerd
stats, err := daemon.containerd.Stats(context.Background(), c.ID) stats, err := task.Stats(context.Background())
if err != nil { if err != nil {
if strings.Contains(err.Error(), "container not found") { if errdefs.IsNotFound(err) {
return nil, containerNotFound(c.ID) return nil, containerNotFound(c.ID)
} }
return nil, err return nil, err

View file

@ -52,7 +52,7 @@ func TestContainerDelete(t *testing.T) {
fixMsg: "Stop the container before attempting removal or force remove", fixMsg: "Stop the container before attempting removal or force remove",
initContainer: func() *container.Container { initContainer: func() *container.Container {
c := newContainerWithState(container.NewState()) c := newContainerWithState(container.NewState())
c.SetRunning(0, true) c.SetRunning(nil, nil, true)
c.SetRestarting(&container.ExitStatus{}) c.SetRestarting(&container.ExitStatus{})
return c return c
}}, }},

View file

@ -2,18 +2,19 @@ package daemon // import "github.com/docker/docker/daemon"
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"runtime" "runtime"
"strings" "strings"
"time" "time"
"github.com/containerd/containerd"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container" containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/strslice" "github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/container/stream" "github.com/docker/docker/container/stream"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/errdefs" "github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/pools"
"github.com/moby/sys/signal" "github.com/moby/sys/signal"
@ -23,7 +24,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func (daemon *Daemon) registerExecCommand(container *container.Container, config *exec.Config) { func (daemon *Daemon) registerExecCommand(container *container.Container, config *container.ExecConfig) {
// Storing execs in container in order to kill them gracefully whenever the container is stopped or removed. // Storing execs in container in order to kill them gracefully whenever the container is stopped or removed.
container.ExecCommands.Add(config.ID, config) container.ExecCommands.Add(config.ID, config)
// Storing execs in daemon for easy access via Engine API. // Storing execs in daemon for easy access via Engine API.
@ -41,7 +42,7 @@ func (daemon *Daemon) ExecExists(name string) (bool, error) {
// getExecConfig looks up the exec instance by name. If the container associated // getExecConfig looks up the exec instance by name. If the container associated
// with the exec instance is stopped or paused, it will return an error. // with the exec instance is stopped or paused, it will return an error.
func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) { func (daemon *Daemon) getExecConfig(name string) (*container.ExecConfig, error) {
ec := daemon.execCommands.Get(name) ec := daemon.execCommands.Get(name)
if ec == nil { if ec == nil {
return nil, errExecNotFound(name) return nil, errExecNotFound(name)
@ -52,7 +53,7 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
// saying the container isn't running, we should return a 404 so that // saying the container isn't running, we should return a 404 so that
// the user sees the same error now that they will after the // the user sees the same error now that they will after the
// 5 minute clean-up loop is run which erases old/dead execs. // 5 minute clean-up loop is run which erases old/dead execs.
ctr := daemon.containers.Get(ec.ContainerID) ctr := daemon.containers.Get(ec.Container.ID)
if ctr == nil { if ctr == nil {
return nil, containerNotFound(name) return nil, containerNotFound(name)
} }
@ -68,9 +69,9 @@ func (daemon *Daemon) getExecConfig(name string) (*exec.Config, error) {
return ec, nil return ec, nil
} }
func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *exec.Config) { func (daemon *Daemon) unregisterExecCommand(container *container.Container, execConfig *container.ExecConfig) {
container.ExecCommands.Delete(execConfig.ID, execConfig.Pid) container.ExecCommands.Delete(execConfig.ID)
daemon.execCommands.Delete(execConfig.ID, execConfig.Pid) daemon.execCommands.Delete(execConfig.ID)
} }
func (daemon *Daemon) getActiveContainer(name string) (*container.Container, error) { func (daemon *Daemon) getActiveContainer(name string) (*container.Container, error) {
@ -110,11 +111,10 @@ func (daemon *Daemon) ContainerExecCreate(name string, config *types.ExecConfig)
} }
} }
execConfig := exec.NewConfig() execConfig := container.NewExecConfig(cntr)
execConfig.OpenStdin = config.AttachStdin execConfig.OpenStdin = config.AttachStdin
execConfig.OpenStdout = config.AttachStdout execConfig.OpenStdout = config.AttachStdout
execConfig.OpenStderr = config.AttachStderr execConfig.OpenStderr = config.AttachStderr
execConfig.ContainerID = cntr.ID
execConfig.DetachKeys = keys execConfig.DetachKeys = keys
execConfig.Entrypoint = entrypoint execConfig.Entrypoint = entrypoint
execConfig.Args = args execConfig.Args = args
@ -174,15 +174,11 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
ec.Running = true ec.Running = true
ec.Unlock() ec.Unlock()
c := daemon.containers.Get(ec.ContainerID) logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID)
if c == nil {
return containerNotFound(ec.ContainerID)
}
logrus.Debugf("starting exec command %s in container %s", ec.ID, c.ID)
attributes := map[string]string{ attributes := map[string]string{
"execID": ec.ID, "execID": ec.ID,
} }
daemon.LogContainerEventWithAttributes(c, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes) daemon.LogContainerEventWithAttributes(ec.Container, "exec_start: "+ec.Entrypoint+" "+strings.Join(ec.Args, " "), attributes)
defer func() { defer func() {
if err != nil { if err != nil {
@ -191,10 +187,10 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
exitCode := 126 exitCode := 126
ec.ExitCode = &exitCode ec.ExitCode = &exitCode
if err := ec.CloseStreams(); err != nil { if err := ec.CloseStreams(); err != nil {
logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err) logrus.Errorf("failed to cleanup exec %s streams: %s", ec.Container.ID, err)
} }
ec.Unlock() ec.Unlock()
c.ExecCommands.Delete(ec.ID, ec.Pid) ec.Container.ExecCommands.Delete(ec.ID)
} }
}() }()
@ -222,15 +218,18 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
p := &specs.Process{} p := &specs.Process{}
if runtime.GOOS != "windows" { if runtime.GOOS != "windows" {
ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.ContainerID) ctr, err := daemon.containerdCli.LoadContainer(ctx, ec.Container.ID)
if err != nil { if err != nil {
return err return err
} }
spec, err := ctr.Spec(ctx) md, err := ctr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil { if err != nil {
return err return err
} }
p = spec.Process spec := specs.Spec{Process: p}
if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
return err
}
} }
p.Args = append([]string{ec.Entrypoint}, ec.Args...) p.Args = append([]string{ec.Entrypoint}, ec.Args...)
p.Env = ec.Env p.Env = ec.Env
@ -253,7 +252,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
p.Cwd = "/" p.Cwd = "/"
} }
if err := daemon.execSetPlatformOpt(c, ec, p); err != nil { if err := daemon.execSetPlatformOpt(ctx, ec, p); err != nil {
return err return err
} }
@ -274,9 +273,16 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
defer cancel() defer cancel()
attachErr := ec.StreamConfig.CopyStreams(copyCtx, &attachConfig) attachErr := ec.StreamConfig.CopyStreams(copyCtx, &attachConfig)
ec.Container.Lock()
tsk, err := ec.Container.GetRunningTask()
ec.Container.Unlock()
if err != nil {
return err
}
// Synchronize with libcontainerd event loop // Synchronize with libcontainerd event loop
ec.Lock() ec.Lock()
systemPid, err := daemon.containerd.Exec(ctx, c.ID, ec.ID, p, cStdin != nil, ec.InitializeStdio) ec.Process, err = tsk.Exec(ctx, ec.ID, p, cStdin != nil, ec.InitializeStdio)
// the exec context should be ready, or error happened. // the exec context should be ready, or error happened.
// close the chan to notify readiness // close the chan to notify readiness
close(ec.Started) close(ec.Started)
@ -284,18 +290,17 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
ec.Unlock() ec.Unlock()
return translateContainerdStartErr(ec.Entrypoint, ec.SetExitCode, err) return translateContainerdStartErr(ec.Entrypoint, ec.SetExitCode, err)
} }
ec.Pid = systemPid
ec.Unlock() ec.Unlock()
select { select {
case <-ctx.Done(): case <-ctx.Done():
log := logrus. log := logrus.
WithField("container", c.ID). WithField("container", ec.Container.ID).
WithField("exec", name) WithField("exec", ec.ID)
log.Debug("Sending KILL signal to container process") log.Debug("Sending KILL signal to container process")
sigCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) sigCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc() defer cancelFunc()
err := daemon.containerd.SignalProcess(sigCtx, c.ID, name, signal.SignalMap["KILL"]) err := ec.Process.Kill(sigCtx, signal.SignalMap["KILL"])
if err != nil { if err != nil {
log.WithError(err).Error("Could not send KILL signal to container process") log.WithError(err).Error("Could not send KILL signal to container process")
} }
@ -308,7 +313,7 @@ func (daemon *Daemon) ContainerExecStart(ctx context.Context, name string, optio
attributes := map[string]string{ attributes := map[string]string{
"execID": ec.ID, "execID": ec.ID,
} }
daemon.LogContainerEventWithAttributes(c, "exec_detach", attributes) daemon.LogContainerEventWithAttributes(ec.Container, "exec_detach", attributes)
} }
} }
return nil return nil
@ -325,7 +330,7 @@ func (daemon *Daemon) execCommandGC() {
for id, config := range daemon.execCommands.Commands() { for id, config := range daemon.execCommands.Commands() {
if config.CanRemove { if config.CanRemove {
cleaned++ cleaned++
daemon.execCommands.Delete(id, config.Pid) daemon.execCommands.Delete(id)
} else { } else {
if _, exists := liveExecCommands[id]; !exists { if _, exists := liveExecCommands[id]; !exists {
config.CanRemove = true config.CanRemove = true

View file

@ -5,15 +5,14 @@ import (
"github.com/containerd/containerd/pkg/apparmor" "github.com/containerd/containerd/pkg/apparmor"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/oci/caps" "github.com/docker/docker/oci/caps"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
) )
func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error { func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error {
if len(ec.User) > 0 { if len(ec.User) > 0 {
var err error var err error
p.User, err = getUser(c, ec.User) p.User, err = getUser(ec.Container, ec.User)
if err != nil { if err != nil {
return err return err
} }
@ -27,9 +26,9 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config
} }
if apparmor.HostSupports() { if apparmor.HostSupports() {
var appArmorProfile string var appArmorProfile string
if c.AppArmorProfile != "" { if ec.Container.AppArmorProfile != "" {
appArmorProfile = c.AppArmorProfile appArmorProfile = ec.Container.AppArmorProfile
} else if c.HostConfig.Privileged { } else if ec.Container.HostConfig.Privileged {
// `docker exec --privileged` does not currently disable AppArmor // `docker exec --privileged` does not currently disable AppArmor
// profiles. Privileged configuration of the container is inherited // profiles. Privileged configuration of the container is inherited
appArmorProfile = unconfinedAppArmorProfile appArmorProfile = unconfinedAppArmorProfile
@ -51,5 +50,5 @@ func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config
p.ApparmorProfile = appArmorProfile p.ApparmorProfile = appArmorProfile
} }
s := &specs.Spec{Process: p} s := &specs.Spec{Process: p}
return WithRlimits(daemon, c)(context.Background(), nil, nil, s) return WithRlimits(daemon, ec.Container)(ctx, nil, nil, s)
} }

View file

@ -4,13 +4,13 @@
package daemon package daemon
import ( import (
"context"
"testing" "testing"
"github.com/containerd/containerd/pkg/apparmor" "github.com/containerd/containerd/pkg/apparmor"
containertypes "github.com/docker/docker/api/types/container" containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/config" "github.com/docker/docker/daemon/config"
"github.com/docker/docker/daemon/exec"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"gotest.tools/v3/assert" "gotest.tools/v3/assert"
) )
@ -79,10 +79,10 @@ func TestExecSetPlatformOptAppArmor(t *testing.T) {
Privileged: tc.privileged, Privileged: tc.privileged,
}, },
} }
ec := &exec.Config{Privileged: execPrivileged} ec := &container.ExecConfig{Container: c, Privileged: execPrivileged}
p := &specs.Process{} p := &specs.Process{}
err := d.execSetPlatformOpt(c, ec, p) err := d.execSetPlatformOpt(context.Background(), ec, p)
assert.NilError(t, err) assert.NilError(t, err)
assert.Equal(t, p.ApparmorProfile, tc.expectedProfile) assert.Equal(t, p.ApparmorProfile, tc.expectedProfile)
}) })

View file

@ -1,13 +1,14 @@
package daemon // import "github.com/docker/docker/daemon" package daemon // import "github.com/docker/docker/daemon"
import ( import (
"context"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/exec"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
) )
func (daemon *Daemon) execSetPlatformOpt(c *container.Container, ec *exec.Config, p *specs.Process) error { func (daemon *Daemon) execSetPlatformOpt(ctx context.Context, ec *container.ExecConfig, p *specs.Process) error {
if c.OS == "windows" { if ec.Container.OS == "windows" {
p.User.Username = ec.User p.User.Username = ec.User
} }
return nil return nil

View file

@ -13,7 +13,6 @@ import (
containertypes "github.com/docker/docker/api/types/container" containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/strslice" "github.com/docker/docker/api/types/strslice"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/exec"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -69,11 +68,10 @@ func (p *cmdProbe) run(ctx context.Context, d *Daemon, cntr *container.Container
cmdSlice = append(getShell(cntr), cmdSlice...) cmdSlice = append(getShell(cntr), cmdSlice...)
} }
entrypoint, args := d.getEntrypointAndArgs(strslice.StrSlice{}, cmdSlice) entrypoint, args := d.getEntrypointAndArgs(strslice.StrSlice{}, cmdSlice)
execConfig := exec.NewConfig() execConfig := container.NewExecConfig(cntr)
execConfig.OpenStdin = false execConfig.OpenStdin = false
execConfig.OpenStdout = true execConfig.OpenStdout = true
execConfig.OpenStderr = true execConfig.OpenStderr = true
execConfig.ContainerID = cntr.ID
execConfig.DetachKeys = []byte{} execConfig.DetachKeys = []byte{}
execConfig.Entrypoint = entrypoint execConfig.Entrypoint = entrypoint
execConfig.Args = args execConfig.Args = args

View file

@ -214,11 +214,15 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err
return nil, errExecNotFound(id) return nil, errExecNotFound(id)
} }
if ctr := daemon.containers.Get(e.ContainerID); ctr == nil { if ctr := daemon.containers.Get(e.Container.ID); ctr == nil {
return nil, errExecNotFound(id) return nil, errExecNotFound(id)
} }
pc := inspectExecProcessConfig(e) pc := inspectExecProcessConfig(e)
var pid int
if e.Process != nil {
pid = int(e.Process.Pid())
}
return &backend.ExecInspect{ return &backend.ExecInspect{
ID: e.ID, ID: e.ID,
@ -229,9 +233,9 @@ func (daemon *Daemon) ContainerExecInspect(id string) (*backend.ExecInspect, err
OpenStdout: e.OpenStdout, OpenStdout: e.OpenStdout,
OpenStderr: e.OpenStderr, OpenStderr: e.OpenStderr,
CanRemove: e.CanRemove, CanRemove: e.CanRemove,
ContainerID: e.ContainerID, ContainerID: e.Container.ID,
DetachKeys: e.DetachKeys, DetachKeys: e.DetachKeys,
Pid: e.Pid, Pid: pid,
}, nil }, nil
} }

View file

@ -5,7 +5,6 @@ import (
"github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/backend"
"github.com/docker/docker/api/types/versions/v1p19" "github.com/docker/docker/api/types/versions/v1p19"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/exec"
) )
// This sets platform-specific fields // This sets platform-specific fields
@ -62,7 +61,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*v1p19.ContainerJSON,
}, nil }, nil
} }
func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig { func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig {
return &backend.ExecProcessConfig{ return &backend.ExecProcessConfig{
Tty: e.Tty, Tty: e.Tty,
Entrypoint: e.Entrypoint, Entrypoint: e.Entrypoint,

View file

@ -6,7 +6,6 @@ import (
containertypes "github.com/docker/docker/api/types/container" containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/config" "github.com/docker/docker/daemon/config"
"github.com/docker/docker/daemon/exec"
"gotest.tools/v3/assert" "gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp" is "gotest.tools/v3/assert/cmp"
) )
@ -16,7 +15,7 @@ func TestGetInspectData(t *testing.T) {
ID: "inspect-me", ID: "inspect-me",
HostConfig: &containertypes.HostConfig{}, HostConfig: &containertypes.HostConfig{},
State: container.NewState(), State: container.NewState(),
ExecCommands: exec.NewStore(), ExecCommands: container.NewExecStore(),
} }
d := &Daemon{ d := &Daemon{

View file

@ -4,7 +4,6 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/backend"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/exec"
) )
// This sets platform-specific fields // This sets platform-specific fields
@ -17,7 +16,7 @@ func (daemon *Daemon) containerInspectPre120(name string) (*types.ContainerJSON,
return daemon.ContainerInspectCurrent(name, false) return daemon.ContainerInspectCurrent(name, false)
} }
func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig { func inspectExecProcessConfig(e *container.ExecConfig) *backend.ExecProcessConfig {
return &backend.ExecProcessConfig{ return &backend.ExecProcessConfig{
Tty: e.Tty, Tty: e.Tty,
Entrypoint: e.Entrypoint, Entrypoint: e.Entrypoint,

View file

@ -9,7 +9,6 @@ import (
containerpkg "github.com/docker/docker/container" containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/errdefs" "github.com/docker/docker/errdefs"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/moby/sys/signal" "github.com/moby/sys/signal"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -65,8 +64,9 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
container.Lock() container.Lock()
defer container.Unlock() defer container.Unlock()
if !container.Running { task, err := container.GetRunningTask()
return errNotRunning(container.ID) if err != nil {
return err
} }
var unpause bool var unpause bool
@ -96,8 +96,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
return nil return nil
} }
err := daemon.containerd.SignalProcess(context.Background(), container.ID, libcontainerdtypes.InitProcessName, stopSignal) if err := task.Kill(context.Background(), stopSignal); err != nil {
if err != nil {
if errdefs.IsNotFound(err) { if errdefs.IsNotFound(err) {
unpause = false unpause = false
logrus.WithError(err).WithField("container", container.ID).WithField("action", "kill").Debug("container kill failed because of 'container not found' or 'no such process'") logrus.WithError(err).WithField("container", container.ID).WithField("action", "kill").Debug("container kill failed because of 'container not found' or 'no such process'")
@ -121,7 +120,7 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, stopSign
if unpause { if unpause {
// above kill signal will be sent once resume is finished // above kill signal will be sent once resume is finished
if err := daemon.containerd.Resume(context.Background(), container.ID); err != nil { if err := task.Resume(context.Background()); err != nil {
logrus.Warnf("Cannot unpause container %s: %s", container.ID, err) logrus.Warnf("Cannot unpause container %s: %s", container.ID, err)
} }
} }

View file

@ -7,6 +7,7 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/errdefs"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types" libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/restartmanager" "github.com/docker/docker/restartmanager"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -25,24 +26,29 @@ func (daemon *Daemon) setStateCounter(c *container.Container) {
} }
func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontainerdtypes.EventInfo) error { func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontainerdtypes.EventInfo) error {
var exitStatus container.ExitStatus
c.Lock() c.Lock()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) tsk, ok := c.Task()
ec, et, err := daemon.containerd.DeleteTask(ctx, c.ID) if ok {
cancel() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
if err != nil { es, err := tsk.Delete(ctx)
logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd") cancel()
if err != nil {
logrus.WithError(err).WithField("container", c.ID).Warnf("failed to delete container from containerd")
} else {
exitStatus = container.ExitStatus{
ExitCode: int(es.ExitCode()),
ExitedAt: es.ExitTime(),
}
}
} }
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
c.StreamConfig.Wait(ctx) c.StreamConfig.Wait(ctx)
cancel() cancel()
c.Reset(false) c.Reset(false)
exitStatus := container.ExitStatus{
ExitCode: int(ec),
ExitedAt: et,
}
if e != nil { if e != nil {
exitStatus.ExitCode = int(e.ExitCode) exitStatus.ExitCode = int(e.ExitCode)
exitStatus.ExitedAt = e.ExitedAt exitStatus.ExitedAt = e.ExitedAt
@ -53,7 +59,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
daemonShutdown := daemon.IsShuttingDown() daemonShutdown := daemon.IsShuttingDown()
execDuration := time.Since(c.StartedAt) execDuration := time.Since(c.StartedAt)
restart, wait, err := c.RestartManager().ShouldRestart(ec, daemonShutdown || c.HasBeenManuallyStopped, execDuration) restart, wait, err := c.RestartManager().ShouldRestart(uint32(exitStatus.ExitCode), daemonShutdown || c.HasBeenManuallyStopped, execDuration)
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
WithField("container", c.ID). WithField("container", c.ID).
@ -70,7 +76,7 @@ func (daemon *Daemon) handleContainerExit(c *container.Container, e *libcontaine
// restarted if/when the container is started again // restarted if/when the container is started again
daemon.stopHealthchecks(c) daemon.stopHealthchecks(c)
attributes := map[string]string{ attributes := map[string]string{
"exitCode": strconv.Itoa(int(ec)), "exitCode": strconv.Itoa(exitStatus.ExitCode),
} }
daemon.Cleanup(c) daemon.Cleanup(c)
@ -170,9 +176,18 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
// remove the exec command from the container's store only and not the // remove the exec command from the container's store only and not the
// daemon's store so that the exec command can be inspected. // daemon's store so that the exec command can be inspected.
c.ExecCommands.Delete(execConfig.ID, execConfig.Pid) c.ExecCommands.Delete(execConfig.ID)
exitCode = ec exitCode = ec
go func() {
if _, err := execConfig.Process.Delete(context.Background()); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
}()
} }
attributes := map[string]string{ attributes := map[string]string{
"execID": ei.ProcessID, "execID": ei.ProcessID,
@ -185,7 +200,27 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
// This is here to handle start not generated by docker // This is here to handle start not generated by docker
if !c.Running { if !c.Running {
c.SetRunning(int(ei.Pid), false) ctr, err := daemon.containerd.LoadContainer(context.Background(), c.ID)
if err != nil {
if errdefs.IsNotFound(err) {
// The container was started by not-docker and so could have been deleted by
// not-docker before we got around to loading it from containerd.
logrus.WithField("container", c.ID).WithError(err).
Debug("could not load containerd container for start event")
return nil
}
return err
}
tsk, err := ctr.Task(context.Background())
if err != nil {
if errdefs.IsNotFound(err) {
logrus.WithField("container", c.ID).WithError(err).
Debug("failed to load task for externally-started container")
return nil
}
return err
}
c.SetRunning(ctr, tsk, false)
c.HasBeenManuallyStopped = false c.HasBeenManuallyStopped = false
c.HasBeenStartedBefore = true c.HasBeenStartedBefore = true
daemon.setStateCounter(c) daemon.setStateCounter(c)

View file

@ -24,8 +24,9 @@ func (daemon *Daemon) containerPause(container *container.Container) error {
defer container.Unlock() defer container.Unlock()
// We cannot Pause the container which is not running // We cannot Pause the container which is not running
if !container.Running { tsk, err := container.GetRunningTask()
return errNotRunning(container.ID) if err != nil {
return err
} }
// We cannot Pause the container which is already paused // We cannot Pause the container which is already paused
@ -38,8 +39,8 @@ func (daemon *Daemon) containerPause(container *container.Container) error {
return errContainerIsRestarting(container.ID) return errContainerIsRestarting(container.ID)
} }
if err := daemon.containerd.Pause(context.Background(), container.ID); err != nil { if err := tsk.Pause(context.Background()); err != nil {
return fmt.Errorf("Cannot pause container %s: %s", container.ID, err) return fmt.Errorf("cannot pause container %s: %s", container.ID, err)
} }
container.Paused = true container.Paused = true

View file

@ -4,8 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"time" "time"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
) )
// ContainerResize changes the size of the TTY of the process running // ContainerResize changes the size of the TTY of the process running
@ -16,11 +14,14 @@ func (daemon *Daemon) ContainerResize(name string, height, width int) error {
return err return err
} }
if !container.IsRunning() { container.Lock()
return errNotRunning(container.ID) tsk, err := container.GetRunningTask()
container.Unlock()
if err != nil {
return err
} }
if err = daemon.containerd.ResizeTerminal(context.Background(), container.ID, libcontainerdtypes.InitProcessName, width, height); err == nil { if err = tsk.Resize(context.Background(), uint32(width), uint32(height)); err == nil {
attributes := map[string]string{ attributes := map[string]string{
"height": fmt.Sprintf("%d", height), "height": fmt.Sprintf("%d", height),
"width": fmt.Sprintf("%d", width), "width": fmt.Sprintf("%d", width),
@ -46,7 +47,7 @@ func (daemon *Daemon) ContainerExecResize(name string, height, width int) error
select { select {
case <-ec.Started: case <-ec.Started:
return daemon.containerd.ResizeTerminal(context.Background(), ec.ContainerID, ec.ID, width, height) return ec.Process.Resize(context.Background(), uint32(width), uint32(height))
case <-timeout.C: case <-timeout.C:
return fmt.Errorf("timeout waiting for exec session ready") return fmt.Errorf("timeout waiting for exec session ready")
} }

View file

@ -8,7 +8,7 @@ import (
"testing" "testing"
"github.com/docker/docker/container" "github.com/docker/docker/container"
"github.com/docker/docker/daemon/exec" "github.com/docker/docker/libcontainerd/types"
"gotest.tools/v3/assert" "gotest.tools/v3/assert"
) )
@ -16,32 +16,28 @@ import (
func TestExecResizeNoSuchExec(t *testing.T) { func TestExecResizeNoSuchExec(t *testing.T) {
n := "TestExecResize" n := "TestExecResize"
d := &Daemon{ d := &Daemon{
execCommands: exec.NewStore(), execCommands: container.NewExecStore(),
} }
c := &container.Container{ c := &container.Container{
ExecCommands: exec.NewStore(), ExecCommands: container.NewExecStore(),
} }
ec := &exec.Config{ ec := &container.ExecConfig{
ID: n, ID: n,
Container: c,
} }
d.registerExecCommand(c, ec) d.registerExecCommand(c, ec)
err := d.ContainerExecResize("nil", 24, 8) err := d.ContainerExecResize("nil", 24, 8)
assert.ErrorContains(t, err, "No such exec instance") assert.ErrorContains(t, err, "No such exec instance")
} }
type execResizeMockContainerdClient struct { type execResizeMockProcess struct {
MockContainerdClient types.Process
ProcessID string Width, Height int
ContainerID string
Width int
Height int
} }
func (c *execResizeMockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error { func (p *execResizeMockProcess) Resize(ctx context.Context, width, height uint32) error {
c.ProcessID = processID p.Width = int(width)
c.ContainerID = containerID p.Height = int(height)
c.Width = width
c.Height = height
return nil return nil
} }
@ -50,30 +46,29 @@ func TestExecResize(t *testing.T) {
n := "TestExecResize" n := "TestExecResize"
width := 24 width := 24
height := 8 height := 8
ec := &exec.Config{ mp := &execResizeMockProcess{}
ID: n,
ContainerID: n,
Started: make(chan struct{}),
}
close(ec.Started)
mc := &execResizeMockContainerdClient{}
d := &Daemon{ d := &Daemon{
execCommands: exec.NewStore(), execCommands: container.NewExecStore(),
containerd: mc,
containers: container.NewMemoryStore(), containers: container.NewMemoryStore(),
} }
c := &container.Container{ c := &container.Container{
ExecCommands: exec.NewStore(), ID: n,
ExecCommands: container.NewExecStore(),
State: &container.State{Running: true}, State: &container.State{Running: true},
} }
ec := &container.ExecConfig{
ID: n,
Container: c,
Process: mp,
Started: make(chan struct{}),
}
close(ec.Started)
d.containers.Add(n, c) d.containers.Add(n, c)
d.registerExecCommand(c, ec) d.registerExecCommand(c, ec)
err := d.ContainerExecResize(n, height, width) err := d.ContainerExecResize(n, height, width)
assert.NilError(t, err) assert.NilError(t, err)
assert.Equal(t, mc.Width, width) assert.Equal(t, mp.Width, width)
assert.Equal(t, mc.Height, height) assert.Equal(t, mp.Height, height)
assert.Equal(t, mc.ProcessID, n)
assert.Equal(t, mc.ContainerID, n)
} }
// This test is to make sure that when exec context is not ready, a timeout error should happen. // This test is to make sure that when exec context is not ready, a timeout error should happen.
@ -82,21 +77,22 @@ func TestExecResizeTimeout(t *testing.T) {
n := "TestExecResize" n := "TestExecResize"
width := 24 width := 24
height := 8 height := 8
ec := &exec.Config{ mp := &execResizeMockProcess{}
ID: n,
ContainerID: n,
Started: make(chan struct{}),
}
mc := &execResizeMockContainerdClient{}
d := &Daemon{ d := &Daemon{
execCommands: exec.NewStore(), execCommands: container.NewExecStore(),
containerd: mc,
containers: container.NewMemoryStore(), containers: container.NewMemoryStore(),
} }
c := &container.Container{ c := &container.Container{
ExecCommands: exec.NewStore(), ID: n,
ExecCommands: container.NewExecStore(),
State: &container.State{Running: true}, State: &container.State{Running: true},
} }
ec := &container.ExecConfig{
ID: n,
Container: c,
Process: mp,
Started: make(chan struct{}),
}
d.containers.Add(n, c) d.containers.Add(n, c)
d.registerExecCommand(c, ec) d.registerExecCommand(c, ec)
err := d.ContainerExecResize(n, height, width) err := d.ContainerExecResize(n, height, width)

View file

@ -178,16 +178,12 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
ctx := context.TODO() ctx := context.TODO()
err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions) ctr, err := daemon.containerd.NewContainer(ctx, container.ID, spec, shim, createOptions)
if err != nil { if err != nil {
if errdefs.IsConflict(err) { if errdefs.IsConflict(err) {
logrus.WithError(err).WithField("container", container.ID).Error("Container not cleaned up from containerd from previous run") logrus.WithError(err).WithField("container", container.ID).Error("Container not cleaned up from containerd from previous run")
// best effort to clean up old container object daemon.cleanupStaleContainer(ctx, container.ID)
daemon.containerd.DeleteTask(ctx, container.ID) ctr, err = daemon.containerd.NewContainer(ctx, container.ID, spec, shim, createOptions)
if err := daemon.containerd.Delete(ctx, container.ID); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("container", container.ID).Error("Error cleaning up stale containerd container object")
}
err = daemon.containerd.Create(ctx, container.ID, spec, shim, createOptions)
} }
if err != nil { if err != nil {
return translateContainerdStartErr(container.Path, container.SetExitCode, err) return translateContainerdStartErr(container.Path, container.SetExitCode, err)
@ -195,11 +191,11 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
} }
// TODO(mlaventure): we need to specify checkpoint options here // TODO(mlaventure): we need to specify checkpoint options here
pid, err := daemon.containerd.Start(context.Background(), container.ID, checkpointDir, tsk, err := ctr.Start(ctx, checkpointDir,
container.StreamConfig.Stdin() != nil || container.Config.Tty, container.StreamConfig.Stdin() != nil || container.Config.Tty,
container.InitializeStdio) container.InitializeStdio)
if err != nil { if err != nil {
if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil { if err := ctr.Delete(context.Background()); err != nil {
logrus.WithError(err).WithField("container", container.ID). logrus.WithError(err).WithField("container", container.ID).
Error("failed to delete failed start container") Error("failed to delete failed start container")
} }
@ -207,7 +203,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
} }
container.HasBeenManuallyRestarted = false container.HasBeenManuallyRestarted = false
container.SetRunning(pid, true) container.SetRunning(ctr, tsk, true)
container.HasBeenStartedBefore = true container.HasBeenStartedBefore = true
daemon.setStateCounter(container) daemon.setStateCounter(container)
@ -224,9 +220,42 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
return nil return nil
} }
func (daemon *Daemon) cleanupStaleContainer(ctx context.Context, id string) {
// best effort to clean up old container object
log := logrus.WithContext(ctx).WithField("container", id)
ctr, err := daemon.containerd.LoadContainer(ctx, id)
if err != nil {
// Log an error no matter the kind. A container existed with the
// ID, so a NotFound error would be an exceptional situation
// worth logging.
log.WithError(err).Error("Error loading stale containerd container object")
return
}
if tsk, err := ctr.Task(ctx); err != nil {
if !errdefs.IsNotFound(err) {
log.WithError(err).Error("Error loading stale containerd task object")
}
} else {
if err := tsk.ForceDelete(ctx); err != nil {
log.WithError(err).Error("Error cleaning up stale containerd task object")
}
}
if err := ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
log.WithError(err).Error("Error cleaning up stale containerd container object")
}
}
// Cleanup releases any network resources allocated to the container along with any rules // Cleanup releases any network resources allocated to the container along with any rules
// around how containers are linked together. It also unmounts the container's root filesystem. // around how containers are linked together. It also unmounts the container's root filesystem.
func (daemon *Daemon) Cleanup(container *container.Container) { func (daemon *Daemon) Cleanup(container *container.Container) {
// Microsoft HCS containers get in a bad state if host resources are
// released while the container still exists.
if ctr, ok := container.C8dContainer(); ok {
if err := ctr.Delete(context.Background()); err != nil {
logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err)
}
}
daemon.releaseNetwork(container) daemon.releaseNetwork(container)
if err := container.UnmountIpcMount(); err != nil { if err := container.UnmountIpcMount(); err != nil {
@ -260,8 +289,4 @@ func (daemon *Daemon) Cleanup(container *container.Container) {
} }
container.CancelAttachContext() container.CancelAttachContext()
if err := daemon.containerd.Delete(context.Background(), container.ID); err != nil {
logrus.Errorf("%s cleanup: failed to delete container from containerd: %v", container.ID, err)
}
} }

View file

@ -14,6 +14,7 @@ import (
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/errdefs" "github.com/docker/docker/errdefs"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -150,19 +151,32 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*container.Conta
return nil, err return nil, err
} }
if !ctr.IsRunning() { tsk, err := func() (libcontainerdtypes.Task, error) {
return nil, errNotRunning(ctr.ID) ctr.Lock()
} defer ctr.Unlock()
if ctr.IsRestarting() { tsk, err := ctr.GetRunningTask()
return nil, errContainerIsRestarting(ctr.ID) if err != nil {
} return nil, err
}
procs, err := daemon.containerd.ListPids(context.Background(), ctr.ID) if ctr.Restarting {
return nil, errContainerIsRestarting(ctr.ID)
}
return tsk, nil
}()
if err != nil { if err != nil {
return nil, err return nil, err
} }
infos, err := tsk.Pids(context.Background())
if err != nil {
return nil, err
}
procs := make([]uint32, len(infos))
for i, p := range infos {
procs[i] = p.Pid
}
args := strings.Split(psArgs, " ") args := strings.Split(psArgs, " ")
pids := psPidsArg(procs) pids := psPidsArg(procs)
output, err := exec.Command("ps", append(args, pids)...).Output() output, err := exec.Command("ps", append(args, pids)...).Output()

View file

@ -7,6 +7,7 @@ import (
"time" "time"
containertypes "github.com/docker/docker/api/types/container" containertypes "github.com/docker/docker/api/types/container"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
units "github.com/docker/go-units" units "github.com/docker/go-units"
) )
@ -36,15 +37,21 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*containertypes.
return nil, err return nil, err
} }
if !container.IsRunning() { task, err := func() (libcontainerdtypes.Task, error) {
return nil, errNotRunning(container.ID) container.Lock()
} defer container.Unlock()
if container.IsRestarting() { task, err := container.GetRunningTask()
return nil, errContainerIsRestarting(container.ID) if err != nil {
} return nil, err
}
if container.Restarting {
return nil, errContainerIsRestarting(container.ID)
}
return task, nil
}()
s, err := daemon.containerd.Summary(context.Background(), container.ID) s, err := task.Summary(context.Background())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -26,8 +26,12 @@ func (daemon *Daemon) containerUnpause(ctr *container.Container) error {
if !ctr.Paused { if !ctr.Paused {
return fmt.Errorf("Container %s is not paused", ctr.ID) return fmt.Errorf("Container %s is not paused", ctr.ID)
} }
tsk, err := ctr.GetRunningTask()
if err != nil {
return err
}
if err := daemon.containerd.Resume(context.Background(), ctr.ID); err != nil { if err := tsk.Resume(context.Background()); err != nil {
return fmt.Errorf("Cannot unpause container %s: %s", ctr.ID, err) return fmt.Errorf("Cannot unpause container %s: %s", ctr.ID, err)
} }

View file

@ -74,19 +74,28 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro
ctr.UpdateMonitor(hostConfig.RestartPolicy) ctr.UpdateMonitor(hostConfig.RestartPolicy)
} }
defer daemon.LogContainerEvent(ctr, "update")
// If container is not running, update hostConfig struct is enough, // If container is not running, update hostConfig struct is enough,
// resources will be updated when the container is started again. // resources will be updated when the container is started again.
// If container is running (including paused), we need to update configs // If container is running (including paused), we need to update configs
// to the real world. // to the real world.
if ctr.IsRunning() && !ctr.IsRestarting() { ctr.Lock()
if err := daemon.containerd.UpdateResources(context.Background(), ctr.ID, toContainerdResources(hostConfig.Resources)); err != nil { isRestarting := ctr.Restarting
restoreConfig = true tsk, err := ctr.GetRunningTask()
// TODO: it would be nice if containerd responded with better errors here so we can classify this better. ctr.Unlock()
return errCannotUpdate(ctr.ID, errdefs.System(err)) if errdefs.IsConflict(err) || isRestarting {
} return nil
}
if err != nil {
return err
} }
daemon.LogContainerEvent(ctr, "update") if err := tsk.UpdateResources(context.TODO(), toContainerdResources(hostConfig.Resources)); err != nil {
restoreConfig = true
// TODO: it would be nice if containerd responded with better errors here so we can classify this better.
return errCannotUpdate(ctr.ID, errdefs.System(err))
}
return nil return nil
} }

View file

@ -1,74 +0,0 @@
//go:build linux
// +build linux
package daemon
import (
"context"
"syscall"
"time"
"github.com/containerd/containerd"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
type mockProcess struct {
}
func (m *mockProcess) Delete(_ context.Context) (uint32, time.Time, error) {
return 0, time.Time{}, nil
}
// Mock containerd client implementation, for unit tests.
type MockContainerdClient struct {
}
func (c *MockContainerdClient) Version(ctx context.Context) (containerd.Version, error) {
return containerd.Version{}, nil
}
func (c *MockContainerdClient) Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
return false, 0, &mockProcess{}, nil
}
func (c *MockContainerdClient) Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error {
return nil
}
func (c *MockContainerdClient) Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error) {
return 0, nil
}
func (c *MockContainerdClient) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error {
return nil
}
func (c *MockContainerdClient) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
return 0, nil
}
func (c *MockContainerdClient) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
return nil
}
func (c *MockContainerdClient) CloseStdin(ctx context.Context, containerID, processID string) error {
return nil
}
func (c *MockContainerdClient) Pause(ctx context.Context, containerID string) error { return nil }
func (c *MockContainerdClient) Resume(ctx context.Context, containerID string) error { return nil }
func (c *MockContainerdClient) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
return nil, nil
}
func (c *MockContainerdClient) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
return nil, nil
}
func (c *MockContainerdClient) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
return nil, nil
}
func (c *MockContainerdClient) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
return 0, time.Time{}, nil
}
func (c *MockContainerdClient) Delete(ctx context.Context, containerID string) error { return nil }
func (c *MockContainerdClient) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
return "null", nil
}
func (c *MockContainerdClient) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error {
return nil
}
func (c *MockContainerdClient) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
return nil
}

File diff suppressed because it is too large Load diff

View file

@ -38,7 +38,3 @@ func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteClo
return nil return nil
}) })
} }
func (p *process) Cleanup() error {
return nil
}

View file

@ -45,22 +45,34 @@ type client struct {
logger *logrus.Entry logger *logrus.Entry
ns string ns string
backend libcontainerdtypes.Backend backend libcontainerdtypes.Backend
eventQ queue.Queue eventQ queue.Queue
v2runcoptionsMu sync.Mutex }
// v2runcoptions is used for copying options specified on Create() to Start()
v2runcoptions map[string]v2runcoptions.Options type container struct {
client *client
c8dCtr containerd.Container
v2runcoptions *v2runcoptions.Options
}
type task struct {
containerd.Task
ctr *container
}
type process struct {
containerd.Process
} }
// NewClient creates a new libcontainerd client from a containerd client // NewClient creates a new libcontainerd client from a containerd client
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) { func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
c := &client{ c := &client{
client: cli, client: cli,
stateDir: stateDir, stateDir: stateDir,
logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns), logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
ns: ns, ns: ns,
backend: b, backend: b,
v2runcoptions: make(map[string]v2runcoptions.Options),
} }
go c.processEventStream(ctx, ns) go c.processEventStream(ctx, ns)
@ -72,58 +84,36 @@ func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return c.client.Version(ctx) return c.client.Version(ctx)
} }
// Restore loads the containerd container. func (c *container) newTask(t containerd.Task) *task {
// It should not be called concurrently with any other operation for the given ID. return &task{Task: t, ctr: c}
func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) { }
func (c *container) AttachTask(ctx context.Context, attachStdio libcontainerdtypes.StdioCallback) (_ libcontainerdtypes.Task, err error) {
var dio *cio.DirectIO var dio *cio.DirectIO
defer func() { defer func() {
if err != nil && dio != nil { if err != nil && dio != nil {
dio.Cancel() dio.Cancel()
dio.Close() dio.Close()
} }
err = wrapError(err)
}() }()
ctr, err := c.client.LoadContainer(ctx, id)
if err != nil {
return false, -1, nil, errors.WithStack(wrapError(err))
}
attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) { attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
// dio must be assigned to the previously defined dio for the defer above // dio must be assigned to the previously defined dio for the defer above
// to handle cleanup // to handle cleanup
dio, err = c.newDirectIO(ctx, fifos) dio, err = c.client.newDirectIO(ctx, fifos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return attachStdio(dio) return attachStdio(dio)
} }
t, err := ctr.Task(ctx, attachIO) t, err := c.c8dCtr.Task(ctx, attachIO)
if err != nil && !containerderrors.IsNotFound(err) { if err != nil {
return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container") return nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
} }
return c.newTask(t), nil
if t != nil {
s, err := t.Status(ctx)
if err != nil {
return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status")
}
alive = s.Status != containerd.Stopped
pid = int(t.Pid())
}
c.logger.WithFields(logrus.Fields{
"container": id,
"alive": alive,
"pid": pid,
}).Debug("restored container")
return alive, pid, &restoredProcess{
p: t,
}, nil
} }
func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error { func (c *client) NewContainer(ctx context.Context, id string, ociSpec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (libcontainerdtypes.Container, error) {
bdir := c.bundleDir(id) bdir := c.bundleDir(id)
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created") c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
@ -134,44 +124,43 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, shi
} }
opts = append(opts, newOpts...) opts = append(opts, newOpts...)
_, err := c.client.NewContainer(ctx, id, opts...) ctr, err := c.client.NewContainer(ctx, id, opts...)
if err != nil { if err != nil {
if containerderrors.IsAlreadyExists(err) { if containerderrors.IsAlreadyExists(err) {
return errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
} }
return wrapError(err) return nil, wrapError(err)
}
created := container{
client: c,
c8dCtr: ctr,
} }
if x, ok := runtimeOptions.(*v2runcoptions.Options); ok { if x, ok := runtimeOptions.(*v2runcoptions.Options); ok {
c.v2runcoptionsMu.Lock() created.v2runcoptions = x
c.v2runcoptions[id] = *x
c.v2runcoptionsMu.Unlock()
} }
return nil return &created, nil
} }
// Start create and start a task for the specified containerd id // Start create and start a task for the specified containerd id
func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { func (c *container) Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Task, error) {
ctr, err := c.getContainer(ctx, id)
if err != nil {
return -1, err
}
var ( var (
cp *types.Descriptor cp *types.Descriptor
t containerd.Task t containerd.Task
rio cio.IO rio cio.IO
stdinCloseSync = make(chan struct{}) stdinCloseSync = make(chan containerd.Process, 1)
) )
if checkpointDir != "" { if checkpointDir != "" {
// write checkpoint to the content store // write checkpoint to the content store
tar := archive.Diff(ctx, "", checkpointDir) tar := archive.Diff(ctx, "", checkpointDir)
cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar) cp, err := c.client.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
// remove the checkpoint when we're done // remove the checkpoint when we're done
defer func() { defer func() {
if cp != nil { if cp != nil {
err := c.client.ContentStore().Delete(context.Background(), cp.Digest) err := c.client.client.ContentStore().Delete(ctx, cp.Digest)
if err != nil { if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{ c.client.logger.WithError(err).WithFields(logrus.Fields{
"ref": checkpointDir, "ref": checkpointDir,
"digest": cp.Digest, "digest": cp.Digest,
}).Warnf("failed to delete temporary checkpoint entry") }).Warnf("failed to delete temporary checkpoint entry")
@ -179,23 +168,27 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
} }
}() }()
if err := tar.Close(); err != nil { if err := tar.Close(); err != nil {
return -1, errors.Wrap(err, "failed to close checkpoint tar stream") return nil, errors.Wrap(err, "failed to close checkpoint tar stream")
} }
if err != nil { if err != nil {
return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd") return nil, errors.Wrapf(err, "failed to upload checkpoint to containerd")
} }
} }
spec, err := ctr.Spec(ctx) // Optimization: assume the relevant metadata has not changed in the
// moment since the container was created. Elide redundant RPC requests
// to refresh the metadata separately for spec and labels.
md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil { if err != nil {
return -1, errors.Wrap(err, "failed to retrieve spec") return nil, errors.Wrap(err, "failed to retrieve metadata")
} }
labels, err := ctr.Labels(ctx) bundle := md.Labels[DockerContainerBundlePath]
if err != nil {
return -1, errors.Wrap(err, "failed to retrieve labels") var spec specs.Spec
if err := json.Unmarshal(md.Spec.GetValue(), &spec); err != nil {
return nil, errors.Wrap(err, "failed to retrieve spec")
} }
bundle := labels[DockerContainerBundlePath] uid, gid := getSpecUser(&spec)
uid, gid := getSpecUser(spec)
taskOpts := []containerd.NewTaskOpts{ taskOpts := []containerd.NewTaskOpts{
func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error { func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
@ -206,10 +199,8 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
if runtime.GOOS != "windows" { if runtime.GOOS != "windows" {
taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error { taskOpts = append(taskOpts, func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
c.v2runcoptionsMu.Lock() if c.v2runcoptions != nil {
opts, ok := c.v2runcoptions[id] opts := *c.v2runcoptions
c.v2runcoptionsMu.Unlock()
if ok {
opts.IoUid = uint32(uid) opts.IoUid = uint32(uid)
opts.IoGid = uint32(gid) opts.IoGid = uint32(gid)
info.Options = &opts info.Options = &opts
@ -217,14 +208,14 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
return nil return nil
}) })
} else { } else {
taskOpts = append(taskOpts, withLogLevel(c.logger.Level)) taskOpts = append(taskOpts, withLogLevel(c.client.logger.Level))
} }
t, err = ctr.NewTask(ctx, t, err = c.c8dCtr.NewTask(ctx,
func(id string) (cio.IO, error) { func(id string) (cio.IO, error) {
fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal) fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)
rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio) rio, err = c.createIO(fifos, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
return rio, err return rio, err
}, },
taskOpts..., taskOpts...,
@ -235,21 +226,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
rio.Cancel() rio.Cancel()
rio.Close() rio.Close()
} }
return -1, wrapError(err) return nil, errors.Wrap(wrapError(err), "failed to create task for container")
} }
// Signal c.createIO that it can call CloseIO // Signal c.createIO that it can call CloseIO
close(stdinCloseSync) stdinCloseSync <- t
if err := t.Start(ctx); err != nil { if err := t.Start(ctx); err != nil {
if _, err := t.Delete(ctx); err != nil { if _, err := t.Delete(ctx); err != nil {
c.logger.WithError(err).WithField("container", id). c.client.logger.WithError(err).WithField("container", c.c8dCtr.ID()).
Error("failed to delete task after fail start") Error("failed to delete task after fail start")
} }
return -1, wrapError(err) return nil, wrapError(err)
} }
return int(t.Pid()), nil return c.newTask(t), nil
} }
// Exec creates exec process. // Exec creates exec process.
@ -259,31 +250,21 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
// for the container main process, the stdin fifo will be created in Create not // for the container main process, the stdin fifo will be created in Create not
// the Start call. stdinCloseSync channel should be closed after Start exec // the Start call. stdinCloseSync channel should be closed after Start exec
// process. // process.
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) { func (t *task) Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (libcontainerdtypes.Process, error) {
ctr, err := c.getContainer(ctx, containerID)
if err != nil {
return -1, err
}
t, err := ctr.Task(ctx, nil)
if err != nil {
if containerderrors.IsNotFound(err) {
return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
}
return -1, wrapError(err)
}
var ( var (
p containerd.Process p containerd.Process
rio cio.IO rio cio.IO
stdinCloseSync = make(chan struct{}) stdinCloseSync = make(chan containerd.Process, 1)
) )
labels, err := ctr.Labels(ctx) // Optimization: assume the DockerContainerBundlePath label has not been
// updated since the container metadata was last loaded/refreshed.
md, err := t.ctr.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
if err != nil { if err != nil {
return -1, wrapError(err) return nil, wrapError(err)
} }
fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal) fifos := newFIFOSet(md.Labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
defer func() { defer func() {
if err != nil { if err != nil {
@ -294,22 +275,22 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
} }
}() }()
p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { p, err = t.Task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio) rio, err = t.ctr.createIO(fifos, processID, stdinCloseSync, attachStdio)
return rio, err return rio, err
}) })
if err != nil { if err != nil {
close(stdinCloseSync) close(stdinCloseSync)
if containerderrors.IsAlreadyExists(err) { if containerderrors.IsAlreadyExists(err) {
return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use"))) return nil, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
} }
return -1, wrapError(err) return nil, wrapError(err)
} }
// Signal c.createIO that it can call CloseIO // Signal c.createIO that it can call CloseIO
// //
// the stdin of exec process will be created after p.Start in containerd // the stdin of exec process will be created after p.Start in containerd
defer close(stdinCloseSync) defer func() { stdinCloseSync <- p }()
if err = p.Start(ctx); err != nil { if err = p.Start(ctx); err != nil {
// use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure // use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
@ -318,62 +299,29 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
defer cancel() defer cancel()
p.Delete(ctx) p.Delete(ctx)
return -1, wrapError(err) return nil, wrapError(err)
} }
return int(p.Pid()), nil return process{p}, nil
} }
func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error { func (t *task) Kill(ctx context.Context, signal syscall.Signal) error {
p, err := c.getProcess(ctx, containerID, processID) return wrapError(t.Task.Kill(ctx, signal))
if err != nil {
return err
}
return wrapError(p.Kill(ctx, signal))
} }
func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error { func (p process) Kill(ctx context.Context, signal syscall.Signal) error {
p, err := c.getProcess(ctx, containerID, processID) return wrapError(p.Process.Kill(ctx, signal))
if err != nil {
return err
}
return p.Resize(ctx, uint32(width), uint32(height))
} }
func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error { func (t *task) Pause(ctx context.Context) error {
p, err := c.getProcess(ctx, containerID, processID) return wrapError(t.Task.Pause(ctx))
if err != nil {
return err
}
return p.CloseIO(ctx, containerd.WithStdinCloser)
} }
func (c *client) Pause(ctx context.Context, containerID string) error { func (t *task) Resume(ctx context.Context) error {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) return wrapError(t.Task.Resume(ctx))
if err != nil {
return err
}
return wrapError(p.(containerd.Task).Pause(ctx))
} }
func (c *client) Resume(ctx context.Context, containerID string) error { func (t *task) Stats(ctx context.Context) (*libcontainerdtypes.Stats, error) {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) m, err := t.Metrics(ctx)
if err != nil {
return err
}
return p.(containerd.Task).Resume(ctx)
}
func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return nil, err
}
m, err := p.(containerd.Task).Metrics(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -385,32 +333,8 @@ func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdt
return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil
} }
func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) { func (t *task) Summary(ctx context.Context) ([]libcontainerdtypes.Summary, error) {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) pis, err := t.Pids(ctx)
if err != nil {
return nil, err
}
pis, err := p.(containerd.Task).Pids(ctx)
if err != nil {
return nil, err
}
var pids []uint32
for _, i := range pis {
pids = append(pids, i.Pid)
}
return pids, nil
}
func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return nil, err
}
pis, err := p.(containerd.Task).Pids(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -431,54 +355,31 @@ func (c *client) Summary(ctx context.Context, containerID string) ([]libcontaine
return infos, nil return infos, nil
} }
type restoredProcess struct { func (t *task) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
p containerd.Process s, err := t.Task.Delete(ctx)
return s, wrapError(err)
} }
func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) { func (p process) Delete(ctx context.Context) (*containerd.ExitStatus, error) {
if p.p == nil { s, err := p.Process.Delete(ctx)
return 255, time.Now(), nil return s, wrapError(err)
}
status, err := p.p.Delete(ctx)
if err != nil {
return 255, time.Now(), nil
}
return status.ExitCode(), status.ExitTime(), nil
} }
func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) { func (c *container) Delete(ctx context.Context) error {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) // Optimization: assume the DockerContainerBundlePath label has not been
if err != nil { // updated since the container metadata was last loaded/refreshed.
return 255, time.Now(), nil md, err := c.c8dCtr.Info(ctx, containerd.WithoutRefreshedMetadata)
}
status, err := p.Delete(ctx)
if err != nil {
return 255, time.Now(), nil
}
return status.ExitCode(), status.ExitTime(), nil
}
func (c *client) Delete(ctx context.Context, containerID string) error {
ctr, err := c.getContainer(ctx, containerID)
if err != nil { if err != nil {
return err return err
} }
labels, err := ctr.Labels(ctx) bundle := md.Labels[DockerContainerBundlePath]
if err != nil { if err := c.c8dCtr.Delete(ctx); err != nil {
return err
}
bundle := labels[DockerContainerBundlePath]
if err := ctr.Delete(ctx); err != nil {
return wrapError(err) return wrapError(err)
} }
c.v2runcoptionsMu.Lock()
delete(c.v2runcoptions, containerID)
c.v2runcoptionsMu.Unlock()
if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" { if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
if err := os.RemoveAll(bundle); err != nil { if err := os.RemoveAll(bundle); err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{ c.client.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{
"container": containerID, "container": c.c8dCtr.ID(),
"bundle": bundle, "bundle": bundle,
}).Error("failed to remove state dir") }).Error("failed to remove state dir")
} }
@ -486,28 +387,25 @@ func (c *client) Delete(ctx context.Context, containerID string) error {
return nil return nil
} }
func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) { func (t *task) ForceDelete(ctx context.Context) error {
t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) _, err := t.Task.Delete(ctx, containerd.WithProcessKill)
if err != nil { return wrapError(err)
return containerd.Unknown, err
}
s, err := t.Status(ctx)
if err != nil {
return containerd.Unknown, wrapError(err)
}
return s.Status, nil
} }
func (c *client) getCheckpointOptions(id string, exit bool) containerd.CheckpointTaskOpts { func (t *task) Status(ctx context.Context) (containerd.Status, error) {
s, err := t.Task.Status(ctx)
return s, wrapError(err)
}
func (p process) Status(ctx context.Context) (containerd.Status, error) {
s, err := p.Process.Status(ctx)
return s, wrapError(err)
}
func (c *container) getCheckpointOptions(exit bool) containerd.CheckpointTaskOpts {
return func(r *containerd.CheckpointTaskInfo) error { return func(r *containerd.CheckpointTaskInfo) error {
if r.Options == nil { if r.Options == nil && c.v2runcoptions != nil {
c.v2runcoptionsMu.Lock() r.Options = &v2runcoptions.CheckpointOptions{}
_, ok := c.v2runcoptions[id]
c.v2runcoptionsMu.Unlock()
if ok {
r.Options = &v2runcoptions.CheckpointOptions{Exit: exit}
}
return nil
} }
switch opts := r.Options.(type) { switch opts := r.Options.(type) {
@ -519,27 +417,21 @@ func (c *client) getCheckpointOptions(id string, exit bool) containerd.Checkpoin
} }
} }
func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error { func (t *task) CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName) img, err := t.Task.Checkpoint(ctx, t.ctr.getCheckpointOptions(exit))
if err != nil {
return err
}
opts := []containerd.CheckpointTaskOpts{c.getCheckpointOptions(containerID, exit)}
img, err := p.(containerd.Task).Checkpoint(ctx, opts...)
if err != nil { if err != nil {
return wrapError(err) return wrapError(err)
} }
// Whatever happens, delete the checkpoint from containerd // Whatever happens, delete the checkpoint from containerd
defer func() { defer func() {
err := c.client.ImageService().Delete(context.Background(), img.Name()) err := t.ctr.client.client.ImageService().Delete(ctx, img.Name())
if err != nil { if err != nil {
c.logger.WithError(err).WithField("digest", img.Target().Digest). t.ctr.client.logger.WithError(err).WithField("digest", img.Target().Digest).
Warnf("failed to delete checkpoint image") Warnf("failed to delete checkpoint image")
} }
}() }()
b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target()) b, err := content.ReadBlob(ctx, t.ctr.client.client.ContentStore(), img.Target())
if err != nil { if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data")) return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
} }
@ -560,7 +452,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
return errdefs.System(errors.Wrapf(err, "invalid checkpoint")) return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
} }
rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc) rat, err := t.ctr.client.client.ContentStore().ReaderAt(ctx, *cpDesc)
if err != nil { if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader")) return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
} }
@ -573,7 +465,8 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
return err return err
} }
func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) { // LoadContainer loads the containerd container.
func (c *client) LoadContainer(ctx context.Context, id string) (libcontainerdtypes.Container, error) {
ctr, err := c.client.LoadContainer(ctx, id) ctr, err := c.client.LoadContainer(ctx, id)
if err != nil { if err != nil {
if containerderrors.IsNotFound(err) { if containerderrors.IsNotFound(err) {
@ -581,42 +474,25 @@ func (c *client) getContainer(ctx context.Context, id string) (containerd.Contai
} }
return nil, wrapError(err) return nil, wrapError(err)
} }
return ctr, nil return &container{client: c, c8dCtr: ctr}, nil
} }
func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) { func (c *container) Task(ctx context.Context) (libcontainerdtypes.Task, error) {
ctr, err := c.getContainer(ctx, containerID) t, err := c.c8dCtr.Task(ctx, nil)
if err != nil { if err != nil {
return nil, err
}
t, err := ctr.Task(ctx, nil)
if err != nil {
if containerderrors.IsNotFound(err) {
return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
}
return nil, wrapError(err) return nil, wrapError(err)
} }
if processID == libcontainerdtypes.InitProcessName { return c.newTask(t), nil
return t, nil
}
p, err := t.LoadProcess(ctx, processID, nil)
if err != nil {
if containerderrors.IsNotFound(err) {
return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
}
return nil, wrapError(err)
}
return p, nil
} }
// createIO creates the io to be used by a process // createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered // This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) { func (c *container) createIO(fifos *cio.FIFOSet, processID string, stdinCloseSync chan containerd.Process, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
var ( var (
io *cio.DirectIO io *cio.DirectIO
err error err error
) )
io, err = c.newDirectIO(context.Background(), fifos) io, err = c.client.newDirectIO(context.Background(), fifos)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -633,13 +509,13 @@ func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, std
// Do the rest in a new routine to avoid a deadlock if the // Do the rest in a new routine to avoid a deadlock if the
// Exec/Start call failed. // Exec/Start call failed.
go func() { go func() {
<-stdinCloseSync p, ok := <-stdinCloseSync
p, err := c.getProcess(context.Background(), containerID, processID) if !ok {
if err == nil { return
err = p.CloseIO(context.Background(), containerd.WithStdinCloser) }
if err != nil && strings.Contains(err.Error(), "transport is closing") { err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
err = nil if err != nil && strings.Contains(err.Error(), "transport is closing") {
} err = nil
} }
}() }()
}) })
@ -659,51 +535,12 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
c.eventQ.Append(ei.ContainerID, func() { c.eventQ.Append(ei.ContainerID, func() {
err := c.backend.ProcessEvent(ei.ContainerID, et, ei) err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
if err != nil { if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{ c.logger.WithContext(ctx).WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID, "container": ei.ContainerID,
"event": et, "event": et,
"event-info": ei, "event-info": ei,
}).Error("failed to process event") }).Error("failed to process event")
} }
if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
if err != nil {
c.logger.WithError(errors.New("no such process")).
WithFields(logrus.Fields{
"error": err,
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Error("exit event")
return
}
ctr, err := c.getContainer(ctx, ei.ContainerID)
if err != nil {
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
"error": err,
}).Error("failed to find container")
} else {
labels, err := ctr.Labels(ctx)
if err != nil {
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
"error": err,
}).Error("failed to get container labels")
return
}
newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
}
}) })
} }

View file

@ -20,15 +20,10 @@ func summaryFromInterface(i interface{}) (*libcontainerdtypes.Summary, error) {
return &libcontainerdtypes.Summary{}, nil return &libcontainerdtypes.Summary{}, nil
} }
func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error { func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
if err != nil {
return err
}
// go doesn't like the alias in 1.8, this means this need to be // go doesn't like the alias in 1.8, this means this need to be
// platform specific // platform specific
return p.(containerd.Task).Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources))) return t.Update(ctx, containerd.WithResources((*specs.LinuxResources)(resources)))
} }
func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int { func hostIDFromMap(id uint32, mp []specs.LinuxIDMapping) int {

View file

@ -87,7 +87,7 @@ func (c *client) newDirectIO(ctx context.Context, fifos *cio.FIFOSet) (*cio.Dire
return cio.NewDirectIOFromFIFOSet(ctx, pipes.stdin, pipes.stdout, pipes.stderr, fifos), nil return cio.NewDirectIOFromFIFOSet(ctx, pipes.stdin, pipes.stdout, pipes.stderr, fifos), nil
} }
func (c *client) UpdateResources(ctx context.Context, containerID string, resources *libcontainerdtypes.Resources) error { func (t *task) UpdateResources(ctx context.Context, resources *libcontainerdtypes.Resources) error {
// TODO: (containerd): Not implemented, but don't error. // TODO: (containerd): Not implemented, but don't error.
return nil return nil
} }

View file

@ -43,32 +43,58 @@ type Backend interface {
// Process of a container // Process of a container
type Process interface { type Process interface {
Delete(context.Context) (uint32, time.Time, error) // Pid is the system specific process id
Pid() uint32
// Kill sends the provided signal to the process
Kill(ctx context.Context, signal syscall.Signal) error
// Resize changes the width and height of the process's terminal
Resize(ctx context.Context, width, height uint32) error
// Delete removes the process and any resources allocated returning the exit status
Delete(context.Context) (*containerd.ExitStatus, error)
} }
// Client provides access to containerd features. // Client provides access to containerd features.
type Client interface { type Client interface {
Version(ctx context.Context) (containerd.Version, error) Version(ctx context.Context) (containerd.Version, error)
// LoadContainer loads the metadata for a container from containerd.
LoadContainer(ctx context.Context, containerID string) (Container, error)
// NewContainer creates a new containerd container.
NewContainer(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) (Container, error)
}
Restore(ctx context.Context, containerID string, attachStdio StdioCallback) (alive bool, pid int, p Process, err error) // Container provides access to a containerd container.
type Container interface {
Start(ctx context.Context, checkpointDir string, withStdin bool, attachStdio StdioCallback) (Task, error)
Task(ctx context.Context) (Task, error)
// AttachTask returns the current task for the container and reattaches
// to the IO for the running task. If no task exists for the container
// a NotFound error is returned.
//
// Clients must make sure that only one reader is attached to the task.
AttachTask(ctx context.Context, attachStdio StdioCallback) (Task, error)
// Delete removes the container and associated resources
Delete(context.Context) error
}
Create(ctx context.Context, containerID string, spec *specs.Spec, shim string, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error // Task provides access to a running containerd container.
Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio StdioCallback) (pid int, err error) type Task interface {
SignalProcess(ctx context.Context, containerID, processID string, signal syscall.Signal) error Process
Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) // Pause suspends the execution of the task
ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error Pause(context.Context) error
CloseStdin(ctx context.Context, containerID, processID string) error // Resume the execution of the task
Pause(ctx context.Context, containerID string) error Resume(context.Context) error
Resume(ctx context.Context, containerID string) error Stats(ctx context.Context) (*Stats, error)
Stats(ctx context.Context, containerID string) (*Stats, error) // Pids returns a list of system specific process ids inside the task
ListPids(ctx context.Context, containerID string) ([]uint32, error) Pids(context.Context) ([]containerd.ProcessInfo, error)
Summary(ctx context.Context, containerID string) ([]Summary, error) Summary(ctx context.Context) ([]Summary, error)
DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) // ForceDelete forcefully kills the task's processes and deletes the task
Delete(ctx context.Context, containerID string) error ForceDelete(context.Context) error
Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) // Status returns the executing status of the task
Status(ctx context.Context) (containerd.Status, error)
UpdateResources(ctx context.Context, containerID string, resources *Resources) error // Exec creates and starts a new process inside the task
CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error Exec(ctx context.Context, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (Process, error)
UpdateResources(ctx context.Context, resources *Resources) error
CreateCheckpoint(ctx context.Context, checkpointDir string, exit bool) error
} }
// StdioCallback is called to connect a container or process stdio. // StdioCallback is called to connect a container or process stdio.

View file

@ -2,6 +2,7 @@ package containerd // import "github.com/docker/docker/plugin/executor/container
import ( import (
"context" "context"
"fmt"
"io" "io"
"sync" "sync"
"syscall" "syscall"
@ -28,6 +29,7 @@ func New(ctx context.Context, rootDir string, cli *containerd.Client, ns string,
rootDir: rootDir, rootDir: rootDir,
exitHandler: exitHandler, exitHandler: exitHandler,
runtime: runtime, runtime: runtime,
plugins: make(map[string]*c8dPlugin),
} }
client, err := libcontainerd.NewClient(ctx, cli, rootDir, ns, e) client, err := libcontainerd.NewClient(ctx, cli, rootDir, ns, e)
@ -44,41 +46,62 @@ type Executor struct {
client libcontainerdtypes.Client client libcontainerdtypes.Client
exitHandler ExitHandler exitHandler ExitHandler
runtime types.Runtime runtime types.Runtime
mu sync.Mutex // Guards plugins map
plugins map[string]*c8dPlugin
}
type c8dPlugin struct {
log *logrus.Entry
ctr libcontainerdtypes.Container
tsk libcontainerdtypes.Task
} }
// deleteTaskAndContainer deletes plugin task and then plugin container from containerd // deleteTaskAndContainer deletes plugin task and then plugin container from containerd
func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) { func (p c8dPlugin) deleteTaskAndContainer(ctx context.Context) {
if p != nil { if p.tsk != nil {
if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) { if _, err := p.tsk.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd") p.log.WithError(err).Error("failed to delete plugin task from containerd")
}
} else {
if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
} }
} }
if p.ctr != nil {
if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) { if err := p.ctr.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd") p.log.WithError(err).Error("failed to delete plugin container from containerd")
}
} }
} }
// Create creates a new container // Create creates a new container
func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error { func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
ctx := context.Background() ctx := context.Background()
err := e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts) log := logrus.WithField("plugin", id)
ctr, err := e.client.NewContainer(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
if err != nil { if err != nil {
status, err2 := e.client.Status(ctx, id) ctr2, err2 := e.client.LoadContainer(ctx, id)
if err2 != nil { if err2 != nil {
if !errdefs.IsNotFound(err2) { if !errdefs.IsNotFound(err2) {
logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status") log.WithError(err2).Warn("Received an error while attempting to load containerd container for plugin")
} }
} else { } else {
if status != containerd.Running && status != containerd.Unknown { status := containerd.Unknown
if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) { t, err2 := ctr2.Task(ctx)
logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container") if err2 != nil {
if !errdefs.IsNotFound(err2) {
log.WithError(err2).Warn("Received an error while attempting to load containerd task for plugin")
} }
err = e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts) } else {
s, err2 := t.Status(ctx)
if err2 != nil {
log.WithError(err2).Warn("Received an error while attempting to read plugin status")
} else {
status = s.Status
}
}
if status != containerd.Running && status != containerd.Unknown {
if err2 := ctr2.Delete(ctx); err2 != nil && !errdefs.IsNotFound(err2) {
log.WithError(err2).Error("Error cleaning up containerd container")
}
ctr, err = e.client.NewContainer(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
} }
} }
@ -87,34 +110,78 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo
} }
} }
_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr)) p := c8dPlugin{log: log, ctr: ctr}
p.tsk, err = ctr.Start(ctx, "", false, attachStreamsFunc(stdout, stderr))
if err != nil { if err != nil {
deleteTaskAndContainer(ctx, e.client, id, nil) p.deleteTaskAndContainer(ctx)
return err
} }
return err e.mu.Lock()
defer e.mu.Unlock()
e.plugins[id] = &p
return nil
} }
// Restore restores a container // Restore restores a container
func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) { func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr)) ctx := context.Background()
if err != nil && !errdefs.IsNotFound(err) { p := c8dPlugin{log: logrus.WithField("plugin", id)}
ctr, err := e.client.LoadContainer(ctx, id)
if err != nil {
if errdefs.IsNotFound(err) {
return false, nil
}
return false, err return false, err
} }
if !alive { p.tsk, err = ctr.AttachTask(ctx, attachStreamsFunc(stdout, stderr))
deleteTaskAndContainer(context.Background(), e.client, id, p) if err != nil {
if errdefs.IsNotFound(err) {
p.deleteTaskAndContainer(ctx)
return false, nil
}
return false, err
} }
return alive, nil s, err := p.tsk.Status(ctx)
if err != nil {
if errdefs.IsNotFound(err) {
// Task vanished after attaching?
p.tsk = nil
p.deleteTaskAndContainer(ctx)
return false, nil
}
return false, err
}
if s.Status == containerd.Stopped {
p.deleteTaskAndContainer(ctx)
return false, nil
}
e.mu.Lock()
defer e.mu.Unlock()
e.plugins[id] = &p
return true, nil
} }
// IsRunning returns if the container with the given id is running // IsRunning returns if the container with the given id is running
func (e *Executor) IsRunning(id string) (bool, error) { func (e *Executor) IsRunning(id string) (bool, error) {
status, err := e.client.Status(context.Background(), id) e.mu.Lock()
return status == containerd.Running, err p := e.plugins[id]
e.mu.Unlock()
if p == nil {
return false, errdefs.NotFound(fmt.Errorf("unknown plugin %q", id))
}
status, err := p.tsk.Status(context.Background())
return status.Status == containerd.Running, err
} }
// Signal sends the specified signal to the container // Signal sends the specified signal to the container
func (e *Executor) Signal(id string, signal syscall.Signal) error { func (e *Executor) Signal(id string, signal syscall.Signal) error {
return e.client.SignalProcess(context.Background(), id, libcontainerdtypes.InitProcessName, signal) e.mu.Lock()
p := e.plugins[id]
e.mu.Unlock()
if p == nil {
return errdefs.NotFound(fmt.Errorf("unknown plugin %q", id))
}
return p.tsk.Kill(context.Background(), signal)
} }
// ProcessEvent handles events from containerd // ProcessEvent handles events from containerd
@ -122,7 +189,14 @@ func (e *Executor) Signal(id string, signal syscall.Signal) error {
func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error { func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
switch et { switch et {
case libcontainerdtypes.EventExit: case libcontainerdtypes.EventExit:
deleteTaskAndContainer(context.Background(), e.client, id, nil) e.mu.Lock()
p := e.plugins[id]
e.mu.Unlock()
if p == nil {
logrus.WithField("id", id).Warn("Received exit event for an unknown plugin")
} else {
p.deleteTaskAndContainer(context.Background())
}
return e.exitHandler.HandleExitEvent(ei.ContainerID) return e.exitHandler.HandleExitEvent(ei.ContainerID)
} }
return nil return nil