mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
c178700a04
Instead of a timeout the context is cancelled on error to ensure proper cleanup of the associated fifos' goroutines. Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
605 lines
16 KiB
Go
605 lines
16 KiB
Go
package libcontainerd
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
containerd "github.com/docker/containerd/api/grpc/types"
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/docker/docker/pkg/mount"
|
|
"github.com/golang/protobuf/ptypes"
|
|
"github.com/golang/protobuf/ptypes/timestamp"
|
|
specs "github.com/opencontainers/runtime-spec/specs-go"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
type client struct {
|
|
clientCommon
|
|
|
|
// Platform specific properties below here.
|
|
remote *remote
|
|
q queue
|
|
exitNotifiers map[string]*exitNotifier
|
|
liveRestore bool
|
|
}
|
|
|
|
// GetServerVersion returns the connected server version information
|
|
func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
|
|
resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sv := &ServerVersion{
|
|
GetServerVersionResponse: *resp,
|
|
}
|
|
|
|
return sv, nil
|
|
}
|
|
|
|
// AddProcess is the handler for adding a process to an already running
|
|
// container. It's called through docker exec. It returns the system pid of the
|
|
// exec'd process.
|
|
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (pid int, err error) {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
spec, err := container.spec()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
sp := spec.Process
|
|
sp.Args = specp.Args
|
|
sp.Terminal = specp.Terminal
|
|
if len(specp.Env) > 0 {
|
|
sp.Env = specp.Env
|
|
}
|
|
if specp.Cwd != nil {
|
|
sp.Cwd = *specp.Cwd
|
|
}
|
|
if specp.User != nil {
|
|
sp.User = specs.User{
|
|
UID: specp.User.UID,
|
|
GID: specp.User.GID,
|
|
AdditionalGids: specp.User.AdditionalGids,
|
|
}
|
|
}
|
|
if specp.Capabilities != nil {
|
|
sp.Capabilities = specp.Capabilities
|
|
}
|
|
|
|
p := container.newProcess(processFriendlyName)
|
|
|
|
r := &containerd.AddProcessRequest{
|
|
Args: sp.Args,
|
|
Cwd: sp.Cwd,
|
|
Terminal: sp.Terminal,
|
|
Id: containerID,
|
|
Env: sp.Env,
|
|
User: &containerd.User{
|
|
Uid: sp.User.UID,
|
|
Gid: sp.User.GID,
|
|
AdditionalGids: sp.User.AdditionalGids,
|
|
},
|
|
Pid: processFriendlyName,
|
|
Stdin: p.fifo(syscall.Stdin),
|
|
Stdout: p.fifo(syscall.Stdout),
|
|
Stderr: p.fifo(syscall.Stderr),
|
|
Capabilities: sp.Capabilities,
|
|
ApparmorProfile: sp.ApparmorProfile,
|
|
SelinuxLabel: sp.SelinuxLabel,
|
|
NoNewPrivileges: sp.NoNewPrivileges,
|
|
Rlimits: convertRlimits(sp.Rlimits),
|
|
}
|
|
|
|
fifoCtx, cancel := context.WithCancel(context.Background())
|
|
defer func() {
|
|
if err != nil {
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
iopipe, err := p.openFifos(fifoCtx, sp.Terminal)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
|
|
if err != nil {
|
|
p.closeFifos(iopipe)
|
|
return -1, err
|
|
}
|
|
|
|
var stdinOnce sync.Once
|
|
stdin := iopipe.Stdin
|
|
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
|
|
var err error
|
|
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
|
|
err = stdin.Close()
|
|
if err2 := p.sendCloseStdin(); err == nil {
|
|
err = err2
|
|
}
|
|
})
|
|
return err
|
|
})
|
|
|
|
container.processes[processFriendlyName] = p
|
|
|
|
if err := attachStdio(*iopipe); err != nil {
|
|
p.closeFifos(iopipe)
|
|
return -1, err
|
|
}
|
|
|
|
return int(resp.SystemPid), nil
|
|
}
|
|
|
|
func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
_, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
|
|
Id: containerID,
|
|
Pid: pid,
|
|
Signal: uint32(sig),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
if _, err := clnt.getContainer(containerID); err != nil {
|
|
return err
|
|
}
|
|
_, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
|
|
Id: containerID,
|
|
Pid: processFriendlyName,
|
|
Width: uint32(width),
|
|
Height: uint32(height),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) Pause(containerID string) error {
|
|
return clnt.setState(containerID, StatePause)
|
|
}
|
|
|
|
func (clnt *client) setState(containerID, state string) error {
|
|
clnt.lock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
clnt.unlock(containerID)
|
|
return err
|
|
}
|
|
if container.systemPid == 0 {
|
|
clnt.unlock(containerID)
|
|
return fmt.Errorf("No active process for container %s", containerID)
|
|
}
|
|
st := "running"
|
|
if state == StatePause {
|
|
st = "paused"
|
|
}
|
|
chstate := make(chan struct{})
|
|
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Status: st,
|
|
})
|
|
if err != nil {
|
|
clnt.unlock(containerID)
|
|
return err
|
|
}
|
|
container.pauseMonitor.append(state, chstate)
|
|
clnt.unlock(containerID)
|
|
<-chstate
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) Resume(containerID string) error {
|
|
return clnt.setState(containerID, StateResume)
|
|
}
|
|
|
|
func (clnt *client) Stats(containerID string) (*Stats, error) {
|
|
resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return (*Stats)(resp), nil
|
|
}
|
|
|
|
// Take care of the old 1.11.0 behavior in case the version upgrade
|
|
// happened without a clean daemon shutdown
|
|
func (clnt *client) cleanupOldRootfs(containerID string) {
|
|
// Unmount and delete the bundle folder
|
|
if mts, err := mount.GetMounts(); err == nil {
|
|
for _, mts := range mts {
|
|
if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
|
|
if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
|
|
os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (clnt *client) setExited(containerID string, exitCode uint32) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
|
|
err := clnt.backend.StateChanged(containerID, StateInfo{
|
|
CommonStateInfo: CommonStateInfo{
|
|
State: StateExit,
|
|
ExitCode: exitCode,
|
|
}})
|
|
|
|
clnt.cleanupOldRootfs(containerID)
|
|
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
|
|
cont, err := clnt.getContainerdContainer(containerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids := make([]int, len(cont.Pids))
|
|
for i, p := range cont.Pids {
|
|
pids[i] = int(p)
|
|
}
|
|
return pids, nil
|
|
}
|
|
|
|
// Summary returns a summary of the processes running in a container.
|
|
// This is a no-op on Linux.
|
|
func (clnt *client) Summary(containerID string) ([]Summary, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
|
|
resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, cont := range resp.Containers {
|
|
if cont.Id == containerID {
|
|
return cont, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("invalid state response")
|
|
}
|
|
|
|
func (clnt *client) UpdateResources(containerID string, resources Resources) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if container.systemPid == 0 {
|
|
return fmt.Errorf("No active process for container %s", containerID)
|
|
}
|
|
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Resources: (*containerd.UpdateResource)(&resources),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
|
|
clnt.mapMutex.RLock()
|
|
defer clnt.mapMutex.RUnlock()
|
|
return clnt.exitNotifiers[containerID]
|
|
}
|
|
|
|
func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
|
|
clnt.mapMutex.Lock()
|
|
w, ok := clnt.exitNotifiers[containerID]
|
|
defer clnt.mapMutex.Unlock()
|
|
if !ok {
|
|
w = &exitNotifier{c: make(chan struct{}), client: clnt}
|
|
clnt.exitNotifiers[containerID] = w
|
|
}
|
|
return w
|
|
}
|
|
|
|
func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
|
|
clnt.lock(cont.Id)
|
|
defer clnt.unlock(cont.Id)
|
|
|
|
logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
|
|
|
|
containerID := cont.Id
|
|
if _, err := clnt.getContainer(containerID); err == nil {
|
|
return fmt.Errorf("container %s is already active", containerID)
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
clnt.deleteContainer(cont.Id)
|
|
}
|
|
}()
|
|
|
|
container := clnt.newContainer(cont.BundlePath, options...)
|
|
container.systemPid = systemPid(cont)
|
|
|
|
var terminal bool
|
|
for _, p := range cont.Processes {
|
|
if p.Pid == InitFriendlyName {
|
|
terminal = p.Terminal
|
|
}
|
|
}
|
|
|
|
fifoCtx, cancel := context.WithCancel(context.Background())
|
|
defer func() {
|
|
if err != nil {
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
iopipe, err := container.openFifos(fifoCtx, terminal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var stdinOnce sync.Once
|
|
stdin := iopipe.Stdin
|
|
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
|
|
var err error
|
|
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
|
|
err = stdin.Close()
|
|
})
|
|
return err
|
|
})
|
|
|
|
if err := attachStdio(*iopipe); err != nil {
|
|
container.closeFifos(iopipe)
|
|
return err
|
|
}
|
|
|
|
clnt.appendContainer(container)
|
|
|
|
err = clnt.backend.StateChanged(containerID, StateInfo{
|
|
CommonStateInfo: CommonStateInfo{
|
|
State: StateRestore,
|
|
Pid: container.systemPid,
|
|
}})
|
|
|
|
if err != nil {
|
|
container.closeFifos(iopipe)
|
|
return err
|
|
}
|
|
|
|
if lastEvent != nil {
|
|
// This should only be a pause or resume event
|
|
if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
|
|
return clnt.backend.StateChanged(containerID, StateInfo{
|
|
CommonStateInfo: CommonStateInfo{
|
|
State: lastEvent.Type,
|
|
Pid: container.systemPid,
|
|
}})
|
|
}
|
|
|
|
logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
|
|
er := &containerd.EventsRequest{
|
|
Timestamp: tsp,
|
|
StoredOnly: true,
|
|
Id: id,
|
|
}
|
|
events, err := clnt.remote.apiClient.Events(context.Background(), er)
|
|
if err != nil {
|
|
logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
|
|
return nil, err
|
|
}
|
|
|
|
var ev *containerd.Event
|
|
for {
|
|
e, err := events.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
|
|
return nil, err
|
|
}
|
|
ev = e
|
|
logrus.Debugf("libcontainerd: received past event %#v", ev)
|
|
}
|
|
|
|
return ev, nil
|
|
}
|
|
|
|
func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
|
|
ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
|
|
if err == nil && ev == nil {
|
|
// If ev is nil and the container is running in containerd,
|
|
// we already consumed all the event of the
|
|
// container, included the "exit" one.
|
|
// Thus, we request all events containerd has in memory for
|
|
// this container in order to get the last one (which should
|
|
// be an exit event)
|
|
logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
|
|
// Request all events since beginning of time
|
|
t := time.Unix(0, 0)
|
|
tsp, err := ptypes.TimestampProto(t)
|
|
if err != nil {
|
|
logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
|
|
return nil, err
|
|
}
|
|
|
|
return clnt.getContainerLastEventSinceTime(id, tsp)
|
|
}
|
|
|
|
return ev, err
|
|
}
|
|
|
|
func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
|
|
// Synchronize with live events
|
|
clnt.remote.Lock()
|
|
defer clnt.remote.Unlock()
|
|
// Check that containerd still knows this container.
|
|
//
|
|
// In the unlikely event that Restore for this container process
|
|
// the its past event before the main loop, the event will be
|
|
// processed twice. However, this is not an issue as all those
|
|
// events will do is change the state of the container to be
|
|
// exactly the same.
|
|
cont, err := clnt.getContainerdContainer(containerID)
|
|
// Get its last event
|
|
ev, eerr := clnt.getContainerLastEvent(containerID)
|
|
if err != nil || cont.Status == "Stopped" {
|
|
if err != nil {
|
|
logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err)
|
|
}
|
|
if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) {
|
|
// Wait a while for the exit event
|
|
timeout := time.NewTimer(10 * time.Second)
|
|
tick := time.NewTicker(100 * time.Millisecond)
|
|
stop:
|
|
for {
|
|
select {
|
|
case <-timeout.C:
|
|
break stop
|
|
case <-tick.C:
|
|
ev, eerr = clnt.getContainerLastEvent(containerID)
|
|
if eerr != nil {
|
|
break stop
|
|
}
|
|
if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
|
|
break stop
|
|
}
|
|
}
|
|
}
|
|
timeout.Stop()
|
|
tick.Stop()
|
|
}
|
|
|
|
// get the exit status for this container, if we don't have
|
|
// one, indicate an error
|
|
ec := uint32(255)
|
|
if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
|
|
ec = ev.Status
|
|
}
|
|
clnt.setExited(containerID, ec)
|
|
|
|
return nil
|
|
}
|
|
|
|
// container is still alive
|
|
if clnt.liveRestore {
|
|
if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
|
|
logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Kill the container if liveRestore == false
|
|
w := clnt.getOrCreateExitNotifier(containerID)
|
|
clnt.lock(cont.Id)
|
|
container := clnt.newContainer(cont.BundlePath)
|
|
container.systemPid = systemPid(cont)
|
|
clnt.appendContainer(container)
|
|
clnt.unlock(cont.Id)
|
|
|
|
container.discardFifos()
|
|
|
|
if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
|
|
logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
|
|
}
|
|
// Let the main loop handle the exit event
|
|
clnt.remote.Unlock()
|
|
select {
|
|
case <-time.After(10 * time.Second):
|
|
if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
|
|
logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
|
|
}
|
|
select {
|
|
case <-time.After(2 * time.Second):
|
|
case <-w.wait():
|
|
// relock because of the defer
|
|
clnt.remote.Lock()
|
|
return nil
|
|
}
|
|
case <-w.wait():
|
|
// relock because of the defer
|
|
clnt.remote.Lock()
|
|
return nil
|
|
}
|
|
// relock because of the defer
|
|
clnt.remote.Lock()
|
|
|
|
clnt.deleteContainer(containerID)
|
|
|
|
return clnt.setExited(containerID, uint32(255))
|
|
}
|
|
|
|
func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
if _, err := clnt.getContainer(containerID); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
|
|
Id: containerID,
|
|
Checkpoint: &containerd.Checkpoint{
|
|
Name: checkpointID,
|
|
Exit: exit,
|
|
Tcp: true,
|
|
UnixSockets: true,
|
|
Shell: false,
|
|
EmptyNS: []string{"network"},
|
|
},
|
|
CheckpointDir: checkpointDir,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
if _, err := clnt.getContainer(containerID); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
|
|
Id: containerID,
|
|
Name: checkpointID,
|
|
CheckpointDir: checkpointDir,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
if _, err := clnt.getContainer(containerID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
|
|
Id: containerID,
|
|
CheckpointDir: checkpointDir,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return (*Checkpoints)(resp), nil
|
|
}
|