mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
2650b1b6ab
When there is no event for the container it can happen because of a crash and the container state on the persistent disk will have a mismatch between what was in `/run` ( machine crash ). This situation will create an unkillable container in docker because containerd does not see it and it is not running but docker thinks it is and you cannot tell it anything different. This fixes the issue by checking if containerd has the container running if we do not have an event instead of just returning. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
601 lines
15 KiB
Go
601 lines
15 KiB
Go
package libcontainerd
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
containerd "github.com/docker/containerd/api/grpc/types"
|
|
"github.com/docker/docker/pkg/idtools"
|
|
"github.com/docker/docker/pkg/mount"
|
|
specs "github.com/opencontainers/specs/specs-go"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
type client struct {
|
|
clientCommon
|
|
|
|
// Platform specific properties below here.
|
|
remote *remote
|
|
q queue
|
|
exitNotifiers map[string]*exitNotifier
|
|
liveRestore bool
|
|
}
|
|
|
|
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
spec, err := container.spec()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sp := spec.Process
|
|
sp.Args = specp.Args
|
|
sp.Terminal = specp.Terminal
|
|
if specp.Env != nil {
|
|
sp.Env = specp.Env
|
|
}
|
|
if specp.Cwd != nil {
|
|
sp.Cwd = *specp.Cwd
|
|
}
|
|
if specp.User != nil {
|
|
sp.User = specs.User{
|
|
UID: specp.User.UID,
|
|
GID: specp.User.GID,
|
|
AdditionalGids: specp.User.AdditionalGids,
|
|
}
|
|
}
|
|
if specp.Capabilities != nil {
|
|
sp.Capabilities = specp.Capabilities
|
|
}
|
|
|
|
p := container.newProcess(processFriendlyName)
|
|
|
|
r := &containerd.AddProcessRequest{
|
|
Args: sp.Args,
|
|
Cwd: sp.Cwd,
|
|
Terminal: sp.Terminal,
|
|
Id: containerID,
|
|
Env: sp.Env,
|
|
User: &containerd.User{
|
|
Uid: sp.User.UID,
|
|
Gid: sp.User.GID,
|
|
AdditionalGids: sp.User.AdditionalGids,
|
|
},
|
|
Pid: processFriendlyName,
|
|
Stdin: p.fifo(syscall.Stdin),
|
|
Stdout: p.fifo(syscall.Stdout),
|
|
Stderr: p.fifo(syscall.Stderr),
|
|
Capabilities: sp.Capabilities,
|
|
ApparmorProfile: sp.ApparmorProfile,
|
|
SelinuxLabel: sp.SelinuxLabel,
|
|
NoNewPrivileges: sp.NoNewPrivileges,
|
|
Rlimits: convertRlimits(sp.Rlimits),
|
|
}
|
|
|
|
iopipe, err := p.openFifos(sp.Terminal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := clnt.remote.apiClient.AddProcess(ctx, r); err != nil {
|
|
p.closeFifos(iopipe)
|
|
return err
|
|
}
|
|
|
|
container.processes[processFriendlyName] = p
|
|
|
|
clnt.unlock(containerID)
|
|
|
|
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
|
|
return err
|
|
}
|
|
clnt.lock(containerID)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
|
|
root, err := filepath.Abs(clnt.remote.stateDir)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if uid == 0 && gid == 0 {
|
|
return root, nil
|
|
}
|
|
p := string(filepath.Separator)
|
|
for _, d := range strings.Split(root, string(filepath.Separator))[1:] {
|
|
p = filepath.Join(p, d)
|
|
fi, err := os.Stat(p)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
return "", err
|
|
}
|
|
if os.IsNotExist(err) || fi.Mode()&1 == 0 {
|
|
p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
|
|
if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) {
|
|
return "", err
|
|
}
|
|
}
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
|
|
if ctr, err := clnt.getContainer(containerID); err == nil {
|
|
if ctr.restarting {
|
|
ctr.restartManager.Cancel()
|
|
ctr.clean()
|
|
} else {
|
|
return fmt.Errorf("Container %s is already active", containerID)
|
|
}
|
|
}
|
|
|
|
uid, gid, err := getRootIDs(specs.Spec(spec))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dir, err := clnt.prepareBundleDir(uid, gid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
container := clnt.newContainer(filepath.Join(dir, containerID), options...)
|
|
if err := container.clean(); err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
container.clean()
|
|
clnt.deleteContainer(containerID)
|
|
}
|
|
}()
|
|
|
|
if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
|
|
return err
|
|
}
|
|
|
|
f, err := os.Create(filepath.Join(container.dir, configFilename))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
if err := json.NewEncoder(f).Encode(spec); err != nil {
|
|
return err
|
|
}
|
|
|
|
return container.start()
|
|
}
|
|
|
|
func (clnt *client) Signal(containerID string, sig int) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
_, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Signal: uint32(sig),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
_, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
|
|
Id: containerID,
|
|
Pid: pid,
|
|
Signal: uint32(sig),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
if _, err := clnt.getContainer(containerID); err != nil {
|
|
return err
|
|
}
|
|
_, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
|
|
Id: containerID,
|
|
Pid: processFriendlyName,
|
|
Width: uint32(width),
|
|
Height: uint32(height),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) Pause(containerID string) error {
|
|
return clnt.setState(containerID, StatePause)
|
|
}
|
|
|
|
func (clnt *client) setState(containerID, state string) error {
|
|
clnt.lock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
clnt.unlock(containerID)
|
|
return err
|
|
}
|
|
if container.systemPid == 0 {
|
|
clnt.unlock(containerID)
|
|
return fmt.Errorf("No active process for container %s", containerID)
|
|
}
|
|
st := "running"
|
|
if state == StatePause {
|
|
st = "paused"
|
|
}
|
|
chstate := make(chan struct{})
|
|
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Status: st,
|
|
})
|
|
if err != nil {
|
|
clnt.unlock(containerID)
|
|
return err
|
|
}
|
|
container.pauseMonitor.append(state, chstate)
|
|
clnt.unlock(containerID)
|
|
<-chstate
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) Resume(containerID string) error {
|
|
return clnt.setState(containerID, StateResume)
|
|
}
|
|
|
|
func (clnt *client) Stats(containerID string) (*Stats, error) {
|
|
resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return (*Stats)(resp), nil
|
|
}
|
|
|
|
// Take care of the old 1.11.0 behavior in case the version upgrade
|
|
// happened without a clean daemon shutdown
|
|
func (clnt *client) cleanupOldRootfs(containerID string) {
|
|
// Unmount and delete the bundle folder
|
|
if mts, err := mount.GetMounts(); err == nil {
|
|
for _, mts := range mts {
|
|
if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
|
|
if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
|
|
os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (clnt *client) setExited(containerID string, exitCode uint32) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
|
|
err := clnt.backend.StateChanged(containerID, StateInfo{
|
|
CommonStateInfo: CommonStateInfo{
|
|
State: StateExit,
|
|
ExitCode: exitCode,
|
|
}})
|
|
|
|
clnt.cleanupOldRootfs(containerID)
|
|
|
|
return err
|
|
}
|
|
|
|
func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
|
|
cont, err := clnt.getContainerdContainer(containerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids := make([]int, len(cont.Pids))
|
|
for i, p := range cont.Pids {
|
|
pids[i] = int(p)
|
|
}
|
|
return pids, nil
|
|
}
|
|
|
|
// Summary returns a summary of the processes running in a container.
|
|
// This is a no-op on Linux.
|
|
func (clnt *client) Summary(containerID string) ([]Summary, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
|
|
resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, cont := range resp.Containers {
|
|
if cont.Id == containerID {
|
|
return cont, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("invalid state response")
|
|
}
|
|
|
|
func (clnt *client) newContainer(dir string, options ...CreateOption) *container {
|
|
container := &container{
|
|
containerCommon: containerCommon{
|
|
process: process{
|
|
dir: dir,
|
|
processCommon: processCommon{
|
|
containerID: filepath.Base(dir),
|
|
client: clnt,
|
|
friendlyName: InitFriendlyName,
|
|
},
|
|
},
|
|
processes: make(map[string]*process),
|
|
},
|
|
}
|
|
for _, option := range options {
|
|
if err := option.Apply(container); err != nil {
|
|
logrus.Errorf("libcontainerd: newContainer(): %v", err)
|
|
}
|
|
}
|
|
return container
|
|
}
|
|
|
|
func (clnt *client) UpdateResources(containerID string, resources Resources) error {
|
|
clnt.lock(containerID)
|
|
defer clnt.unlock(containerID)
|
|
container, err := clnt.getContainer(containerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if container.systemPid == 0 {
|
|
return fmt.Errorf("No active process for container %s", containerID)
|
|
}
|
|
_, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
|
|
Id: containerID,
|
|
Pid: InitFriendlyName,
|
|
Resources: (*containerd.UpdateResource)(&resources),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
|
|
clnt.mapMutex.RLock()
|
|
defer clnt.mapMutex.RUnlock()
|
|
return clnt.exitNotifiers[containerID]
|
|
}
|
|
|
|
func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
|
|
clnt.mapMutex.Lock()
|
|
w, ok := clnt.exitNotifiers[containerID]
|
|
defer clnt.mapMutex.Unlock()
|
|
if !ok {
|
|
w = &exitNotifier{c: make(chan struct{}), client: clnt}
|
|
clnt.exitNotifiers[containerID] = w
|
|
}
|
|
return w
|
|
}
|
|
|
|
func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
|
|
clnt.lock(cont.Id)
|
|
defer clnt.unlock(cont.Id)
|
|
|
|
logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
|
|
|
|
containerID := cont.Id
|
|
if _, err := clnt.getContainer(containerID); err == nil {
|
|
return fmt.Errorf("container %s is already active", containerID)
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
clnt.deleteContainer(cont.Id)
|
|
}
|
|
}()
|
|
|
|
container := clnt.newContainer(cont.BundlePath, options...)
|
|
container.systemPid = systemPid(cont)
|
|
|
|
var terminal bool
|
|
for _, p := range cont.Processes {
|
|
if p.Pid == InitFriendlyName {
|
|
terminal = p.Terminal
|
|
}
|
|
}
|
|
|
|
iopipe, err := container.openFifos(terminal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
|
|
return err
|
|
}
|
|
|
|
clnt.appendContainer(container)
|
|
|
|
err = clnt.backend.StateChanged(containerID, StateInfo{
|
|
CommonStateInfo: CommonStateInfo{
|
|
State: StateRestore,
|
|
Pid: container.systemPid,
|
|
}})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if lastEvent != nil {
|
|
// This should only be a pause or resume event
|
|
if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
|
|
return clnt.backend.StateChanged(containerID, StateInfo{
|
|
CommonStateInfo: CommonStateInfo{
|
|
State: lastEvent.Type,
|
|
Pid: container.systemPid,
|
|
}})
|
|
}
|
|
|
|
logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (clnt *client) getContainerLastEvent(containerID string) (*containerd.Event, error) {
|
|
er := &containerd.EventsRequest{
|
|
Timestamp: clnt.remote.restoreFromTimestamp,
|
|
StoredOnly: true,
|
|
Id: containerID,
|
|
}
|
|
events, err := clnt.remote.apiClient.Events(context.Background(), er)
|
|
if err != nil {
|
|
logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
|
|
return nil, err
|
|
}
|
|
|
|
var ev *containerd.Event
|
|
for {
|
|
e, err := events.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
logrus.Errorf("libcontainerd: failed to get container event for %s: %q", containerID, err)
|
|
return nil, err
|
|
}
|
|
|
|
logrus.Debugf("libcontainerd: received past event %#v", e)
|
|
|
|
switch e.Type {
|
|
case StateExit, StatePause, StateResume:
|
|
ev = e
|
|
}
|
|
}
|
|
|
|
return ev, nil
|
|
}
|
|
|
|
func (clnt *client) Restore(containerID string, options ...CreateOption) error {
|
|
// Synchronize with live events
|
|
clnt.remote.Lock()
|
|
defer clnt.remote.Unlock()
|
|
// Check that containerd still knows this container.
|
|
//
|
|
// In the unlikely event that Restore for this container process
|
|
// the its past event before the main loop, the event will be
|
|
// processed twice. However, this is not an issue as all those
|
|
// events will do is change the state of the container to be
|
|
// exactly the same.
|
|
cont, err := clnt.getContainerdContainer(containerID)
|
|
// Get its last event
|
|
ev, eerr := clnt.getContainerLastEvent(containerID)
|
|
if err != nil || cont.Status == "Stopped" {
|
|
if err != nil && !strings.Contains(err.Error(), "container not found") {
|
|
// Legitimate error
|
|
return err
|
|
}
|
|
|
|
if ev == nil {
|
|
if _, err := clnt.getContainer(containerID); err == nil {
|
|
// If ev is nil and the container is running in containerd,
|
|
// we already consumed all the event of the
|
|
// container, included the "exit" one.
|
|
// Thus we return to avoid overriding the Exit Code.
|
|
logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID)
|
|
return nil
|
|
}
|
|
// the container is not running so we need to fix the state within docker
|
|
ev = &containerd.Event{
|
|
Type: StateExit,
|
|
Status: 1,
|
|
}
|
|
}
|
|
|
|
// get the exit status for this container
|
|
ec := uint32(0)
|
|
if eerr == nil && ev.Type == StateExit {
|
|
ec = ev.Status
|
|
}
|
|
clnt.setExited(containerID, ec)
|
|
|
|
return nil
|
|
}
|
|
|
|
// container is still alive
|
|
if clnt.liveRestore {
|
|
if err := clnt.restore(cont, ev, options...); err != nil {
|
|
logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Kill the container if liveRestore == false
|
|
w := clnt.getOrCreateExitNotifier(containerID)
|
|
clnt.lock(cont.Id)
|
|
container := clnt.newContainer(cont.BundlePath)
|
|
container.systemPid = systemPid(cont)
|
|
clnt.appendContainer(container)
|
|
clnt.unlock(cont.Id)
|
|
|
|
container.discardFifos()
|
|
|
|
if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
|
|
logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
|
|
}
|
|
// Let the main loop handle the exit event
|
|
clnt.remote.Unlock()
|
|
select {
|
|
case <-time.After(10 * time.Second):
|
|
if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
|
|
logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
|
|
}
|
|
select {
|
|
case <-time.After(2 * time.Second):
|
|
case <-w.wait():
|
|
// relock because of the defer
|
|
clnt.remote.Lock()
|
|
return nil
|
|
}
|
|
case <-w.wait():
|
|
// relock because of the defer
|
|
clnt.remote.Lock()
|
|
return nil
|
|
}
|
|
// relock because of the defer
|
|
clnt.remote.Lock()
|
|
|
|
clnt.deleteContainer(containerID)
|
|
|
|
return clnt.setExited(containerID, uint32(255))
|
|
}
|
|
|
|
type exitNotifier struct {
|
|
id string
|
|
client *client
|
|
c chan struct{}
|
|
once sync.Once
|
|
}
|
|
|
|
func (en *exitNotifier) close() {
|
|
en.once.Do(func() {
|
|
close(en.c)
|
|
en.client.mapMutex.Lock()
|
|
if en == en.client.exitNotifiers[en.id] {
|
|
delete(en.client.exitNotifiers, en.id)
|
|
}
|
|
en.client.mapMutex.Unlock()
|
|
})
|
|
}
|
|
func (en *exitNotifier) wait() <-chan struct{} {
|
|
return en.c
|
|
}
|